From 3b7dd91a7169e556142fba8dd3406b82f4184deb Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 17:02:06 +0000 Subject: [PATCH 1/8] feat: add 'log_structured_entry' for JSON structured logs Move config into own file --- utils/__init__.py | 4 ++++ utils/config.py | 54 ++++++++++++++++++++++++++++++++++++++++++ utils/logging_setup.py | 50 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+) create mode 100644 utils/__init__.py create mode 100644 utils/config.py create mode 100644 utils/logging_setup.py diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..dcbcb26 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,4 @@ +from .config import Settings, _args +from .logging_setup import log_structured_entry + +__all__ = ['Settings', '_args', 'log_structured_entry'] diff --git a/utils/config.py b/utils/config.py new file mode 100644 index 0000000..07b5c01 --- /dev/null +++ b/utils/config.py @@ -0,0 +1,54 @@ +import os +import argparse +from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument( + "--transport", + choices=["stdio", "sse"], + default="stdio", + ) + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--port", type=int, default=8080) + parser.add_argument( + "--config", + default=os.environ.get("CONFIG_FILE", "config.yaml"), + ) + return parser.parse_args() + + +_args = _parse_args() + +class Settings(BaseSettings): + """Server configuration populated from env vars and a YAML config file.""" + + model_config = {"env_file": ".env", "yaml_file": _args.config} + + project_id: str + location: str + bucket: str + index_name: str + deployed_index_id: str + endpoint_name: str + endpoint_domain: str + embedding_model: str = "gemini-embedding-001" + search_limit: int = 10 + + @classmethod + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return ( + init_settings, + env_settings, + dotenv_settings, + YamlConfigSettingsSource(settings_cls), + file_secret_settings, + ) diff --git a/utils/logging_setup.py b/utils/logging_setup.py new file mode 100644 index 0000000..686da1f --- /dev/null +++ b/utils/logging_setup.py @@ -0,0 +1,50 @@ +""" +Centralized Cloud Logging setup. +Uses CloudLoggingHandler (background thread) so logging does not add latency +""" + +import logging +from typing import Optional, Dict, Literal + +import google.cloud.logging +from google.cloud.logging.handlers import CloudLoggingHandler + +from .config import Settings + + +def _setup_logger() -> logging.Logger: + """Create or return the singleton evaluation logger.""" + log_name = "va_agent-evaluation-logs" + logger = logging.getLogger(log_name) + cfg = Settings.model_validate({}) + if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers): + return logger + + try: + client = google.cloud.logging.Client(project=cfg.project_id) + handler = CloudLoggingHandler(client, name=log_name) # async transport + logger.addHandler(handler) + logger.setLevel(logging.INFO) + except Exception as e: + # Fallback to console if Cloud Logging is unavailable (local dev) + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(log_name) + logger.warning("Cloud Logging setup failed; using console. Error: %s", e) + + return logger + + +_eval_log = _setup_logger() + + +def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None: + """ + Emit a JSON-structured log row. + + Args: + message: Short label for the row (e.g., "Final agent turn"). + severity: "INFO" | "WARNING" | "ERROR" etc. + custom_log: A dict with your structured payload. + """ + level = getattr(logging, severity.upper(), logging.INFO) + _eval_log.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}}) -- 2.49.1 From 6feeeff4f382eaf8e439a2d220c613192645e2b6 Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 17:02:25 +0000 Subject: [PATCH 2/8] feat: use a new logger --- main.py | 94 +++++++++++---------------------------------------------- 1 file changed, 17 insertions(+), 77 deletions(-) diff --git a/main.py b/main.py index dfb5d96..e99f9d8 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,8 @@ # ruff: noqa: INP001 """Async helpers for querying Vertex AI vector search via MCP.""" -import argparse import asyncio import io -import logging -import os from collections.abc import AsyncIterator, Sequence from contextlib import asynccontextmanager from dataclasses import dataclass @@ -17,9 +14,8 @@ from gcloud.aio.storage import Storage from google import genai from google.genai import types as genai_types from mcp.server.fastmcp import Context, FastMCP -from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource -logger = logging.getLogger(__name__) +from .utils import Settings, _args, log_structured_entry HTTP_TOO_MANY_REQUESTS = 429 HTTP_SERVER_ERROR = 500 @@ -91,12 +87,9 @@ class GoogleCloudFileStorage: file_stream.name = file_name except TimeoutError as exc: last_exception = exc - logger.warning( - "Timeout downloading gs://%s/%s (attempt %d/%d)", - self.bucket_name, - file_name, - attempt + 1, - max_retries, + log_structured_entry( + f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})" + "WARNING" ) except aiohttp.ClientResponseError as exc: last_exception = exc @@ -104,13 +97,9 @@ class GoogleCloudFileStorage: exc.status == HTTP_TOO_MANY_REQUESTS or exc.status >= HTTP_SERVER_ERROR ): - logger.warning( - "HTTP %d downloading gs://%s/%s (attempt %d/%d)", - exc.status, - self.bucket_name, - file_name, - attempt + 1, - max_retries, + log_structured_entry( + f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})" + "WARNING" ) else: raise @@ -283,58 +272,6 @@ class GoogleCloudVectorSearch: # --------------------------------------------------------------------------- -def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser() - parser.add_argument( - "--transport", - choices=["stdio", "sse"], - default="stdio", - ) - parser.add_argument("--host", default="0.0.0.0") - parser.add_argument("--port", type=int, default=8080) - parser.add_argument( - "--config", - default=os.environ.get("CONFIG_FILE", "config.yaml"), - ) - return parser.parse_args() - - -_args = _parse_args() - - -class Settings(BaseSettings): - """Server configuration populated from env vars and a YAML config file.""" - - model_config = {"env_file": ".env", "yaml_file": _args.config} - - project_id: str - location: str - bucket: str - index_name: str - deployed_index_id: str - endpoint_name: str - endpoint_domain: str - embedding_model: str = "gemini-embedding-001" - search_limit: int = 10 - - @classmethod - def settings_customise_sources( - cls, - settings_cls: type[BaseSettings], - init_settings: PydanticBaseSettingsSource, - env_settings: PydanticBaseSettingsSource, - dotenv_settings: PydanticBaseSettingsSource, - file_secret_settings: PydanticBaseSettingsSource, - ) -> tuple[PydanticBaseSettingsSource, ...]: - return ( - init_settings, - env_settings, - dotenv_settings, - YamlConfigSettingsSource(settings_cls), - file_secret_settings, - ) - - @dataclass class AppContext: """Shared resources initialised once at server startup.""" @@ -429,13 +366,16 @@ async def knowledge_search( for s in search_results if s["distance"] > cutoff and s["distance"] > min_sim ] - - logger.info( - "knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s", - round((t_embed - t0) * 1000, 1), - round((t_search - t_embed) * 1000, 1), - round((t_search - t0) * 1000, 1), - [s["id"] for s in search_results], + + log_structured_entry( + "knowledge_search timing", + "INFO", + { + "embedding": f"{round((t_embed - t0) * 1000, 1)}ms", + "vector_serach": f"{round((t_search - t_embed) * 1000, 1)}ms", + "total": f"{round((t_search - t0) * 1000, 1)}ms", + "chunks": {[s["id"] for s in search_results]} + } ) # Format results as XML-like documents -- 2.49.1 From 753b5c7871e018cb325c6299fc9d08f707456b20 Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 17:23:24 +0000 Subject: [PATCH 3/8] fix: refactor correct import and add more logs --- main.py | 308 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 249 insertions(+), 59 deletions(-) diff --git a/main.py b/main.py index e99f9d8..5bb2117 100644 --- a/main.py +++ b/main.py @@ -15,7 +15,7 @@ from google import genai from google.genai import types as genai_types from mcp.server.fastmcp import Context, FastMCP -from .utils import Settings, _args, log_structured_entry +from utils import Settings, _args, log_structured_entry HTTP_TOO_MANY_REQUESTS = 429 HTTP_SERVER_ERROR = 500 @@ -70,10 +70,21 @@ class GoogleCloudFileStorage: """ if file_name in self._cache: + log_structured_entry( + "File retrieved from cache", + "INFO", + {"file": file_name, "bucket": self.bucket_name} + ) file_stream = io.BytesIO(self._cache[file_name]) file_stream.name = file_name return file_stream + log_structured_entry( + "Starting file download from GCS", + "INFO", + {"file": file_name, "bucket": self.bucket_name} + ) + storage_client = self._get_aio_storage() last_exception: Exception | None = None @@ -85,11 +96,22 @@ class GoogleCloudFileStorage: ) file_stream = io.BytesIO(self._cache[file_name]) file_stream.name = file_name + log_structured_entry( + "File downloaded successfully", + "INFO", + { + "file": file_name, + "bucket": self.bucket_name, + "size_bytes": len(self._cache[file_name]), + "attempt": attempt + 1 + } + ) except TimeoutError as exc: last_exception = exc log_structured_entry( - f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})" - "WARNING" + f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", + "WARNING", + {"error": str(exc)} ) except aiohttp.ClientResponseError as exc: last_exception = exc @@ -98,22 +120,43 @@ class GoogleCloudFileStorage: or exc.status >= HTTP_SERVER_ERROR ): log_structured_entry( - f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})" - "WARNING" + f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", + "WARNING", + {"status": exc.status, "message": str(exc)} ) else: + log_structured_entry( + f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}", + "ERROR", + {"status": exc.status, "message": str(exc)} + ) raise else: return file_stream if attempt < max_retries - 1: delay = 0.5 * (2**attempt) + log_structured_entry( + "Retrying file download", + "INFO", + {"file": file_name, "delay_seconds": delay} + ) await asyncio.sleep(delay) msg = ( f"Failed to download gs://{self.bucket_name}/{file_name} " f"after {max_retries} attempts" ) + log_structured_entry( + "File download failed after all retries", + "ERROR", + { + "file": file_name, + "bucket": self.bucket_name, + "max_retries": max_retries, + "last_error": str(last_exception) + } + ) raise TimeoutError(msg) from last_exception @@ -210,7 +253,13 @@ class GoogleCloudVectorSearch: "Missing endpoint metadata. Call " "`configure_index_endpoint` before querying." ) + log_structured_entry( + "Vector search query failed - endpoint not configured", + "ERROR", + {"error": msg} + ) raise RuntimeError(msg) + domain = self._endpoint_domain endpoint_id = self._endpoint_name.split("/")[-1] url = ( @@ -218,6 +267,18 @@ class GoogleCloudVectorSearch: f"/locations/{self.location}" f"/indexEndpoints/{endpoint_id}:findNeighbors" ) + + log_structured_entry( + "Starting vector search query", + "INFO", + { + "deployed_index_id": deployed_index_id, + "neighbor_count": limit, + "endpoint_id": endpoint_id, + "embedding_dimension": len(query) + } + ) + payload = { "deployed_index_id": deployed_index_id, "queries": [ @@ -228,43 +289,98 @@ class GoogleCloudVectorSearch: ], } - headers = await self._async_get_auth_headers() - session = self._get_aio_session() - async with session.post( - url, - json=payload, - headers=headers, - ) as response: - if not response.ok: - body = await response.text() - msg = f"findNeighbors returned {response.status}: {body}" - raise RuntimeError(msg) - data = await response.json() + try: + headers = await self._async_get_auth_headers() + session = self._get_aio_session() + async with session.post( + url, + json=payload, + headers=headers, + ) as response: + if not response.ok: + body = await response.text() + msg = f"findNeighbors returned {response.status}: {body}" + log_structured_entry( + "Vector search API request failed", + "ERROR", + { + "status": response.status, + "response_body": body, + "deployed_index_id": deployed_index_id + } + ) + raise RuntimeError(msg) + data = await response.json() - neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) - content_tasks = [] - for neighbor in neighbors: - datapoint_id = neighbor["datapoint"]["datapointId"] - file_path = f"{self.index_name}/contents/{datapoint_id}.md" - content_tasks.append( - self.storage.async_get_file_stream(file_path), + neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) + log_structured_entry( + "Vector search API request successful", + "INFO", + { + "neighbors_found": len(neighbors), + "deployed_index_id": deployed_index_id + } ) - file_streams = await asyncio.gather(*content_tasks) - results: list[SearchResult] = [] - for neighbor, stream in zip( - neighbors, - file_streams, - strict=True, - ): - results.append( - SearchResult( - id=neighbor["datapoint"]["datapointId"], - distance=neighbor["distance"], - content=stream.read().decode("utf-8"), - ), + if not neighbors: + log_structured_entry( + "No neighbors found in vector search", + "WARNING", + {"deployed_index_id": deployed_index_id} + ) + return [] + + # Fetch content for all neighbors + content_tasks = [] + for neighbor in neighbors: + datapoint_id = neighbor["datapoint"]["datapointId"] + file_path = f"{self.index_name}/contents/{datapoint_id}.md" + content_tasks.append( + self.storage.async_get_file_stream(file_path), + ) + + log_structured_entry( + "Fetching content for search results", + "INFO", + {"file_count": len(content_tasks)} ) - return results + + file_streams = await asyncio.gather(*content_tasks) + results: list[SearchResult] = [] + for neighbor, stream in zip( + neighbors, + file_streams, + strict=True, + ): + results.append( + SearchResult( + id=neighbor["datapoint"]["datapointId"], + distance=neighbor["distance"], + content=stream.read().decode("utf-8"), + ), + ) + + log_structured_entry( + "Vector search completed successfully", + "INFO", + { + "results_count": len(results), + "deployed_index_id": deployed_index_id + } + ) + return results + + except Exception as e: + log_structured_entry( + "Vector search query failed with exception", + "ERROR", + { + "error": str(e), + "error_type": type(e).__name__, + "deployed_index_id": deployed_index_id + } + ) + raise # --------------------------------------------------------------------------- @@ -284,28 +400,102 @@ class AppContext: @asynccontextmanager async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: """Create and configure the vector-search client for the server lifetime.""" - vs = GoogleCloudVectorSearch( - project_id=cfg.project_id, - location=cfg.location, - bucket=cfg.bucket, - index_name=cfg.index_name, - ) - vs.configure_index_endpoint( - name=cfg.endpoint_name, - public_domain=cfg.endpoint_domain, + log_structured_entry( + "Initializing MCP server", + "INFO", + { + "project_id": cfg.project_id, + "location": cfg.location, + "bucket": cfg.bucket, + "index_name": cfg.index_name, + } ) - genai_client = genai.Client( - vertexai=True, - project=cfg.project_id, - location=cfg.location, - ) + try: + # Initialize vector search client + log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO") + vs = GoogleCloudVectorSearch( + project_id=cfg.project_id, + location=cfg.location, + bucket=cfg.bucket, + index_name=cfg.index_name, + ) - yield AppContext( - vector_search=vs, - genai_client=genai_client, - settings=cfg, - ) + # Configure endpoint + log_structured_entry( + "Configuring index endpoint", + "INFO", + { + "endpoint_name": cfg.endpoint_name, + "endpoint_domain": cfg.endpoint_domain, + } + ) + vs.configure_index_endpoint( + name=cfg.endpoint_name, + public_domain=cfg.endpoint_domain, + ) + + # Initialize GenAI client + log_structured_entry( + "Creating GenAI client", + "INFO", + {"project_id": cfg.project_id, "location": cfg.location} + ) + genai_client = genai.Client( + vertexai=True, + project=cfg.project_id, + location=cfg.location, + ) + + # Validate credentials by testing a simple operation + log_structured_entry("Validating credentials with test embedding", "INFO") + try: + test_response = await genai_client.aio.models.embed_content( + model=cfg.embedding_model, + contents="test", + config=genai_types.EmbedContentConfig( + task_type="RETRIEVAL_QUERY", + ), + ) + if test_response and test_response.embeddings: + log_structured_entry( + "Credentials validated successfully", + "INFO", + {"embedding_dimension": len(test_response.embeddings[0].values)} + ) + else: + log_structured_entry( + "Credential validation returned empty response", + "WARNING" + ) + except Exception as e: + log_structured_entry( + "Failed to validate credentials - embedding test failed", + "ERROR", + {"error": str(e), "error_type": type(e).__name__} + ) + raise + + log_structured_entry("MCP server initialization complete", "INFO") + + yield AppContext( + vector_search=vs, + genai_client=genai_client, + settings=cfg, + ) + + except Exception as e: + log_structured_entry( + "Failed to initialize MCP server", + "ERROR", + { + "error": str(e), + "error_type": type(e).__name__, + } + ) + raise + finally: + log_structured_entry("MCP server lifespan ending", "INFO") cfg = Settings.model_validate({}) @@ -372,9 +562,9 @@ async def knowledge_search( "INFO", { "embedding": f"{round((t_embed - t0) * 1000, 1)}ms", - "vector_serach": f"{round((t_search - t_embed) * 1000, 1)}ms", + "vector_search": f"{round((t_search - t_embed) * 1000, 1)}ms", "total": f"{round((t_search - t0) * 1000, 1)}ms", - "chunks": {[s["id"] for s in search_results]} + "chunks": [s["id"] for s in search_results] } ) -- 2.49.1 From 13c8e122deb9678cf12a8daf7e3ee4c9e2ea67d0 Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 17:40:37 +0000 Subject: [PATCH 4/8] fix: add more validations for bd connection --- main.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 127 insertions(+), 9 deletions(-) diff --git a/main.py b/main.py index 5bb2117..4f6858e 100644 --- a/main.py +++ b/main.py @@ -447,8 +447,11 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: location=cfg.location, ) - # Validate credentials by testing a simple operation - log_structured_entry("Validating credentials with test embedding", "INFO") + # Validate credentials and configuration by testing actual resources + log_structured_entry("Starting validation of credentials and resources", "INFO") + + # 1. Validate GenAI embedding access + log_structured_entry("Validating GenAI embedding access", "INFO") try: test_response = await genai_client.aio.models.embed_content( model=cfg.embedding_model, @@ -459,24 +462,139 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: ) if test_response and test_response.embeddings: log_structured_entry( - "Credentials validated successfully", + "GenAI embedding validation successful", "INFO", {"embedding_dimension": len(test_response.embeddings[0].values)} ) else: - log_structured_entry( - "Credential validation returned empty response", - "WARNING" - ) + msg = "Embedding validation returned empty response" + log_structured_entry(msg, "ERROR") + raise RuntimeError(msg) except Exception as e: log_structured_entry( - "Failed to validate credentials - embedding test failed", + "Failed to validate GenAI embedding access", "ERROR", {"error": str(e), "error_type": type(e).__name__} ) raise - log_structured_entry("MCP server initialization complete", "INFO") + # 2. Validate GCS bucket access + log_structured_entry( + "Validating GCS bucket access", + "INFO", + {"bucket": cfg.bucket} + ) + try: + session = vs.storage._get_aio_session() + token_obj = Token( + session=session, + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + access_token = await token_obj.get() + headers = {"Authorization": f"Bearer {access_token}"} + + async with session.get( + f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1", + headers=headers, + ) as response: + if response.status == 403: + msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions." + log_structured_entry( + "GCS bucket validation failed - access denied", + "ERROR", + {"bucket": cfg.bucket, "status": response.status} + ) + raise RuntimeError(msg) + elif response.status == 404: + msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project." + log_structured_entry( + "GCS bucket validation failed - not found", + "ERROR", + {"bucket": cfg.bucket, "status": response.status} + ) + raise RuntimeError(msg) + elif not response.ok: + body = await response.text() + msg = f"Failed to access bucket '{cfg.bucket}': {response.status} - {body}" + log_structured_entry( + "GCS bucket validation failed", + "ERROR", + {"bucket": cfg.bucket, "status": response.status, "response": body} + ) + raise RuntimeError(msg) + + log_structured_entry( + "GCS bucket validation successful", + "INFO", + {"bucket": cfg.bucket} + ) + except RuntimeError: + raise + except Exception as e: + log_structured_entry( + "Failed to validate GCS bucket access", + "ERROR", + {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket} + ) + raise + + # 3. Validate vector search endpoint access + log_structured_entry( + "Validating vector search endpoint access", + "INFO", + {"endpoint_name": cfg.endpoint_name} + ) + try: + # Try to get endpoint info + headers = await vs._async_get_auth_headers() + session = vs._get_aio_session() + endpoint_url = ( + f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}" + ) + + async with session.get(endpoint_url, headers=headers) as response: + if response.status == 403: + msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions." + log_structured_entry( + "Vector search endpoint validation failed - access denied", + "ERROR", + {"endpoint": cfg.endpoint_name, "status": response.status} + ) + raise RuntimeError(msg) + elif response.status == 404: + msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project." + log_structured_entry( + "Vector search endpoint validation failed - not found", + "ERROR", + {"endpoint": cfg.endpoint_name, "status": response.status} + ) + raise RuntimeError(msg) + elif not response.ok: + body = await response.text() + msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status} - {body}" + log_structured_entry( + "Vector search endpoint validation failed", + "ERROR", + {"endpoint": cfg.endpoint_name, "status": response.status, "response": body} + ) + raise RuntimeError(msg) + + log_structured_entry( + "Vector search endpoint validation successful", + "INFO", + {"endpoint": cfg.endpoint_name} + ) + except RuntimeError: + raise + except Exception as e: + log_structured_entry( + "Failed to validate vector search endpoint access", + "ERROR", + {"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name} + ) + raise + + log_structured_entry("All validations passed - MCP server initialization complete", "INFO") yield AppContext( vector_search=vs, -- 2.49.1 From a8c611fbeca75254db76377dc2cb493560022887 Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 18:00:41 +0000 Subject: [PATCH 5/8] feat: do not exit app and save errors --- main.py | 103 ++++++++++++++++++++++++++++++-------------------------- 1 file changed, 55 insertions(+), 48 deletions(-) diff --git a/main.py b/main.py index 4f6858e..ffac63f 100644 --- a/main.py +++ b/main.py @@ -448,8 +448,11 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: ) # Validate credentials and configuration by testing actual resources + # These validations are non-blocking - errors are logged but won't stop startup log_structured_entry("Starting validation of credentials and resources", "INFO") + validation_errors = [] + # 1. Validate GenAI embedding access log_structured_entry("Validating GenAI embedding access", "INFO") try: @@ -468,15 +471,15 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: ) else: msg = "Embedding validation returned empty response" - log_structured_entry(msg, "ERROR") - raise RuntimeError(msg) + log_structured_entry(msg, "WARNING") + validation_errors.append(msg) except Exception as e: log_structured_entry( - "Failed to validate GenAI embedding access", - "ERROR", + "Failed to validate GenAI embedding access - service may not work correctly", + "WARNING", {"error": str(e), "error_type": type(e).__name__} ) - raise + validation_errors.append(f"GenAI: {str(e)}") # 2. Validate GCS bucket access log_structured_entry( @@ -500,43 +503,41 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: if response.status == 403: msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions." log_structured_entry( - "GCS bucket validation failed - access denied", - "ERROR", + "GCS bucket validation failed - access denied - service may not work correctly", + "WARNING", {"bucket": cfg.bucket, "status": response.status} ) - raise RuntimeError(msg) + validation_errors.append(msg) elif response.status == 404: msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project." log_structured_entry( - "GCS bucket validation failed - not found", - "ERROR", + "GCS bucket validation failed - not found - service may not work correctly", + "WARNING", {"bucket": cfg.bucket, "status": response.status} ) - raise RuntimeError(msg) + validation_errors.append(msg) elif not response.ok: body = await response.text() - msg = f"Failed to access bucket '{cfg.bucket}': {response.status} - {body}" + msg = f"Failed to access bucket '{cfg.bucket}': {response.status}" log_structured_entry( - "GCS bucket validation failed", - "ERROR", + "GCS bucket validation failed - service may not work correctly", + "WARNING", {"bucket": cfg.bucket, "status": response.status, "response": body} ) - raise RuntimeError(msg) - - log_structured_entry( - "GCS bucket validation successful", - "INFO", - {"bucket": cfg.bucket} - ) - except RuntimeError: - raise + validation_errors.append(msg) + else: + log_structured_entry( + "GCS bucket validation successful", + "INFO", + {"bucket": cfg.bucket} + ) except Exception as e: log_structured_entry( - "Failed to validate GCS bucket access", - "ERROR", + "Failed to validate GCS bucket access - service may not work correctly", + "WARNING", {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket} ) - raise + validation_errors.append(f"GCS: {str(e)}") # 3. Validate vector search endpoint access log_structured_entry( @@ -556,45 +557,51 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: if response.status == 403: msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions." log_structured_entry( - "Vector search endpoint validation failed - access denied", - "ERROR", + "Vector search endpoint validation failed - access denied - service may not work correctly", + "WARNING", {"endpoint": cfg.endpoint_name, "status": response.status} ) - raise RuntimeError(msg) + validation_errors.append(msg) elif response.status == 404: msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project." log_structured_entry( - "Vector search endpoint validation failed - not found", - "ERROR", + "Vector search endpoint validation failed - not found - service may not work correctly", + "WARNING", {"endpoint": cfg.endpoint_name, "status": response.status} ) - raise RuntimeError(msg) + validation_errors.append(msg) elif not response.ok: body = await response.text() - msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status} - {body}" + msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}" log_structured_entry( - "Vector search endpoint validation failed", - "ERROR", + "Vector search endpoint validation failed - service may not work correctly", + "WARNING", {"endpoint": cfg.endpoint_name, "status": response.status, "response": body} ) - raise RuntimeError(msg) - - log_structured_entry( - "Vector search endpoint validation successful", - "INFO", - {"endpoint": cfg.endpoint_name} - ) - except RuntimeError: - raise + validation_errors.append(msg) + else: + log_structured_entry( + "Vector search endpoint validation successful", + "INFO", + {"endpoint": cfg.endpoint_name} + ) except Exception as e: log_structured_entry( - "Failed to validate vector search endpoint access", - "ERROR", + "Failed to validate vector search endpoint access - service may not work correctly", + "WARNING", {"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name} ) - raise + validation_errors.append(f"Vector Search: {str(e)}") - log_structured_entry("All validations passed - MCP server initialization complete", "INFO") + # Summary of validations + if validation_errors: + log_structured_entry( + "MCP server started with validation errors - service may not work correctly", + "WARNING", + {"validation_errors": validation_errors, "error_count": len(validation_errors)} + ) + else: + log_structured_entry("All validations passed - MCP server initialization complete", "INFO") yield AppContext( vector_search=vs, -- 2.49.1 From 0fd97a31a534a434c90fa21f500c66b81860b420 Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 18:08:04 +0000 Subject: [PATCH 6/8] fix: add robust handling for exceptions --- main.py | 159 +++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 118 insertions(+), 41 deletions(-) diff --git a/main.py b/main.py index ffac63f..e5b053b 100644 --- a/main.py +++ b/main.py @@ -654,51 +654,128 @@ async def knowledge_search( t0 = time.perf_counter() min_sim = 0.6 - response = await app.genai_client.aio.models.embed_content( - model=app.settings.embedding_model, - contents=query, - config=genai_types.EmbedContentConfig( - task_type="RETRIEVAL_QUERY", - - ), - ) - embedding = response.embeddings[0].values - t_embed = time.perf_counter() - - search_results = await app.vector_search.async_run_query( - deployed_index_id=app.settings.deployed_index_id, - query=embedding, - limit=app.settings.search_limit, - ) - t_search = time.perf_counter() - - # Apply similarity filtering - if search_results: - max_sim = max(r["distance"] for r in search_results) - cutoff = max_sim * 0.9 - search_results = [ - s - for s in search_results - if s["distance"] > cutoff and s["distance"] > min_sim - ] - log_structured_entry( - "knowledge_search timing", + "knowledge_search request received", "INFO", - { - "embedding": f"{round((t_embed - t0) * 1000, 1)}ms", - "vector_search": f"{round((t_search - t_embed) * 1000, 1)}ms", - "total": f"{round((t_search - t0) * 1000, 1)}ms", - "chunks": [s["id"] for s in search_results] - } + {"query": query[:100]} # Log first 100 chars of query ) - # Format results as XML-like documents - formatted_results = [ - f"\n{result['content']}\n" - for i, result in enumerate(search_results, start=1) - ] - return "\n".join(formatted_results) + try: + # Generate embedding for the query + log_structured_entry("Generating query embedding", "INFO") + try: + response = await app.genai_client.aio.models.embed_content( + model=app.settings.embedding_model, + contents=query, + config=genai_types.EmbedContentConfig( + task_type="RETRIEVAL_QUERY", + ), + ) + embedding = response.embeddings[0].values + t_embed = time.perf_counter() + log_structured_entry( + "Query embedding generated successfully", + "INFO", + {"time_ms": round((t_embed - t0) * 1000, 1)} + ) + except Exception as e: + error_type = type(e).__name__ + error_msg = str(e) + + # Check if it's a rate limit error + if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg: + log_structured_entry( + "Rate limit exceeded while generating embedding", + "WARNING", + { + "error": error_msg, + "error_type": error_type, + "query": query[:100] + } + ) + return "Error: API rate limit exceeded. Please try again later." + else: + log_structured_entry( + "Failed to generate query embedding", + "ERROR", + { + "error": error_msg, + "error_type": error_type, + "query": query[:100] + } + ) + return f"Error generating embedding: {error_msg}" + + # Perform vector search + log_structured_entry("Performing vector search", "INFO") + try: + search_results = await app.vector_search.async_run_query( + deployed_index_id=app.settings.deployed_index_id, + query=embedding, + limit=app.settings.search_limit, + ) + t_search = time.perf_counter() + except Exception as e: + log_structured_entry( + "Vector search failed", + "ERROR", + { + "error": str(e), + "error_type": type(e).__name__, + "query": query[:100] + } + ) + return f"Error performing vector search: {str(e)}" + + # Apply similarity filtering + if search_results: + max_sim = max(r["distance"] for r in search_results) + cutoff = max_sim * 0.9 + search_results = [ + s + for s in search_results + if s["distance"] > cutoff and s["distance"] > min_sim + ] + + log_structured_entry( + "knowledge_search completed successfully", + "INFO", + { + "embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms", + "vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms", + "total_ms": f"{round((t_search - t0) * 1000, 1)}ms", + "results_count": len(search_results), + "chunks": [s["id"] for s in search_results] + } + ) + + # Format results as XML-like documents + if not search_results: + log_structured_entry( + "No results found for query", + "INFO", + {"query": query[:100]} + ) + return "No relevant documents found for your query." + + formatted_results = [ + f"\n{result['content']}\n" + for i, result in enumerate(search_results, start=1) + ] + return "\n".join(formatted_results) + + except Exception as e: + # Catch-all for any unexpected errors + log_structured_entry( + "Unexpected error in knowledge_search", + "ERROR", + { + "error": str(e), + "error_type": type(e).__name__, + "query": query[:100] + } + ) + return f"Unexpected error during search: {str(e)}" if __name__ == "__main__": -- 2.49.1 From a3ba340224888593fd5ca21d8992bb70ed508687 Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 18:28:37 +0000 Subject: [PATCH 7/8] fix: pass all checks for ruff and ty --- main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main.py b/main.py index e5b053b..2fcab4c 100644 --- a/main.py +++ b/main.py @@ -464,10 +464,11 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: ), ) if test_response and test_response.embeddings: + embedding_values = test_response.embeddings[0].values log_structured_entry( "GenAI embedding validation successful", "INFO", - {"embedding_dimension": len(test_response.embeddings[0].values)} + {"embedding_dimension": len(embedding_values) if embedding_values else 0} ) else: msg = "Embedding validation returned empty response" -- 2.49.1 From b95bb72b24e379f193350ca34e936c1acc4a753e Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 22:14:29 +0000 Subject: [PATCH 8/8] feat: attended PR comments --- main.py | 4 +--- utils/__init__.py | 4 ++-- utils/config.py | 6 ++++++ utils/logging_setup.py | 16 +++++++--------- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index 2fcab4c..c70a6c9 100644 --- a/main.py +++ b/main.py @@ -15,7 +15,7 @@ from google import genai from google.genai import types as genai_types from mcp.server.fastmcp import Context, FastMCP -from utils import Settings, _args, log_structured_entry +from utils import Settings, _args, cfg, log_structured_entry HTTP_TOO_MANY_REQUESTS = 429 HTTP_SERVER_ERROR = 500 @@ -624,8 +624,6 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: log_structured_entry("MCP server lifespan ending", "INFO") -cfg = Settings.model_validate({}) - mcp = FastMCP( "knowledge-search", host=_args.host, diff --git a/utils/__init__.py b/utils/__init__.py index dcbcb26..17f1feb 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -1,4 +1,4 @@ -from .config import Settings, _args +from .config import Settings, _args, cfg from .logging_setup import log_structured_entry -__all__ = ['Settings', '_args', 'log_structured_entry'] +__all__ = ['Settings', '_args', 'cfg', 'log_structured_entry'] diff --git a/utils/config.py b/utils/config.py index 07b5c01..04f81d4 100644 --- a/utils/config.py +++ b/utils/config.py @@ -35,6 +35,8 @@ class Settings(BaseSettings): endpoint_domain: str embedding_model: str = "gemini-embedding-001" search_limit: int = 10 + log_name: str = "va_agent_evaluation_logs" + log_level: str = "INFO" @classmethod def settings_customise_sources( @@ -52,3 +54,7 @@ class Settings(BaseSettings): YamlConfigSettingsSource(settings_cls), file_secret_settings, ) + + +# Singleton instance of Settings +cfg = Settings.model_validate({}) diff --git a/utils/logging_setup.py b/utils/logging_setup.py index 686da1f..fb1519a 100644 --- a/utils/logging_setup.py +++ b/utils/logging_setup.py @@ -9,26 +9,24 @@ from typing import Optional, Dict, Literal import google.cloud.logging from google.cloud.logging.handlers import CloudLoggingHandler -from .config import Settings +from .config import cfg def _setup_logger() -> logging.Logger: """Create or return the singleton evaluation logger.""" - log_name = "va_agent-evaluation-logs" - logger = logging.getLogger(log_name) - cfg = Settings.model_validate({}) + logger = logging.getLogger(cfg.log_name) if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers): return logger try: client = google.cloud.logging.Client(project=cfg.project_id) - handler = CloudLoggingHandler(client, name=log_name) # async transport + handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport logger.addHandler(handler) - logger.setLevel(logging.INFO) + logger.setLevel(getattr(logging, cfg.log_level.upper())) except Exception as e: # Fallback to console if Cloud Logging is unavailable (local dev) - logging.basicConfig(level=logging.INFO) - logger = logging.getLogger(log_name) + logging.basicConfig(level=getattr(logging, cfg.log_level.upper())) + logger = logging.getLogger(cfg.log_name) logger.warning("Cloud Logging setup failed; using console. Error: %s", e) return logger @@ -43,7 +41,7 @@ def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERR Args: message: Short label for the row (e.g., "Final agent turn"). - severity: "INFO" | "WARNING" | "ERROR" etc. + severity: "INFO" | "WARNING" | "ERROR" custom_log: A dict with your structured payload. """ level = getattr(logging, severity.upper(), logging.INFO) -- 2.49.1