diff --git a/REFACTORING_SUMMARY.md b/REFACTORING_SUMMARY.md
new file mode 100644
index 0000000..a14fa46
--- /dev/null
+++ b/REFACTORING_SUMMARY.md
@@ -0,0 +1,136 @@
+# Refactoring Summary
+
+## High-ROI Refactorings Completed
+
+### 1. Eliminated Code Duplication - Session Management ✅
+
+**Problem**: The `_get_aio_session()` method was duplicated identically in both `GoogleCloudFileStorage` and `GoogleCloudVectorSearch` classes.
+
+**Solution**:
+- Created a new `BaseGoogleCloudClient` base class that encapsulates shared session management logic
+- Both `GoogleCloudFileStorage` and `GoogleCloudVectorSearch` now inherit from this base class
+- Added a `close()` method to properly clean up resources
+
+**Files Changed**:
+- `src/knowledge_search_mcp/main.py:25-80` - Added base class
+- `src/knowledge_search_mcp/main.py:83` - GoogleCloudFileStorage inherits from base
+- `src/knowledge_search_mcp/main.py:219` - GoogleCloudVectorSearch inherits from base
+
+**Impact**: Reduced ~24 lines of duplicated code, improved maintainability
+
+---
+
+### 2. Fixed Resource Cleanup ✅
+
+**Problem**: aiohttp sessions were never explicitly closed, leading to potential resource leaks and warnings.
+
+**Solution**:
+- Added `close()` method to `BaseGoogleCloudClient` to properly close aiohttp sessions
+- Extended `close()` in `GoogleCloudVectorSearch` to also close the storage client's session
+- Modified `lifespan()` function's finally block to call `vs.close()` on shutdown
+
+**Files Changed**:
+- `src/knowledge_search_mcp/main.py:74-78` - Base close method
+- `src/knowledge_search_mcp/main.py:228-231` - VectorSearch close override
+- `src/knowledge_search_mcp/main.py:699-707` - Cleanup in lifespan finally block
+
+**Impact**: Prevents resource leaks, eliminates aiohttp warnings on shutdown
+
+---
+
+### 3. Implemented LRU Cache with Size Limits ✅
+
+**Problem**: The `_cache` dictionary in `GoogleCloudFileStorage` grew indefinitely, potentially causing memory issues with large document sets.
+
+**Solution**:
+- Created a new `LRUCache` class with configurable max size (default: 100 items)
+- Automatically evicts least recently used items when cache is full
+- Maintains insertion order and tracks access patterns
+
+**Files Changed**:
+- `src/knowledge_search_mcp/main.py:28-58` - New LRUCache class
+- `src/knowledge_search_mcp/main.py:85-87` - Updated GoogleCloudFileStorage to use LRUCache
+- `src/knowledge_search_mcp/main.py:115-122` - Updated cache access patterns
+- `src/knowledge_search_mcp/main.py:147-148` - Updated cache write patterns
+- `tests/test_search.py` - Updated tests to work with LRUCache interface
+
+**Impact**: Bounded memory usage, prevents cache from growing indefinitely
+
+---
+
+### 4. Broke Down Large Functions ✅
+
+#### a. Extracted Validation Functions from `lifespan()`
+
+**Problem**: The `lifespan()` function was 225 lines with repetitive validation logic.
+
+**Solution**: Extracted three helper functions:
+- `_validate_genai_access()` - Validates GenAI embedding API access
+- `_validate_gcs_access()` - Validates GCS bucket access
+- `_validate_vector_search_access()` - Validates vector search endpoint access
+
+**Files Changed**:
+- `src/knowledge_search_mcp/main.py:424-587` - New validation functions
+- `src/knowledge_search_mcp/main.py:644-693` - Simplified lifespan function
+
+**Impact**: Reduced lifespan() from 225 to ~65 lines, improved readability and testability
+
+#### b. Extracted Helper Functions from `knowledge_search()`
+
+**Problem**: The `knowledge_search()` function was 149 lines mixing multiple concerns.
+
+**Solution**: Extracted three helper functions:
+- `_generate_query_embedding()` - Handles embedding generation with error handling
+- `_filter_search_results()` - Applies similarity thresholds and filtering
+- `_format_search_results()` - Formats results as XML-like documents
+
+**Files Changed**:
+- `src/knowledge_search_mcp/main.py:717-766` - _generate_query_embedding
+- `src/knowledge_search_mcp/main.py:769-793` - _filter_search_results
+- `src/knowledge_search_mcp/main.py:796-810` - _format_search_results
+- `src/knowledge_search_mcp/main.py:814-876` - Simplified knowledge_search function
+
+**Impact**: Reduced knowledge_search() from 149 to ~63 lines, improved testability, added input validation for empty queries
+
+---
+
+## Additional Improvements
+
+### Input Validation
+- Added validation for empty/whitespace-only queries in `_generate_query_embedding()`
+
+### Code Organization
+- Moved `import time` from inline to module-level imports
+
+### Test Updates
+- Updated all tests to work with the new LRUCache interface
+- All 11 tests passing
+
+---
+
+## Metrics
+
+| Metric | Before | After | Change |
+|--------|--------|-------|--------|
+| Total lines (main.py) | 809 | 876 | +67 (more modular code) |
+| Longest function | 225 lines | 65 lines | -71% |
+| Code duplication instances | 2 major | 0 | -100% |
+| Resource leaks | Yes | No | Fixed |
+| Cache memory bound | No | Yes (100 items) | Fixed |
+| Test coverage | 11 tests | 11 tests | Maintained |
+
+---
+
+## What's Left for Future Work
+
+### Medium Priority (Not Done)
+- Move magic numbers to Settings configuration
+- Update outdated DockerfileConnector
+- Review and adjust logging levels
+- Add dependency injection for tighter coupling issues
+
+### Lower Priority (Not Done)
+- Add integration tests for end-to-end flows
+- Add performance tests
+- Introduce abstraction layers for cloud services
+- Standardize on f-strings (one %-format remaining)
diff --git a/src/knowledge_search_mcp/main.py b/src/knowledge_search_mcp/main.py
index e061cf1..ccb3a05 100644
--- a/src/knowledge_search_mcp/main.py
+++ b/src/knowledge_search_mcp/main.py
@@ -3,6 +3,8 @@
import asyncio
import io
+import time
+from collections import OrderedDict
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from dataclasses import dataclass
@@ -23,25 +25,44 @@ HTTP_TOO_MANY_REQUESTS = 429
HTTP_SERVER_ERROR = 500
-class SourceNamespace(str, Enum):
- """Allowed values for the 'source' namespace filter."""
+class LRUCache:
+ """Simple LRU cache with size limit."""
- EDUCACION_FINANCIERA = "Educacion Financiera"
- PRODUCTOS_Y_SERVICIOS = "Productos y Servicios"
- FUNCIONALIDADES_APP_MOVIL = "Funcionalidades de la App Movil"
+ def __init__(self, max_size: int = 100) -> None:
+ """Initialize cache with maximum size."""
+ self.cache: OrderedDict[str, bytes] = OrderedDict()
+ self.max_size = max_size
+
+ def get(self, key: str) -> bytes | None:
+ """Get item from cache, returning None if not found."""
+ if key not in self.cache:
+ return None
+ # Move to end to mark as recently used
+ self.cache.move_to_end(key)
+ return self.cache[key]
+
+ def put(self, key: str, value: bytes) -> None:
+ """Put item in cache, evicting oldest if at capacity."""
+ if key in self.cache:
+ self.cache.move_to_end(key)
+ self.cache[key] = value
+ if len(self.cache) > self.max_size:
+ self.cache.popitem(last=False)
+
+ def __contains__(self, key: str) -> bool:
+ """Check if key exists in cache."""
+ return key in self.cache
-class GoogleCloudFileStorage:
- """Cache-aware helper for downloading files from Google Cloud Storage."""
+class BaseGoogleCloudClient:
+ """Base class with shared aiohttp session management."""
- def __init__(self, bucket: str) -> None:
- """Initialize the storage helper."""
- self.bucket_name = bucket
+ def __init__(self) -> None:
+ """Initialize session tracking."""
self._aio_session: aiohttp.ClientSession | None = None
- self._aio_storage: Storage | None = None
- self._cache: dict[str, bytes] = {}
def _get_aio_session(self) -> aiohttp.ClientSession:
+ """Get or create aiohttp session with connection pooling."""
if self._aio_session is None or self._aio_session.closed:
connector = aiohttp.TCPConnector(
limit=300,
@@ -54,6 +75,30 @@ class GoogleCloudFileStorage:
)
return self._aio_session
+ async def close(self) -> None:
+ """Close aiohttp session if open."""
+ if self._aio_session and not self._aio_session.closed:
+ await self._aio_session.close()
+
+
+class SourceNamespace(str, Enum):
+ """Allowed values for the 'source' namespace filter."""
+
+ EDUCACION_FINANCIERA = "Educacion Financiera"
+ PRODUCTOS_Y_SERVICIOS = "Productos y Servicios"
+ FUNCIONALIDADES_APP_MOVIL = "Funcionalidades de la App Movil"
+
+
+class GoogleCloudFileStorage(BaseGoogleCloudClient):
+ """Cache-aware helper for downloading files from Google Cloud Storage."""
+
+ def __init__(self, bucket: str, cache_size: int = 100) -> None:
+ """Initialize the storage helper with LRU cache."""
+ super().__init__()
+ self.bucket_name = bucket
+ self._aio_storage: Storage | None = None
+ self._cache = LRUCache(max_size=cache_size)
+
def _get_aio_storage(self) -> Storage:
if self._aio_storage is None:
self._aio_storage = Storage(
@@ -79,13 +124,14 @@ class GoogleCloudFileStorage:
TimeoutError: If all retry attempts fail.
"""
- if file_name in self._cache:
+ cached_content = self._cache.get(file_name)
+ if cached_content is not None:
log_structured_entry(
"File retrieved from cache",
"INFO",
{"file": file_name, "bucket": self.bucket_name}
)
- file_stream = io.BytesIO(self._cache[file_name])
+ file_stream = io.BytesIO(cached_content)
file_stream.name = file_name
return file_stream
@@ -100,11 +146,12 @@ class GoogleCloudFileStorage:
for attempt in range(max_retries):
try:
- self._cache[file_name] = await storage_client.download(
+ content = await storage_client.download(
self.bucket_name,
file_name,
)
- file_stream = io.BytesIO(self._cache[file_name])
+ self._cache.put(file_name, content)
+ file_stream = io.BytesIO(content)
file_stream.name = file_name
log_structured_entry(
"File downloaded successfully",
@@ -112,7 +159,7 @@ class GoogleCloudFileStorage:
{
"file": file_name,
"bucket": self.bucket_name,
- "size_bytes": len(self._cache[file_name]),
+ "size_bytes": len(content),
"attempt": attempt + 1
}
)
@@ -178,7 +225,7 @@ class SearchResult(TypedDict):
content: str
-class GoogleCloudVectorSearch:
+class GoogleCloudVectorSearch(BaseGoogleCloudClient):
"""Minimal async client for the Vertex AI Matching Engine REST API."""
def __init__(
@@ -189,11 +236,11 @@ class GoogleCloudVectorSearch:
index_name: str | None = None,
) -> None:
"""Store configuration used to issue Matching Engine queries."""
+ super().__init__()
self.project_id = project_id
self.location = location
self.storage = GoogleCloudFileStorage(bucket=bucket)
self.index_name = index_name
- self._aio_session: aiohttp.ClientSession | None = None
self._async_token: Token | None = None
self._endpoint_domain: str | None = None
self._endpoint_name: str | None = None
@@ -212,18 +259,10 @@ class GoogleCloudVectorSearch:
"Content-Type": "application/json",
}
- def _get_aio_session(self) -> aiohttp.ClientSession:
- if self._aio_session is None or self._aio_session.closed:
- connector = aiohttp.TCPConnector(
- limit=300,
- limit_per_host=50,
- )
- timeout = aiohttp.ClientTimeout(total=60)
- self._aio_session = aiohttp.ClientSession(
- timeout=timeout,
- connector=connector,
- )
- return self._aio_session
+ async def close(self) -> None:
+ """Close aiohttp sessions for both vector search and storage."""
+ await super().close()
+ await self.storage.close()
def configure_index_endpoint(
self,
@@ -414,6 +453,167 @@ class AppContext:
settings: Settings
+async def _validate_genai_access(genai_client: genai.Client, cfg: Settings) -> str | None:
+ """Validate GenAI embedding access.
+
+ Returns:
+ Error message if validation fails, None if successful.
+ """
+ log_structured_entry("Validating GenAI embedding access", "INFO")
+ try:
+ test_response = await genai_client.aio.models.embed_content(
+ model=cfg.embedding_model,
+ contents="test",
+ config=genai_types.EmbedContentConfig(
+ task_type="RETRIEVAL_QUERY",
+ ),
+ )
+ if test_response and test_response.embeddings:
+ embedding_values = test_response.embeddings[0].values
+ log_structured_entry(
+ "GenAI embedding validation successful",
+ "INFO",
+ {"embedding_dimension": len(embedding_values) if embedding_values else 0}
+ )
+ return None
+ else:
+ msg = "Embedding validation returned empty response"
+ log_structured_entry(msg, "WARNING")
+ return msg
+ except Exception as e:
+ log_structured_entry(
+ "Failed to validate GenAI embedding access - service may not work correctly",
+ "WARNING",
+ {"error": str(e), "error_type": type(e).__name__}
+ )
+ return f"GenAI: {str(e)}"
+
+
+async def _validate_gcs_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str | None:
+ """Validate GCS bucket access.
+
+ Returns:
+ Error message if validation fails, None if successful.
+ """
+ log_structured_entry(
+ "Validating GCS bucket access",
+ "INFO",
+ {"bucket": cfg.bucket}
+ )
+ try:
+ session = vs.storage._get_aio_session()
+ token_obj = Token(
+ session=session,
+ scopes=["https://www.googleapis.com/auth/cloud-platform"],
+ )
+ access_token = await token_obj.get()
+ headers = {"Authorization": f"Bearer {access_token}"}
+
+ async with session.get(
+ f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
+ headers=headers,
+ ) as response:
+ if response.status == 403:
+ msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
+ log_structured_entry(
+ "GCS bucket validation failed - access denied - service may not work correctly",
+ "WARNING",
+ {"bucket": cfg.bucket, "status": response.status}
+ )
+ return msg
+ elif response.status == 404:
+ msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
+ log_structured_entry(
+ "GCS bucket validation failed - not found - service may not work correctly",
+ "WARNING",
+ {"bucket": cfg.bucket, "status": response.status}
+ )
+ return msg
+ elif not response.ok:
+ body = await response.text()
+ msg = f"Failed to access bucket '{cfg.bucket}': {response.status}"
+ log_structured_entry(
+ "GCS bucket validation failed - service may not work correctly",
+ "WARNING",
+ {"bucket": cfg.bucket, "status": response.status, "response": body}
+ )
+ return msg
+ else:
+ log_structured_entry(
+ "GCS bucket validation successful",
+ "INFO",
+ {"bucket": cfg.bucket}
+ )
+ return None
+ except Exception as e:
+ log_structured_entry(
+ "Failed to validate GCS bucket access - service may not work correctly",
+ "WARNING",
+ {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}
+ )
+ return f"GCS: {str(e)}"
+
+
+async def _validate_vector_search_access(vs: GoogleCloudVectorSearch, cfg: Settings) -> str | None:
+ """Validate vector search endpoint access.
+
+ Returns:
+ Error message if validation fails, None if successful.
+ """
+ log_structured_entry(
+ "Validating vector search endpoint access",
+ "INFO",
+ {"endpoint_name": cfg.endpoint_name}
+ )
+ try:
+ headers = await vs._async_get_auth_headers()
+ session = vs._get_aio_session()
+ endpoint_url = (
+ f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
+ )
+
+ async with session.get(endpoint_url, headers=headers) as response:
+ if response.status == 403:
+ msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions."
+ log_structured_entry(
+ "Vector search endpoint validation failed - access denied - service may not work correctly",
+ "WARNING",
+ {"endpoint": cfg.endpoint_name, "status": response.status}
+ )
+ return msg
+ elif response.status == 404:
+ msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project."
+ log_structured_entry(
+ "Vector search endpoint validation failed - not found - service may not work correctly",
+ "WARNING",
+ {"endpoint": cfg.endpoint_name, "status": response.status}
+ )
+ return msg
+ elif not response.ok:
+ body = await response.text()
+ msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}"
+ log_structured_entry(
+ "Vector search endpoint validation failed - service may not work correctly",
+ "WARNING",
+ {"endpoint": cfg.endpoint_name, "status": response.status, "response": body}
+ )
+ return msg
+ else:
+ log_structured_entry(
+ "Vector search endpoint validation successful",
+ "INFO",
+ {"endpoint": cfg.endpoint_name}
+ )
+ return None
+ except Exception as e:
+ log_structured_entry(
+ "Failed to validate vector search endpoint access - service may not work correctly",
+ "WARNING",
+ {"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name}
+ )
+ return f"Vector Search: {str(e)}"
+
+
@asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Create and configure the vector-search client for the server lifetime."""
@@ -428,6 +628,7 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
}
)
+ vs: GoogleCloudVectorSearch | None = None
try:
# Initialize vector search client
log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO")
@@ -470,146 +671,18 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
validation_errors = []
- # 1. Validate GenAI embedding access
- log_structured_entry("Validating GenAI embedding access", "INFO")
- try:
- test_response = await genai_client.aio.models.embed_content(
- model=cfg.embedding_model,
- contents="test",
- config=genai_types.EmbedContentConfig(
- task_type="RETRIEVAL_QUERY",
- ),
- )
- if test_response and test_response.embeddings:
- embedding_values = test_response.embeddings[0].values
- log_structured_entry(
- "GenAI embedding validation successful",
- "INFO",
- {"embedding_dimension": len(embedding_values) if embedding_values else 0}
- )
- else:
- msg = "Embedding validation returned empty response"
- log_structured_entry(msg, "WARNING")
- validation_errors.append(msg)
- except Exception as e:
- log_structured_entry(
- "Failed to validate GenAI embedding access - service may not work correctly",
- "WARNING",
- {"error": str(e), "error_type": type(e).__name__}
- )
- validation_errors.append(f"GenAI: {str(e)}")
+ # Run all validations
+ genai_error = await _validate_genai_access(genai_client, cfg)
+ if genai_error:
+ validation_errors.append(genai_error)
- # 2. Validate GCS bucket access
- log_structured_entry(
- "Validating GCS bucket access",
- "INFO",
- {"bucket": cfg.bucket}
- )
- try:
- session = vs.storage._get_aio_session()
- token_obj = Token(
- session=session,
- scopes=["https://www.googleapis.com/auth/cloud-platform"],
- )
- access_token = await token_obj.get()
- headers = {"Authorization": f"Bearer {access_token}"}
+ gcs_error = await _validate_gcs_access(vs, cfg)
+ if gcs_error:
+ validation_errors.append(gcs_error)
- async with session.get(
- f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
- headers=headers,
- ) as response:
- if response.status == 403:
- msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
- log_structured_entry(
- "GCS bucket validation failed - access denied - service may not work correctly",
- "WARNING",
- {"bucket": cfg.bucket, "status": response.status}
- )
- validation_errors.append(msg)
- elif response.status == 404:
- msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
- log_structured_entry(
- "GCS bucket validation failed - not found - service may not work correctly",
- "WARNING",
- {"bucket": cfg.bucket, "status": response.status}
- )
- validation_errors.append(msg)
- elif not response.ok:
- body = await response.text()
- msg = f"Failed to access bucket '{cfg.bucket}': {response.status}"
- log_structured_entry(
- "GCS bucket validation failed - service may not work correctly",
- "WARNING",
- {"bucket": cfg.bucket, "status": response.status, "response": body}
- )
- validation_errors.append(msg)
- else:
- log_structured_entry(
- "GCS bucket validation successful",
- "INFO",
- {"bucket": cfg.bucket}
- )
- except Exception as e:
- log_structured_entry(
- "Failed to validate GCS bucket access - service may not work correctly",
- "WARNING",
- {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}
- )
- validation_errors.append(f"GCS: {str(e)}")
-
- # 3. Validate vector search endpoint access
- log_structured_entry(
- "Validating vector search endpoint access",
- "INFO",
- {"endpoint_name": cfg.endpoint_name}
- )
- try:
- # Try to get endpoint info
- headers = await vs._async_get_auth_headers()
- session = vs._get_aio_session()
- endpoint_url = (
- f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
- )
-
- async with session.get(endpoint_url, headers=headers) as response:
- if response.status == 403:
- msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions."
- log_structured_entry(
- "Vector search endpoint validation failed - access denied - service may not work correctly",
- "WARNING",
- {"endpoint": cfg.endpoint_name, "status": response.status}
- )
- validation_errors.append(msg)
- elif response.status == 404:
- msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project."
- log_structured_entry(
- "Vector search endpoint validation failed - not found - service may not work correctly",
- "WARNING",
- {"endpoint": cfg.endpoint_name, "status": response.status}
- )
- validation_errors.append(msg)
- elif not response.ok:
- body = await response.text()
- msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status}"
- log_structured_entry(
- "Vector search endpoint validation failed - service may not work correctly",
- "WARNING",
- {"endpoint": cfg.endpoint_name, "status": response.status, "response": body}
- )
- validation_errors.append(msg)
- else:
- log_structured_entry(
- "Vector search endpoint validation successful",
- "INFO",
- {"endpoint": cfg.endpoint_name}
- )
- except Exception as e:
- log_structured_entry(
- "Failed to validate vector search endpoint access - service may not work correctly",
- "WARNING",
- {"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name}
- )
- validation_errors.append(f"Vector Search: {str(e)}")
+ vs_error = await _validate_vector_search_access(vs, cfg)
+ if vs_error:
+ validation_errors.append(vs_error)
# Summary of validations
if validation_errors:
@@ -639,6 +712,17 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
raise
finally:
log_structured_entry("MCP server lifespan ending", "INFO")
+ # Clean up resources
+ if vs is not None:
+ try:
+ await vs.close()
+ log_structured_entry("Closed aiohttp sessions", "INFO")
+ except Exception as e:
+ log_structured_entry(
+ "Error closing aiohttp sessions",
+ "WARNING",
+ {"error": str(e), "error_type": type(e).__name__}
+ )
mcp = FastMCP(
@@ -649,6 +733,108 @@ mcp = FastMCP(
)
+async def _generate_query_embedding(
+ genai_client: genai.Client,
+ embedding_model: str,
+ query: str,
+) -> tuple[list[float], str | None]:
+ """Generate embedding for search query.
+
+ Returns:
+ Tuple of (embedding vector, error message). Error message is None on success.
+ """
+ if not query or not query.strip():
+ return ([], "Error: Query cannot be empty")
+
+ log_structured_entry("Generating query embedding", "INFO")
+ try:
+ response = await genai_client.aio.models.embed_content(
+ model=embedding_model,
+ contents=query,
+ config=genai_types.EmbedContentConfig(
+ task_type="RETRIEVAL_QUERY",
+ ),
+ )
+ embedding = response.embeddings[0].values
+ return (embedding, None)
+ except Exception as e:
+ error_type = type(e).__name__
+ error_msg = str(e)
+
+ # Check if it's a rate limit error
+ if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
+ log_structured_entry(
+ "Rate limit exceeded while generating embedding",
+ "WARNING",
+ {
+ "error": error_msg,
+ "error_type": error_type,
+ "query": query[:100]
+ }
+ )
+ return ([], "Error: API rate limit exceeded. Please try again later.")
+ else:
+ log_structured_entry(
+ "Failed to generate query embedding",
+ "ERROR",
+ {
+ "error": error_msg,
+ "error_type": error_type,
+ "query": query[:100]
+ }
+ )
+ return ([], f"Error generating embedding: {error_msg}")
+
+
+def _filter_search_results(
+ results: list[SearchResult],
+ min_similarity: float = 0.6,
+ top_percent: float = 0.9,
+) -> list[SearchResult]:
+ """Filter search results by similarity thresholds.
+
+ Args:
+ results: Raw search results from vector search.
+ min_similarity: Minimum similarity score (distance) to include.
+ top_percent: Keep results within this percentage of the top score.
+
+ Returns:
+ Filtered list of search results.
+ """
+ if not results:
+ return []
+
+ max_sim = max(r["distance"] for r in results)
+ cutoff = max_sim * top_percent
+
+ filtered = [
+ s
+ for s in results
+ if s["distance"] > cutoff and s["distance"] > min_similarity
+ ]
+
+ return filtered
+
+
+def _format_search_results(results: list[SearchResult]) -> str:
+ """Format search results as XML-like documents.
+
+ Args:
+ results: List of search results to format.
+
+ Returns:
+ Formatted string with document tags.
+ """
+ if not results:
+ return "No relevant documents found for your query."
+
+ formatted_results = [
+ f"\n{result['content']}\n"
+ for i, result in enumerate(results, start=1)
+ ]
+ return "\n".join(formatted_results)
+
+
@mcp.tool()
async def knowledge_search(
query: str,
@@ -668,11 +854,8 @@ async def knowledge_search(
A formatted string containing matched documents with id and content.
"""
- import time
-
app: AppContext = ctx.request_context.lifespan_context
t0 = time.perf_counter()
- min_sim = 0.6
log_structured_entry(
"knowledge_search request received",
@@ -682,49 +865,20 @@ async def knowledge_search(
try:
# Generate embedding for the query
- log_structured_entry("Generating query embedding", "INFO")
- try:
- response = await app.genai_client.aio.models.embed_content(
- model=app.settings.embedding_model,
- contents=query,
- config=genai_types.EmbedContentConfig(
- task_type="RETRIEVAL_QUERY",
- ),
- )
- embedding = response.embeddings[0].values
- t_embed = time.perf_counter()
- log_structured_entry(
- "Query embedding generated successfully",
- "INFO",
- {"time_ms": round((t_embed - t0) * 1000, 1)}
- )
- except Exception as e:
- error_type = type(e).__name__
- error_msg = str(e)
+ embedding, error = await _generate_query_embedding(
+ app.genai_client,
+ app.settings.embedding_model,
+ query,
+ )
+ if error:
+ return error
- # Check if it's a rate limit error
- if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
- log_structured_entry(
- "Rate limit exceeded while generating embedding",
- "WARNING",
- {
- "error": error_msg,
- "error_type": error_type,
- "query": query[:100]
- }
- )
- return "Error: API rate limit exceeded. Please try again later."
- else:
- log_structured_entry(
- "Failed to generate query embedding",
- "ERROR",
- {
- "error": error_msg,
- "error_type": error_type,
- "query": query[:100]
- }
- )
- return f"Error generating embedding: {error_msg}"
+ t_embed = time.perf_counter()
+ log_structured_entry(
+ "Query embedding generated successfully",
+ "INFO",
+ {"time_ms": round((t_embed - t0) * 1000, 1)}
+ )
# Perform vector search
log_structured_entry("Performing vector search", "INFO")
@@ -749,14 +903,7 @@ async def knowledge_search(
return f"Error performing vector search: {str(e)}"
# Apply similarity filtering
- if search_results:
- max_sim = max(r["distance"] for r in search_results)
- cutoff = max_sim * 0.9
- search_results = [
- s
- for s in search_results
- if s["distance"] > cutoff and s["distance"] > min_sim
- ]
+ filtered_results = _filter_search_results(search_results)
log_structured_entry(
"knowledge_search completed successfully",
@@ -766,25 +913,20 @@ async def knowledge_search(
"vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms",
"total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
"source_filter": source.value if source is not None else None,
- "results_count": len(search_results),
- "chunks": [s["id"] for s in search_results]
+ "results_count": len(filtered_results),
+ "chunks": [s["id"] for s in filtered_results]
}
)
- # Format results as XML-like documents
- if not search_results:
+ # Format and return results
+ if not filtered_results:
log_structured_entry(
"No results found for query",
"INFO",
{"query": query[:100]}
)
- return "No relevant documents found for your query."
- formatted_results = [
- f"\n{result['content']}\n"
- for i, result in enumerate(search_results, start=1)
- ]
- return "\n".join(formatted_results)
+ return _format_search_results(filtered_results)
except Exception as e:
# Catch-all for any unexpected errors
diff --git a/tests/test_search.py b/tests/test_search.py
index a0801b2..ad82b72 100644
--- a/tests/test_search.py
+++ b/tests/test_search.py
@@ -8,6 +8,7 @@ import pytest
from knowledge_search_mcp.main import (
GoogleCloudFileStorage,
GoogleCloudVectorSearch,
+ LRUCache,
SourceNamespace,
)
@@ -19,14 +20,15 @@ class TestGoogleCloudFileStorage:
"""Test storage initialization."""
storage = GoogleCloudFileStorage(bucket="test-bucket")
assert storage.bucket_name == "test-bucket"
- assert storage._cache == {}
+ assert isinstance(storage._cache, LRUCache)
+ assert storage._cache.max_size == 100
@pytest.mark.asyncio
async def test_cache_hit(self):
"""Test that cached files are returned without fetching."""
storage = GoogleCloudFileStorage(bucket="test-bucket")
test_content = b"cached content"
- storage._cache["test.md"] = test_content
+ storage._cache.put("test.md", test_content)
result = await storage.async_get_file_stream("test.md")
@@ -48,7 +50,7 @@ class TestGoogleCloudFileStorage:
result = await storage.async_get_file_stream("test.md")
assert result.read() == test_content
- assert storage._cache["test.md"] == test_content
+ assert storage._cache.get("test.md") == test_content
class TestGoogleCloudVectorSearch: