Files
agent/src/va_agent/agent.py
2026-02-25 02:01:04 -06:00

61 lines
1.7 KiB
Python

"""ADK agent with vector search RAG tool."""
from google import genai
from google.adk.agents.llm_agent import Agent
from google.adk.runners import Runner
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import SseConnectionParams
from google.cloud.firestore_v1.async_client import AsyncClient
from va_agent.config import settings
from va_agent.session import FirestoreSessionService
# --- Autenticación Cloud Run → Cloud Run (ID Token) ---
from google.oauth2 import id_token
from google.auth.transport.requests import Request as GAuthRequest
def _fetch_id_token(audience: str) -> str:
"""Emite un ID Token para invocar un servicio Cloud Run protegido."""
return id_token.fetch_id_token(GAuthRequest(), audience)
# Audience = URL del MCP remoto
_MCP_URL = settings.mcp_remote_url
_MCP_AUDIENCE = getattr(settings, "mcp_audience", None) or _MCP_URL
def _auth_headers_provider() -> dict[str, str]:
token = _fetch_id_token(_MCP_AUDIENCE)
return {"Authorization": f"Bearer {token}"}
connection_params = SseConnectionParams(
url=_MCP_URL,
headers=_auth_headers_provider()
)
# connection_params = SseConnectionParams(url=settings.mcp_remote_url)
toolset = McpToolset(connection_params=connection_params)
agent = Agent(
model=settings.agent_model,
name=settings.agent_name,
instruction=settings.agent_instructions,
tools=[toolset],
)
session_service = FirestoreSessionService(
db=AsyncClient(database=settings.firestore_db),
compaction_token_threshold=10_000,
genai_client=genai.Client(),
)
runner = Runner(
app_name="va_agent",
agent=agent,
session_service=session_service,
auto_create_session=True,
)