Files
metadata-search/agent.py
Anibal Angulo 82764bd60b Add SSE support
2026-02-22 15:52:35 +00:00

112 lines
3.3 KiB
Python

# ruff: noqa: INP001
"""ADK agent that connects to the knowledge-search MCP server."""
import argparse
import asyncio
import os
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import (
SseConnectionParams,
StdioConnectionParams,
)
from google.genai import types
from mcp import StdioServerParameters
# ADK needs these env vars for Vertex AI; reuse the ones from .env
os.environ.setdefault("GOOGLE_GENAI_USE_VERTEXAI", "True")
if project := os.environ.get("PROJECT_ID"):
os.environ.setdefault("GOOGLE_CLOUD_PROJECT", project)
if location := os.environ.get("LOCATION"):
os.environ.setdefault("GOOGLE_CLOUD_LOCATION", location)
SERVER_SCRIPT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "main.py")
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Knowledge Search Agent")
parser.add_argument(
"--remote",
metavar="URL",
help="Connect to an already-running MCP server at this SSE URL "
"(e.g. http://localhost:8080/sse). Without this flag the agent "
"spawns the server as a subprocess.",
)
return parser.parse_args()
async def async_main() -> None:
args = _parse_args()
if args.remote:
connection_params = SseConnectionParams(url=args.remote)
else:
connection_params = StdioConnectionParams(
server_params=StdioServerParameters(
command="uv",
args=["run", "python", SERVER_SCRIPT],
),
)
toolset = McpToolset(connection_params=connection_params)
agent = LlmAgent(
model="gemini-2.0-flash",
name="knowledge_agent",
instruction=(
"You are a helpful assistant with access to a knowledge base. "
"Use the knowledge_search tool to find relevant information "
"when the user asks questions. Summarize the results clearly."
),
tools=[toolset],
)
session_service = InMemorySessionService()
session = await session_service.create_session(
state={},
app_name="knowledge_agent",
user_id="user",
)
runner = Runner(
app_name="knowledge_agent",
agent=agent,
session_service=session_service,
)
print("Knowledge Search Agent ready. Type your query (Ctrl+C to exit):")
try:
while True:
try:
query = input("\n> ").strip()
except EOFError:
break
if not query:
continue
content = types.Content(
role="user",
parts=[types.Part(text=query)],
)
async for event in runner.run_async(
session_id=session.id,
user_id=session.user_id,
new_message=content,
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if part.text:
print(part.text)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
await toolset.close()
if __name__ == "__main__":
asyncio.run(async_main())