Compare commits

1 Commits

Author SHA1 Message Date
72808b1475 Add filter with metadata using restricts 2026-02-24 03:05:50 +00:00
7 changed files with 231 additions and 642 deletions

View File

@@ -8,8 +8,7 @@ COPY pyproject.toml uv.lock ./
RUN uv sync --no-dev --frozen RUN uv sync --no-dev --frozen
COPY main.py . COPY main.py .
COPY utils/ utils/
ENV PATH="/app/.venv/bin:$PATH" ENV PATH="/app/.venv/bin:$PATH"
CMD ["uv", "run", "python", "main.py", "--transport", "streamable-http", "--port", "8000"] CMD ["uv", "run", "python", "main.py", "--transport", "sse", "--port", "8000"]

View File

@@ -6,7 +6,24 @@ An MCP (Model Context Protocol) server that exposes a `knowledge_search` tool fo
1. A natural-language query is embedded using a Gemini embedding model. 1. A natural-language query is embedded using a Gemini embedding model.
2. The embedding is sent to a Vertex AI Matching Engine index endpoint to find nearest neighbors. 2. The embedding is sent to a Vertex AI Matching Engine index endpoint to find nearest neighbors.
3. The matched document contents are fetched from a GCS bucket and returned to the caller. 3. Optional filters (restricts) can be applied to search only specific source folders.
4. The matched document contents are fetched from a GCS bucket and returned to the caller.
## Filtering by Source Folder
The `knowledge_search` tool supports filtering results by source folder:
```python
# Search all folders
knowledge_search(query="what is a savings account?")
# Search only in specific folders
knowledge_search(
query="what is a savings account?",
source_folders=["Educacion Financiera", "Productos y Servicios"]
)
```
## Prerequisites ## Prerequisites

View File

@@ -57,9 +57,20 @@ async def async_main() -> None:
model="gemini-2.0-flash", model="gemini-2.0-flash",
name="knowledge_agent", name="knowledge_agent",
instruction=( instruction=(
"You are a helpful assistant with access to a knowledge base. " "You are a helpful assistant with access to a knowledge base organized by folders. "
"Use the knowledge_search tool to find relevant information " "Use the knowledge_search tool to find relevant information when the user asks questions.\n\n"
"when the user asks questions. Summarize the results clearly." "Available folders in the knowledge base:\n"
"- 'Educacion Financiera': Educational content about finance, savings, investments, financial concepts\n"
"- 'Funcionalidades de la App Movil': Mobile app features, functionality, usage instructions\n"
"- 'Productos y Servicios': Bank products and services, accounts, procedures\n\n"
"IMPORTANT: When the user asks about a specific topic, analyze which folders are relevant "
"and use the source_folders parameter to filter results for more precise answers.\n\n"
"Examples:\n"
"- User asks about 'cuenta de ahorros' → Use source_folders=['Educacion Financiera', 'Productos y Servicios']\n"
"- User asks about 'cómo usar la app móvil' → Use source_folders=['Funcionalidades de App Movil']\n"
"- User asks about 'transferencias en la app' → Use source_folders=['Funcionalidades de App Movil', 'Productos y Servicios']\n"
"- User asks general question → Don't use source_folders (search all)\n\n"
"Summarize the results clearly in Spanish."
), ),
tools=[toolset], tools=[toolset],
) )

534
main.py
View File

@@ -1,12 +1,14 @@
# ruff: noqa: INP001 # ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP.""" """Async helpers for querying Vertex AI vector search via MCP."""
import argparse
import asyncio import asyncio
import io import io
import logging
import os
from collections.abc import AsyncIterator, Sequence from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum
from typing import BinaryIO, TypedDict from typing import BinaryIO, TypedDict
import aiohttp import aiohttp
@@ -15,21 +17,14 @@ from gcloud.aio.storage import Storage
from google import genai from google import genai
from google.genai import types as genai_types from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP from mcp.server.fastmcp import Context, FastMCP
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
from utils import Settings, _args, cfg, log_structured_entry logger = logging.getLogger(__name__)
HTTP_TOO_MANY_REQUESTS = 429 HTTP_TOO_MANY_REQUESTS = 429
HTTP_SERVER_ERROR = 500 HTTP_SERVER_ERROR = 500
class SourceNamespace(str, Enum):
"""Allowed values for the 'source' namespace filter."""
EDUCACION_FINANCIERA = "Educacion Financiera"
PRODUCTOS_Y_SERVICIOS = "Productos y Servicios"
FUNCIONALIDADES_APP_MOVIL = "Funcionalidades de la App Movil"
class GoogleCloudFileStorage: class GoogleCloudFileStorage:
"""Cache-aware helper for downloading files from Google Cloud Storage.""" """Cache-aware helper for downloading files from Google Cloud Storage."""
@@ -79,21 +74,10 @@ class GoogleCloudFileStorage:
""" """
if file_name in self._cache: if file_name in self._cache:
log_structured_entry(
"File retrieved from cache",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
file_stream = io.BytesIO(self._cache[file_name]) file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name file_stream.name = file_name
return file_stream return file_stream
log_structured_entry(
"Starting file download from GCS",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
storage_client = self._get_aio_storage() storage_client = self._get_aio_storage()
last_exception: Exception | None = None last_exception: Exception | None = None
@@ -105,22 +89,14 @@ class GoogleCloudFileStorage:
) )
file_stream = io.BytesIO(self._cache[file_name]) file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name file_stream.name = file_name
log_structured_entry(
"File downloaded successfully",
"INFO",
{
"file": file_name,
"bucket": self.bucket_name,
"size_bytes": len(self._cache[file_name]),
"attempt": attempt + 1
}
)
except TimeoutError as exc: except TimeoutError as exc:
last_exception = exc last_exception = exc
log_structured_entry( logger.warning(
f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", "Timeout downloading gs://%s/%s (attempt %d/%d)",
"WARNING", self.bucket_name,
{"error": str(exc)} file_name,
attempt + 1,
max_retries,
) )
except aiohttp.ClientResponseError as exc: except aiohttp.ClientResponseError as exc:
last_exception = exc last_exception = exc
@@ -128,44 +104,27 @@ class GoogleCloudFileStorage:
exc.status == HTTP_TOO_MANY_REQUESTS exc.status == HTTP_TOO_MANY_REQUESTS
or exc.status >= HTTP_SERVER_ERROR or exc.status >= HTTP_SERVER_ERROR
): ):
log_structured_entry( logger.warning(
f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", "HTTP %d downloading gs://%s/%s (attempt %d/%d)",
"WARNING", exc.status,
{"status": exc.status, "message": str(exc)} self.bucket_name,
file_name,
attempt + 1,
max_retries,
) )
else: else:
log_structured_entry(
f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}",
"ERROR",
{"status": exc.status, "message": str(exc)}
)
raise raise
else: else:
return file_stream return file_stream
if attempt < max_retries - 1: if attempt < max_retries - 1:
delay = 0.5 * (2**attempt) delay = 0.5 * (2**attempt)
log_structured_entry(
"Retrying file download",
"INFO",
{"file": file_name, "delay_seconds": delay}
)
await asyncio.sleep(delay) await asyncio.sleep(delay)
msg = ( msg = (
f"Failed to download gs://{self.bucket_name}/{file_name} " f"Failed to download gs://{self.bucket_name}/{file_name} "
f"after {max_retries} attempts" f"after {max_retries} attempts"
) )
log_structured_entry(
"File download failed after all retries",
"ERROR",
{
"file": file_name,
"bucket": self.bucket_name,
"max_retries": max_retries,
"last_error": str(last_exception)
}
)
raise TimeoutError(msg) from last_exception raise TimeoutError(msg) from last_exception
@@ -245,7 +204,7 @@ class GoogleCloudVectorSearch:
deployed_index_id: str, deployed_index_id: str,
query: Sequence[float], query: Sequence[float],
limit: int, limit: int,
source: SourceNamespace | None = None, restricts: list[dict[str, list[str]]] | None = None,
) -> list[SearchResult]: ) -> list[SearchResult]:
"""Run an async similarity search via the REST API. """Run an async similarity search via the REST API.
@@ -253,7 +212,6 @@ class GoogleCloudVectorSearch:
deployed_index_id: The ID of the deployed index. deployed_index_id: The ID of the deployed index.
query: The embedding vector for the search query. query: The embedding vector for the search query.
limit: Maximum number of nearest neighbors to return. limit: Maximum number of nearest neighbors to return.
source: Optional namespace filter to restrict results by source.
Returns: Returns:
A list of matched items with id, distance, and content. A list of matched items with id, distance, and content.
@@ -264,13 +222,7 @@ class GoogleCloudVectorSearch:
"Missing endpoint metadata. Call " "Missing endpoint metadata. Call "
"`configure_index_endpoint` before querying." "`configure_index_endpoint` before querying."
) )
log_structured_entry(
"Vector search query failed - endpoint not configured",
"ERROR",
{"error": msg}
)
raise RuntimeError(msg) raise RuntimeError(msg)
domain = self._endpoint_domain domain = self._endpoint_domain
endpoint_id = self._endpoint_name.split("/")[-1] endpoint_id = self._endpoint_name.split("/")[-1]
url = ( url = (
@@ -278,34 +230,20 @@ class GoogleCloudVectorSearch:
f"/locations/{self.location}" f"/locations/{self.location}"
f"/indexEndpoints/{endpoint_id}:findNeighbors" f"/indexEndpoints/{endpoint_id}:findNeighbors"
) )
query_payload = {
log_structured_entry( "datapoint": {"feature_vector": list(query)},
"Starting vector search query",
"INFO",
{
"deployed_index_id": deployed_index_id,
"neighbor_count": limit, "neighbor_count": limit,
"endpoint_id": endpoint_id,
"embedding_dimension": len(query)
} }
)
datapoint: dict = {"feature_vector": list(query)} # Add restricts if provided
if source is not None: if restricts:
datapoint["restricts"] = [ query_payload["restricts"] = restricts
{"namespace": "source", "allow_list": [source.value]},
]
payload = { payload = {
"deployed_index_id": deployed_index_id, "deployed_index_id": deployed_index_id,
"queries": [ "queries": [query_payload],
{
"datapoint": datapoint,
"neighbor_count": limit,
},
],
} }
try:
headers = await self._async_get_auth_headers() headers = await self._async_get_auth_headers()
session = self._get_aio_session() session = self._get_aio_session()
async with session.post( async with session.post(
@@ -316,37 +254,10 @@ class GoogleCloudVectorSearch:
if not response.ok: if not response.ok:
body = await response.text() body = await response.text()
msg = f"findNeighbors returned {response.status}: {body}" msg = f"findNeighbors returned {response.status}: {body}"
log_structured_entry(
"Vector search API request failed",
"ERROR",
{
"status": response.status,
"response_body": body,
"deployed_index_id": deployed_index_id
}
)
raise RuntimeError(msg) raise RuntimeError(msg)
data = await response.json() data = await response.json()
neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", [])
log_structured_entry(
"Vector search API request successful",
"INFO",
{
"neighbors_found": len(neighbors),
"deployed_index_id": deployed_index_id
}
)
if not neighbors:
log_structured_entry(
"No neighbors found in vector search",
"WARNING",
{"deployed_index_id": deployed_index_id}
)
return []
# Fetch content for all neighbors
content_tasks = [] content_tasks = []
for neighbor in neighbors: for neighbor in neighbors:
datapoint_id = neighbor["datapoint"]["datapointId"] datapoint_id = neighbor["datapoint"]["datapointId"]
@@ -355,12 +266,6 @@ class GoogleCloudVectorSearch:
self.storage.async_get_file_stream(file_path), self.storage.async_get_file_stream(file_path),
) )
log_structured_entry(
"Fetching content for search results",
"INFO",
{"file_count": len(content_tasks)}
)
file_streams = await asyncio.gather(*content_tasks) file_streams = await asyncio.gather(*content_tasks)
results: list[SearchResult] = [] results: list[SearchResult] = []
for neighbor, stream in zip( for neighbor, stream in zip(
@@ -375,35 +280,66 @@ class GoogleCloudVectorSearch:
content=stream.read().decode("utf-8"), content=stream.read().decode("utf-8"),
), ),
) )
log_structured_entry(
"Vector search completed successfully",
"INFO",
{
"results_count": len(results),
"deployed_index_id": deployed_index_id
}
)
return results return results
except Exception as e:
log_structured_entry(
"Vector search query failed with exception",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"deployed_index_id": deployed_index_id
}
)
raise
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# MCP Server # MCP Server
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
@dataclass @dataclass
class AppContext: class AppContext:
"""Shared resources initialised once at server startup.""" """Shared resources initialised once at server startup."""
@@ -416,229 +352,31 @@ class AppContext:
@asynccontextmanager @asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Create and configure the vector-search client for the server lifetime.""" """Create and configure the vector-search client for the server lifetime."""
log_structured_entry(
"Initializing MCP server",
"INFO",
{
"project_id": cfg.project_id,
"location": cfg.location,
"bucket": cfg.bucket,
"index_name": cfg.index_name,
}
)
try:
# Initialize vector search client
log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO")
vs = GoogleCloudVectorSearch( vs = GoogleCloudVectorSearch(
project_id=cfg.project_id, project_id=cfg.project_id,
location=cfg.location, location=cfg.location,
bucket=cfg.bucket, bucket=cfg.bucket,
index_name=cfg.index_name, index_name=cfg.index_name,
) )
# Configure endpoint
log_structured_entry(
"Configuring index endpoint",
"INFO",
{
"endpoint_name": cfg.endpoint_name,
"endpoint_domain": cfg.endpoint_domain,
}
)
vs.configure_index_endpoint( vs.configure_index_endpoint(
name=cfg.endpoint_name, name=cfg.endpoint_name,
public_domain=cfg.endpoint_domain, public_domain=cfg.endpoint_domain,
) )
# Initialize GenAI client
log_structured_entry(
"Creating GenAI client",
"INFO",
{"project_id": cfg.project_id, "location": cfg.location}
)
genai_client = genai.Client( genai_client = genai.Client(
vertexai=True, vertexai=True,
project=cfg.project_id, project=cfg.project_id,
location=cfg.location, location=cfg.location,
) )
# Validate credentials and configuration by testing actual resources
# These validations are non-blocking - errors are logged but won't stop startup
log_structured_entry("Starting validation of credentials and resources", "INFO")
validation_errors = []
# 1. Validate GenAI embedding access
log_structured_entry("Validating GenAI embedding access", "INFO")
try:
test_response = await genai_client.aio.models.embed_content(
model=cfg.embedding_model,
contents="test",
config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY",
),
)
if test_response and test_response.embeddings:
embedding_values = test_response.embeddings[0].values
log_structured_entry(
"GenAI embedding validation successful",
"INFO",
{"embedding_dimension": len(embedding_values) if embedding_values else 0}
)
else:
msg = "Embedding validation returned empty response"
log_structured_entry(msg, "WARNING")
validation_errors.append(msg)
except Exception as e:
log_structured_entry(
"Failed to validate GenAI embedding access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__}
)
validation_errors.append(f"GenAI: {str(e)}")
# 2. Validate GCS bucket access
log_structured_entry(
"Validating GCS bucket access",
"INFO",
{"bucket": cfg.bucket}
)
try:
session = vs.storage._get_aio_session()
token_obj = Token(
session=session,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
access_token = await token_obj.get()
headers = {"Authorization": f"Bearer {access_token}"}
async with session.get(
f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
headers=headers,
) as response:
if response.status == 403:
msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
log_structured_entry(
"GCS bucket validation failed - access denied - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status}
)
validation_errors.append(msg)
elif response.status == 404:
msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
log_structured_entry(
"GCS bucket validation failed - not found - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status}
)
validation_errors.append(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access bucket '{cfg.bucket}': {response.status}"
log_structured_entry(
"GCS bucket validation failed - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status, "response": body}
)
validation_errors.append(msg)
else:
log_structured_entry(
"GCS bucket validation successful",
"INFO",
{"bucket": cfg.bucket}
)
except Exception as e:
log_structured_entry(
"Failed to validate GCS bucket access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}
)
validation_errors.append(f"GCS: {str(e)}")
# 3. Validate vector search endpoint access
log_structured_entry(
"Validating vector search endpoint access",
"INFO",
{"endpoint_name": cfg.endpoint_name}
)
try:
# Try to get endpoint info
headers = await vs._async_get_auth_headers()
session = vs._get_aio_session()
endpoint_url = (
f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
)
async with session.get(endpoint_url, headers=headers) as response:
if response.status == 403:
msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions."
log_structured_entry(
"Vector search endpoint validation failed - access denied - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
validation_errors.append(msg)
elif response.status == 404:
msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project."
log_structured_entry(
"Vector search endpoint validation failed - not found - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
validation_errors.append(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}"
log_structured_entry(
"Vector search endpoint validation failed - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status, "response": body}
)
validation_errors.append(msg)
else:
log_structured_entry(
"Vector search endpoint validation successful",
"INFO",
{"endpoint": cfg.endpoint_name}
)
except Exception as e:
log_structured_entry(
"Failed to validate vector search endpoint access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name}
)
validation_errors.append(f"Vector Search: {str(e)}")
# Summary of validations
if validation_errors:
log_structured_entry(
"MCP server started with validation errors - service may not work correctly",
"WARNING",
{"validation_errors": validation_errors, "error_count": len(validation_errors)}
)
else:
log_structured_entry("All validations passed - MCP server initialization complete", "INFO")
yield AppContext( yield AppContext(
vector_search=vs, vector_search=vs,
genai_client=genai_client, genai_client=genai_client,
settings=cfg, settings=cfg,
) )
except Exception as e:
log_structured_entry(
"Failed to initialize MCP server",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
}
)
raise
finally:
log_structured_entry("MCP server lifespan ending", "INFO")
cfg = Settings.model_validate({})
mcp = FastMCP( mcp = FastMCP(
"knowledge-search", "knowledge-search",
@@ -652,16 +390,16 @@ mcp = FastMCP(
async def knowledge_search( async def knowledge_search(
query: str, query: str,
ctx: Context, ctx: Context,
source: SourceNamespace | None = None, source_folders: list[str] | None = None,
) -> str: ) -> str:
"""Search a knowledge base using a natural-language query. """Search a knowledge base using a natural-language query.
Args: Args:
query: The text query to search for. query: The text query to search for.
ctx: MCP request context (injected automatically). ctx: MCP request context (injected automatically).
source: Optional filter to restrict results by source. source_folders: Optional list of source folder paths to filter results.
Allowed values: 'Educacion Financiera', If provided, only documents from these folders will be returned.
'Productos y Servicios', 'Funcionalidades de la App Movil'. Example: ["Educacion Financiera", "Productos y Servicios"]
Returns: Returns:
A formatted string containing matched documents with id and content. A formatted string containing matched documents with id and content.
@@ -673,79 +411,41 @@ async def knowledge_search(
t0 = time.perf_counter() t0 = time.perf_counter()
min_sim = 0.6 min_sim = 0.6
log_structured_entry(
"knowledge_search request received",
"INFO",
{"query": query[:100]} # Log first 100 chars of query
)
try:
# Generate embedding for the query
log_structured_entry("Generating query embedding", "INFO")
try:
response = await app.genai_client.aio.models.embed_content( response = await app.genai_client.aio.models.embed_content(
model=app.settings.embedding_model, model=app.settings.embedding_model,
contents=query, contents=query,
config=genai_types.EmbedContentConfig( config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY", task_type="RETRIEVAL_QUERY",
), ),
) )
embedding = response.embeddings[0].values embedding = response.embeddings[0].values
t_embed = time.perf_counter() t_embed = time.perf_counter()
log_structured_entry(
"Query embedding generated successfully",
"INFO",
{"time_ms": round((t_embed - t0) * 1000, 1)}
)
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
# Check if it's a rate limit error # Build restricts for source folder filtering if provided
if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg: restricts = None
log_structured_entry( if source_folders:
"Rate limit exceeded while generating embedding", restricts = [
"WARNING",
{ {
"error": error_msg, "namespace": "source_folder",
"error_type": error_type, "allow": source_folders,
"query": query[:100]
} }
) ]
return "Error: API rate limit exceeded. Please try again later." logger.info(f"Filtering by source_folders: {source_folders}")
else: else:
log_structured_entry( logger.info("No filtering - searching all folders")
"Failed to generate query embedding",
"ERROR",
{
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
)
return f"Error generating embedding: {error_msg}"
# Perform vector search
log_structured_entry("Performing vector search", "INFO")
try:
search_results = await app.vector_search.async_run_query( search_results = await app.vector_search.async_run_query(
deployed_index_id=app.settings.deployed_index_id, deployed_index_id=app.settings.deployed_index_id,
query=embedding, query=embedding,
limit=app.settings.search_limit, limit=app.settings.search_limit,
source=source, restricts=restricts,
) )
t_search = time.perf_counter() t_search = time.perf_counter()
except Exception as e:
log_structured_entry( # Log raw results from Vertex AI before similarity filtering
"Vector search failed", logger.info(f"Raw results from Vertex AI (before similarity filter): {len(search_results)} chunks")
"ERROR", logger.info(f"Raw chunk IDs: {[s['id'] for s in search_results]}")
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Error performing vector search: {str(e)}"
# Apply similarity filtering # Apply similarity filtering
if search_results: if search_results:
@@ -757,47 +457,21 @@ async def knowledge_search(
if s["distance"] > cutoff and s["distance"] > min_sim if s["distance"] > cutoff and s["distance"] > min_sim
] ]
log_structured_entry( logger.info(
"knowledge_search completed successfully", "knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s",
"INFO", round((t_embed - t0) * 1000, 1),
{ round((t_search - t_embed) * 1000, 1),
"embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms", round((t_search - t0) * 1000, 1),
"vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms", [s["id"] for s in search_results],
"total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
"source_filter": source.value if source is not None else None,
"results_count": len(search_results),
"chunks": [s["id"] for s in search_results]
}
) )
# Format results as XML-like documents # Format results as XML-like documents
if not search_results:
log_structured_entry(
"No results found for query",
"INFO",
{"query": query[:100]}
)
return "No relevant documents found for your query."
formatted_results = [ formatted_results = [
f"<document {i} name={result['id']}>\n{result['content']}\n</document {i}>" f"<document {i} name={result['id']}>\n{result['content']}\n</document {i}>"
for i, result in enumerate(search_results, start=1) for i, result in enumerate(search_results, start=1)
] ]
return "\n".join(formatted_results) return "\n".join(formatted_results)
except Exception as e:
# Catch-all for any unexpected errors
log_structured_entry(
"Unexpected error in knowledge_search",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Unexpected error during search: {str(e)}"
if __name__ == "__main__": if __name__ == "__main__":
mcp.run(transport=_args.transport) mcp.run(transport=_args.transport)

View File

@@ -1,4 +0,0 @@
from .config import Settings, _args, cfg
from .logging_setup import log_structured_entry
__all__ = ['Settings', '_args', 'cfg', 'log_structured_entry']

View File

@@ -1,60 +0,0 @@
import os
import argparse
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse", "streamable-http"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
log_name: str = "va_agent_evaluation_logs"
log_level: str = "INFO"
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
# Singleton instance of Settings
cfg = Settings.model_validate({})

View File

@@ -1,48 +0,0 @@
"""
Centralized Cloud Logging setup.
Uses CloudLoggingHandler (background thread) so logging does not add latency
"""
import logging
from typing import Optional, Dict, Literal
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
from .config import cfg
def _setup_logger() -> logging.Logger:
"""Create or return the singleton evaluation logger."""
logger = logging.getLogger(cfg.log_name)
if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers):
return logger
try:
client = google.cloud.logging.Client(project=cfg.project_id)
handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport
logger.addHandler(handler)
logger.setLevel(getattr(logging, cfg.log_level.upper()))
except Exception as e:
# Fallback to console if Cloud Logging is unavailable (local dev)
logging.basicConfig(level=getattr(logging, cfg.log_level.upper()))
logger = logging.getLogger(cfg.log_name)
logger.warning("Cloud Logging setup failed; using console. Error: %s", e)
return logger
_eval_log = _setup_logger()
def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None:
"""
Emit a JSON-structured log row.
Args:
message: Short label for the row (e.g., "Final agent turn").
severity: "INFO" | "WARNING" | "ERROR"
custom_log: A dict with your structured payload.
"""
level = getattr(logging, severity.upper(), logging.INFO)
_eval_log.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}})