From 86ed34887b5dd561a77148e66dcc128764e2b8ce Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Thu, 5 Mar 2026 21:04:42 +0000 Subject: [PATCH 1/2] Make cloud logging optional --- src/knowledge_search_mcp/config.py | 1 + src/knowledge_search_mcp/logging.py | 20 ++++++++++++-------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/knowledge_search_mcp/config.py b/src/knowledge_search_mcp/config.py index 1844142..2fa348b 100644 --- a/src/knowledge_search_mcp/config.py +++ b/src/knowledge_search_mcp/config.py @@ -52,6 +52,7 @@ class Settings(BaseSettings): search_limit: int = 10 log_name: str = "va_agent_evaluation_logs" log_level: str = "INFO" + cloud_logging_enabled: bool = False @classmethod def settings_customise_sources( diff --git a/src/knowledge_search_mcp/logging.py b/src/knowledge_search_mcp/logging.py index b2f667d..6fef29b 100644 --- a/src/knowledge_search_mcp/logging.py +++ b/src/knowledge_search_mcp/logging.py @@ -27,16 +27,20 @@ def _get_logger() -> logging.Logger: _eval_log = logger return logger - try: - client = google.cloud.logging.Client(project=cfg.project_id) - handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport - logger.addHandler(handler) - logger.setLevel(getattr(logging, cfg.log_level.upper())) - except Exception as e: - # Fallback to console if Cloud Logging is unavailable (local dev) + if cfg.cloud_logging_enabled: + try: + client = google.cloud.logging.Client(project=cfg.project_id) + handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport + logger.addHandler(handler) + logger.setLevel(getattr(logging, cfg.log_level.upper())) + except Exception as e: + # Fallback to console if Cloud Logging is unavailable (local dev) + logging.basicConfig(level=getattr(logging, cfg.log_level.upper())) + logger = logging.getLogger(cfg.log_name) + logger.warning("Cloud Logging setup failed; using console. Error: %s", e) + else: logging.basicConfig(level=getattr(logging, cfg.log_level.upper())) logger = logging.getLogger(cfg.log_name) - logger.warning("Cloud Logging setup failed; using console. Error: %s", e) _eval_log = logger return logger From d39b8a6ea7519d423835757faabff12bb674558f Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Thu, 5 Mar 2026 21:43:15 +0000 Subject: [PATCH 2/2] Add CI --- .gitea/workflows/ci.yml | 43 +++++ CLAUDE.md | 3 + pyproject.toml | 16 ++ agent.py => scripts/agent.py | 0 src/knowledge_search_mcp/__init__.py | 6 +- src/knowledge_search_mcp/__main__.py | 39 ++-- src/knowledge_search_mcp/clients/base.py | 1 - src/knowledge_search_mcp/clients/storage.py | 38 ++-- .../clients/vector_search.py | 37 ++-- src/knowledge_search_mcp/config.py | 23 ++- src/knowledge_search_mcp/logging.py | 29 +-- src/knowledge_search_mcp/models.py | 5 +- src/knowledge_search_mcp/server.py | 74 +++++--- src/knowledge_search_mcp/services/__init__.py | 14 +- src/knowledge_search_mcp/services/search.py | 45 ++--- .../services/validation.py | 173 +++++++++++------- src/knowledge_search_mcp/utils/cache.py | 1 - 17 files changed, 337 insertions(+), 210 deletions(-) create mode 100644 .gitea/workflows/ci.yml create mode 100644 CLAUDE.md rename agent.py => scripts/agent.py (100%) diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml new file mode 100644 index 0000000..0a9cd1e --- /dev/null +++ b/.gitea/workflows/ci.yml @@ -0,0 +1,43 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v6 + with: + python-version: "3.12" + - run: uv sync --frozen + - name: Ruff check + run: uv run ruff check + - name: Ruff format check + run: uv run ruff format --check + + typecheck: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v6 + with: + python-version: "3.12" + - run: uv sync --frozen + - name: Type check + run: uv run ty check + + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v6 + with: + python-version: "3.12" + - run: uv sync --frozen + - name: Run tests + run: uv run pytest --cov diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..e4751be --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,3 @@ +Use `uv` for project management +Linter: `uv run ruff check` +Type-checking: `uv run ty check` diff --git a/pyproject.toml b/pyproject.toml index abc579f..1d7e50e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,3 +36,19 @@ pythonpath = ["."] [build-system] requires = ["uv_build>=0.8.3,<0.9.0"] build-backend = "uv_build" + +[tool.ruff] +exclude = ["scripts", "tests"] + +[tool.ty.src] +exclude = ["scripts", "tests"] + +[tool.ruff.lint] +select = ['ALL'] +ignore = [ + 'D203', # one-blank-line-before-class + 'D213', # multi-line-summary-second-line + 'COM812', # missing-trailing-comma + 'ANN401', # dynamically-typed-any + 'ERA001', # commented-out-code +] diff --git a/agent.py b/scripts/agent.py similarity index 100% rename from agent.py rename to scripts/agent.py diff --git a/src/knowledge_search_mcp/__init__.py b/src/knowledge_search_mcp/__init__.py index 3c3648f..666b87f 100644 --- a/src/knowledge_search_mcp/__init__.py +++ b/src/knowledge_search_mcp/__init__.py @@ -6,10 +6,10 @@ from .models import AppContext, SearchResult, SourceNamespace from .utils.cache import LRUCache __all__ = [ + "AppContext", "GoogleCloudFileStorage", "GoogleCloudVectorSearch", - "SourceNamespace", - "SearchResult", - "AppContext", "LRUCache", + "SearchResult", + "SourceNamespace", ] diff --git a/src/knowledge_search_mcp/__main__.py b/src/knowledge_search_mcp/__main__.py index 4c98112..03faf9f 100644 --- a/src/knowledge_search_mcp/__main__.py +++ b/src/knowledge_search_mcp/__main__.py @@ -1,4 +1,3 @@ -# ruff: noqa: INP001 """MCP server for semantic search over Vertex AI Vector Search.""" import time @@ -9,7 +8,11 @@ from .config import _args from .logging import log_structured_entry from .models import AppContext, SourceNamespace from .server import lifespan -from .services.search import filter_search_results, format_search_results, generate_query_embedding +from .services.search import ( + filter_search_results, + format_search_results, + generate_query_embedding, +) mcp = FastMCP( "knowledge-search", @@ -44,7 +47,7 @@ async def knowledge_search( log_structured_entry( "knowledge_search request received", "INFO", - {"query": query[:100]} # Log first 100 chars of query + {"query": query[:100]}, # Log first 100 chars of query ) try: @@ -61,7 +64,7 @@ async def knowledge_search( log_structured_entry( "Query embedding generated successfully", "INFO", - {"time_ms": round((t_embed - t0) * 1000, 1)} + {"time_ms": round((t_embed - t0) * 1000, 1)}, ) # Perform vector search @@ -74,17 +77,13 @@ async def knowledge_search( source=source, ) t_search = time.perf_counter() - except Exception as e: + except Exception as e: # noqa: BLE001 log_structured_entry( "Vector search failed", "ERROR", - { - "error": str(e), - "error_type": type(e).__name__, - "query": query[:100] - } + {"error": str(e), "error_type": type(e).__name__, "query": query[:100]}, ) - return f"Error performing vector search: {str(e)}" + return f"Error performing vector search: {e!s}" # Apply similarity filtering filtered_results = filter_search_results(search_results) @@ -98,32 +97,26 @@ async def knowledge_search( "total_ms": f"{round((t_search - t0) * 1000, 1)}ms", "source_filter": source.value if source is not None else None, "results_count": len(filtered_results), - "chunks": [s["id"] for s in filtered_results] - } + "chunks": [s["id"] for s in filtered_results], + }, ) # Format and return results if not filtered_results: log_structured_entry( - "No results found for query", - "INFO", - {"query": query[:100]} + "No results found for query", "INFO", {"query": query[:100]} ) return format_search_results(filtered_results) - except Exception as e: + except Exception as e: # noqa: BLE001 # Catch-all for any unexpected errors log_structured_entry( "Unexpected error in knowledge_search", "ERROR", - { - "error": str(e), - "error_type": type(e).__name__, - "query": query[:100] - } + {"error": str(e), "error_type": type(e).__name__, "query": query[:100]}, ) - return f"Unexpected error during search: {str(e)}" + return f"Unexpected error during search: {e!s}" def main() -> None: diff --git a/src/knowledge_search_mcp/clients/base.py b/src/knowledge_search_mcp/clients/base.py index 4e4b0b2..7a371f8 100644 --- a/src/knowledge_search_mcp/clients/base.py +++ b/src/knowledge_search_mcp/clients/base.py @@ -1,4 +1,3 @@ -# ruff: noqa: INP001 """Base client with shared aiohttp session management.""" import aiohttp diff --git a/src/knowledge_search_mcp/clients/storage.py b/src/knowledge_search_mcp/clients/storage.py index 6004df8..c603ded 100644 --- a/src/knowledge_search_mcp/clients/storage.py +++ b/src/knowledge_search_mcp/clients/storage.py @@ -1,4 +1,3 @@ -# ruff: noqa: INP001 """Google Cloud Storage client with caching.""" import asyncio @@ -8,8 +7,9 @@ from typing import BinaryIO import aiohttp from gcloud.aio.storage import Storage -from ..logging import log_structured_entry -from ..utils.cache import LRUCache +from knowledge_search_mcp.logging import log_structured_entry +from knowledge_search_mcp.utils.cache import LRUCache + from .base import BaseGoogleCloudClient HTTP_TOO_MANY_REQUESTS = 429 @@ -56,7 +56,7 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient): log_structured_entry( "File retrieved from cache", "INFO", - {"file": file_name, "bucket": self.bucket_name} + {"file": file_name, "bucket": self.bucket_name}, ) file_stream = io.BytesIO(cached_content) file_stream.name = file_name @@ -65,7 +65,7 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient): log_structured_entry( "Starting file download from GCS", "INFO", - {"file": file_name, "bucket": self.bucket_name} + {"file": file_name, "bucket": self.bucket_name}, ) storage_client = self._get_aio_storage() @@ -87,15 +87,18 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient): "file": file_name, "bucket": self.bucket_name, "size_bytes": len(content), - "attempt": attempt + 1 - } + "attempt": attempt + 1, + }, ) except TimeoutError as exc: last_exception = exc log_structured_entry( - f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", + ( + f"Timeout downloading gs://{self.bucket_name}/{file_name} " + f"(attempt {attempt + 1}/{max_retries})" + ), "WARNING", - {"error": str(exc)} + {"error": str(exc)}, ) except aiohttp.ClientResponseError as exc: last_exception = exc @@ -103,16 +106,19 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient): exc.status == HTTP_TOO_MANY_REQUESTS or exc.status >= HTTP_SERVER_ERROR ): - log_structured_entry( - f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", + log_structured_entry( + ( + f"HTTP {exc.status} downloading gs://{self.bucket_name}/" + f"{file_name} (attempt {attempt + 1}/{max_retries})" + ), "WARNING", - {"status": exc.status, "message": str(exc)} + {"status": exc.status, "message": str(exc)}, ) else: log_structured_entry( f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}", "ERROR", - {"status": exc.status, "message": str(exc)} + {"status": exc.status, "message": str(exc)}, ) raise else: @@ -123,7 +129,7 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient): log_structured_entry( "Retrying file download", "INFO", - {"file": file_name, "delay_seconds": delay} + {"file": file_name, "delay_seconds": delay}, ) await asyncio.sleep(delay) @@ -138,7 +144,7 @@ class GoogleCloudFileStorage(BaseGoogleCloudClient): "file": file_name, "bucket": self.bucket_name, "max_retries": max_retries, - "last_error": str(last_exception) - } + "last_error": str(last_exception), + }, ) raise TimeoutError(msg) from last_exception diff --git a/src/knowledge_search_mcp/clients/vector_search.py b/src/knowledge_search_mcp/clients/vector_search.py index bd80585..94819d9 100644 --- a/src/knowledge_search_mcp/clients/vector_search.py +++ b/src/knowledge_search_mcp/clients/vector_search.py @@ -1,4 +1,3 @@ -# ruff: noqa: INP001 """Google Cloud Vector Search client.""" import asyncio @@ -6,8 +5,9 @@ from collections.abc import Sequence from gcloud.aio.auth import Token -from ..logging import log_structured_entry -from ..models import SearchResult, SourceNamespace +from knowledge_search_mcp.logging import log_structured_entry +from knowledge_search_mcp.models import SearchResult, SourceNamespace + from .base import BaseGoogleCloudClient from .storage import GoogleCloudFileStorage @@ -94,7 +94,7 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient): log_structured_entry( "Vector search query failed - endpoint not configured", "ERROR", - {"error": msg} + {"error": msg}, ) raise RuntimeError(msg) @@ -113,8 +113,8 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient): "deployed_index_id": deployed_index_id, "neighbor_count": limit, "endpoint_id": endpoint_id, - "embedding_dimension": len(query) - } + "embedding_dimension": len(query), + }, ) datapoint: dict = {"feature_vector": list(query)} @@ -149,10 +149,10 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient): { "status": response.status, "response_body": body, - "deployed_index_id": deployed_index_id - } + "deployed_index_id": deployed_index_id, + }, ) - raise RuntimeError(msg) + raise RuntimeError(msg) # noqa: TRY301 data = await response.json() neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) @@ -161,15 +161,15 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient): "INFO", { "neighbors_found": len(neighbors), - "deployed_index_id": deployed_index_id - } + "deployed_index_id": deployed_index_id, + }, ) if not neighbors: log_structured_entry( "No neighbors found in vector search", "WARNING", - {"deployed_index_id": deployed_index_id} + {"deployed_index_id": deployed_index_id}, ) return [] @@ -185,7 +185,7 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient): log_structured_entry( "Fetching content for search results", "INFO", - {"file_count": len(content_tasks)} + {"file_count": len(content_tasks)}, ) file_streams = await asyncio.gather(*content_tasks) @@ -206,12 +206,9 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient): log_structured_entry( "Vector search completed successfully", "INFO", - { - "results_count": len(results), - "deployed_index_id": deployed_index_id - } + {"results_count": len(results), "deployed_index_id": deployed_index_id}, ) - return results + return results # noqa: TRY300 except Exception as e: log_structured_entry( @@ -220,7 +217,7 @@ class GoogleCloudVectorSearch(BaseGoogleCloudClient): { "error": str(e), "error_type": type(e).__name__, - "deployed_index_id": deployed_index_id - } + "deployed_index_id": deployed_index_id, + }, ) raise diff --git a/src/knowledge_search_mcp/config.py b/src/knowledge_search_mcp/config.py index 2fa348b..d46d92d 100644 --- a/src/knowledge_search_mcp/config.py +++ b/src/knowledge_search_mcp/config.py @@ -1,7 +1,14 @@ +"""Configuration management for the MCP server.""" + +import argparse import os import sys -import argparse -from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource + +from pydantic_settings import ( + BaseSettings, + PydanticBaseSettingsSource, + YamlConfigSettingsSource, +) def _parse_args() -> argparse.Namespace: @@ -14,7 +21,7 @@ def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() return argparse.Namespace( transport="stdio", - host="0.0.0.0", + host="0.0.0.0", # noqa: S104 port=8080, config=os.environ.get("CONFIG_FILE", "config.yaml"), ) @@ -25,7 +32,7 @@ def _parse_args() -> argparse.Namespace: choices=["stdio", "sse", "streamable-http"], default="stdio", ) - parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--host", default="0.0.0.0") # noqa: S104 parser.add_argument("--port", type=int, default=8080) parser.add_argument( "--config", @@ -36,6 +43,7 @@ def _parse_args() -> argparse.Namespace: _args = _parse_args() + class Settings(BaseSettings): """Server configuration populated from env vars and a YAML config file.""" @@ -63,6 +71,7 @@ class Settings(BaseSettings): dotenv_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource, ) -> tuple[PydanticBaseSettingsSource, ...]: + """Customize the order of settings sources to include YAML config.""" return ( init_settings, env_settings, @@ -78,7 +87,7 @@ _cfg: Settings | None = None def get_config() -> Settings: """Get or create the singleton Settings instance.""" - global _cfg + global _cfg # noqa: PLW0603 if _cfg is None: _cfg = Settings.model_validate({}) return _cfg @@ -88,8 +97,8 @@ def get_config() -> Settings: class _ConfigProxy: """Proxy object that lazily loads config on attribute access.""" - def __getattr__(self, name: str): + def __getattr__(self, name: str) -> object: return getattr(get_config(), name) -cfg = _ConfigProxy() # type: ignore[assignment] +cfg = _ConfigProxy() diff --git a/src/knowledge_search_mcp/logging.py b/src/knowledge_search_mcp/logging.py index 6fef29b..a54be5f 100644 --- a/src/knowledge_search_mcp/logging.py +++ b/src/knowledge_search_mcp/logging.py @@ -1,23 +1,22 @@ -""" -Centralized Cloud Logging setup. -Uses CloudLoggingHandler (background thread) so logging does not add latency +"""Centralized Cloud Logging setup. + +Uses CloudLoggingHandler (background thread) so logging does not add latency. """ import logging -from typing import Optional, Dict, Literal +from typing import Literal import google.cloud.logging from google.cloud.logging.handlers import CloudLoggingHandler from .config import get_config - _eval_log: logging.Logger | None = None def _get_logger() -> logging.Logger: """Get or create the singleton evaluation logger.""" - global _eval_log + global _eval_log # noqa: PLW0603 if _eval_log is not None: return _eval_log @@ -33,7 +32,7 @@ def _get_logger() -> logging.Logger: handler = CloudLoggingHandler(client, name=cfg.log_name) # async transport logger.addHandler(handler) logger.setLevel(getattr(logging, cfg.log_level.upper())) - except Exception as e: + except Exception as e: # noqa: BLE001 # Fallback to console if Cloud Logging is unavailable (local dev) logging.basicConfig(level=getattr(logging, cfg.log_level.upper())) logger = logging.getLogger(cfg.log_name) @@ -46,15 +45,23 @@ def _get_logger() -> logging.Logger: return logger -def log_structured_entry(message: str, severity: Literal["INFO", "WARNING", "ERROR"], custom_log: Optional[Dict] = None) -> None: - """ - Emit a JSON-structured log row. +def log_structured_entry( + message: str, + severity: Literal["INFO", "WARNING", "ERROR"], + custom_log: dict | None = None, +) -> None: + """Emit a JSON-structured log row. Args: message: Short label for the row (e.g., "Final agent turn"). severity: "INFO" | "WARNING" | "ERROR" custom_log: A dict with your structured payload. + """ level = getattr(logging, severity.upper(), logging.INFO) logger = _get_logger() - logger.log(level, message, extra={"json_fields": {"message": message, "custom": custom_log or {}}}) + logger.log( + level, + message, + extra={"json_fields": {"message": message, "custom": custom_log or {}}}, + ) diff --git a/src/knowledge_search_mcp/models.py b/src/knowledge_search_mcp/models.py index 37e412e..b47176f 100644 --- a/src/knowledge_search_mcp/models.py +++ b/src/knowledge_search_mcp/models.py @@ -1,8 +1,7 @@ -# ruff: noqa: INP001 """Domain models for knowledge search MCP server.""" from dataclasses import dataclass -from enum import Enum +from enum import StrEnum from typing import TYPE_CHECKING, TypedDict if TYPE_CHECKING: @@ -12,7 +11,7 @@ if TYPE_CHECKING: from .config import Settings -class SourceNamespace(str, Enum): +class SourceNamespace(StrEnum): """Allowed values for the 'source' namespace filter.""" EDUCACION_FINANCIERA = "Educacion Financiera" diff --git a/src/knowledge_search_mcp/server.py b/src/knowledge_search_mcp/server.py index ca7591e..8f15b78 100644 --- a/src/knowledge_search_mcp/server.py +++ b/src/knowledge_search_mcp/server.py @@ -1,4 +1,3 @@ -# ruff: noqa: INP001 """MCP server lifecycle management.""" from collections.abc import AsyncIterator @@ -8,12 +7,12 @@ from google import genai from mcp.server.fastmcp import FastMCP from .clients.vector_search import GoogleCloudVectorSearch -from .config import Settings, cfg +from .config import get_config from .logging import log_structured_entry from .models import AppContext from .services.validation import ( - validate_genai_access, validate_gcs_access, + validate_genai_access, validate_vector_search_access, ) @@ -21,15 +20,18 @@ from .services.validation import ( @asynccontextmanager async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: """Create and configure the vector-search client for the server lifetime.""" + # Get config with proper types for initialization + config_for_init = get_config() + log_structured_entry( "Initializing MCP server", "INFO", { - "project_id": cfg.project_id, - "location": cfg.location, - "bucket": cfg.bucket, - "index_name": cfg.index_name, - } + "project_id": config_for_init.project_id, + "location": config_for_init.location, + "bucket": config_for_init.bucket, + "index_name": config_for_init.index_name, + }, ) vs: GoogleCloudVectorSearch | None = None @@ -37,10 +39,10 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: # Initialize vector search client log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO") vs = GoogleCloudVectorSearch( - project_id=cfg.project_id, - location=cfg.location, - bucket=cfg.bucket, - index_name=cfg.index_name, + project_id=config_for_init.project_id, + location=config_for_init.location, + bucket=config_for_init.bucket, + index_name=config_for_init.index_name, ) # Configure endpoint @@ -48,25 +50,28 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: "Configuring index endpoint", "INFO", { - "endpoint_name": cfg.endpoint_name, - "endpoint_domain": cfg.endpoint_domain, - } + "endpoint_name": config_for_init.endpoint_name, + "endpoint_domain": config_for_init.endpoint_domain, + }, ) vs.configure_index_endpoint( - name=cfg.endpoint_name, - public_domain=cfg.endpoint_domain, + name=config_for_init.endpoint_name, + public_domain=config_for_init.endpoint_domain, ) # Initialize GenAI client log_structured_entry( "Creating GenAI client", "INFO", - {"project_id": cfg.project_id, "location": cfg.location} + { + "project_id": config_for_init.project_id, + "location": config_for_init.location, + }, ) genai_client = genai.Client( vertexai=True, - project=cfg.project_id, - location=cfg.location, + project=config_for_init.project_id, + location=config_for_init.location, ) # Validate credentials and configuration by testing actual resources @@ -76,32 +81,41 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: validation_errors = [] # Run all validations - genai_error = await validate_genai_access(genai_client, cfg) + config = get_config() + genai_error = await validate_genai_access(genai_client, config) if genai_error: validation_errors.append(genai_error) - gcs_error = await validate_gcs_access(vs, cfg) + gcs_error = await validate_gcs_access(vs, config) if gcs_error: validation_errors.append(gcs_error) - vs_error = await validate_vector_search_access(vs, cfg) + vs_error = await validate_vector_search_access(vs, config) if vs_error: validation_errors.append(vs_error) # Summary of validations if validation_errors: log_structured_entry( - "MCP server started with validation errors - service may not work correctly", + ( + "MCP server started with validation errors - " + "service may not work correctly" + ), "WARNING", - {"validation_errors": validation_errors, "error_count": len(validation_errors)} + { + "validation_errors": validation_errors, + "error_count": len(validation_errors), + }, ) else: - log_structured_entry("All validations passed - MCP server initialization complete", "INFO") + log_structured_entry( + "All validations passed - MCP server initialization complete", "INFO" + ) yield AppContext( vector_search=vs, genai_client=genai_client, - settings=cfg, + settings=config, ) except Exception as e: @@ -111,7 +125,7 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: { "error": str(e), "error_type": type(e).__name__, - } + }, ) raise finally: @@ -121,9 +135,9 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: try: await vs.close() log_structured_entry("Closed aiohttp sessions", "INFO") - except Exception as e: + except Exception as e: # noqa: BLE001 log_structured_entry( "Error closing aiohttp sessions", "WARNING", - {"error": str(e), "error_type": type(e).__name__} + {"error": str(e), "error_type": type(e).__name__}, ) diff --git a/src/knowledge_search_mcp/services/__init__.py b/src/knowledge_search_mcp/services/__init__.py index 6ea8345..4199edc 100644 --- a/src/knowledge_search_mcp/services/__init__.py +++ b/src/knowledge_search_mcp/services/__init__.py @@ -1,13 +1,21 @@ """Service modules for business logic.""" -from .search import filter_search_results, format_search_results, generate_query_embedding -from .validation import validate_genai_access, validate_gcs_access, validate_vector_search_access +from .search import ( + filter_search_results, + format_search_results, + generate_query_embedding, +) +from .validation import ( + validate_gcs_access, + validate_genai_access, + validate_vector_search_access, +) __all__ = [ "filter_search_results", "format_search_results", "generate_query_embedding", - "validate_genai_access", "validate_gcs_access", + "validate_genai_access", "validate_vector_search_access", ] diff --git a/src/knowledge_search_mcp/services/search.py b/src/knowledge_search_mcp/services/search.py index b33dd9e..8f55eeb 100644 --- a/src/knowledge_search_mcp/services/search.py +++ b/src/knowledge_search_mcp/services/search.py @@ -1,11 +1,10 @@ -# ruff: noqa: INP001 """Search helper functions.""" from google import genai from google.genai import types as genai_types -from ..logging import log_structured_entry -from ..models import SearchResult +from knowledge_search_mcp.logging import log_structured_entry +from knowledge_search_mcp.models import SearchResult async def generate_query_embedding( @@ -17,6 +16,7 @@ async def generate_query_embedding( Returns: Tuple of (embedding vector, error message). Error message is None on success. + """ if not query or not query.strip(): return ([], "Error: Query cannot be empty") @@ -30,9 +30,11 @@ async def generate_query_embedding( task_type="RETRIEVAL_QUERY", ), ) + if not response.embeddings or not response.embeddings[0].values: + return ([], "Error: Failed to generate embedding - empty response") embedding = response.embeddings[0].values - return (embedding, None) - except Exception as e: + return (embedding, None) # noqa: TRY300 + except Exception as e: # noqa: BLE001 error_type = type(e).__name__ error_msg = str(e) @@ -41,24 +43,15 @@ async def generate_query_embedding( log_structured_entry( "Rate limit exceeded while generating embedding", "WARNING", - { - "error": error_msg, - "error_type": error_type, - "query": query[:100] - } + {"error": error_msg, "error_type": error_type, "query": query[:100]}, ) return ([], "Error: API rate limit exceeded. Please try again later.") - else: - log_structured_entry( - "Failed to generate query embedding", - "ERROR", - { - "error": error_msg, - "error_type": error_type, - "query": query[:100] - } - ) - return ([], f"Error generating embedding: {error_msg}") + log_structured_entry( + "Failed to generate query embedding", + "ERROR", + {"error": error_msg, "error_type": error_type, "query": query[:100]}, + ) + return ([], f"Error generating embedding: {error_msg}") def filter_search_results( @@ -75,6 +68,7 @@ def filter_search_results( Returns: Filtered list of search results. + """ if not results: return [] @@ -82,14 +76,10 @@ def filter_search_results( max_sim = max(r["distance"] for r in results) cutoff = max_sim * top_percent - filtered = [ - s - for s in results - if s["distance"] > cutoff and s["distance"] > min_similarity + return [ + s for s in results if s["distance"] > cutoff and s["distance"] > min_similarity ] - return filtered - def format_search_results(results: list[SearchResult]) -> str: """Format search results as XML-like documents. @@ -99,6 +89,7 @@ def format_search_results(results: list[SearchResult]) -> str: Returns: Formatted string with document tags. + """ if not results: return "No relevant documents found for your query." diff --git a/src/knowledge_search_mcp/services/validation.py b/src/knowledge_search_mcp/services/validation.py index 23cbb7f..e7651e0 100644 --- a/src/knowledge_search_mcp/services/validation.py +++ b/src/knowledge_search_mcp/services/validation.py @@ -1,20 +1,26 @@ -# ruff: noqa: INP001 """Validation functions for Google Cloud services.""" from gcloud.aio.auth import Token from google import genai from google.genai import types as genai_types -from ..clients.vector_search import GoogleCloudVectorSearch -from ..config import Settings -from ..logging import log_structured_entry +from knowledge_search_mcp.clients.vector_search import GoogleCloudVectorSearch +from knowledge_search_mcp.config import Settings +from knowledge_search_mcp.logging import log_structured_entry + +# HTTP status codes +HTTP_FORBIDDEN = 403 +HTTP_NOT_FOUND = 404 -async def validate_genai_access(genai_client: genai.Client, cfg: Settings) -> str | None: +async def validate_genai_access( + genai_client: genai.Client, cfg: Settings +) -> str | None: """Validate GenAI embedding access. Returns: Error message if validation fails, None if successful. + """ log_structured_entry("Validating GenAI embedding access", "INFO") try: @@ -30,20 +36,26 @@ async def validate_genai_access(genai_client: genai.Client, cfg: Settings) -> st log_structured_entry( "GenAI embedding validation successful", "INFO", - {"embedding_dimension": len(embedding_values) if embedding_values else 0} + { + "embedding_dimension": len(embedding_values) + if embedding_values + else 0 + }, ) return None - else: - msg = "Embedding validation returned empty response" - log_structured_entry(msg, "WARNING") - return msg - except Exception as e: + msg = "Embedding validation returned empty response" + log_structured_entry(msg, "WARNING") + return msg # noqa: TRY300 + except Exception as e: # noqa: BLE001 log_structured_entry( - "Failed to validate GenAI embedding access - service may not work correctly", + ( + "Failed to validate GenAI embedding access - " + "service may not work correctly" + ), "WARNING", - {"error": str(e), "error_type": type(e).__name__} + {"error": str(e), "error_type": type(e).__name__}, ) - return f"GenAI: {str(e)}" + return f"GenAI: {e!s}" async def validate_gcs_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str | None: @@ -51,14 +63,11 @@ async def validate_gcs_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str Returns: Error message if validation fails, None if successful. + """ - log_structured_entry( - "Validating GCS bucket access", - "INFO", - {"bucket": cfg.bucket} - ) + log_structured_entry("Validating GCS bucket access", "INFO", {"bucket": cfg.bucket}) try: - session = vs.storage._get_aio_session() + session = vs.storage._get_aio_session() # noqa: SLF001 token_obj = Token( session=session, scopes=["https://www.googleapis.com/auth/cloud-platform"], @@ -70,102 +79,136 @@ async def validate_gcs_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1", headers=headers, ) as response: - if response.status == 403: + if response.status == HTTP_FORBIDDEN: msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions." log_structured_entry( - "GCS bucket validation failed - access denied - service may not work correctly", + ( + "GCS bucket validation failed - access denied - " + "service may not work correctly" + ), "WARNING", - {"bucket": cfg.bucket, "status": response.status} + {"bucket": cfg.bucket, "status": response.status}, ) return msg - elif response.status == 404: + if response.status == HTTP_NOT_FOUND: msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project." log_structured_entry( - "GCS bucket validation failed - not found - service may not work correctly", + ( + "GCS bucket validation failed - not found - " + "service may not work correctly" + ), "WARNING", - {"bucket": cfg.bucket, "status": response.status} + {"bucket": cfg.bucket, "status": response.status}, ) return msg - elif not response.ok: + if not response.ok: body = await response.text() msg = f"Failed to access bucket '{cfg.bucket}': {response.status}" log_structured_entry( "GCS bucket validation failed - service may not work correctly", "WARNING", - {"bucket": cfg.bucket, "status": response.status, "response": body} + {"bucket": cfg.bucket, "status": response.status, "response": body}, ) return msg - else: - log_structured_entry( - "GCS bucket validation successful", - "INFO", - {"bucket": cfg.bucket} - ) - return None - except Exception as e: + log_structured_entry( + "GCS bucket validation successful", "INFO", {"bucket": cfg.bucket} + ) + return None + except Exception as e: # noqa: BLE001 log_structured_entry( "Failed to validate GCS bucket access - service may not work correctly", "WARNING", - {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket} + {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}, ) - return f"GCS: {str(e)}" + return f"GCS: {e!s}" -async def validate_vector_search_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str | None: +async def validate_vector_search_access( + vs: GoogleCloudVectorSearch, cfg: Settings +) -> str | None: """Validate vector search endpoint access. Returns: Error message if validation fails, None if successful. + """ log_structured_entry( "Validating vector search endpoint access", "INFO", - {"endpoint_name": cfg.endpoint_name} + {"endpoint_name": cfg.endpoint_name}, ) try: - headers = await vs._async_get_auth_headers() - session = vs._get_aio_session() + headers = await vs._async_get_auth_headers() # noqa: SLF001 + session = vs._get_aio_session() # noqa: SLF001 endpoint_url = ( f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}" ) async with session.get(endpoint_url, headers=headers) as response: - if response.status == 403: - msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions." + if response.status == HTTP_FORBIDDEN: + msg = ( + f"Access denied to endpoint '{cfg.endpoint_name}'. " + "Check permissions." + ) log_structured_entry( - "Vector search endpoint validation failed - access denied - service may not work correctly", + ( + "Vector search endpoint validation failed - " + "access denied - service may not work correctly" + ), "WARNING", - {"endpoint": cfg.endpoint_name, "status": response.status} + {"endpoint": cfg.endpoint_name, "status": response.status}, ) return msg - elif response.status == 404: - msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project." + if response.status == HTTP_NOT_FOUND: + msg = ( + f"Endpoint '{cfg.endpoint_name}' not found. " + "Check endpoint name and project." + ) log_structured_entry( - "Vector search endpoint validation failed - not found - service may not work correctly", + ( + "Vector search endpoint validation failed - " + "not found - service may not work correctly" + ), "WARNING", - {"endpoint": cfg.endpoint_name, "status": response.status} + {"endpoint": cfg.endpoint_name, "status": response.status}, ) return msg - elif not response.ok: + if not response.ok: body = await response.text() - msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}" + msg = ( + f"Failed to access endpoint '{cfg.endpoint_name}': " + f"{response.status}" + ) log_structured_entry( - "Vector search endpoint validation failed - service may not work correctly", + ( + "Vector search endpoint validation failed - " + "service may not work correctly" + ), "WARNING", - {"endpoint": cfg.endpoint_name, "status": response.status, "response": body} + { + "endpoint": cfg.endpoint_name, + "status": response.status, + "response": body, + }, ) return msg - else: - log_structured_entry( - "Vector search endpoint validation successful", - "INFO", - {"endpoint": cfg.endpoint_name} - ) - return None - except Exception as e: + log_structured_entry( + "Vector search endpoint validation successful", + "INFO", + {"endpoint": cfg.endpoint_name}, + ) + return None + except Exception as e: # noqa: BLE001 log_structured_entry( - "Failed to validate vector search endpoint access - service may not work correctly", + ( + "Failed to validate vector search endpoint access - " + "service may not work correctly" + ), "WARNING", - {"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name} + { + "error": str(e), + "error_type": type(e).__name__, + "endpoint": cfg.endpoint_name, + }, ) - return f"Vector Search: {str(e)}" + return f"Vector Search: {e!s}" diff --git a/src/knowledge_search_mcp/utils/cache.py b/src/knowledge_search_mcp/utils/cache.py index 2235f66..3e3b662 100644 --- a/src/knowledge_search_mcp/utils/cache.py +++ b/src/knowledge_search_mcp/utils/cache.py @@ -1,4 +1,3 @@ -# ruff: noqa: INP001 """LRU cache implementation.""" from collections import OrderedDict