Merge pull request 'Add CI' (#10) from push-ooqqtrvlvqxn into main
All checks were successful
CI / lint (push) Successful in 11s
CI / typecheck (push) Successful in 12s
CI / test (push) Successful in 25s

Reviewed-on: #10
This commit was merged in pull request #10.
This commit is contained in:
2026-03-05 22:00:48 +00:00
17 changed files with 349 additions and 217 deletions

43
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,43 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v6
with:
python-version: "3.12"
- run: uv sync --frozen
- name: Ruff check
run: uv run ruff check
- name: Ruff format check
run: uv run ruff format --check
typecheck:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v6
with:
python-version: "3.12"
- run: uv sync --frozen
- name: Type check
run: uv run ty check
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v6
with:
python-version: "3.12"
- run: uv sync --frozen
- name: Run tests
run: uv run pytest --cov

3
CLAUDE.md Normal file
View File

@@ -0,0 +1,3 @@
Use `uv` for project management
Linter: `uv run ruff check`
Type-checking: `uv run ty check`

View File

@@ -36,3 +36,19 @@ pythonpath = ["."]
[build-system] [build-system]
requires = ["uv_build>=0.8.3,<0.9.0"] requires = ["uv_build>=0.8.3,<0.9.0"]
build-backend = "uv_build" build-backend = "uv_build"
[tool.ruff]
exclude = ["scripts", "tests"]
[tool.ty.src]
exclude = ["scripts", "tests"]
[tool.ruff.lint]
select = ['ALL']
ignore = [
'D203', # one-blank-line-before-class
'D213', # multi-line-summary-second-line
'COM812', # missing-trailing-comma
'ANN401', # dynamically-typed-any
'ERA001', # commented-out-code
]

View File

@@ -6,10 +6,10 @@ from .models import AppContext, SearchResult, SourceNamespace
from .utils.cache import LRUCache from .utils.cache import LRUCache
__all__ = [ __all__ = [
"AppContext",
"GoogleCloudFileStorage", "GoogleCloudFileStorage",
"GoogleCloudVectorSearch", "GoogleCloudVectorSearch",
"SourceNamespace",
"SearchResult",
"AppContext",
"LRUCache", "LRUCache",
"SearchResult",
"SourceNamespace",
] ]

View File

@@ -1,4 +1,3 @@
# ruff: noqa: INP001
"""MCP server for semantic search over Vertex AI Vector Search.""" """MCP server for semantic search over Vertex AI Vector Search."""
import time import time
@@ -9,7 +8,11 @@ from .config import _args
from .logging import log_structured_entry from .logging import log_structured_entry
from .models import AppContext, SourceNamespace from .models import AppContext, SourceNamespace
from .server import lifespan from .server import lifespan
from .services.search import filter_search_results, format_search_results, generate_query_embedding from .services.search import (
filter_search_results,
format_search_results,
generate_query_embedding,
)
mcp = FastMCP( mcp = FastMCP(
"knowledge-search", "knowledge-search",
@@ -44,7 +47,7 @@ async def knowledge_search(
log_structured_entry( log_structured_entry(
"knowledge_search request received", "knowledge_search request received",
"INFO", "INFO",
{"query": query[:100]} # Log first 100 chars of query {"query": query[:100]}, # Log first 100 chars of query
) )
try: try:
@@ -61,7 +64,7 @@ async def knowledge_search(
log_structured_entry( log_structured_entry(
"Query embedding generated successfully", "Query embedding generated successfully",
"INFO", "INFO",
{"time_ms": round((t_embed - t0) * 1000, 1)} {"time_ms": round((t_embed - t0) * 1000, 1)},
) )
# Perform vector search # Perform vector search
@@ -74,17 +77,13 @@ async def knowledge_search(
source=source, source=source,
) )
t_search = time.perf_counter() t_search = time.perf_counter()
except Exception as e: except Exception as e: # noqa: BLE001
log_structured_entry( log_structured_entry(
"Vector search failed", "Vector search failed",
"ERROR", "ERROR",
{ {"error": str(e), "error_type": type(e).__name__, "query": query[:100]},
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
) )
return f"Error performing vector search: {str(e)}" return f"Error performing vector search: {e!s}"
# Apply similarity filtering # Apply similarity filtering
filtered_results = filter_search_results(search_results) filtered_results = filter_search_results(search_results)
@@ -98,32 +97,26 @@ async def knowledge_search(
"total_ms": f"{round((t_search - t0) * 1000, 1)}ms", "total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
"source_filter": source.value if source is not None else None, "source_filter": source.value if source is not None else None,
"results_count": len(filtered_results), "results_count": len(filtered_results),
"chunks": [s["id"] for s in filtered_results] "chunks": [s["id"] for s in filtered_results],
} },
) )
# Format and return results # Format and return results
if not filtered_results: if not filtered_results:
log_structured_entry( log_structured_entry(
"No results found for query", "No results found for query", "INFO", {"query": query[:100]}
"INFO",
{"query": query[:100]}
) )
return format_search_results(filtered_results) return format_search_results(filtered_results)
except Exception as e: except Exception as e: # noqa: BLE001
# Catch-all for any unexpected errors # Catch-all for any unexpected errors
log_structured_entry( log_structured_entry(
"Unexpected error in knowledge_search", "Unexpected error in knowledge_search",
"ERROR", "ERROR",
{ {"error": str(e), "error_type": type(e).__name__, "query": query[:100]},
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
) )
return f"Unexpected error during search: {str(e)}" return f"Unexpected error during search: {e!s}"
def main() -> None: def main() -> None:

View File

@@ -1,4 +1,3 @@
# ruff: noqa: INP001
"""Base client with shared aiohttp session management.""" """Base client with shared aiohttp session management."""
import aiohttp import aiohttp

View File

@@ -1,4 +1,3 @@
# ruff: noqa: INP001
"""Google Cloud Storage client with caching.""" """Google Cloud Storage client with caching."""
import asyncio import asyncio
@@ -8,8 +7,9 @@ from typing import BinaryIO
import aiohttp import aiohttp
from gcloud.aio.storage import Storage from gcloud.aio.storage import Storage
from ..logging import log_structured_entry from knowledge_search_mcp.logging import log_structured_entry
from ..utils.cache import LRUCache from knowledge_search_mcp.utils.cache import LRUCache
from .base import BaseGoogleCloudClient from .base import BaseGoogleCloudClient
HTTP_TOO_MANY_REQUESTS = 429 HTTP_TOO_MANY_REQUESTS = 429
@@ -56,7 +56,7 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient):
log_structured_entry( log_structured_entry(
"File retrieved from cache", "File retrieved from cache",
"INFO", "INFO",
{"file": file_name, "bucket": self.bucket_name} {"file": file_name, "bucket": self.bucket_name},
) )
file_stream = io.BytesIO(cached_content) file_stream = io.BytesIO(cached_content)
file_stream.name = file_name file_stream.name = file_name
@@ -65,7 +65,7 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient):
log_structured_entry( log_structured_entry(
"Starting file download from GCS", "Starting file download from GCS",
"INFO", "INFO",
{"file": file_name, "bucket": self.bucket_name} {"file": file_name, "bucket": self.bucket_name},
) )
storage_client = self._get_aio_storage() storage_client = self._get_aio_storage()
@@ -87,15 +87,18 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient):
"file": file_name, "file": file_name,
"bucket": self.bucket_name, "bucket": self.bucket_name,
"size_bytes": len(content), "size_bytes": len(content),
"attempt": attempt + 1 "attempt": attempt + 1,
} },
) )
except TimeoutError as exc: except TimeoutError as exc:
last_exception = exc last_exception = exc
log_structured_entry( log_structured_entry(
f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", (
f"Timeout downloading gs://{self.bucket_name}/{file_name} "
f"(attempt {attempt + 1}/{max_retries})"
),
"WARNING", "WARNING",
{"error": str(exc)} {"error": str(exc)},
) )
except aiohttp.ClientResponseError as exc: except aiohttp.ClientResponseError as exc:
last_exception = exc last_exception = exc
@@ -104,15 +107,18 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient):
or exc.status >= HTTP_SERVER_ERROR or exc.status >= HTTP_SERVER_ERROR
): ):
log_structured_entry( log_structured_entry(
f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", (
f"HTTP {exc.status} downloading gs://{self.bucket_name}/"
f"{file_name} (attempt {attempt + 1}/{max_retries})"
),
"WARNING", "WARNING",
{"status": exc.status, "message": str(exc)} {"status": exc.status, "message": str(exc)},
) )
else: else:
log_structured_entry( log_structured_entry(
f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}", f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}",
"ERROR", "ERROR",
{"status": exc.status, "message": str(exc)} {"status": exc.status, "message": str(exc)},
) )
raise raise
else: else:
@@ -123,7 +129,7 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient):
log_structured_entry( log_structured_entry(
"Retrying file download", "Retrying file download",
"INFO", "INFO",
{"file": file_name, "delay_seconds": delay} {"file": file_name, "delay_seconds": delay},
) )
await asyncio.sleep(delay) await asyncio.sleep(delay)
@@ -138,7 +144,7 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient):
"file": file_name, "file": file_name,
"bucket": self.bucket_name, "bucket": self.bucket_name,
"max_retries": max_retries, "max_retries": max_retries,
"last_error": str(last_exception) "last_error": str(last_exception),
} },
) )
raise TimeoutError(msg) from last_exception raise TimeoutError(msg) from last_exception

View File

@@ -1,4 +1,3 @@
# ruff: noqa: INP001
"""Google Cloud Vector Search client.""" """Google Cloud Vector Search client."""
import asyncio import asyncio
@@ -6,8 +5,9 @@ from collections.abc import Sequence
from gcloud.aio.auth import Token from gcloud.aio.auth import Token
from ..logging import log_structured_entry from knowledge_search_mcp.logging import log_structured_entry
from ..models import SearchResult, SourceNamespace from knowledge_search_mcp.models import SearchResult, SourceNamespace
from .base import BaseGoogleCloudClient from .base import BaseGoogleCloudClient
from .storage import GoogleCloudFileStorage from .storage import GoogleCloudFileStorage
@@ -94,7 +94,7 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient):
log_structured_entry( log_structured_entry(
"Vector search query failed - endpoint not configured", "Vector search query failed - endpoint not configured",
"ERROR", "ERROR",
{"error": msg} {"error": msg},
) )
raise RuntimeError(msg) raise RuntimeError(msg)
@@ -113,8 +113,8 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient):
"deployed_index_id": deployed_index_id, "deployed_index_id": deployed_index_id,
"neighbor_count": limit, "neighbor_count": limit,
"endpoint_id": endpoint_id, "endpoint_id": endpoint_id,
"embedding_dimension": len(query) "embedding_dimension": len(query),
} },
) )
datapoint: dict = {"feature_vector": list(query)} datapoint: dict = {"feature_vector": list(query)}
@@ -149,10 +149,10 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient):
{ {
"status": response.status, "status": response.status,
"response_body": body, "response_body": body,
"deployed_index_id": deployed_index_id "deployed_index_id": deployed_index_id,
} },
) )
raise RuntimeError(msg) raise RuntimeError(msg) # noqa: TRY301
data = await response.json() data = await response.json()
neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", [])
@@ -161,15 +161,15 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient):
"INFO", "INFO",
{ {
"neighbors_found": len(neighbors), "neighbors_found": len(neighbors),
"deployed_index_id": deployed_index_id "deployed_index_id": deployed_index_id,
} },
) )
if not neighbors: if not neighbors:
log_structured_entry( log_structured_entry(
"No neighbors found in vector search", "No neighbors found in vector search",
"WARNING", "WARNING",
{"deployed_index_id": deployed_index_id} {"deployed_index_id": deployed_index_id},
) )
return [] return []
@@ -185,7 +185,7 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient):
log_structured_entry( log_structured_entry(
"Fetching content for search results", "Fetching content for search results",
"INFO", "INFO",
{"file_count": len(content_tasks)} {"file_count": len(content_tasks)},
) )
file_streams = await asyncio.gather(*content_tasks) file_streams = await asyncio.gather(*content_tasks)
@@ -206,12 +206,9 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient):
log_structured_entry( log_structured_entry(
"Vector search completed successfully", "Vector search completed successfully",
"INFO", "INFO",
{ {"results_count": len(results), "deployed_index_id": deployed_index_id},
"results_count": len(results),
"deployed_index_id": deployed_index_id
}
) )
return results return results # noqa: TRY300
except Exception as e: except Exception as e:
log_structured_entry( log_structured_entry(
@@ -220,7 +217,7 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient):
{ {
"error": str(e), "error": str(e),
"error_type": type(e).__name__, "error_type": type(e).__name__,
"deployed_index_id": deployed_index_id "deployed_index_id": deployed_index_id,
} },
) )
raise raise

View File

@@ -1,7 +1,14 @@
"""Configuration management for the MCP server."""
import argparse
import os import os
import sys import sys
import argparse
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
YamlConfigSettingsSource,
)
def _parse_args() -> argparse.Namespace: def _parse_args() -> argparse.Namespace:
@@ -14,7 +21,7 @@ def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
return argparse.Namespace( return argparse.Namespace(
transport="stdio", transport="stdio",
host="0.0.0.0", host="0.0.0.0", # noqa: S104
port=8080, port=8080,
config=os.environ.get("CONFIG_FILE", "config.yaml"), config=os.environ.get("CONFIG_FILE", "config.yaml"),
) )
@@ -25,7 +32,7 @@ def _parse_args() -> argparse.Namespace:
choices=["stdio", "sse", "streamable-http"], choices=["stdio", "sse", "streamable-http"],
default="stdio", default="stdio",
) )
parser.add_argument("--host", default="0.0.0.0") parser.add_argument("--host", default="0.0.0.0") # noqa: S104
parser.add_argument("--port", type=int, default=8080) parser.add_argument("--port", type=int, default=8080)
parser.add_argument( parser.add_argument(
"--config", "--config",
@@ -36,6 +43,7 @@ def _parse_args() -> argparse.Namespace:
_args = _parse_args() _args = _parse_args()
class Settings(BaseSettings): class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file.""" """Server configuration populated from env vars and a YAML config file."""
@@ -52,6 +60,7 @@ class Settings(BaseSettings):
search_limit: int = 10 search_limit: int = 10
log_name: str = "va_agent_evaluation_logs" log_name: str = "va_agent_evaluation_logs"
log_level: str = "INFO" log_level: str = "INFO"
cloud_logging_enabled: bool = False
@classmethod @classmethod
def settings_customise_sources( def settings_customise_sources(
@@ -62,6 +71,7 @@ class Settings(BaseSettings):
dotenv_settings: PydanticBaseSettingsSource, dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]: ) -> tuple[PydanticBaseSettingsSource, ...]:
"""Customize the order of settings sources to include YAML config."""
return ( return (
init_settings, init_settings,
env_settings, env_settings,
@@ -77,7 +87,7 @@ _cfg: Settings | None = None
def get_config() -> Settings: def get_config() -> Settings:
"""Get or create the singleton Settings instance.""" """Get or create the singleton Settings instance."""
global _cfg global _cfg # noqa: PLW0603
if _cfg is None: if _cfg is None:
_cfg = Settings.model_validate({}) _cfg = Settings.model_validate({})
return _cfg return _cfg
@@ -87,8 +97,8 @@ def get_config() -> Settings:
class _ConfigProxy: class _ConfigProxy:
"""Proxy object that lazily loads config on attribute access.""" """Proxy object that lazily loads config on attribute access."""
def __getattr__(self, name: str): def __getattr__(self, name: str) -> object:
return getattr(get_config(), name) return getattr(get_config(), name)
cfg = _ConfigProxy() # type: ignore[assignment] cfg = _ConfigProxy()

View File

@@ -1,23 +1,22 @@
""" """Centralized Cloud Logging setup.
Centralized Cloud Logging setup.
Uses CloudLoggingHandler (background thread) so logging does not add latency Uses CloudLoggingHandler (background thread) so logging does not add latency.
""" """
import logging import logging
from typing import Optional, Dict, Literal from typing import Literal
import google.cloud.logging import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler from google.cloud.logging.handlers import CloudLoggingHandler
from .config import get_config from .config import get_config
_eval_log: logging.Logger | None = None _eval_log: logging.Logger | None = None
def _get_logger() -> logging.Logger: def _get_logger() -> logging.Logger:
"""Get or create the singleton evaluation logger.""" """Get or create the singleton evaluation logger."""
global _eval_log global _eval_log # noqa: PLW0603
if _eval_log is not None: if _eval_log is not None:
return _eval_log return _eval_log
@@ -27,30 +26,42 @@ def _get_logger() -> logging.Logger:
_eval_log = logger _eval_log = logger
return logger return logger
if cfg.cloud_logging_enabled:
try: try:
client = google.cloud.logging.Client(project=cfg.project_id) client = google.cloud.logging.Client(project=cfg.project_id)
handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport
logger.addHandler(handler) logger.addHandler(handler)
logger.setLevel(getattr(logging, cfg.log_level.upper())) logger.setLevel(getattr(logging, cfg.log_level.upper()))
except Exception as e: except Exception as e: # noqa: BLE001
# Fallback to console if Cloud Logging is unavailable (local dev) # Fallback to console if Cloud Logging is unavailable (local dev)
logging.basicConfig(level=getattr(logging, cfg.log_level.upper())) logging.basicConfig(level=getattr(logging, cfg.log_level.upper()))
logger = logging.getLogger(cfg.log_name) logger = logging.getLogger(cfg.log_name)
logger.warning("Cloud Logging setup failed; using console. Error: %s", e) logger.warning("Cloud Logging setup failed; using console. Error: %s", e)
else:
logging.basicConfig(level=getattr(logging, cfg.log_level.upper()))
logger = logging.getLogger(cfg.log_name)
_eval_log = logger _eval_log = logger
return logger return logger
def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None: def log_structured_entry(
""" message: str,
Emit a JSON-structured log row. severity: Literal["INFO", "WARNING", "ERROR"],
custom_log: dict | None = None,
) -> None:
"""Emit a JSON-structured log row.
Args: Args:
message: Short label for the row (e.g., "Final agent turn"). message: Short label for the row (e.g., "Final agent turn").
severity: "INFO" | "WARNING" | "ERROR" severity: "INFO" | "WARNING" | "ERROR"
custom_log: A dict with your structured payload. custom_log: A dict with your structured payload.
""" """
level = getattr(logging, severity.upper(), logging.INFO) level = getattr(logging, severity.upper(), logging.INFO)
logger = _get_logger() logger = _get_logger()
logger.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}}) logger.log(
level,
message,
extra={"json_fields": {"message": message, "custom": custom_log or {}}},
)

View File

@@ -1,8 +1,7 @@
# ruff: noqa: INP001
"""Domain models for knowledge search MCP server.""" """Domain models for knowledge search MCP server."""
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import StrEnum
from typing import TYPE_CHECKING, TypedDict from typing import TYPE_CHECKING, TypedDict
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -12,7 +11,7 @@ if TYPE_CHECKING:
from .config import Settings from .config import Settings
class SourceNamespace(str, Enum): class SourceNamespace(StrEnum):
"""Allowed values for the 'source' namespace filter.""" """Allowed values for the 'source' namespace filter."""
EDUCACION_FINANCIERA = "Educacion Financiera" EDUCACION_FINANCIERA = "Educacion Financiera"

View File

@@ -1,4 +1,3 @@
# ruff: noqa: INP001
"""MCP server lifecycle management.""" """MCP server lifecycle management."""
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
@@ -8,12 +7,12 @@ from google import genai
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import FastMCP
from .clients.vector_search import GoogleCloudVectorSearch from .clients.vector_search import GoogleCloudVectorSearch
from .config import Settings, cfg from .config import get_config
from .logging import log_structured_entry from .logging import log_structured_entry
from .models import AppContext from .models import AppContext
from .services.validation import ( from .services.validation import (
validate_genai_access,
validate_gcs_access, validate_gcs_access,
validate_genai_access,
validate_vector_search_access, validate_vector_search_access,
) )
@@ -21,15 +20,18 @@ from .services.validation import (
@asynccontextmanager @asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Create and configure the vector-search client for the server lifetime.""" """Create and configure the vector-search client for the server lifetime."""
# Get config with proper types for initialization
config_for_init = get_config()
log_structured_entry( log_structured_entry(
"Initializing MCP server", "Initializing MCP server",
"INFO", "INFO",
{ {
"project_id": cfg.project_id, "project_id": config_for_init.project_id,
"location": cfg.location, "location": config_for_init.location,
"bucket": cfg.bucket, "bucket": config_for_init.bucket,
"index_name": cfg.index_name, "index_name": config_for_init.index_name,
} },
) )
vs: GoogleCloudVectorSearch | None = None vs: GoogleCloudVectorSearch | None = None
@@ -37,10 +39,10 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
# Initialize vector search client # Initialize vector search client
log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO") log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO")
vs = GoogleCloudVectorSearch( vs = GoogleCloudVectorSearch(
project_id=cfg.project_id, project_id=config_for_init.project_id,
location=cfg.location, location=config_for_init.location,
bucket=cfg.bucket, bucket=config_for_init.bucket,
index_name=cfg.index_name, index_name=config_for_init.index_name,
) )
# Configure endpoint # Configure endpoint
@@ -48,25 +50,28 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"Configuring index endpoint", "Configuring index endpoint",
"INFO", "INFO",
{ {
"endpoint_name": cfg.endpoint_name, "endpoint_name": config_for_init.endpoint_name,
"endpoint_domain": cfg.endpoint_domain, "endpoint_domain": config_for_init.endpoint_domain,
} },
) )
vs.configure_index_endpoint( vs.configure_index_endpoint(
name=cfg.endpoint_name, name=config_for_init.endpoint_name,
public_domain=cfg.endpoint_domain, public_domain=config_for_init.endpoint_domain,
) )
# Initialize GenAI client # Initialize GenAI client
log_structured_entry( log_structured_entry(
"Creating GenAI client", "Creating GenAI client",
"INFO", "INFO",
{"project_id": cfg.project_id, "location": cfg.location} {
"project_id": config_for_init.project_id,
"location": config_for_init.location,
},
) )
genai_client = genai.Client( genai_client = genai.Client(
vertexai=True, vertexai=True,
project=cfg.project_id, project=config_for_init.project_id,
location=cfg.location, location=config_for_init.location,
) )
# Validate credentials and configuration by testing actual resources # Validate credentials and configuration by testing actual resources
@@ -76,32 +81,41 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
validation_errors = [] validation_errors = []
# Run all validations # Run all validations
genai_error = await validate_genai_access(genai_client, cfg) config = get_config()
genai_error = await validate_genai_access(genai_client, config)
if genai_error: if genai_error:
validation_errors.append(genai_error) validation_errors.append(genai_error)
gcs_error = await validate_gcs_access(vs, cfg) gcs_error = await validate_gcs_access(vs, config)
if gcs_error: if gcs_error:
validation_errors.append(gcs_error) validation_errors.append(gcs_error)
vs_error = await validate_vector_search_access(vs, cfg) vs_error = await validate_vector_search_access(vs, config)
if vs_error: if vs_error:
validation_errors.append(vs_error) validation_errors.append(vs_error)
# Summary of validations # Summary of validations
if validation_errors: if validation_errors:
log_structured_entry( log_structured_entry(
"MCP server started with validation errors - service may not work correctly", (
"MCP server started with validation errors - "
"service may not work correctly"
),
"WARNING", "WARNING",
{"validation_errors": validation_errors, "error_count": len(validation_errors)} {
"validation_errors": validation_errors,
"error_count": len(validation_errors),
},
) )
else: else:
log_structured_entry("All validations passed - MCP server initialization complete", "INFO") log_structured_entry(
"All validations passed - MCP server initialization complete", "INFO"
)
yield AppContext( yield AppContext(
vector_search=vs, vector_search=vs,
genai_client=genai_client, genai_client=genai_client,
settings=cfg, settings=config,
) )
except Exception as e: except Exception as e:
@@ -111,7 +125,7 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
{ {
"error": str(e), "error": str(e),
"error_type": type(e).__name__, "error_type": type(e).__name__,
} },
) )
raise raise
finally: finally:
@@ -121,9 +135,9 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
try: try:
await vs.close() await vs.close()
log_structured_entry("Closed aiohttp sessions", "INFO") log_structured_entry("Closed aiohttp sessions", "INFO")
except Exception as e: except Exception as e: # noqa: BLE001
log_structured_entry( log_structured_entry(
"Error closing aiohttp sessions", "Error closing aiohttp sessions",
"WARNING", "WARNING",
{"error": str(e), "error_type": type(e).__name__} {"error": str(e), "error_type": type(e).__name__},
) )

View File

@@ -1,13 +1,21 @@
"""Service modules for business logic.""" """Service modules for business logic."""
from .search import filter_search_results, format_search_results, generate_query_embedding from .search import (
from .validation import validate_genai_access, validate_gcs_access, validate_vector_search_access filter_search_results,
format_search_results,
generate_query_embedding,
)
from .validation import (
validate_gcs_access,
validate_genai_access,
validate_vector_search_access,
)
__all__ = [ __all__ = [
"filter_search_results", "filter_search_results",
"format_search_results", "format_search_results",
"generate_query_embedding", "generate_query_embedding",
"validate_genai_access",
"validate_gcs_access", "validate_gcs_access",
"validate_genai_access",
"validate_vector_search_access", "validate_vector_search_access",
] ]

View File

@@ -1,11 +1,10 @@
# ruff: noqa: INP001
"""Search helper functions.""" """Search helper functions."""
from google import genai from google import genai
from google.genai import types as genai_types from google.genai import types as genai_types
from ..logging import log_structured_entry from knowledge_search_mcp.logging import log_structured_entry
from ..models import SearchResult from knowledge_search_mcp.models import SearchResult
async def generate_query_embedding( async def generate_query_embedding(
@@ -17,6 +16,7 @@ async def generate_query_embedding(
Returns: Returns:
Tuple of (embedding vector, error message). Error message is None on success. Tuple of (embedding vector, error message). Error message is None on success.
""" """
if not query or not query.strip(): if not query or not query.strip():
return ([], "Error: Query cannot be empty") return ([], "Error: Query cannot be empty")
@@ -30,9 +30,11 @@ async def generate_query_embedding(
task_type="RETRIEVAL_QUERY", task_type="RETRIEVAL_QUERY",
), ),
) )
if not response.embeddings or not response.embeddings[0].values:
return ([], "Error: Failed to generate embedding - empty response")
embedding = response.embeddings[0].values embedding = response.embeddings[0].values
return (embedding, None) return (embedding, None) # noqa: TRY300
except Exception as e: except Exception as e: # noqa: BLE001
error_type = type(e).__name__ error_type = type(e).__name__
error_msg = str(e) error_msg = str(e)
@@ -41,22 +43,13 @@ async def generate_query_embedding(
log_structured_entry( log_structured_entry(
"Rate limit exceeded while generating embedding", "Rate limit exceeded while generating embedding",
"WARNING", "WARNING",
{ {"error": error_msg, "error_type": error_type, "query": query[:100]},
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
) )
return ([], "Error: API rate limit exceeded. Please try again later.") return ([], "Error: API rate limit exceeded. Please try again later.")
else:
log_structured_entry( log_structured_entry(
"Failed to generate query embedding", "Failed to generate query embedding",
"ERROR", "ERROR",
{ {"error": error_msg, "error_type": error_type, "query": query[:100]},
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
) )
return ([], f"Error generating embedding: {error_msg}") return ([], f"Error generating embedding: {error_msg}")
@@ -75,6 +68,7 @@ def filter_search_results(
Returns: Returns:
Filtered list of search results. Filtered list of search results.
""" """
if not results: if not results:
return [] return []
@@ -82,14 +76,10 @@ def filter_search_results(
max_sim = max(r["distance"] for r in results) max_sim = max(r["distance"] for r in results)
cutoff = max_sim * top_percent cutoff = max_sim * top_percent
filtered = [ return [
s s for s in results if s["distance"] > cutoff and s["distance"] > min_similarity
for s in results
if s["distance"] > cutoff and s["distance"] > min_similarity
] ]
return filtered
def format_search_results(results: list[SearchResult]) -> str: def format_search_results(results: list[SearchResult]) -> str:
"""Format search results as XML-like documents. """Format search results as XML-like documents.
@@ -99,6 +89,7 @@ def format_search_results(results: list[SearchResult]) -> str:
Returns: Returns:
Formatted string with document tags. Formatted string with document tags.
""" """
if not results: if not results:
return "No relevant documents found for your query." return "No relevant documents found for your query."

View File

@@ -1,20 +1,26 @@
# ruff: noqa: INP001
"""Validation functions for Google Cloud services.""" """Validation functions for Google Cloud services."""
from gcloud.aio.auth import Token from gcloud.aio.auth import Token
from google import genai from google import genai
from google.genai import types as genai_types from google.genai import types as genai_types
from ..clients.vector_search import GoogleCloudVectorSearch from knowledge_search_mcp.clients.vector_search import GoogleCloudVectorSearch
from ..config import Settings from knowledge_search_mcp.config import Settings
from ..logging import log_structured_entry from knowledge_search_mcp.logging import log_structured_entry
# HTTP status codes
HTTP_FORBIDDEN = 403
HTTP_NOT_FOUND = 404
async def validate_genai_access(genai_client: genai.Client, cfg: Settings) -> str | None: async def validate_genai_access(
genai_client: genai.Client, cfg: Settings
) -> str | None:
"""Validate GenAI embedding access. """Validate GenAI embedding access.
Returns: Returns:
Error message if validation fails, None if successful. Error message if validation fails, None if successful.
""" """
log_structured_entry("Validating GenAI embedding access", "INFO") log_structured_entry("Validating GenAI embedding access", "INFO")
try: try:
@@ -30,20 +36,26 @@ async def validate_genai_access(genai_client: genai.Client, cfg: Settings) -> st
log_structured_entry( log_structured_entry(
"GenAI embedding validation successful", "GenAI embedding validation successful",
"INFO", "INFO",
{"embedding_dimension": len(embedding_values) if embedding_values else 0} {
"embedding_dimension": len(embedding_values)
if embedding_values
else 0
},
) )
return None return None
else:
msg = "Embedding validation returned empty response" msg = "Embedding validation returned empty response"
log_structured_entry(msg, "WARNING") log_structured_entry(msg, "WARNING")
return msg return msg # noqa: TRY300
except Exception as e: except Exception as e: # noqa: BLE001
log_structured_entry( log_structured_entry(
"Failed to validate GenAI embedding access - service may not work correctly", (
"Failed to validate GenAI embedding access - "
"service may not work correctly"
),
"WARNING", "WARNING",
{"error": str(e), "error_type": type(e).__name__} {"error": str(e), "error_type": type(e).__name__},
) )
return f"GenAI: {str(e)}" return f"GenAI: {e!s}"
async def validate_gcs_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str | None: async def validate_gcs_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str | None:
@@ -51,14 +63,11 @@ async def validate_gcs_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str
Returns: Returns:
Error message if validation fails, None if successful. Error message if validation fails, None if successful.
""" """
log_structured_entry( log_structured_entry("Validating GCS bucket access", "INFO", {"bucket": cfg.bucket})
"Validating GCS bucket access",
"INFO",
{"bucket": cfg.bucket}
)
try: try:
session = vs.storage._get_aio_session() session = vs.storage._get_aio_session() # noqa: SLF001
token_obj = Token( token_obj = Token(
session=session, session=session,
scopes=["https://www.googleapis.com/auth/cloud-platform"], scopes=["https://www.googleapis.com/auth/cloud-platform"],
@@ -70,102 +79,136 @@ async def validate_gcs_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str
f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1", f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
headers=headers, headers=headers,
) as response: ) as response:
if response.status == 403: if response.status == HTTP_FORBIDDEN:
msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions." msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
log_structured_entry( log_structured_entry(
"GCS bucket validation failed - access denied - service may not work correctly", (
"GCS bucket validation failed - access denied - "
"service may not work correctly"
),
"WARNING", "WARNING",
{"bucket": cfg.bucket, "status": response.status} {"bucket": cfg.bucket, "status": response.status},
) )
return msg return msg
elif response.status == 404: if response.status == HTTP_NOT_FOUND:
msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project." msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
log_structured_entry( log_structured_entry(
"GCS bucket validation failed - not found - service may not work correctly", (
"GCS bucket validation failed - not found - "
"service may not work correctly"
),
"WARNING", "WARNING",
{"bucket": cfg.bucket, "status": response.status} {"bucket": cfg.bucket, "status": response.status},
) )
return msg return msg
elif not response.ok: if not response.ok:
body = await response.text() body = await response.text()
msg = f"Failed to access bucket '{cfg.bucket}': {response.status}" msg = f"Failed to access bucket '{cfg.bucket}': {response.status}"
log_structured_entry( log_structured_entry(
"GCS bucket validation failed - service may not work correctly", "GCS bucket validation failed - service may not work correctly",
"WARNING", "WARNING",
{"bucket": cfg.bucket, "status": response.status, "response": body} {"bucket": cfg.bucket, "status": response.status, "response": body},
) )
return msg return msg
else:
log_structured_entry( log_structured_entry(
"GCS bucket validation successful", "GCS bucket validation successful", "INFO", {"bucket": cfg.bucket}
"INFO",
{"bucket": cfg.bucket}
) )
return None return None
except Exception as e: except Exception as e: # noqa: BLE001
log_structured_entry( log_structured_entry(
"Failed to validate GCS bucket access - service may not work correctly", "Failed to validate GCS bucket access - service may not work correctly",
"WARNING", "WARNING",
{"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket} {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket},
) )
return f"GCS: {str(e)}" return f"GCS: {e!s}"
async def validate_vector_search_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str | None: async def validate_vector_search_access(
vs: GoogleCloudVectorSearch, cfg: Settings
) -> str | None:
"""Validate vector search endpoint access. """Validate vector search endpoint access.
Returns: Returns:
Error message if validation fails, None if successful. Error message if validation fails, None if successful.
""" """
log_structured_entry( log_structured_entry(
"Validating vector search endpoint access", "Validating vector search endpoint access",
"INFO", "INFO",
{"endpoint_name": cfg.endpoint_name} {"endpoint_name": cfg.endpoint_name},
) )
try: try:
headers = await vs._async_get_auth_headers() headers = await vs._async_get_auth_headers() # noqa: SLF001
session = vs._get_aio_session() session = vs._get_aio_session() # noqa: SLF001
endpoint_url = ( endpoint_url = (
f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}" f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
) )
async with session.get(endpoint_url, headers=headers) as response: async with session.get(endpoint_url, headers=headers) as response:
if response.status == 403: if response.status == HTTP_FORBIDDEN:
msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions." msg = (
f"Access denied to endpoint '{cfg.endpoint_name}'. "
"Check permissions."
)
log_structured_entry( log_structured_entry(
"Vector search endpoint validation failed - access denied - service may not work correctly", (
"Vector search endpoint validation failed - "
"access denied - service may not work correctly"
),
"WARNING", "WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status} {"endpoint": cfg.endpoint_name, "status": response.status},
) )
return msg return msg
elif response.status == 404: if response.status == HTTP_NOT_FOUND:
msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project." msg = (
f"Endpoint '{cfg.endpoint_name}' not found. "
"Check endpoint name and project."
)
log_structured_entry( log_structured_entry(
"Vector search endpoint validation failed - not found - service may not work correctly", (
"Vector search endpoint validation failed - "
"not found - service may not work correctly"
),
"WARNING", "WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status} {"endpoint": cfg.endpoint_name, "status": response.status},
) )
return msg return msg
elif not response.ok: if not response.ok:
body = await response.text() body = await response.text()
msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}" msg = (
f"Failed to access endpoint '{cfg.endpoint_name}': "
f"{response.status}"
)
log_structured_entry( log_structured_entry(
"Vector search endpoint validation failed - service may not work correctly", (
"Vector search endpoint validation failed - "
"service may not work correctly"
),
"WARNING", "WARNING",
{"endpoint": cfg.endpoint_name, "status": response.status, "response": body} {
"endpoint": cfg.endpoint_name,
"status": response.status,
"response": body,
},
) )
return msg return msg
else:
log_structured_entry( log_structured_entry(
"Vector search endpoint validation successful", "Vector search endpoint validation successful",
"INFO", "INFO",
{"endpoint": cfg.endpoint_name} {"endpoint": cfg.endpoint_name},
) )
return None return None
except Exception as e: except Exception as e: # noqa: BLE001
log_structured_entry( log_structured_entry(
"Failed to validate vector search endpoint access - service may not work correctly", (
"Failed to validate vector search endpoint access - "
"service may not work correctly"
),
"WARNING", "WARNING",
{"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name} {
"error": str(e),
"error_type": type(e).__name__,
"endpoint": cfg.endpoint_name,
},
) )
return f"Vector Search: {str(e)}" return f"Vector Search: {e!s}"

View File

@@ -1,4 +1,3 @@
# ruff: noqa: INP001
"""LRU cache implementation.""" """LRU cache implementation."""
from collections import OrderedDict from collections import OrderedDict