Add SSE support

This commit is contained in:
Anibal Angulo
2026-02-22 15:52:35 +00:00
parent 54eb6f240c
commit 82764bd60b
2 changed files with 48 additions and 7 deletions

View File

@@ -1,6 +1,7 @@
# ruff: noqa: INP001
"""ADK agent that connects to the knowledge-search MCP server."""
import argparse
import asyncio
import os
@@ -8,7 +9,10 @@ from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from google.adk.tools.mcp_tool.mcp_session_manager import (
SseConnectionParams,
StdioConnectionParams,
)
from google.genai import types
from mcp import StdioServerParameters
@@ -22,15 +26,32 @@ if location := os.environ.get("LOCATION"):
SERVER_SCRIPT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "main.py")
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Knowledge Search Agent")
parser.add_argument(
"--remote",
metavar="URL",
help="Connect to an already-running MCP server at this SSE URL "
"(e.g. http://localhost:8080/sse). Without this flag the agent "
"spawns the server as a subprocess.",
)
return parser.parse_args()
async def async_main() -> None:
toolset = McpToolset(
connection_params=StdioConnectionParams(
args = _parse_args()
if args.remote:
connection_params = SseConnectionParams(url=args.remote)
else:
connection_params = StdioConnectionParams(
server_params=StdioServerParameters(
command="uv",
args=["run", "python", SERVER_SCRIPT],
),
),
)
)
toolset = McpToolset(connection_params=connection_params)
agent = LlmAgent(
model="gemini-2.0-flash",

24
main.py
View File

@@ -1,6 +1,7 @@
# ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP."""
import argparse
import asyncio
import io
import logging
@@ -335,7 +336,26 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
)
mcp = FastMCP("knowledge-search", lifespan=lifespan)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
return parser.parse_args()
_args = _parse_args()
mcp = FastMCP(
"knowledge-search",
host=_args.host,
port=_args.port,
lifespan=lifespan,
)
@mcp.tool()
@@ -404,4 +424,4 @@ async def knowledge_search(
if __name__ == "__main__":
mcp.run()
mcp.run(transport=_args.transport)