Improve auth implementation

This commit is contained in:
2026-02-25 17:28:20 +00:00
parent e77a2ba2ed
commit 9a2643a029
6 changed files with 125 additions and 58 deletions

27
src/va_agent/auth.py Normal file
View File

@@ -0,0 +1,27 @@
"""ID-token auth for Cloud Run → Cloud Run calls."""
import threading
import time
from google.adk.agents.readonly_context import ReadonlyContext
from google.auth import jwt
from google.auth.transport.requests import Request as GAuthRequest
from google.oauth2 import id_token
from va_agent.config import settings
_REFRESH_MARGIN = 300 # refresh 5 min before expiry
_lock = threading.Lock()
_token: str | None = None
_token_exp: float = 0.0
def auth_headers_provider(_ctx: ReadonlyContext | None = None) -> dict[str, str]:
"""Return Authorization headers with a cached ID token."""
global _token, _token_exp # noqa: PLW0603
with _lock:
if _token is None or time.time() >= _token_exp - _REFRESH_MARGIN:
_token = id_token.fetch_id_token(GAuthRequest(), settings.mcp_audience)
_token_exp = jwt.decode(_token, verify=False)["exp"]
return {"Authorization": f"Bearer {_token}"}