Compare commits

15 Commits

Author SHA1 Message Date
3e2386b9b6 Merge pull request 'Update Dockerfile' (#7) from update-dockerfile into main
Reviewed-on: #7
2026-02-25 22:01:38 +00:00
Anibal Angulo
42e1660143 Update Dockerfile 2026-02-25 20:05:03 +00:00
208d5ebebf Merge pull request 'Add streamable-http transport option' (#6) from streamable-http into main
Reviewed-on: #6
2026-02-25 19:59:53 +00:00
83ed64326f Add streamable-http transport option 2026-02-25 19:59:14 +00:00
4c59da0c22 Merge pull request 'Add metadata filtering' (#5) from categories into main
Reviewed-on: #5
2026-02-24 23:05:51 +00:00
Anibal Angulo
cbf3ca7df4 Add metadata filtering 2026-02-24 22:57:16 +00:00
e9b4c93a20 Merge pull request 'Fix: Agrega logs para las operaciones en la base de datos' (#4) from fix/logs into main
Reviewed-on: #4
2026-02-24 22:37:14 +00:00
b95bb72b24 feat: attended PR comments 2026-02-24 22:14:29 +00:00
a3ba340224 fix: pass all checks for ruff and ty 2026-02-24 18:28:37 +00:00
0fd97a31a5 fix: add robust handling for exceptions 2026-02-24 18:08:04 +00:00
a8c611fbec feat: do not exit app and save errors 2026-02-24 18:00:41 +00:00
13c8e122de fix: add more validations for bd connection 2026-02-24 17:40:37 +00:00
753b5c7871 fix: refactor correct import and add more logs 2026-02-24 17:23:24 +00:00
6feeeff4f3 feat: use a new logger 2026-02-24 17:02:25 +00:00
3b7dd91a71 feat: add 'log_structured_entry' for JSON structured logs
Move config into own file
2026-02-24 17:02:06 +00:00
7 changed files with 634 additions and 223 deletions

View File

@@ -8,7 +8,8 @@ COPY pyproject.toml uv.lock ./
RUN uv sync --no-dev --frozen
COPY main.py .
COPY utils/ utils/
ENV PATH="/app/.venv/bin:$PATH"
CMD ["uv", "run", "python", "main.py", "--transport", "sse", "--port", "8000"]
CMD ["uv", "run", "python", "main.py", "--transport", "streamable-http", "--port", "8000"]

View File

@@ -6,24 +6,7 @@ An MCP (Model Context Protocol) server that exposes a `knowledge_search` tool fo
1. A natural-language query is embedded using a Gemini embedding model.
2. The embedding is sent to a Vertex AI Matching Engine index endpoint to find nearest neighbors.
3. Optional filters (restricts) can be applied to search only specific source folders.
4. The matched document contents are fetched from a GCS bucket and returned to the caller.
## Filtering by Source Folder
The `knowledge_search` tool supports filtering results by source folder:
```python
# Search all folders
knowledge_search(query="what is a savings account?")
# Search only in specific folders
knowledge_search(
query="what is a savings account?",
source_folders=["Educacion Financiera", "Productos y Servicios"]
)
```
3. The matched document contents are fetched from a GCS bucket and returned to the caller.
## Prerequisites

View File

@@ -57,20 +57,9 @@ async def async_main() -> None:
model="gemini-2.0-flash",
name="knowledge_agent",
instruction=(
"You are a helpful assistant with access to a knowledge base organized by folders. "
"Use the knowledge_search tool to find relevant information when the user asks questions.\n\n"
"Available folders in the knowledge base:\n"
"- 'Educacion Financiera': Educational content about finance, savings, investments, financial concepts\n"
"- 'Funcionalidades de la App Movil': Mobile app features, functionality, usage instructions\n"
"- 'Productos y Servicios': Bank products and services, accounts, procedures\n\n"
"IMPORTANT: When the user asks about a specific topic, analyze which folders are relevant "
"and use the source_folders parameter to filter results for more precise answers.\n\n"
"Examples:\n"
"- User asks about 'cuenta de ahorros' → Use source_folders=['Educacion Financiera', 'Productos y Servicios']\n"
"- User asks about 'cómo usar la app móvil' → Use source_folders=['Funcionalidades de App Movil']\n"
"- User asks about 'transferencias en la app' → Use source_folders=['Funcionalidades de App Movil', 'Productos y Servicios']\n"
"- User asks general question → Don't use source_folders (search all)\n\n"
"Summarize the results clearly in Spanish."
"You are a helpful assistant with access to a knowledge base. "
"Use the knowledge_search tool to find relevant information "
"when the user asks questions. Summarize the results clearly."
),
tools=[toolset],
)

534
main.py
View File

@@ -1,14 +1,12 @@
# ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP."""
import argparse
import asyncio
import io
import logging
import os
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from dataclasses import dataclass
from enum import Enum
from typing import BinaryIO, TypedDict
import aiohttp
@@ -17,14 +15,21 @@ from gcloud.aio.storage import Storage
from google import genai
from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
logger = logging.getLogger(__name__)
from utils import Settings, _args, cfg, log_structured_entry
HTTP_TOO_MANY_REQUESTS = 429
HTTP_SERVER_ERROR = 500
class SourceNamespace(str, Enum):
"""Allowed values for the 'source' namespace filter."""
EDUCACION_FINANCIERA = "Educacion Financiera"
PRODUCTOS_Y_SERVICIOS = "Productos y Servicios"
FUNCIONALIDADES_APP_MOVIL = "Funcionalidades de la App Movil"
class GoogleCloudFileStorage:
"""Cache-aware helper for downloading files from Google Cloud Storage."""
@@ -74,10 +79,21 @@ class GoogleCloudFileStorage:
"""
if file_name in self._cache:
log_structured_entry(
"File retrieved from cache",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name
return file_stream
log_structured_entry(
"Starting file download from GCS",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
storage_client = self._get_aio_storage()
last_exception: Exception | None = None
@@ -89,14 +105,22 @@ class GoogleCloudFileStorage:
)
file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name
log_structured_entry(
"File downloaded successfully",
"INFO",
{
"file": file_name,
"bucket": self.bucket_name,
"size_bytes": len(self._cache[file_name]),
"attempt": attempt + 1
}
)
except TimeoutError as exc:
last_exception = exc
logger.warning(
"Timeout downloading gs://%s/%s (attempt %d/%d)",
self.bucket_name,
file_name,
attempt + 1,
max_retries,
log_structured_entry(
f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})",
"WARNING",
{"error": str(exc)}
)
except aiohttp.ClientResponseError as exc:
last_exception = exc
@@ -104,27 +128,44 @@ class GoogleCloudFileStorage:
exc.status == HTTP_TOO_MANY_REQUESTS
or exc.status >= HTTP_SERVER_ERROR
):
logger.warning(
"HTTP %d downloading gs://%s/%s (attempt %d/%d)",
exc.status,
self.bucket_name,
file_name,
attempt + 1,
max_retries,
log_structured_entry(
f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})",
"WARNING",
{"status": exc.status, "message": str(exc)}
)
else:
log_structured_entry(
f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}",
"ERROR",
{"status": exc.status, "message": str(exc)}
)
raise
else:
return file_stream
if attempt < max_retries - 1:
delay = 0.5 * (2**attempt)
log_structured_entry(
"Retrying file download",
"INFO",
{"file": file_name, "delay_seconds": delay}
)
await asyncio.sleep(delay)
msg = (
f"Failed to download gs://{self.bucket_name}/{file_name} "
f"after {max_retries} attempts"
)
log_structured_entry(
"File download failed after all retries",
"ERROR",
{
"file": file_name,
"bucket": self.bucket_name,
"max_retries": max_retries,
"last_error": str(last_exception)
}
)
raise TimeoutError(msg) from last_exception
@@ -204,7 +245,7 @@ class GoogleCloudVectorSearch:
deployed_index_id: str,
query: Sequence[float],
limit: int,
restricts: list[dict[str, list[str]]] | None = None,
source: SourceNamespace | None = None,
) -> list[SearchResult]:
"""Run an async similarity search via the REST API.
@@ -212,6 +253,7 @@ class GoogleCloudVectorSearch:
deployed_index_id: The ID of the deployed index.
query: The embedding vector for the search query.
limit: Maximum number of nearest neighbors to return.
source: Optional namespace filter to restrict results by source.
Returns:
A list of matched items with id, distance, and content.
@@ -222,7 +264,13 @@ class GoogleCloudVectorSearch:
"Missing endpoint metadata. Call "
"`configure_index_endpoint` before querying."
)
log_structured_entry(
"Vector search query failed - endpoint not configured",
"ERROR",
{"error": msg}
)
raise RuntimeError(msg)
domain = self._endpoint_domain
endpoint_id = self._endpoint_name.split("/")[-1]
url = (
@@ -230,20 +278,34 @@ class GoogleCloudVectorSearch:
f"/locations/{self.location}"
f"/indexEndpoints/{endpoint_id}:findNeighbors"
)
query_payload = {
"datapoint": {"feature_vector": list(query)},
log_structured_entry(
"Starting vector search query",
"INFO",
{
"deployed_index_id": deployed_index_id,
"neighbor_count": limit,
"endpoint_id": endpoint_id,
"embedding_dimension": len(query)
}
)
# Add restricts if provided
if restricts:
query_payload["restricts"] = restricts
datapoint: dict = {"feature_vector": list(query)}
if source is not None:
datapoint["restricts"] = [
{"namespace": "source", "allow_list": [source.value]},
]
payload = {
"deployed_index_id": deployed_index_id,
"queries": [query_payload],
"queries": [
{
"datapoint": datapoint,
"neighbor_count": limit,
},
],
}
try:
headers = await self._async_get_auth_headers()
session = self._get_aio_session()
async with session.post(
@@ -254,10 +316,37 @@ class GoogleCloudVectorSearch:
if not response.ok:
body = await response.text()
msg = f"findNeighbors returned {response.status}: {body}"
log_structured_entry(
"Vector search API request failed",
"ERROR",
{
"status": response.status,
"response_body": body,
"deployed_index_id": deployed_index_id
}
)
raise RuntimeError(msg)
data = await response.json()
neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", [])
log_structured_entry(
"Vector search API request successful",
"INFO",
{
"neighbors_found": len(neighbors),
"deployed_index_id": deployed_index_id
}
)
if not neighbors:
log_structured_entry(
"No neighbors found in vector search",
"WARNING",
{"deployed_index_id": deployed_index_id}
)
return []
# Fetch content for all neighbors
content_tasks = []
for neighbor in neighbors:
datapoint_id = neighbor["datapoint"]["datapointId"]
@@ -266,6 +355,12 @@ class GoogleCloudVectorSearch:
self.storage.async_get_file_stream(file_path),
)
log_structured_entry(
"Fetching content for search results",
"INFO",
{"file_count": len(content_tasks)}
)
file_streams = await asyncio.gather(*content_tasks)
results: list[SearchResult] = []
for neighbor, stream in zip(
@@ -280,66 +375,35 @@ class GoogleCloudVectorSearch:
content=stream.read().decode("utf-8"),
),
)
log_structured_entry(
"Vector search completed successfully",
"INFO",
{
"results_count": len(results),
"deployed_index_id": deployed_index_id
}
)
return results
except Exception as e:
log_structured_entry(
"Vector search query failed with exception",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"deployed_index_id": deployed_index_id
}
)
raise
# ---------------------------------------------------------------------------
# MCP Server
# ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
@dataclass
class AppContext:
"""Shared resources initialised once at server startup."""
@@ -352,31 +416,229 @@ class AppContext:
@asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Create and configure the vector-search client for the server lifetime."""
log_structured_entry(
"Initializing MCP server",
"INFO",
{
"project_id": cfg.project_id,
"location": cfg.location,
"bucket": cfg.bucket,
"index_name": cfg.index_name,
}
)
try:
# Initialize vector search client
log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO")
vs = GoogleCloudVectorSearch(
project_id=cfg.project_id,
location=cfg.location,
bucket=cfg.bucket,
index_name=cfg.index_name,
)
# Configure endpoint
log_structured_entry(
"Configuring index endpoint",
"INFO",
{
"endpoint_name": cfg.endpoint_name,
"endpoint_domain": cfg.endpoint_domain,
}
)
vs.configure_index_endpoint(
name=cfg.endpoint_name,
public_domain=cfg.endpoint_domain,
)
# Initialize GenAI client
log_structured_entry(
"Creating GenAI client",
"INFO",
{"project_id": cfg.project_id, "location": cfg.location}
)
genai_client = genai.Client(
vertexai=True,
project=cfg.project_id,
location=cfg.location,
)
# Validate credentials and configuration by testing actual resources
# These validations are non-blocking - errors are logged but won't stop startup
log_structured_entry("Starting validation of credentials and resources", "INFO")
validation_errors = []
# 1. Validate GenAI embedding access
log_structured_entry("Validating GenAI embedding access", "INFO")
try:
test_response = await genai_client.aio.models.embed_content(
model=cfg.embedding_model,
contents="test",
config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY",
),
)
if test_response and test_response.embeddings:
embedding_values = test_response.embeddings[0].values
log_structured_entry(
"GenAI embedding validation successful",
"INFO",
{"embedding_dimension": len(embedding_values) if embedding_values else 0}
)
else:
msg = "Embedding validation returned empty response"
log_structured_entry(msg, "WARNING")
validation_errors.append(msg)
except Exception as e:
log_structured_entry(
"Failed to validate GenAI embedding access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__}
)
validation_errors.append(f"GenAI: {str(e)}")
# 2. Validate GCS bucket access
log_structured_entry(
"Validating GCS bucket access",
"INFO",
{"bucket": cfg.bucket}
)
try:
session = vs.storage._get_aio_session()
token_obj = Token(
session=session,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
access_token = await token_obj.get()
headers = {"Authorization": f"Bearer {access_token}"}
async with session.get(
f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
headers=headers,
) as response:
if response.status == 403:
msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
log_structured_entry(
"GCS bucket validation failed - access denied - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status}
)
validation_errors.append(msg)
elif response.status == 404:
msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
log_structured_entry(
"GCS bucket validation failed - not found - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status}
)
validation_errors.append(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access bucket '{cfg.bucket}': {response.status}"
log_structured_entry(
"GCS bucket validation failed - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status, "response": body}
)
validation_errors.append(msg)
else:
log_structured_entry(
"GCS bucket validation successful",
"INFO",
{"bucket": cfg.bucket}
)
except Exception as e:
log_structured_entry(
"Failed to validate GCS bucket access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}
)
validation_errors.append(f"GCS: {str(e)}")
# 3. Validate vector search endpoint access
log_structured_entry(
"Validating vector search endpoint access",
"INFO",
{"endpoint_name": cfg.endpoint_name}
)
try:
# Try to get endpoint info
headers = await vs._async_get_auth_headers()
session = vs._get_aio_session()
endpoint_url = (
f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
)
async with session.get(endpoint_url, headers=headers) as response:
if response.status == 403:
msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions."
log_structured_entry(
"Vector search endpoint validation failed - access denied - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
validation_errors.append(msg)
elif response.status == 404:
msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project."
log_structured_entry(
"Vector search endpoint validation failed - not found - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
validation_errors.append(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}"
log_structured_entry(
"Vector search endpoint validation failed - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status, "response": body}
)
validation_errors.append(msg)
else:
log_structured_entry(
"Vector search endpoint validation successful",
"INFO",
{"endpoint": cfg.endpoint_name}
)
except Exception as e:
log_structured_entry(
"Failed to validate vector search endpoint access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name}
)
validation_errors.append(f"Vector Search: {str(e)}")
# Summary of validations
if validation_errors:
log_structured_entry(
"MCP server started with validation errors - service may not work correctly",
"WARNING",
{"validation_errors": validation_errors, "error_count": len(validation_errors)}
)
else:
log_structured_entry("All validations passed - MCP server initialization complete", "INFO")
yield AppContext(
vector_search=vs,
genai_client=genai_client,
settings=cfg,
)
except Exception as e:
log_structured_entry(
"Failed to initialize MCP server",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
}
)
raise
finally:
log_structured_entry("MCP server lifespan ending", "INFO")
cfg = Settings.model_validate({})
mcp = FastMCP(
"knowledge-search",
@@ -390,16 +652,16 @@ mcp = FastMCP(
async def knowledge_search(
query: str,
ctx: Context,
source_folders: list[str] | None = None,
source: SourceNamespace | None = None,
) -> str:
"""Search a knowledge base using a natural-language query.
Args:
query: The text query to search for.
ctx: MCP request context (injected automatically).
source_folders: Optional list of source folder paths to filter results.
If provided, only documents from these folders will be returned.
Example: ["Educacion Financiera", "Productos y Servicios"]
source: Optional filter to restrict results by source.
Allowed values: 'Educacion Financiera',
'Productos y Servicios', 'Funcionalidades de la App Movil'.
Returns:
A formatted string containing matched documents with id and content.
@@ -411,41 +673,79 @@ async def knowledge_search(
t0 = time.perf_counter()
min_sim = 0.6
log_structured_entry(
"knowledge_search request received",
"INFO",
{"query": query[:100]} # Log first 100 chars of query
)
try:
# Generate embedding for the query
log_structured_entry("Generating query embedding", "INFO")
try:
response = await app.genai_client.aio.models.embed_content(
model=app.settings.embedding_model,
contents=query,
config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY",
),
)
embedding = response.embeddings[0].values
t_embed = time.perf_counter()
log_structured_entry(
"Query embedding generated successfully",
"INFO",
{"time_ms": round((t_embed - t0) * 1000, 1)}
)
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
# Build restricts for source folder filtering if provided
restricts = None
if source_folders:
restricts = [
# Check if it's a rate limit error
if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
log_structured_entry(
"Rate limit exceeded while generating embedding",
"WARNING",
{
"namespace": "source_folder",
"allow": source_folders,
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
]
logger.info(f"Filtering by source_folders: {source_folders}")
)
return "Error: API rate limit exceeded. Please try again later."
else:
logger.info("No filtering - searching all folders")
log_structured_entry(
"Failed to generate query embedding",
"ERROR",
{
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
)
return f"Error generating embedding: {error_msg}"
# Perform vector search
log_structured_entry("Performing vector search", "INFO")
try:
search_results = await app.vector_search.async_run_query(
deployed_index_id=app.settings.deployed_index_id,
query=embedding,
limit=app.settings.search_limit,
restricts=restricts,
source=source,
)
t_search = time.perf_counter()
# Log raw results from Vertex AI before similarity filtering
logger.info(f"Raw results from Vertex AI (before similarity filter): {len(search_results)} chunks")
logger.info(f"Raw chunk IDs: {[s['id'] for s in search_results]}")
except Exception as e:
log_structured_entry(
"Vector search failed",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Error performing vector search: {str(e)}"
# Apply similarity filtering
if search_results:
@@ -457,21 +757,47 @@ async def knowledge_search(
if s["distance"] > cutoff and s["distance"] > min_sim
]
logger.info(
"knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s",
round((t_embed - t0) * 1000, 1),
round((t_search - t_embed) * 1000, 1),
round((t_search - t0) * 1000, 1),
[s["id"] for s in search_results],
log_structured_entry(
"knowledge_search completed successfully",
"INFO",
{
"embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms",
"vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms",
"total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
"source_filter": source.value if source is not None else None,
"results_count": len(search_results),
"chunks": [s["id"] for s in search_results]
}
)
# Format results as XML-like documents
if not search_results:
log_structured_entry(
"No results found for query",
"INFO",
{"query": query[:100]}
)
return "No relevant documents found for your query."
formatted_results = [
f"<document {i} name={result['id']}>\n{result['content']}\n</document {i}>"
for i, result in enumerate(search_results, start=1)
]
return "\n".join(formatted_results)
except Exception as e:
# Catch-all for any unexpected errors
log_structured_entry(
"Unexpected error in knowledge_search",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Unexpected error during search: {str(e)}"
if __name__ == "__main__":
mcp.run(transport=_args.transport)

4
utils/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .config import Settings, _args, cfg
from .logging_setup import log_structured_entry
__all__ = ['Settings', '_args', 'cfg', 'log_structured_entry']

60
utils/config.py Normal file
View File

@@ -0,0 +1,60 @@
import os
import argparse
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse", "streamable-http"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
log_name: str = "va_agent_evaluation_logs"
log_level: str = "INFO"
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
# Singleton instance of Settings
cfg = Settings.model_validate({})

48
utils/logging_setup.py Normal file
View File

@@ -0,0 +1,48 @@
"""
Centralized Cloud Logging setup.
Uses CloudLoggingHandler (background thread) so logging does not add latency
"""
import logging
from typing import Optional, Dict, Literal
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
from .config import cfg
def _setup_logger() -> logging.Logger:
"""Create or return the singleton evaluation logger."""
logger = logging.getLogger(cfg.log_name)
if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers):
return logger
try:
client = google.cloud.logging.Client(project=cfg.project_id)
handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport
logger.addHandler(handler)
logger.setLevel(getattr(logging, cfg.log_level.upper()))
except Exception as e:
# Fallback to console if Cloud Logging is unavailable (local dev)
logging.basicConfig(level=getattr(logging, cfg.log_level.upper()))
logger = logging.getLogger(cfg.log_name)
logger.warning("Cloud Logging setup failed; using console. Error: %s", e)
return logger
_eval_log = _setup_logger()
def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None:
"""
Emit a JSON-structured log row.
Args:
message: Short label for the row (e.g., "Final agent turn").
severity: "INFO" | "WARNING" | "ERROR"
custom_log: A dict with your structured payload.
"""
level = getattr(logging, severity.upper(), logging.INFO)
_eval_log.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}})