Compare commits

1 Commits

Author SHA1 Message Date
72808b1475 Add filter with metadata using restricts 2026-02-24 03:05:50 +00:00
6 changed files with 226 additions and 614 deletions

View File

@@ -6,7 +6,24 @@ An MCP (Model Context Protocol) server that exposes a `knowledge_search` tool fo
1. A natural-language query is embedded using a Gemini embedding model. 1. A natural-language query is embedded using a Gemini embedding model.
2. The embedding is sent to a Vertex AI Matching Engine index endpoint to find nearest neighbors. 2. The embedding is sent to a Vertex AI Matching Engine index endpoint to find nearest neighbors.
3. The matched document contents are fetched from a GCS bucket and returned to the caller. 3. Optional filters (restricts) can be applied to search only specific source folders.
4. The matched document contents are fetched from a GCS bucket and returned to the caller.
## Filtering by Source Folder
The `knowledge_search` tool supports filtering results by source folder:
```python
# Search all folders
knowledge_search(query="what is a savings account?")
# Search only in specific folders
knowledge_search(
query="what is a savings account?",
source_folders=["Educacion Financiera", "Productos y Servicios"]
)
```
## Prerequisites ## Prerequisites

View File

@@ -57,9 +57,20 @@ async def async_main() -> None:
model="gemini-2.0-flash", model="gemini-2.0-flash",
name="knowledge_agent", name="knowledge_agent",
instruction=( instruction=(
"You are a helpful assistant with access to a knowledge base. " "You are a helpful assistant with access to a knowledge base organized by folders. "
"Use the knowledge_search tool to find relevant information " "Use the knowledge_search tool to find relevant information when the user asks questions.\n\n"
"when the user asks questions. Summarize the results clearly." "Available folders in the knowledge base:\n"
"- 'Educacion Financiera': Educational content about finance, savings, investments, financial concepts\n"
"- 'Funcionalidades de la App Movil': Mobile app features, functionality, usage instructions\n"
"- 'Productos y Servicios': Bank products and services, accounts, procedures\n\n"
"IMPORTANT: When the user asks about a specific topic, analyze which folders are relevant "
"and use the source_folders parameter to filter results for more precise answers.\n\n"
"Examples:\n"
"- User asks about 'cuenta de ahorros' → Use source_folders=['Educacion Financiera', 'Productos y Servicios']\n"
"- User asks about 'cómo usar la app móvil' → Use source_folders=['Funcionalidades de App Movil']\n"
"- User asks about 'transferencias en la app' → Use source_folders=['Funcionalidades de App Movil', 'Productos y Servicios']\n"
"- User asks general question → Don't use source_folders (search all)\n\n"
"Summarize the results clearly in Spanish."
), ),
tools=[toolset], tools=[toolset],
) )

692
main.py
View File

@@ -1,8 +1,11 @@
# ruff: noqa: INP001 # ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP.""" """Async helpers for querying Vertex AI vector search via MCP."""
import argparse
import asyncio import asyncio
import io import io
import logging
import os
from collections.abc import AsyncIterator, Sequence from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
@@ -14,8 +17,9 @@ from gcloud.aio.storage import Storage
from google import genai from google import genai
from google.genai import types as genai_types from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP from mcp.server.fastmcp import Context, FastMCP
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
from utils import Settings, _args, cfg, log_structured_entry logger = logging.getLogger(__name__)
HTTP_TOO_MANY_REQUESTS = 429 HTTP_TOO_MANY_REQUESTS = 429
HTTP_SERVER_ERROR = 500 HTTP_SERVER_ERROR = 500
@@ -70,21 +74,10 @@ class GoogleCloudFileStorage:
""" """
if file_name in self._cache: if file_name in self._cache:
log_structured_entry(
"File retrieved from cache",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
file_stream = io.BytesIO(self._cache[file_name]) file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name file_stream.name = file_name
return file_stream return file_stream
log_structured_entry(
"Starting file download from GCS",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
storage_client = self._get_aio_storage() storage_client = self._get_aio_storage()
last_exception: Exception | None = None last_exception: Exception | None = None
@@ -96,22 +89,14 @@ class GoogleCloudFileStorage:
) )
file_stream = io.BytesIO(self._cache[file_name]) file_stream = io.BytesIO(self._cache[file_name])
file_stream.name = file_name file_stream.name = file_name
log_structured_entry(
"File downloaded successfully",
"INFO",
{
"file": file_name,
"bucket": self.bucket_name,
"size_bytes": len(self._cache[file_name]),
"attempt": attempt + 1
}
)
except TimeoutError as exc: except TimeoutError as exc:
last_exception = exc last_exception = exc
log_structured_entry( logger.warning(
f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", "Timeout downloading gs://%s/%s (attempt %d/%d)",
"WARNING", self.bucket_name,
{"error": str(exc)} file_name,
attempt + 1,
max_retries,
) )
except aiohttp.ClientResponseError as exc: except aiohttp.ClientResponseError as exc:
last_exception = exc last_exception = exc
@@ -119,44 +104,27 @@ class GoogleCloudFileStorage:
exc.status == HTTP_TOO_MANY_REQUESTS exc.status == HTTP_TOO_MANY_REQUESTS
or exc.status >= HTTP_SERVER_ERROR or exc.status >= HTTP_SERVER_ERROR
): ):
log_structured_entry( logger.warning(
f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", "HTTP %d downloading gs://%s/%s (attempt %d/%d)",
"WARNING", exc.status,
{"status": exc.status, "message": str(exc)} self.bucket_name,
file_name,
attempt + 1,
max_retries,
) )
else: else:
log_structured_entry(
f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}",
"ERROR",
{"status": exc.status, "message": str(exc)}
)
raise raise
else: else:
return file_stream return file_stream
if attempt < max_retries - 1: if attempt < max_retries - 1:
delay = 0.5 * (2**attempt) delay = 0.5 * (2**attempt)
log_structured_entry(
"Retrying file download",
"INFO",
{"file": file_name, "delay_seconds": delay}
)
await asyncio.sleep(delay) await asyncio.sleep(delay)
msg = ( msg = (
f"Failed to download gs://{self.bucket_name}/{file_name} " f"Failed to download gs://{self.bucket_name}/{file_name} "
f"after {max_retries} attempts" f"after {max_retries} attempts"
) )
log_structured_entry(
"File download failed after all retries",
"ERROR",
{
"file": file_name,
"bucket": self.bucket_name,
"max_retries": max_retries,
"last_error": str(last_exception)
}
)
raise TimeoutError(msg) from last_exception raise TimeoutError(msg) from last_exception
@@ -236,6 +204,7 @@ class GoogleCloudVectorSearch:
deployed_index_id: str, deployed_index_id: str,
query: Sequence[float], query: Sequence[float],
limit: int, limit: int,
restricts: list[dict[str, list[str]]] | None = None,
) -> list[SearchResult]: ) -> list[SearchResult]:
"""Run an async similarity search via the REST API. """Run an async similarity search via the REST API.
@@ -253,13 +222,7 @@ class GoogleCloudVectorSearch:
"Missing endpoint metadata. Call " "Missing endpoint metadata. Call "
"`configure_index_endpoint` before querying." "`configure_index_endpoint` before querying."
) )
log_structured_entry(
"Vector search query failed - endpoint not configured",
"ERROR",
{"error": msg}
)
raise RuntimeError(msg) raise RuntimeError(msg)
domain = self._endpoint_domain domain = self._endpoint_domain
endpoint_id = self._endpoint_name.split("/")[-1] endpoint_id = self._endpoint_name.split("/")[-1]
url = ( url = (
@@ -267,120 +230,57 @@ class GoogleCloudVectorSearch:
f"/locations/{self.location}" f"/locations/{self.location}"
f"/indexEndpoints/{endpoint_id}:findNeighbors" f"/indexEndpoints/{endpoint_id}:findNeighbors"
) )
query_payload = {
"datapoint": {"feature_vector": list(query)},
"neighbor_count": limit,
}
log_structured_entry( # Add restricts if provided
"Starting vector search query", if restricts:
"INFO", query_payload["restricts"] = restricts
{
"deployed_index_id": deployed_index_id,
"neighbor_count": limit,
"endpoint_id": endpoint_id,
"embedding_dimension": len(query)
}
)
payload = { payload = {
"deployed_index_id": deployed_index_id, "deployed_index_id": deployed_index_id,
"queries": [ "queries": [query_payload],
{
"datapoint": {"feature_vector": list(query)},
"neighbor_count": limit,
},
],
} }
try: headers = await self._async_get_auth_headers()
headers = await self._async_get_auth_headers() session = self._get_aio_session()
session = self._get_aio_session() async with session.post(
async with session.post( url,
url, json=payload,
json=payload, headers=headers,
headers=headers, ) as response:
) as response: if not response.ok:
if not response.ok: body = await response.text()
body = await response.text() msg = f"findNeighbors returned {response.status}: {body}"
msg = f"findNeighbors returned {response.status}: {body}" raise RuntimeError(msg)
log_structured_entry( data = await response.json()
"Vector search API request failed",
"ERROR",
{
"status": response.status,
"response_body": body,
"deployed_index_id": deployed_index_id
}
)
raise RuntimeError(msg)
data = await response.json()
neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", [])
log_structured_entry( content_tasks = []
"Vector search API request successful", for neighbor in neighbors:
"INFO", datapoint_id = neighbor["datapoint"]["datapointId"]
{ file_path = f"{self.index_name}/contents/{datapoint_id}.md"
"neighbors_found": len(neighbors), content_tasks.append(
"deployed_index_id": deployed_index_id self.storage.async_get_file_stream(file_path),
}
) )
if not neighbors: file_streams = await asyncio.gather(*content_tasks)
log_structured_entry( results: list[SearchResult] = []
"No neighbors found in vector search", for neighbor, stream in zip(
"WARNING", neighbors,
{"deployed_index_id": deployed_index_id} file_streams,
) strict=True,
return [] ):
results.append(
# Fetch content for all neighbors SearchResult(
content_tasks = [] id=neighbor["datapoint"]["datapointId"],
for neighbor in neighbors: distance=neighbor["distance"],
datapoint_id = neighbor["datapoint"]["datapointId"] content=stream.read().decode("utf-8"),
file_path = f"{self.index_name}/contents/{datapoint_id}.md" ),
content_tasks.append(
self.storage.async_get_file_stream(file_path),
)
log_structured_entry(
"Fetching content for search results",
"INFO",
{"file_count": len(content_tasks)}
) )
return results
file_streams = await asyncio.gather(*content_tasks)
results: list[SearchResult] = []
for neighbor, stream in zip(
neighbors,
file_streams,
strict=True,
):
results.append(
SearchResult(
id=neighbor["datapoint"]["datapointId"],
distance=neighbor["distance"],
content=stream.read().decode("utf-8"),
),
)
log_structured_entry(
"Vector search completed successfully",
"INFO",
{
"results_count": len(results),
"deployed_index_id": deployed_index_id
}
)
return results
except Exception as e:
log_structured_entry(
"Vector search query failed with exception",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"deployed_index_id": deployed_index_id
}
)
raise
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -388,6 +288,58 @@ class GoogleCloudVectorSearch:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
@dataclass @dataclass
class AppContext: class AppContext:
"""Shared resources initialised once at server startup.""" """Shared resources initialised once at server startup."""
@@ -400,229 +352,31 @@ class AppContext:
@asynccontextmanager @asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Create and configure the vector-search client for the server lifetime.""" """Create and configure the vector-search client for the server lifetime."""
log_structured_entry( vs = GoogleCloudVectorSearch(
"Initializing MCP server", project_id=cfg.project_id,
"INFO", location=cfg.location,
{ bucket=cfg.bucket,
"project_id": cfg.project_id, index_name=cfg.index_name,
"location": cfg.location, )
"bucket": cfg.bucket, vs.configure_index_endpoint(
"index_name": cfg.index_name, name=cfg.endpoint_name,
} public_domain=cfg.endpoint_domain,
) )
try: genai_client = genai.Client(
# Initialize vector search client vertexai=True,
log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO") project=cfg.project_id,
vs = GoogleCloudVectorSearch( location=cfg.location,
project_id=cfg.project_id, )
location=cfg.location,
bucket=cfg.bucket,
index_name=cfg.index_name,
)
# Configure endpoint yield AppContext(
log_structured_entry( vector_search=vs,
"Configuring index endpoint", genai_client=genai_client,
"INFO", settings=cfg,
{ )
"endpoint_name": cfg.endpoint_name,
"endpoint_domain": cfg.endpoint_domain,
}
)
vs.configure_index_endpoint(
name=cfg.endpoint_name,
public_domain=cfg.endpoint_domain,
)
# Initialize GenAI client
log_structured_entry(
"Creating GenAI client",
"INFO",
{"project_id": cfg.project_id, "location": cfg.location}
)
genai_client = genai.Client(
vertexai=True,
project=cfg.project_id,
location=cfg.location,
)
# Validate credentials and configuration by testing actual resources
# These validations are non-blocking - errors are logged but won't stop startup
log_structured_entry("Starting validation of credentials and resources", "INFO")
validation_errors = []
# 1. Validate GenAI embedding access
log_structured_entry("Validating GenAI embedding access", "INFO")
try:
test_response = await genai_client.aio.models.embed_content(
model=cfg.embedding_model,
contents="test",
config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY",
),
)
if test_response and test_response.embeddings:
embedding_values = test_response.embeddings[0].values
log_structured_entry(
"GenAI embedding validation successful",
"INFO",
{"embedding_dimension": len(embedding_values) if embedding_values else 0}
)
else:
msg = "Embedding validation returned empty response"
log_structured_entry(msg, "WARNING")
validation_errors.append(msg)
except Exception as e:
log_structured_entry(
"Failed to validate GenAI embedding access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__}
)
validation_errors.append(f"GenAI: {str(e)}")
# 2. Validate GCS bucket access
log_structured_entry(
"Validating GCS bucket access",
"INFO",
{"bucket": cfg.bucket}
)
try:
session = vs.storage._get_aio_session()
token_obj = Token(
session=session,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
access_token = await token_obj.get()
headers = {"Authorization": f"Bearer {access_token}"}
async with session.get(
f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
headers=headers,
) as response:
if response.status == 403:
msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
log_structured_entry(
"GCS bucket validation failed - access denied - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status}
)
validation_errors.append(msg)
elif response.status == 404:
msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
log_structured_entry(
"GCS bucket validation failed - not found - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status}
)
validation_errors.append(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access bucket '{cfg.bucket}': {response.status}"
log_structured_entry(
"GCS bucket validation failed - service may not work correctly",
"WARNING",
{"bucket": cfg.bucket, "status": response.status, "response": body}
)
validation_errors.append(msg)
else:
log_structured_entry(
"GCS bucket validation successful",
"INFO",
{"bucket": cfg.bucket}
)
except Exception as e:
log_structured_entry(
"Failed to validate GCS bucket access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}
)
validation_errors.append(f"GCS: {str(e)}")
# 3. Validate vector search endpoint access
log_structured_entry(
"Validating vector search endpoint access",
"INFO",
{"endpoint_name": cfg.endpoint_name}
)
try:
# Try to get endpoint info
headers = await vs._async_get_auth_headers()
session = vs._get_aio_session()
endpoint_url = (
f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
)
async with session.get(endpoint_url, headers=headers) as response:
if response.status == 403:
msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions."
log_structured_entry(
"Vector search endpoint validation failed - access denied - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
validation_errors.append(msg)
elif response.status == 404:
msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project."
log_structured_entry(
"Vector search endpoint validation failed - not found - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
validation_errors.append(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}"
log_structured_entry(
"Vector search endpoint validation failed - service may not work correctly",
"WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status, "response": body}
)
validation_errors.append(msg)
else:
log_structured_entry(
"Vector search endpoint validation successful",
"INFO",
{"endpoint": cfg.endpoint_name}
)
except Exception as e:
log_structured_entry(
"Failed to validate vector search endpoint access - service may not work correctly",
"WARNING",
{"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name}
)
validation_errors.append(f"Vector Search: {str(e)}")
# Summary of validations
if validation_errors:
log_structured_entry(
"MCP server started with validation errors - service may not work correctly",
"WARNING",
{"validation_errors": validation_errors, "error_count": len(validation_errors)}
)
else:
log_structured_entry("All validations passed - MCP server initialization complete", "INFO")
yield AppContext(
vector_search=vs,
genai_client=genai_client,
settings=cfg,
)
except Exception as e:
log_structured_entry(
"Failed to initialize MCP server",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
}
)
raise
finally:
log_structured_entry("MCP server lifespan ending", "INFO")
cfg = Settings.model_validate({})
mcp = FastMCP( mcp = FastMCP(
"knowledge-search", "knowledge-search",
@@ -636,12 +390,16 @@ mcp = FastMCP(
async def knowledge_search( async def knowledge_search(
query: str, query: str,
ctx: Context, ctx: Context,
source_folders: list[str] | None = None,
) -> str: ) -> str:
"""Search a knowledge base using a natural-language query. """Search a knowledge base using a natural-language query.
Args: Args:
query: The text query to search for. query: The text query to search for.
ctx: MCP request context (injected automatically). ctx: MCP request context (injected automatically).
source_folders: Optional list of source folder paths to filter results.
If provided, only documents from these folders will be returned.
Example: ["Educacion Financiera", "Productos y Servicios"]
Returns: Returns:
A formatted string containing matched documents with id and content. A formatted string containing matched documents with id and content.
@@ -653,128 +411,66 @@ async def knowledge_search(
t0 = time.perf_counter() t0 = time.perf_counter()
min_sim = 0.6 min_sim = 0.6
log_structured_entry( response = await app.genai_client.aio.models.embed_content(
"knowledge_search request received", model=app.settings.embedding_model,
"INFO", contents=query,
{"query": query[:100]} # Log first 100 chars of query config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY",
),
)
embedding = response.embeddings[0].values
t_embed = time.perf_counter()
# Build restricts for source folder filtering if provided
restricts = None
if source_folders:
restricts = [
{
"namespace": "source_folder",
"allow": source_folders,
}
]
logger.info(f"Filtering by source_folders: {source_folders}")
else:
logger.info("No filtering - searching all folders")
search_results = await app.vector_search.async_run_query(
deployed_index_id=app.settings.deployed_index_id,
query=embedding,
limit=app.settings.search_limit,
restricts=restricts,
)
t_search = time.perf_counter()
# Log raw results from Vertex AI before similarity filtering
logger.info(f"Raw results from Vertex AI (before similarity filter): {len(search_results)} chunks")
logger.info(f"Raw chunk IDs: {[s['id'] for s in search_results]}")
# Apply similarity filtering
if search_results:
max_sim = max(r["distance"] for r in search_results)
cutoff = max_sim * 0.9
search_results = [
s
for s in search_results
if s["distance"] > cutoff and s["distance"] > min_sim
]
logger.info(
"knowledge_search timing: embedding=%sms, vector_search=%sms, total=%sms, chunks=%s",
round((t_embed - t0) * 1000, 1),
round((t_search - t_embed) * 1000, 1),
round((t_search - t0) * 1000, 1),
[s["id"] for s in search_results],
) )
try: # Format results as XML-like documents
# Generate embedding for the query formatted_results = [
log_structured_entry("Generating query embedding", "INFO") f"<document {i} name={result['id']}>\n{result['content']}\n</document {i}>"
try: for i, result in enumerate(search_results, start=1)
response = await app.genai_client.aio.models.embed_content( ]
model=app.settings.embedding_model, return "\n".join(formatted_results)
contents=query,
config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY",
),
)
embedding = response.embeddings[0].values
t_embed = time.perf_counter()
log_structured_entry(
"Query embedding generated successfully",
"INFO",
{"time_ms": round((t_embed - t0) * 1000, 1)}
)
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
# Check if it's a rate limit error
if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
log_structured_entry(
"Rate limit exceeded while generating embedding",
"WARNING",
{
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
)
return "Error: API rate limit exceeded. Please try again later."
else:
log_structured_entry(
"Failed to generate query embedding",
"ERROR",
{
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
)
return f"Error generating embedding: {error_msg}"
# Perform vector search
log_structured_entry("Performing vector search", "INFO")
try:
search_results = await app.vector_search.async_run_query(
deployed_index_id=app.settings.deployed_index_id,
query=embedding,
limit=app.settings.search_limit,
)
t_search = time.perf_counter()
except Exception as e:
log_structured_entry(
"Vector search failed",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Error performing vector search: {str(e)}"
# Apply similarity filtering
if search_results:
max_sim = max(r["distance"] for r in search_results)
cutoff = max_sim * 0.9
search_results = [
s
for s in search_results
if s["distance"] > cutoff and s["distance"] > min_sim
]
log_structured_entry(
"knowledge_search completed successfully",
"INFO",
{
"embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms",
"vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms",
"total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
"results_count": len(search_results),
"chunks": [s["id"] for s in search_results]
}
)
# Format results as XML-like documents
if not search_results:
log_structured_entry(
"No results found for query",
"INFO",
{"query": query[:100]}
)
return "No relevant documents found for your query."
formatted_results = [
f"<document {i} name={result['id']}>\n{result['content']}\n</document {i}>"
for i, result in enumerate(search_results, start=1)
]
return "\n".join(formatted_results)
except Exception as e:
# Catch-all for any unexpected errors
log_structured_entry(
"Unexpected error in knowledge_search",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Unexpected error during search: {str(e)}"
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,4 +0,0 @@
from .config import Settings, _args, cfg
from .logging_setup import log_structured_entry
__all__ = ['Settings', '_args', 'cfg', 'log_structured_entry']

View File

@@ -1,60 +0,0 @@
import os
import argparse
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
bucket: str
index_name: str
deployed_index_id: str
endpoint_name: str
endpoint_domain: str
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
log_name: str = "va_agent_evaluation_logs"
log_level: str = "INFO"
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
# Singleton instance of Settings
cfg = Settings.model_validate({})

View File

@@ -1,48 +0,0 @@
"""
Centralized Cloud Logging setup.
Uses CloudLoggingHandler (background thread) so logging does not add latency
"""
import logging
from typing import Optional, Dict, Literal
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
from .config import cfg
def _setup_logger() -> logging.Logger:
"""Create or return the singleton evaluation logger."""
logger = logging.getLogger(cfg.log_name)
if any(isinstance(h, CloudLoggingHandler) for h in logger.handlers):
return logger
try:
client = google.cloud.logging.Client(project=cfg.project_id)
handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport
logger.addHandler(handler)
logger.setLevel(getattr(logging, cfg.log_level.upper()))
except Exception as e:
# Fallback to console if Cloud Logging is unavailable (local dev)
logging.basicConfig(level=getattr(logging, cfg.log_level.upper()))
logger = logging.getLogger(cfg.log_name)
logger.warning("Cloud Logging setup failed; using console. Error: %s", e)
return logger
_eval_log = _setup_logger()
def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None:
"""
Emit a JSON-structured log row.
Args:
message: Short label for the row (e.g., "Final agent turn").
severity: "INFO" | "WARNING" | "ERROR"
custom_log: A dict with your structured payload.
"""
level = getattr(logging, severity.upper(), logging.INFO)
_eval_log.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}})