"""ID-token auth for Cloud Run → Cloud Run calls.""" import logging import time from google.adk.agents.readonly_context import ReadonlyContext from google.auth import jwt from google.auth.transport.requests import Request as GAuthRequest from google.oauth2 import id_token from va_agent.config import settings logger = logging.getLogger(__name__) _REFRESH_MARGIN = 900 # refresh 15 min before expiry _token: str | None = None _token_exp: float = 0.0 def _fetch_token() -> tuple[str, float]: """Fetch a fresh ID token (blocking I/O).""" tok = id_token.fetch_id_token(GAuthRequest(), settings.mcp_audience) exp = jwt.decode(tok, verify=False)["exp"] return tok, exp def auth_headers_provider(_ctx: ReadonlyContext | None = None) -> dict[str, str]: """Return Authorization headers, refreshing the cached token when needed. With Streamable HTTP transport every tool call is a fresh HTTP request, so returning a valid token here is sufficient — no background refresh loop required. """ global _token, _token_exp if _token is not None and time.time() < _token_exp - _REFRESH_MARGIN: return {"Authorization": f"Bearer {_token}"} tok, exp = _fetch_token() _token, _token_exp = tok, exp return {"Authorization": f"Bearer {tok}"}