Files
ai-stack-deployer/test_mcp.py
Oussama Douhou 8977a6fdee New design v0.2
New design and framework
2026-01-13 10:49:47 +01:00

28 lines
1.1 KiB
Python

import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.http import HttpClientTransport
async def main():
async with HttpClientTransport("http://10.100.0.17:8000/mcp/") as transport:
async with ClientSession(transport) as session:
await session.initialize()
# Clear failures
result = await session.call_tool("clear_queue_failures", {})
print(f"Clear Failures: {result}")
# Add memory
result = await session.call_tool("add_memory", {
"name": "MCP Health Verification - 2026-01-11",
"episode_body": "Graphiti MCP server audit completed. Server healthy, queue cleared. Pending: SEMAPHORE_LIMIT and timeout changes require scheduled maintenance window.",
"group_id": "global"
})
print(f"Add Memory: {result}")
# Get status
result = await session.call_tool("get_queue_status", {})
print(f"Queue Status: {result}")
if __name__ == "__main__":
asyncio.run(main())