Compare commits

...

2 Commits

Author SHA1 Message Date
6feeeff4f3 feat: use a new logger 2026-02-24 17:02:25 +00:00
3b7dd91a71 feat: add 'log_structured_entry' for JSON structured logs
Move config into own file
2026-02-24 17:02:06 +00:00
4 changed files with 125 additions and 77 deletions

92
main.py
View File

@@ -1,11 +1,8 @@
# ruff: noqa: INP001 # ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP.""" """Async helpers for querying Vertex AI vector search via MCP."""
import argparse
import asyncio import asyncio
import io import io
import logging
import os
from collections.abc import AsyncIterator, Sequence from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
@@ -17,9 +14,8 @@ from gcloud.aio.storage import Storage
from google import genai from google import genai
from google.genai import types as genai_types from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP from mcp.server.fastmcp import Context, FastMCP
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
logger = logging.getLogger(__name__) from .utils import Settings, _args, log_structured_entry
HTTP_TOO_MANY_REQUESTS = 429 HTTP_TOO_MANY_REQUESTS = 429
HTTP_SERVER_ERROR = 500 HTTP_SERVER_ERROR = 500
@@ -91,12 +87,9 @@ class GoogleCloudFileStorage:
file_stream.name = file_name file_stream.name = file_name
except TimeoutError as exc: except TimeoutError as exc:
last_exception = exc last_exception = exc
logger.warning( log_structured_entry(
"Timeout downloading gs://%s/%s (attempt %d/%d)", f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})"
self.bucket_name, "WARNING"
file_name,
attempt + 1,
max_retries,
) )
except aiohttp.ClientResponseError as exc: except aiohttp.ClientResponseError as exc:
last_exception = exc last_exception = exc
@@ -104,13 +97,9 @@ class GoogleCloudFileStorage:
exc.status == HTTP_TOO_MANY_REQUESTS exc.status == HTTP_TOO_MANY_REQUESTS
or exc.status >= HTTP_SERVER_ERROR or exc.status >= HTTP_SERVER_ERROR
): ):
logger.warning( log_structured_entry(
"HTTP %d downloading gs://%s/%s (attempt %d/%d)", f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})"
exc.status, "WARNING"
self.bucket_name,
file_name,
attempt + 1,
max_retries,
) )
else: else:
raise raise
@@ -283,58 +272,6 @@ class GoogleCloudVectorSearch:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
@dataclass @dataclass
class AppContext: class AppContext:
"""Shared resources initialised once at server startup.""" """Shared resources initialised once at server startup."""
@@ -430,12 +367,15 @@ async def knowledge_search(
if s["distance"] > cutoff and s["distance"] > min_sim if s["distance"] > cutoff and s["distance"] > min_sim
] ]
logger.info( log_structured_entry(
"knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s", "knowledge_search timing",
round((t_embed - t0) * 1000, 1), "INFO",
round((t_search - t_embed) * 1000, 1), {
round((t_search - t0) * 1000, 1), "embedding": f"{round((t_embed - t0) * 1000, 1)}ms",
[s["id"] for s in search_results], "vector_serach": f"{round((t_search - t_embed) * 1000, 1)}ms",
"total": f"{round((t_search - t0) * 1000, 1)}ms",
"chunks": {[s["id"] for s in search_results]}
}
) )
# Format results as XML-like documents # Format results as XML-like documents

4
utils/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .config import Settings, _args
from .logging_setup import log_structured_entry
__all__ = ['Settings', '_args', 'log_structured_entry']

54
utils/config.py Normal file
View File

@@ -0,0 +1,54 @@
import os
import argparse
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)

50
utils/logging_setup.py Normal file
View File

@@ -0,0 +1,50 @@
"""
Centralized Cloud Logging setup.
Uses CloudLoggingHandler (background thread) so logging does not add latency
"""
import logging
from typing import Optional, Dict, Literal
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
from .config import Settings
def _setup_logger() -> logging.Logger:
"""Create or return the singleton evaluation logger."""
log_name = "va_agent-evaluation-logs"
logger = logging.getLogger(log_name)
cfg = Settings.model_validate({})
if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers):
return logger
try:
client = google.cloud.logging.Client(project=cfg.project_id)
handler = CloudLoggingHandler(client, name=log_name) # async transport
logger.addHandler(handler)
logger.setLevel(logging.INFO)
except Exception as e:
# Fallback to console if Cloud Logging is unavailable (local dev)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(log_name)
logger.warning("Cloud Logging setup failed; using console. Error: %s", e)
return logger
_eval_log = _setup_logger()
def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None:
"""
Emit a JSON-structured log row.
Args:
message: Short label for the row (e.g., "Final agent turn").
severity: "INFO" | "WARNING" | "ERROR" etc.
custom_log: A dict with your structured payload.
"""
level = getattr(logging, severity.upper(), logging.INFO)
_eval_log.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}})