Files
knowledge-search-mcp/agent.py
2026-02-22 15:41:16 +00:00

91 lines
2.7 KiB
Python

# ruff: noqa: INP001
"""ADK agent that connects to the knowledge-search MCP server."""
import asyncio
import os
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from google.genai import types
from mcp import StdioServerParameters
# ADK needs these env vars for Vertex AI; reuse the ones from .env
os.environ.setdefault("GOOGLE_GENAI_USE_VERTEXAI", "True")
if project := os.environ.get("PROJECT_ID"):
os.environ.setdefault("GOOGLE_CLOUD_PROJECT", project)
if location := os.environ.get("LOCATION"):
os.environ.setdefault("GOOGLE_CLOUD_LOCATION", location)
SERVER_SCRIPT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "main.py")
async def async_main() -> None:
toolset = McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="uv",
args=["run", "python", SERVER_SCRIPT],
),
),
)
agent = LlmAgent(
model="gemini-2.0-flash",
name="knowledge_agent",
instruction=(
"You are a helpful assistant with access to a knowledge base. "
"Use the knowledge_search tool to find relevant information "
"when the user asks questions. Summarize the results clearly."
),
tools=[toolset],
)
session_service = InMemorySessionService()
session = await session_service.create_session(
state={},
app_name="knowledge_agent",
user_id="user",
)
runner = Runner(
app_name="knowledge_agent",
agent=agent,
session_service=session_service,
)
print("Knowledge Search Agent ready. Type your query (Ctrl+C to exit):")
try:
while True:
try:
query = input("\n> ").strip()
except EOFError:
break
if not query:
continue
content = types.Content(
role="user",
parts=[types.Part(text=query)],
)
async for event in runner.run_async(
session_id=session.id,
user_id=session.user_id,
new_message=content,
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if part.text:
print(part.text)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
await toolset.close()
if __name__ == "__main__":
asyncio.run(async_main())