diff --git a/main.py b/main.py
index dfb5d96..c70a6c9 100644
--- a/main.py
+++ b/main.py
@@ -1,11 +1,8 @@
# ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP."""
-import argparse
import asyncio
import io
-import logging
-import os
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from dataclasses import dataclass
@@ -17,9 +14,8 @@ from gcloud.aio.storage import Storage
from google import genai
from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP
-from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
-logger = logging.getLogger(__name__)
+from utils import Settings, _args, cfg, log_structured_entry
HTTP_TOO_MANY_REQUESTS = 429
HTTP_SERVER_ERROR = 500
@@ -74,10 +70,21 @@ class GoogleCloudFileStorage:
"""
if file_name in self._cache:
+ log_structured_entry(
+ "File retrieved from cache",
+ "INFO",
+ {"file": file_name, "bucket": self.bucket_name}
+ )
file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name
return file_stream
+ log_structured_entry(
+ "Starting file download from GCS",
+ "INFO",
+ {"file": file_name, "bucket": self.bucket_name}
+ )
+
storage_client = self._get_aio_storage()
last_exception: Exception | None = None
@@ -89,14 +96,22 @@ class GoogleCloudFileStorage:
)
file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name
+ log_structured_entry(
+ "File downloaded successfully",
+ "INFO",
+ {
+ "file": file_name,
+ "bucket": self.bucket_name,
+ "size_bytes": len(self._cache[file_name]),
+ "attempt": attempt + 1
+ }
+ )
except TimeoutError as exc:
last_exception = exc
- logger.warning(
- "Timeout downloading gs://%s/%s (attempt %d/%d)",
- self.bucket_name,
- file_name,
- attempt + 1,
- max_retries,
+ log_structured_entry(
+ f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})",
+ "WARNING",
+ {"error": str(exc)}
)
except aiohttp.ClientResponseError as exc:
last_exception = exc
@@ -104,27 +119,44 @@ class GoogleCloudFileStorage:
exc.status == HTTP_TOO_MANY_REQUESTS
or exc.status >= HTTP_SERVER_ERROR
):
- logger.warning(
- "HTTP %d downloading gs://%s/%s (attempt %d/%d)",
- exc.status,
- self.bucket_name,
- file_name,
- attempt + 1,
- max_retries,
+ log_structured_entry(
+ f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})",
+ "WARNING",
+ {"status": exc.status, "message": str(exc)}
)
else:
+ log_structured_entry(
+ f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}",
+ "ERROR",
+ {"status": exc.status, "message": str(exc)}
+ )
raise
else:
return file_stream
if attempt < max_retries - 1:
delay = 0.5 * (2**attempt)
+ log_structured_entry(
+ "Retrying file download",
+ "INFO",
+ {"file": file_name, "delay_seconds": delay}
+ )
await asyncio.sleep(delay)
msg = (
f"Failed to download gs://{self.bucket_name}/{file_name} "
f"after {max_retries} attempts"
)
+ log_structured_entry(
+ "File download failed after all retries",
+ "ERROR",
+ {
+ "file": file_name,
+ "bucket": self.bucket_name,
+ "max_retries": max_retries,
+ "last_error": str(last_exception)
+ }
+ )
raise TimeoutError(msg) from last_exception
@@ -221,7 +253,13 @@ class GoogleCloudVectorSearch:
"Missing endpoint metadata. Call "
"`configure_index_endpoint` before querying."
)
+ log_structured_entry(
+ "Vector search query failed - endpoint not configured",
+ "ERROR",
+ {"error": msg}
+ )
raise RuntimeError(msg)
+
domain = self._endpoint_domain
endpoint_id = self._endpoint_name.split("/")[-1]
url = (
@@ -229,6 +267,18 @@ class GoogleCloudVectorSearch:
f"/locations/{self.location}"
f"/indexEndpoints/{endpoint_id}:findNeighbors"
)
+
+ log_structured_entry(
+ "Starting vector search query",
+ "INFO",
+ {
+ "deployed_index_id": deployed_index_id,
+ "neighbor_count": limit,
+ "endpoint_id": endpoint_id,
+ "embedding_dimension": len(query)
+ }
+ )
+
payload = {
"deployed_index_id": deployed_index_id,
"queries": [
@@ -239,43 +289,98 @@ class GoogleCloudVectorSearch:
],
}
- headers = await self._async_get_auth_headers()
- session = self._get_aio_session()
- async with session.post(
- url,
- json=payload,
- headers=headers,
- ) as response:
- if not response.ok:
- body = await response.text()
- msg = f"findNeighbors returned {response.status}: {body}"
- raise RuntimeError(msg)
- data = await response.json()
+ try:
+ headers = await self._async_get_auth_headers()
+ session = self._get_aio_session()
+ async with session.post(
+ url,
+ json=payload,
+ headers=headers,
+ ) as response:
+ if not response.ok:
+ body = await response.text()
+ msg = f"findNeighbors returned {response.status}: {body}"
+ log_structured_entry(
+ "Vector search API request failed",
+ "ERROR",
+ {
+ "status": response.status,
+ "response_body": body,
+ "deployed_index_id": deployed_index_id
+ }
+ )
+ raise RuntimeError(msg)
+ data = await response.json()
- neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", [])
- content_tasks = []
- for neighbor in neighbors:
- datapoint_id = neighbor["datapoint"]["datapointId"]
- file_path = f"{self.index_name}/contents/{datapoint_id}.md"
- content_tasks.append(
- self.storage.async_get_file_stream(file_path),
+ neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", [])
+ log_structured_entry(
+ "Vector search API request successful",
+ "INFO",
+ {
+ "neighbors_found": len(neighbors),
+ "deployed_index_id": deployed_index_id
+ }
)
- file_streams = await asyncio.gather(*content_tasks)
- results: list[SearchResult] = []
- for neighbor, stream in zip(
- neighbors,
- file_streams,
- strict=True,
- ):
- results.append(
- SearchResult(
- id=neighbor["datapoint"]["datapointId"],
- distance=neighbor["distance"],
- content=stream.read().decode("utf-8"),
- ),
+ if not neighbors:
+ log_structured_entry(
+ "No neighbors found in vector search",
+ "WARNING",
+ {"deployed_index_id": deployed_index_id}
+ )
+ return []
+
+ # Fetch content for all neighbors
+ content_tasks = []
+ for neighbor in neighbors:
+ datapoint_id = neighbor["datapoint"]["datapointId"]
+ file_path = f"{self.index_name}/contents/{datapoint_id}.md"
+ content_tasks.append(
+ self.storage.async_get_file_stream(file_path),
+ )
+
+ log_structured_entry(
+ "Fetching content for search results",
+ "INFO",
+ {"file_count": len(content_tasks)}
)
- return results
+
+ file_streams = await asyncio.gather(*content_tasks)
+ results: list[SearchResult] = []
+ for neighbor, stream in zip(
+ neighbors,
+ file_streams,
+ strict=True,
+ ):
+ results.append(
+ SearchResult(
+ id=neighbor["datapoint"]["datapointId"],
+ distance=neighbor["distance"],
+ content=stream.read().decode("utf-8"),
+ ),
+ )
+
+ log_structured_entry(
+ "Vector search completed successfully",
+ "INFO",
+ {
+ "results_count": len(results),
+ "deployed_index_id": deployed_index_id
+ }
+ )
+ return results
+
+ except Exception as e:
+ log_structured_entry(
+ "Vector search query failed with exception",
+ "ERROR",
+ {
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "deployed_index_id": deployed_index_id
+ }
+ )
+ raise
# ---------------------------------------------------------------------------
@@ -283,58 +388,6 @@ class GoogleCloudVectorSearch:
# ---------------------------------------------------------------------------
-def _parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--transport",
- choices=["stdio", "sse"],
- default="stdio",
- )
- parser.add_argument("--host", default="0.0.0.0")
- parser.add_argument("--port", type=int, default=8080)
- parser.add_argument(
- "--config",
- default=os.environ.get("CONFIG_FILE", "config.yaml"),
- )
- return parser.parse_args()
-
-
-_args = _parse_args()
-
-
-class Settings(BaseSettings):
- """Server configuration populated from env vars and a YAML config file."""
-
- model_config = {"env_file": ".env", "yaml_file": _args.config}
-
- project_id: str
- location: str
- bucket: str
- index_name: str
- deployed_index_id: str
- endpoint_name: str
- endpoint_domain: str
- embedding_model: str = "gemini-embedding-001"
- search_limit: int = 10
-
- @classmethod
- def settings_customise_sources(
- cls,
- settings_cls: type[BaseSettings],
- init_settings: PydanticBaseSettingsSource,
- env_settings: PydanticBaseSettingsSource,
- dotenv_settings: PydanticBaseSettingsSource,
- file_secret_settings: PydanticBaseSettingsSource,
- ) -> tuple[PydanticBaseSettingsSource, ...]:
- return (
- init_settings,
- env_settings,
- dotenv_settings,
- YamlConfigSettingsSource(settings_cls),
- file_secret_settings,
- )
-
-
@dataclass
class AppContext:
"""Shared resources initialised once at server startup."""
@@ -347,31 +400,229 @@ class AppContext:
@asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Create and configure the vector-search client for the server lifetime."""
- vs = GoogleCloudVectorSearch(
- project_id=cfg.project_id,
- location=cfg.location,
- bucket=cfg.bucket,
- index_name=cfg.index_name,
- )
- vs.configure_index_endpoint(
- name=cfg.endpoint_name,
- public_domain=cfg.endpoint_domain,
+ log_structured_entry(
+ "Initializing MCP server",
+ "INFO",
+ {
+ "project_id": cfg.project_id,
+ "location": cfg.location,
+ "bucket": cfg.bucket,
+ "index_name": cfg.index_name,
+ }
)
- genai_client = genai.Client(
- vertexai=True,
- project=cfg.project_id,
- location=cfg.location,
- )
+ try:
+ # Initialize vector search client
+ log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO")
+ vs = GoogleCloudVectorSearch(
+ project_id=cfg.project_id,
+ location=cfg.location,
+ bucket=cfg.bucket,
+ index_name=cfg.index_name,
+ )
- yield AppContext(
- vector_search=vs,
- genai_client=genai_client,
- settings=cfg,
- )
+ # Configure endpoint
+ log_structured_entry(
+ "Configuring index endpoint",
+ "INFO",
+ {
+ "endpoint_name": cfg.endpoint_name,
+ "endpoint_domain": cfg.endpoint_domain,
+ }
+ )
+ vs.configure_index_endpoint(
+ name=cfg.endpoint_name,
+ public_domain=cfg.endpoint_domain,
+ )
+ # Initialize GenAI client
+ log_structured_entry(
+ "Creating GenAI client",
+ "INFO",
+ {"project_id": cfg.project_id, "location": cfg.location}
+ )
+ genai_client = genai.Client(
+ vertexai=True,
+ project=cfg.project_id,
+ location=cfg.location,
+ )
+
+ # Validate credentials and configuration by testing actual resources
+ # These validations are non-blocking - errors are logged but won't stop startup
+ log_structured_entry("Starting validation of credentials and resources", "INFO")
+
+ validation_errors = []
+
+ # 1. Validate GenAI embedding access
+ log_structured_entry("Validating GenAI embedding access", "INFO")
+ try:
+ test_response = await genai_client.aio.models.embed_content(
+ model=cfg.embedding_model,
+ contents="test",
+ config=genai_types.EmbedContentConfig(
+ task_type="RETRIEVAL_QUERY",
+ ),
+ )
+ if test_response and test_response.embeddings:
+ embedding_values = test_response.embeddings[0].values
+ log_structured_entry(
+ "GenAI embedding validation successful",
+ "INFO",
+ {"embedding_dimension": len(embedding_values) if embedding_values else 0}
+ )
+ else:
+ msg = "Embedding validation returned empty response"
+ log_structured_entry(msg, "WARNING")
+ validation_errors.append(msg)
+ except Exception as e:
+ log_structured_entry(
+ "Failed to validate GenAI embedding access - service may not work correctly",
+ "WARNING",
+ {"error": str(e), "error_type": type(e).__name__}
+ )
+ validation_errors.append(f"GenAI: {str(e)}")
+
+ # 2. Validate GCS bucket access
+ log_structured_entry(
+ "Validating GCS bucket access",
+ "INFO",
+ {"bucket": cfg.bucket}
+ )
+ try:
+ session = vs.storage._get_aio_session()
+ token_obj = Token(
+ session=session,
+ scopes=["https://www.googleapis.com/auth/cloud-platform"],
+ )
+ access_token = await token_obj.get()
+ headers = {"Authorization": f"Bearer {access_token}"}
+
+ async with session.get(
+ f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
+ headers=headers,
+ ) as response:
+ if response.status == 403:
+ msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
+ log_structured_entry(
+ "GCS bucket validation failed - access denied - service may not work correctly",
+ "WARNING",
+ {"bucket": cfg.bucket, "status": response.status}
+ )
+ validation_errors.append(msg)
+ elif response.status == 404:
+ msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
+ log_structured_entry(
+ "GCS bucket validation failed - not found - service may not work correctly",
+ "WARNING",
+ {"bucket": cfg.bucket, "status": response.status}
+ )
+ validation_errors.append(msg)
+ elif not response.ok:
+ body = await response.text()
+ msg = f"Failed to access bucket '{cfg.bucket}': {response.status}"
+ log_structured_entry(
+ "GCS bucket validation failed - service may not work correctly",
+ "WARNING",
+ {"bucket": cfg.bucket, "status": response.status, "response": body}
+ )
+ validation_errors.append(msg)
+ else:
+ log_structured_entry(
+ "GCS bucket validation successful",
+ "INFO",
+ {"bucket": cfg.bucket}
+ )
+ except Exception as e:
+ log_structured_entry(
+ "Failed to validate GCS bucket access - service may not work correctly",
+ "WARNING",
+ {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}
+ )
+ validation_errors.append(f"GCS: {str(e)}")
+
+ # 3. Validate vector search endpoint access
+ log_structured_entry(
+ "Validating vector search endpoint access",
+ "INFO",
+ {"endpoint_name": cfg.endpoint_name}
+ )
+ try:
+ # Try to get endpoint info
+ headers = await vs._async_get_auth_headers()
+ session = vs._get_aio_session()
+ endpoint_url = (
+ f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
+ )
+
+ async with session.get(endpoint_url, headers=headers) as response:
+ if response.status == 403:
+ msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions."
+ log_structured_entry(
+ "Vector search endpoint validation failed - access denied - service may not work correctly",
+ "WARNING",
+ {"endpoint": cfg.endpoint_name, "status": response.status}
+ )
+ validation_errors.append(msg)
+ elif response.status == 404:
+ msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project."
+ log_structured_entry(
+ "Vector search endpoint validation failed - not found - service may not work correctly",
+ "WARNING",
+ {"endpoint": cfg.endpoint_name, "status": response.status}
+ )
+ validation_errors.append(msg)
+ elif not response.ok:
+ body = await response.text()
+ msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}"
+ log_structured_entry(
+ "Vector search endpoint validation failed - service may not work correctly",
+ "WARNING",
+ {"endpoint": cfg.endpoint_name, "status": response.status, "response": body}
+ )
+ validation_errors.append(msg)
+ else:
+ log_structured_entry(
+ "Vector search endpoint validation successful",
+ "INFO",
+ {"endpoint": cfg.endpoint_name}
+ )
+ except Exception as e:
+ log_structured_entry(
+ "Failed to validate vector search endpoint access - service may not work correctly",
+ "WARNING",
+ {"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name}
+ )
+ validation_errors.append(f"Vector Search: {str(e)}")
+
+ # Summary of validations
+ if validation_errors:
+ log_structured_entry(
+ "MCP server started with validation errors - service may not work correctly",
+ "WARNING",
+ {"validation_errors": validation_errors, "error_count": len(validation_errors)}
+ )
+ else:
+ log_structured_entry("All validations passed - MCP server initialization complete", "INFO")
+
+ yield AppContext(
+ vector_search=vs,
+ genai_client=genai_client,
+ settings=cfg,
+ )
+
+ except Exception as e:
+ log_structured_entry(
+ "Failed to initialize MCP server",
+ "ERROR",
+ {
+ "error": str(e),
+ "error_type": type(e).__name__,
+ }
+ )
+ raise
+ finally:
+ log_structured_entry("MCP server lifespan ending", "INFO")
-cfg = Settings.model_validate({})
mcp = FastMCP(
"knowledge-search",
@@ -402,48 +653,128 @@ async def knowledge_search(
t0 = time.perf_counter()
min_sim = 0.6
- response = await app.genai_client.aio.models.embed_content(
- model=app.settings.embedding_model,
- contents=query,
- config=genai_types.EmbedContentConfig(
- task_type="RETRIEVAL_QUERY",
-
- ),
+ log_structured_entry(
+ "knowledge_search request received",
+ "INFO",
+ {"query": query[:100]} # Log first 100 chars of query
)
- embedding = response.embeddings[0].values
- t_embed = time.perf_counter()
- search_results = await app.vector_search.async_run_query(
- deployed_index_id=app.settings.deployed_index_id,
- query=embedding,
- limit=app.settings.search_limit,
- )
- t_search = time.perf_counter()
+ try:
+ # Generate embedding for the query
+ log_structured_entry("Generating query embedding", "INFO")
+ try:
+ response = await app.genai_client.aio.models.embed_content(
+ model=app.settings.embedding_model,
+ contents=query,
+ config=genai_types.EmbedContentConfig(
+ task_type="RETRIEVAL_QUERY",
+ ),
+ )
+ embedding = response.embeddings[0].values
+ t_embed = time.perf_counter()
+ log_structured_entry(
+ "Query embedding generated successfully",
+ "INFO",
+ {"time_ms": round((t_embed - t0) * 1000, 1)}
+ )
+ except Exception as e:
+ error_type = type(e).__name__
+ error_msg = str(e)
- # Apply similarity filtering
- if search_results:
- max_sim = max(r["distance"] for r in search_results)
- cutoff = max_sim * 0.9
- search_results = [
- s
- for s in search_results
- if s["distance"] > cutoff and s["distance"] > min_sim
+ # Check if it's a rate limit error
+ if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
+ log_structured_entry(
+ "Rate limit exceeded while generating embedding",
+ "WARNING",
+ {
+ "error": error_msg,
+ "error_type": error_type,
+ "query": query[:100]
+ }
+ )
+ return "Error: API rate limit exceeded. Please try again later."
+ else:
+ log_structured_entry(
+ "Failed to generate query embedding",
+ "ERROR",
+ {
+ "error": error_msg,
+ "error_type": error_type,
+ "query": query[:100]
+ }
+ )
+ return f"Error generating embedding: {error_msg}"
+
+ # Perform vector search
+ log_structured_entry("Performing vector search", "INFO")
+ try:
+ search_results = await app.vector_search.async_run_query(
+ deployed_index_id=app.settings.deployed_index_id,
+ query=embedding,
+ limit=app.settings.search_limit,
+ )
+ t_search = time.perf_counter()
+ except Exception as e:
+ log_structured_entry(
+ "Vector search failed",
+ "ERROR",
+ {
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "query": query[:100]
+ }
+ )
+ return f"Error performing vector search: {str(e)}"
+
+ # Apply similarity filtering
+ if search_results:
+ max_sim = max(r["distance"] for r in search_results)
+ cutoff = max_sim * 0.9
+ search_results = [
+ s
+ for s in search_results
+ if s["distance"] > cutoff and s["distance"] > min_sim
+ ]
+
+ log_structured_entry(
+ "knowledge_search completed successfully",
+ "INFO",
+ {
+ "embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms",
+ "vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms",
+ "total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
+ "results_count": len(search_results),
+ "chunks": [s["id"] for s in search_results]
+ }
+ )
+
+ # Format results as XML-like documents
+ if not search_results:
+ log_structured_entry(
+ "No results found for query",
+ "INFO",
+ {"query": query[:100]}
+ )
+ return "No relevant documents found for your query."
+
+ formatted_results = [
+ f"\n{result['content']}\n"
+ for i, result in enumerate(search_results, start=1)
]
+ return "\n".join(formatted_results)
- logger.info(
- "knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s",
- round((t_embed - t0) * 1000, 1),
- round((t_search - t_embed) * 1000, 1),
- round((t_search - t0) * 1000, 1),
- [s["id"] for s in search_results],
- )
-
- # Format results as XML-like documents
- formatted_results = [
- f"\n{result['content']}\n"
- for i, result in enumerate(search_results, start=1)
- ]
- return "\n".join(formatted_results)
+ except Exception as e:
+ # Catch-all for any unexpected errors
+ log_structured_entry(
+ "Unexpected error in knowledge_search",
+ "ERROR",
+ {
+ "error": str(e),
+ "error_type": type(e).__name__,
+ "query": query[:100]
+ }
+ )
+ return f"Unexpected error during search: {str(e)}"
if __name__ == "__main__":
diff --git a/utils/__init__.py b/utils/__init__.py
new file mode 100644
index 0000000..17f1feb
--- /dev/null
+++ b/utils/__init__.py
@@ -0,0 +1,4 @@
+from .config import Settings, _args, cfg
+from .logging_setup import log_structured_entry
+
+__all__ = ['Settings', '_args', 'cfg', 'log_structured_entry']
diff --git a/utils/config.py b/utils/config.py
new file mode 100644
index 0000000..04f81d4
--- /dev/null
+++ b/utils/config.py
@@ -0,0 +1,60 @@
+import os
+import argparse
+from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
+
+
+def _parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--transport",
+ choices=["stdio", "sse"],
+ default="stdio",
+ )
+ parser.add_argument("--host", default="0.0.0.0")
+ parser.add_argument("--port", type=int, default=8080)
+ parser.add_argument(
+ "--config",
+ default=os.environ.get("CONFIG_FILE", "config.yaml"),
+ )
+ return parser.parse_args()
+
+
+_args = _parse_args()
+
+class Settings(BaseSettings):
+ """Server configuration populated from env vars and a YAML config file."""
+
+ model_config = {"env_file": ".env", "yaml_file": _args.config}
+
+ project_id: str
+ location: str
+ bucket: str
+ index_name: str
+ deployed_index_id: str
+ endpoint_name: str
+ endpoint_domain: str
+ embedding_model: str = "gemini-embedding-001"
+ search_limit: int = 10
+ log_name: str = "va_agent_evaluation_logs"
+ log_level: str = "INFO"
+
+ @classmethod
+ def settings_customise_sources(
+ cls,
+ settings_cls: type[BaseSettings],
+ init_settings: PydanticBaseSettingsSource,
+ env_settings: PydanticBaseSettingsSource,
+ dotenv_settings: PydanticBaseSettingsSource,
+ file_secret_settings: PydanticBaseSettingsSource,
+ ) -> tuple[PydanticBaseSettingsSource, ...]:
+ return (
+ init_settings,
+ env_settings,
+ dotenv_settings,
+ YamlConfigSettingsSource(settings_cls),
+ file_secret_settings,
+ )
+
+
+# Singleton instance of Settings
+cfg = Settings.model_validate({})
diff --git a/utils/logging_setup.py b/utils/logging_setup.py
new file mode 100644
index 0000000..fb1519a
--- /dev/null
+++ b/utils/logging_setup.py
@@ -0,0 +1,48 @@
+"""
+Centralized Cloud Logging setup.
+Uses CloudLoggingHandler (background thread) so logging does not add latency
+"""
+
+import logging
+from typing import Optional, Dict, Literal
+
+import google.cloud.logging
+from google.cloud.logging.handlers import CloudLoggingHandler
+
+from .config import cfg
+
+
+def _setup_logger() -> logging.Logger:
+ """Create or return the singleton evaluation logger."""
+ logger = logging.getLogger(cfg.log_name)
+ if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers):
+ return logger
+
+ try:
+ client = google.cloud.logging.Client(project=cfg.project_id)
+ handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport
+ logger.addHandler(handler)
+ logger.setLevel(getattr(logging, cfg.log_level.upper()))
+ except Exception as e:
+ # Fallback to console if Cloud Logging is unavailable (local dev)
+ logging.basicConfig(level=getattr(logging, cfg.log_level.upper()))
+ logger = logging.getLogger(cfg.log_name)
+ logger.warning("Cloud Logging setup failed; using console. Error: %s", e)
+
+ return logger
+
+
+_eval_log = _setup_logger()
+
+
+def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None:
+ """
+ Emit a JSON-structured log row.
+
+ Args:
+ message: Short label for the row (e.g., "Final agent turn").
+ severity: "INFO" | "WARNING" | "ERROR"
+ custom_log: A dict with your structured payload.
+ """
+ level = getattr(logging, severity.upper(), logging.INFO)
+ _eval_log.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}})