Compare commits

12 Commits

4 changed files with 626 additions and 161 deletions

511
main.py
View File

@@ -1,14 +1,12 @@
# ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP."""
import argparse
import asyncio
import io
import logging
import os
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from dataclasses import dataclass
from enum import Enum
from typing import BinaryIO, TypedDict
import aiohttp
@@ -17,14 +15,21 @@ from gcloud.aio.storage import Storage
from google import genai
from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
logger = logging.getLogger(__name__)
from utils import Settings, _args, cfg, log_structured_entry
HTTP_TOO_MANY_REQUESTS = 429
HTTP_SERVER_ERROR = 500
class SourceNamespace(str, Enum):
"""Allowed values for the 'source' namespace filter."""
EDUCACION_FINANCIERA = "Educacion Financiera"
PRODUCTOS_Y_SERVICIOS = "Productos y Servicios"
FUNCIONALIDADES_APP_MOVIL = "Funcionalidades de la App Movil"
class GoogleCloudFileStorage:
"""Cache-aware helper for downloading files from Google Cloud Storage."""
@@ -74,10 +79,21 @@ class GoogleCloudFileStorage:
"""
if file_name in self._cache:
log_structured_entry(
"File retrieved from cache",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name
return file_stream
log_structured_entry(
"Starting file download from GCS",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
storage_client = self._get_aio_storage()
last_exception: Exception | None = None
@@ -89,14 +105,22 @@ class GoogleCloudFileStorage:
)
file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name
log_structured_entry(
"File downloaded successfully",
"INFO",
{
"file": file_name,
"bucket": self.bucket_name,
"size_bytes": len(self._cache[file_name]),
"attempt": attempt + 1
}
)
except TimeoutError as exc:
last_exception = exc
logger.warning(
"Timeout downloading gs://%s/%s (attempt %d/%d)",
self.bucket_name,
file_name,
attempt + 1,
max_retries,
log_structured_entry(
f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})",
"WARNING",
{"error": str(exc)}
)
except aiohttp.ClientResponseError as exc:
last_exception = exc
@@ -104,27 +128,44 @@ class GoogleCloudFileStorage:
exc.status == HTTP_TOO_MANY_REQUESTS
or exc.status >= HTTP_SERVER_ERROR
):
logger.warning(
"HTTP %d downloading gs://%s/%s (attempt %d/%d)",
exc.status,
self.bucket_name,
file_name,
attempt + 1,
max_retries,
log_structured_entry(
f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})",
"WARNING",
{"status": exc.status, "message": str(exc)}
)
else:
log_structured_entry(
f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}",
"ERROR",
{"status": exc.status, "message": str(exc)}
)
raise
else:
return file_stream
if attempt < max_retries - 1:
delay = 0.5 * (2**attempt)
log_structured_entry(
"Retrying file download",
"INFO",
{"file": file_name, "delay_seconds": delay}
)
await asyncio.sleep(delay)
msg = (
f"Failed to download gs://{self.bucket_name}/{file_name} "
f"after {max_retries} attempts"
)
log_structured_entry(
"File download failed after all retries",
"ERROR",
{
"file": file_name,
"bucket": self.bucket_name,
"max_retries": max_retries,
"last_error": str(last_exception)
}
)
raise TimeoutError(msg) from last_exception
@@ -204,6 +245,7 @@ class GoogleCloudVectorSearch:
deployed_index_id: str,
query: Sequence[float],
limit: int,
source: SourceNamespace | None = None,
) -> list[SearchResult]:
"""Run an async similarity search via the REST API.
@@ -211,6 +253,7 @@ class GoogleCloudVectorSearch:
deployed_index_id: The ID of the deployed index.
query: The embedding vector for the search query.
limit: Maximum number of nearest neighbors to return.
source: Optional namespace filter to restrict results by source.
Returns:
A list of matched items with id, distance, and content.
@@ -221,7 +264,13 @@ class GoogleCloudVectorSearch:
"Missing endpoint metadata. Call "
"`configure_index_endpoint` before querying."
)
log_structured_entry(
"Vector search query failed - endpoint not configured",
"ERROR",
{"error": msg}
)
raise RuntimeError(msg)
domain = self._endpoint_domain
endpoint_id = self._endpoint_name.split("/")[-1]
url = (
@@ -229,16 +278,34 @@ class GoogleCloudVectorSearch:
f"/locations/{self.location}"
f"/indexEndpoints/{endpoint_id}:findNeighbors"
)
log_structured_entry(
"Starting vector search query",
"INFO",
{
"deployed_index_id": deployed_index_id,
"neighbor_count": limit,
"endpoint_id": endpoint_id,
"embedding_dimension": len(query)
}
)
datapoint: dict = {"feature_vector": list(query)}
if source is not None:
datapoint["restricts"] = [
{"namespace": "source", "allow_list": [source.value]},
]
payload = {
"deployed_index_id": deployed_index_id,
"queries": [
{
"datapoint": {"feature_vector": list(query)},
"datapoint": datapoint,
"neighbor_count": limit,
},
],
}
try:
headers = await self._async_get_auth_headers()
session = self._get_aio_session()
async with session.post(
@@ -249,10 +316,37 @@ class GoogleCloudVectorSearch:
if not response.ok:
body = await response.text()
msg = f"findNeighbors returned {response.status}: {body}"
log_structured_entry(
"Vector search API request failed",
"ERROR",
{
"status": response.status,
"response_body": body,
"deployed_index_id": deployed_index_id
}
)
raise RuntimeError(msg)
data = await response.json()
neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", [])
log_structured_entry(
"Vector search API request successful",
"INFO",
{
"neighbors_found": len(neighbors),
"deployed_index_id": deployed_index_id
}
)
if not neighbors:
log_structured_entry(
"No neighbors found in vector search",
"WARNING",
{"deployed_index_id": deployed_index_id}
)
return []
# Fetch content for all neighbors
content_tasks = []
for neighbor in neighbors:
datapoint_id = neighbor["datapoint"]["datapointId"]
@@ -261,6 +355,12 @@ class GoogleCloudVectorSearch:
self.storage.async_get_file_stream(file_path),
)
log_structured_entry(
"Fetching content for search results",
"INFO",
{"file_count": len(content_tasks)}
)
file_streams = await asyncio.gather(*content_tasks)
results: list[SearchResult] = []
for neighbor, stream in zip(
@@ -275,66 +375,35 @@ class GoogleCloudVectorSearch:
content=stream.read().decode("utf-8"),
),
)
log_structured_entry(
"Vector search completed successfully",
"INFO",
{
"results_count": len(results),
"deployed_index_id": deployed_index_id
}
)
return results
except Exception as e:
log_structured_entry(
"Vector search query failed with exception",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"deployed_index_id": deployed_index_id
}
)
raise
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
@dataclass
class AppContext:
"""Shared resources initialised once at server startup."""
@@ -347,31 +416,229 @@ class AppContext:
@asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Create and configure the vector-search client for the server lifetime."""
log_structured_entry(
"Initializing MCP server",
"INFO",
{
"project_id": cfg.project_id,
"location": cfg.location,
"bucket": cfg.bucket,
"index_name": cfg.index_name,
}
)
try:
# Initialize vector search client
log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO")
vs = GoogleCloudVectorSearch(
project_id=cfg.project_id,
location=cfg.location,
bucket=cfg.bucket,
index_name=cfg.index_name,
)
# Configure endpoint
log_structured_entry(
"Configuring index endpoint",
"INFO",
{
"endpoint_name": cfg.endpoint_name,
"endpoint_domain": cfg.endpoint_domain,
}
)
vs.configure_index_endpoint(
name=cfg.endpoint_name,
public_domain=cfg.endpoint_domain,
)
# Initialize GenAI client
log_structured_entry(
"Creating GenAI client",
"INFO",
{"project_id": cfg.project_id, "location": cfg.location}
)
genai_client = genai.Client(
vertexai=True,
project=cfg.project_id,
location=cfg.location,
)
# Validate credentials and configuration by testing actual resources
# These validations are non-blocking - errors are logged but won't stop startup
log_structured_entry("Starting validation of credentials and resources", "INFO")
validation_errors = []
# 1. Validate GenAI embedding access
log_structured_entry("Validating GenAI embedding access", "INFO")
try:
test_response = await genai_client.aio.models.embed_content(
model=cfg.embedding_model,
contents="test",
config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY",
),
)
if test_response and test_response.embeddings:
embedding_values = test_response.embeddings[0].values
log_structured_entry(
"GenAI embedding validation successful",
"INFO",
{"embedding_dimension": len(embedding_values) if embedding_values else 0}
)
else:
msg = "Embedding validation returned empty response"
log_structured_entry(msg, "WARNING")
validation_errors.append(msg)
except Exception as e:
log_structured_entry(
"Failed to validate GenAI embedding access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__}
)
validation_errors.append(f"GenAI: {str(e)}")
# 2. Validate GCS bucket access
log_structured_entry(
"Validating GCS bucket access",
"INFO",
{"bucket": cfg.bucket}
)
try:
session = vs.storage._get_aio_session()
token_obj = Token(
session=session,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
access_token = await token_obj.get()
headers = {"Authorization": f"Bearer {access_token}"}
async with session.get(
f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
headers=headers,
) as response:
if response.status == 403:
msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
log_structured_entry(
"GCS bucket validation failed - access denied - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status}
)
validation_errors.append(msg)
elif response.status == 404:
msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
log_structured_entry(
"GCS bucket validation failed - not found - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status}
)
validation_errors.append(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access bucket '{cfg.bucket}': {response.status}"
log_structured_entry(
"GCS bucket validation failed - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status, "response": body}
)
validation_errors.append(msg)
else:
log_structured_entry(
"GCS bucket validation successful",
"INFO",
{"bucket": cfg.bucket}
)
except Exception as e:
log_structured_entry(
"Failed to validate GCS bucket access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}
)
validation_errors.append(f"GCS: {str(e)}")
# 3. Validate vector search endpoint access
log_structured_entry(
"Validating vector search endpoint access",
"INFO",
{"endpoint_name": cfg.endpoint_name}
)
try:
# Try to get endpoint info
headers = await vs._async_get_auth_headers()
session = vs._get_aio_session()
endpoint_url = (
f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
)
async with session.get(endpoint_url, headers=headers) as response:
if response.status == 403:
msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions."
log_structured_entry(
"Vector search endpoint validation failed - access denied - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
validation_errors.append(msg)
elif response.status == 404:
msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project."
log_structured_entry(
"Vector search endpoint validation failed - not found - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
validation_errors.append(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}"
log_structured_entry(
"Vector search endpoint validation failed - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status, "response": body}
)
validation_errors.append(msg)
else:
log_structured_entry(
"Vector search endpoint validation successful",
"INFO",
{"endpoint": cfg.endpoint_name}
)
except Exception as e:
log_structured_entry(
"Failed to validate vector search endpoint access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name}
)
validation_errors.append(f"Vector Search: {str(e)}")
# Summary of validations
if validation_errors:
log_structured_entry(
"MCP server started with validation errors - service may not work correctly",
"WARNING",
{"validation_errors": validation_errors, "error_count": len(validation_errors)}
)
else:
log_structured_entry("All validations passed - MCP server initialization complete", "INFO")
yield AppContext(
vector_search=vs,
genai_client=genai_client,
settings=cfg,
)
except Exception as e:
log_structured_entry(
"Failed to initialize MCP server",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
}
)
raise
finally:
log_structured_entry("MCP server lifespan ending", "INFO")
cfg = Settings.model_validate({})
mcp = FastMCP(
"knowledge-search",
@@ -385,12 +652,16 @@ mcp = FastMCP(
async def knowledge_search(
query: str,
ctx: Context,
source: SourceNamespace | None = None,
) -> str:
"""Search a knowledge base using a natural-language query.
Args:
query: The text query to search for.
ctx: MCP request context (injected automatically).
source: Optional filter to restrict results by source.
Allowed values: 'Educacion Financiera',
'Productos y Servicios', 'Funcionalidades de la App Movil'.
Returns:
A formatted string containing matched documents with id and content.
@@ -402,23 +673,79 @@ async def knowledge_search(
t0 = time.perf_counter()
min_sim = 0.6
log_structured_entry(
"knowledge_search request received",
"INFO",
{"query": query[:100]} # Log first 100 chars of query
)
try:
# Generate embedding for the query
log_structured_entry("Generating query embedding", "INFO")
try:
response = await app.genai_client.aio.models.embed_content(
model=app.settings.embedding_model,
contents=query,
config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY",
),
)
embedding = response.embeddings[0].values
t_embed = time.perf_counter()
log_structured_entry(
"Query embedding generated successfully",
"INFO",
{"time_ms": round((t_embed - t0) * 1000, 1)}
)
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
# Check if it's a rate limit error
if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
log_structured_entry(
"Rate limit exceeded while generating embedding",
"WARNING",
{
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
)
return "Error: API rate limit exceeded. Please try again later."
else:
log_structured_entry(
"Failed to generate query embedding",
"ERROR",
{
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
)
return f"Error generating embedding: {error_msg}"
# Perform vector search
log_structured_entry("Performing vector search", "INFO")
try:
search_results = await app.vector_search.async_run_query(
deployed_index_id=app.settings.deployed_index_id,
query=embedding,
limit=app.settings.search_limit,
source=source,
)
t_search = time.perf_counter()
except Exception as e:
log_structured_entry(
"Vector search failed",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Error performing vector search: {str(e)}"
# Apply similarity filtering
if search_results:
@@ -430,21 +757,47 @@ async def knowledge_search(
if s["distance"] > cutoff and s["distance"] > min_sim
]
logger.info(
"knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s",
round((t_embed - t0) * 1000, 1),
round((t_search - t_embed) * 1000, 1),
round((t_search - t0) * 1000, 1),
[s["id"] for s in search_results],
log_structured_entry(
"knowledge_search completed successfully",
"INFO",
{
"embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms",
"vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms",
"total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
"source_filter": source.value if source is not None else None,
"results_count": len(search_results),
"chunks": [s["id"] for s in search_results]
}
)
# Format results as XML-like documents
if not search_results:
log_structured_entry(
"No results found for query",
"INFO",
{"query": query[:100]}
)
return "No relevant documents found for your query."
formatted_results = [
f"<document {i} name={result['id']}>\n{result['content']}\n</document {i}>"
for i, result in enumerate(search_results, start=1)
]
return "\n".join(formatted_results)
except Exception as e:
# Catch-all for any unexpected errors
log_structured_entry(
"Unexpected error in knowledge_search",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Unexpected error during search: {str(e)}"
if __name__ == "__main__":
mcp.run(transport=_args.transport)

4
utils/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .config import Settings, _args, cfg
from .logging_setup import log_structured_entry
__all__ = ['Settings', '_args', 'cfg', 'log_structured_entry']

60
utils/config.py Normal file
View File

@@ -0,0 +1,60 @@
import os
import argparse
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse", "streamable-http"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
log_name: str = "va_agent_evaluation_logs"
log_level: str = "INFO"
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
# Singleton instance of Settings
cfg = Settings.model_validate({})

48
utils/logging_setup.py Normal file
View File

@@ -0,0 +1,48 @@
"""
Centralized Cloud Logging setup.
Uses CloudLoggingHandler (background thread) so logging does not add latency
"""
import logging
from typing import Optional, Dict, Literal
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
from .config import cfg
def _setup_logger() -> logging.Logger:
"""Create or return the singleton evaluation logger."""
logger = logging.getLogger(cfg.log_name)
if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers):
return logger
try:
client = google.cloud.logging.Client(project=cfg.project_id)
handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport
logger.addHandler(handler)
logger.setLevel(getattr(logging, cfg.log_level.upper()))
except Exception as e:
# Fallback to console if Cloud Logging is unavailable (local dev)
logging.basicConfig(level=getattr(logging, cfg.log_level.upper()))
logger = logging.getLogger(cfg.log_name)
logger.warning("Cloud Logging setup failed; using console. Error: %s", e)
return logger
_eval_log = _setup_logger()
def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None:
"""
Emit a JSON-structured log row.
Args:
message: Short label for the row (e.g., "Final agent turn").
severity: "INFO" | "WARNING" | "ERROR"
custom_log: A dict with your structured payload.
"""
level = getattr(logging, severity.upper(), logging.INFO)
_eval_log.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}})