Compare commits
2 Commits
feature/me
...
6feeeff4f3
| Author | SHA1 | Date | |
|---|---|---|---|
| 6feeeff4f3 | |||
| 3b7dd91a71 |
92
main.py
92
main.py
@@ -1,11 +1,8 @@
|
|||||||
# ruff: noqa: INP001
|
# ruff: noqa: INP001
|
||||||
"""Async helpers for querying Vertex AI vector search via MCP."""
|
"""Async helpers for querying Vertex AI vector search via MCP."""
|
||||||
|
|
||||||
import argparse
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import io
|
import io
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
from collections.abc import AsyncIterator, Sequence
|
from collections.abc import AsyncIterator, Sequence
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
@@ -17,9 +14,8 @@ from gcloud.aio.storage import Storage
|
|||||||
from google import genai
|
from google import genai
|
||||||
from google.genai import types as genai_types
|
from google.genai import types as genai_types
|
||||||
from mcp.server.fastmcp import Context, FastMCP
|
from mcp.server.fastmcp import Context, FastMCP
|
||||||
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
from .utils import Settings, _args, log_structured_entry
|
||||||
|
|
||||||
HTTP_TOO_MANY_REQUESTS = 429
|
HTTP_TOO_MANY_REQUESTS = 429
|
||||||
HTTP_SERVER_ERROR = 500
|
HTTP_SERVER_ERROR = 500
|
||||||
@@ -91,12 +87,9 @@ class GoogleCloudFileStorage:
|
|||||||
file_stream.name = file_name
|
file_stream.name = file_name
|
||||||
except TimeoutError as exc:
|
except TimeoutError as exc:
|
||||||
last_exception = exc
|
last_exception = exc
|
||||||
logger.warning(
|
log_structured_entry(
|
||||||
"Timeout downloading gs://%s/%s (attempt %d/%d)",
|
f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})"
|
||||||
self.bucket_name,
|
"WARNING"
|
||||||
file_name,
|
|
||||||
attempt + 1,
|
|
||||||
max_retries,
|
|
||||||
)
|
)
|
||||||
except aiohttp.ClientResponseError as exc:
|
except aiohttp.ClientResponseError as exc:
|
||||||
last_exception = exc
|
last_exception = exc
|
||||||
@@ -104,13 +97,9 @@ class GoogleCloudFileStorage:
|
|||||||
exc.status == HTTP_TOO_MANY_REQUESTS
|
exc.status == HTTP_TOO_MANY_REQUESTS
|
||||||
or exc.status >= HTTP_SERVER_ERROR
|
or exc.status >= HTTP_SERVER_ERROR
|
||||||
):
|
):
|
||||||
logger.warning(
|
log_structured_entry(
|
||||||
"HTTP %d downloading gs://%s/%s (attempt %d/%d)",
|
f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})"
|
||||||
exc.status,
|
"WARNING"
|
||||||
self.bucket_name,
|
|
||||||
file_name,
|
|
||||||
attempt + 1,
|
|
||||||
max_retries,
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
@@ -283,58 +272,6 @@ class GoogleCloudVectorSearch:
|
|||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def _parse_args() -> argparse.Namespace:
|
|
||||||
parser = argparse.ArgumentParser()
|
|
||||||
parser.add_argument(
|
|
||||||
"--transport",
|
|
||||||
choices=["stdio", "sse"],
|
|
||||||
default="stdio",
|
|
||||||
)
|
|
||||||
parser.add_argument("--host", default="0.0.0.0")
|
|
||||||
parser.add_argument("--port", type=int, default=8080)
|
|
||||||
parser.add_argument(
|
|
||||||
"--config",
|
|
||||||
default=os.environ.get("CONFIG_FILE", "config.yaml"),
|
|
||||||
)
|
|
||||||
return parser.parse_args()
|
|
||||||
|
|
||||||
|
|
||||||
_args = _parse_args()
|
|
||||||
|
|
||||||
|
|
||||||
class Settings(BaseSettings):
|
|
||||||
"""Server configuration populated from env vars and a YAML config file."""
|
|
||||||
|
|
||||||
model_config = {"env_file": ".env", "yaml_file": _args.config}
|
|
||||||
|
|
||||||
project_id: str
|
|
||||||
location: str
|
|
||||||
bucket: str
|
|
||||||
index_name: str
|
|
||||||
deployed_index_id: str
|
|
||||||
endpoint_name: str
|
|
||||||
endpoint_domain: str
|
|
||||||
embedding_model: str = "gemini-embedding-001"
|
|
||||||
search_limit: int = 10
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def settings_customise_sources(
|
|
||||||
cls,
|
|
||||||
settings_cls: type[BaseSettings],
|
|
||||||
init_settings: PydanticBaseSettingsSource,
|
|
||||||
env_settings: PydanticBaseSettingsSource,
|
|
||||||
dotenv_settings: PydanticBaseSettingsSource,
|
|
||||||
file_secret_settings: PydanticBaseSettingsSource,
|
|
||||||
) -> tuple[PydanticBaseSettingsSource, ...]:
|
|
||||||
return (
|
|
||||||
init_settings,
|
|
||||||
env_settings,
|
|
||||||
dotenv_settings,
|
|
||||||
YamlConfigSettingsSource(settings_cls),
|
|
||||||
file_secret_settings,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class AppContext:
|
class AppContext:
|
||||||
"""Shared resources initialised once at server startup."""
|
"""Shared resources initialised once at server startup."""
|
||||||
@@ -430,12 +367,15 @@ async def knowledge_search(
|
|||||||
if s["distance"] > cutoff and s["distance"] > min_sim
|
if s["distance"] > cutoff and s["distance"] > min_sim
|
||||||
]
|
]
|
||||||
|
|
||||||
logger.info(
|
log_structured_entry(
|
||||||
"knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s",
|
"knowledge_search timing",
|
||||||
round((t_embed - t0) * 1000, 1),
|
"INFO",
|
||||||
round((t_search - t_embed) * 1000, 1),
|
{
|
||||||
round((t_search - t0) * 1000, 1),
|
"embedding": f"{round((t_embed - t0) * 1000, 1)}ms",
|
||||||
[s["id"] for s in search_results],
|
"vector_serach": f"{round((t_search - t_embed) * 1000, 1)}ms",
|
||||||
|
"total": f"{round((t_search - t0) * 1000, 1)}ms",
|
||||||
|
"chunks": {[s["id"] for s in search_results]}
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Format results as XML-like documents
|
# Format results as XML-like documents
|
||||||
|
|||||||
4
utils/__init__.py
Normal file
4
utils/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
from .config import Settings, _args
|
||||||
|
from .logging_setup import log_structured_entry
|
||||||
|
|
||||||
|
__all__ = ['Settings', '_args', 'log_structured_entry']
|
||||||
54
utils/config.py
Normal file
54
utils/config.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
import os
|
||||||
|
import argparse
|
||||||
|
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_args() -> argparse.Namespace:
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
"--transport",
|
||||||
|
choices=["stdio", "sse"],
|
||||||
|
default="stdio",
|
||||||
|
)
|
||||||
|
parser.add_argument("--host", default="0.0.0.0")
|
||||||
|
parser.add_argument("--port", type=int, default=8080)
|
||||||
|
parser.add_argument(
|
||||||
|
"--config",
|
||||||
|
default=os.environ.get("CONFIG_FILE", "config.yaml"),
|
||||||
|
)
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
_args = _parse_args()
|
||||||
|
|
||||||
|
class Settings(BaseSettings):
|
||||||
|
"""Server configuration populated from env vars and a YAML config file."""
|
||||||
|
|
||||||
|
model_config = {"env_file": ".env", "yaml_file": _args.config}
|
||||||
|
|
||||||
|
project_id: str
|
||||||
|
location: str
|
||||||
|
bucket: str
|
||||||
|
index_name: str
|
||||||
|
deployed_index_id: str
|
||||||
|
endpoint_name: str
|
||||||
|
endpoint_domain: str
|
||||||
|
embedding_model: str = "gemini-embedding-001"
|
||||||
|
search_limit: int = 10
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def settings_customise_sources(
|
||||||
|
cls,
|
||||||
|
settings_cls: type[BaseSettings],
|
||||||
|
init_settings: PydanticBaseSettingsSource,
|
||||||
|
env_settings: PydanticBaseSettingsSource,
|
||||||
|
dotenv_settings: PydanticBaseSettingsSource,
|
||||||
|
file_secret_settings: PydanticBaseSettingsSource,
|
||||||
|
) -> tuple[PydanticBaseSettingsSource, ...]:
|
||||||
|
return (
|
||||||
|
init_settings,
|
||||||
|
env_settings,
|
||||||
|
dotenv_settings,
|
||||||
|
YamlConfigSettingsSource(settings_cls),
|
||||||
|
file_secret_settings,
|
||||||
|
)
|
||||||
50
utils/logging_setup.py
Normal file
50
utils/logging_setup.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
"""
|
||||||
|
Centralized Cloud Logging setup.
|
||||||
|
Uses CloudLoggingHandler (background thread) so logging does not add latency
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Optional, Dict, Literal
|
||||||
|
|
||||||
|
import google.cloud.logging
|
||||||
|
from google.cloud.logging.handlers import CloudLoggingHandler
|
||||||
|
|
||||||
|
from .config import Settings
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_logger() -> logging.Logger:
|
||||||
|
"""Create or return the singleton evaluation logger."""
|
||||||
|
log_name = "va_agent-evaluation-logs"
|
||||||
|
logger = logging.getLogger(log_name)
|
||||||
|
cfg = Settings.model_validate({})
|
||||||
|
if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers):
|
||||||
|
return logger
|
||||||
|
|
||||||
|
try:
|
||||||
|
client = google.cloud.logging.Client(project=cfg.project_id)
|
||||||
|
handler = CloudLoggingHandler(client, name=log_name) # async transport
|
||||||
|
logger.addHandler(handler)
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
except Exception as e:
|
||||||
|
# Fallback to console if Cloud Logging is unavailable (local dev)
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
logger = logging.getLogger(log_name)
|
||||||
|
logger.warning("Cloud Logging setup failed; using console. Error: %s", e)
|
||||||
|
|
||||||
|
return logger
|
||||||
|
|
||||||
|
|
||||||
|
_eval_log = _setup_logger()
|
||||||
|
|
||||||
|
|
||||||
|
def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None:
|
||||||
|
"""
|
||||||
|
Emit a JSON-structured log row.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: Short label for the row (e.g., "Final agent turn").
|
||||||
|
severity: "INFO" | "WARNING" | "ERROR" etc.
|
||||||
|
custom_log: A dict with your structured payload.
|
||||||
|
"""
|
||||||
|
level = getattr(logging, severity.upper(), logging.INFO)
|
||||||
|
_eval_log.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}})
|
||||||
Reference in New Issue
Block a user