feat: use a new logger

This commit is contained in:
2026-02-24 17:02:25 +00:00
parent 3b7dd91a71
commit 6feeeff4f3

92
main.py
View File

@@ -1,11 +1,8 @@
# ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP."""
import argparse
import asyncio
import io
import logging
import os
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from dataclasses import dataclass
@@ -17,9 +14,8 @@ from gcloud.aio.storage import Storage
from google import genai
from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
logger = logging.getLogger(__name__)
from .utils import Settings, _args, log_structured_entry
HTTP_TOO_MANY_REQUESTS = 429
HTTP_SERVER_ERROR = 500
@@ -91,12 +87,9 @@ class GoogleCloudFileStorage:
file_stream.name = file_name
except TimeoutError as exc:
last_exception = exc
logger.warning(
"Timeout downloading gs://%s/%s (attempt %d/%d)",
self.bucket_name,
file_name,
attempt + 1,
max_retries,
log_structured_entry(
f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})"
"WARNING"
)
except aiohttp.ClientResponseError as exc:
last_exception = exc
@@ -104,13 +97,9 @@ class GoogleCloudFileStorage:
exc.status == HTTP_TOO_MANY_REQUESTS
or exc.status >= HTTP_SERVER_ERROR
):
logger.warning(
"HTTP %d downloading gs://%s/%s (attempt %d/%d)",
exc.status,
self.bucket_name,
file_name,
attempt + 1,
max_retries,
log_structured_entry(
f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})"
"WARNING"
)
else:
raise
@@ -283,58 +272,6 @@ class GoogleCloudVectorSearch:
# ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
@dataclass
class AppContext:
"""Shared resources initialised once at server startup."""
@@ -430,12 +367,15 @@ async def knowledge_search(
if s["distance"] > cutoff and s["distance"] > min_sim
]
logger.info(
"knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s",
round((t_embed - t0) * 1000, 1),
round((t_search - t_embed) * 1000, 1),
round((t_search - t0) * 1000, 1),
[s["id"] for s in search_results],
log_structured_entry(
"knowledge_search timing",
"INFO",
{
"embedding": f"{round((t_embed - t0) * 1000, 1)}ms",
"vector_serach": f"{round((t_search - t_embed) * 1000, 1)}ms",
"total": f"{round((t_search - t0) * 1000, 1)}ms",
"chunks": {[s["id"] for s in search_results]}
}
)
# Format results as XML-like documents