From 753b5c7871e018cb325c6299fc9d08f707456b20 Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 17:23:24 +0000 Subject: [PATCH] fix: refactor correct import and add more logs --- main.py | 308 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 249 insertions(+), 59 deletions(-) diff --git a/main.py b/main.py index e99f9d8..5bb2117 100644 --- a/main.py +++ b/main.py @@ -15,7 +15,7 @@ from google import genai from google.genai import types as genai_types from mcp.server.fastmcp import Context, FastMCP -from .utils import Settings, _args, log_structured_entry +from utils import Settings, _args, log_structured_entry HTTP_TOO_MANY_REQUESTS = 429 HTTP_SERVER_ERROR = 500 @@ -70,10 +70,21 @@ class GoogleCloudFileStorage: """ if file_name in self._cache: + log_structured_entry( + "File retrieved from cache", + "INFO", + {"file": file_name, "bucket": self.bucket_name} + ) file_stream = io.BytesIO(self._cache[file_name]) file_stream.name = file_name return file_stream + log_structured_entry( + "Starting file download from GCS", + "INFO", + {"file": file_name, "bucket": self.bucket_name} + ) + storage_client = self._get_aio_storage() last_exception: Exception | None = None @@ -85,11 +96,22 @@ class GoogleCloudFileStorage: ) file_stream = io.BytesIO(self._cache[file_name]) file_stream.name = file_name + log_structured_entry( + "File downloaded successfully", + "INFO", + { + "file": file_name, + "bucket": self.bucket_name, + "size_bytes": len(self._cache[file_name]), + "attempt": attempt + 1 + } + ) except TimeoutError as exc: last_exception = exc log_structured_entry( - f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})" - "WARNING" + f"Timeout downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", + "WARNING", + {"error": str(exc)} ) except aiohttp.ClientResponseError as exc: last_exception = exc @@ -98,22 +120,43 @@ class GoogleCloudFileStorage: or exc.status >= HTTP_SERVER_ERROR ): log_structured_entry( - f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})" - "WARNING" + f"HTTP {exc.status} downloading gs://{self.bucket_name}/{file_name} (attempt {attempt + 1}/{max_retries})", + "WARNING", + {"status": exc.status, "message": str(exc)} ) else: + log_structured_entry( + f"Non-retryable HTTP error downloading gs://{self.bucket_name}/{file_name}", + "ERROR", + {"status": exc.status, "message": str(exc)} + ) raise else: return file_stream if attempt < max_retries - 1: delay = 0.5 * (2**attempt) + log_structured_entry( + "Retrying file download", + "INFO", + {"file": file_name, "delay_seconds": delay} + ) await asyncio.sleep(delay) msg = ( f"Failed to download gs://{self.bucket_name}/{file_name} " f"after {max_retries} attempts" ) + log_structured_entry( + "File download failed after all retries", + "ERROR", + { + "file": file_name, + "bucket": self.bucket_name, + "max_retries": max_retries, + "last_error": str(last_exception) + } + ) raise TimeoutError(msg) from last_exception @@ -210,7 +253,13 @@ class GoogleCloudVectorSearch: "Missing endpoint metadata. Call " "`configure_index_endpoint` before querying." ) + log_structured_entry( + "Vector search query failed - endpoint not configured", + "ERROR", + {"error": msg} + ) raise RuntimeError(msg) + domain = self._endpoint_domain endpoint_id = self._endpoint_name.split("/")[-1] url = ( @@ -218,6 +267,18 @@ class GoogleCloudVectorSearch: f"/locations/{self.location}" f"/indexEndpoints/{endpoint_id}:findNeighbors" ) + + log_structured_entry( + "Starting vector search query", + "INFO", + { + "deployed_index_id": deployed_index_id, + "neighbor_count": limit, + "endpoint_id": endpoint_id, + "embedding_dimension": len(query) + } + ) + payload = { "deployed_index_id": deployed_index_id, "queries": [ @@ -228,43 +289,98 @@ class GoogleCloudVectorSearch: ], } - headers = await self._async_get_auth_headers() - session = self._get_aio_session() - async with session.post( - url, - json=payload, - headers=headers, - ) as response: - if not response.ok: - body = await response.text() - msg = f"findNeighbors returned {response.status}: {body}" - raise RuntimeError(msg) - data = await response.json() + try: + headers = await self._async_get_auth_headers() + session = self._get_aio_session() + async with session.post( + url, + json=payload, + headers=headers, + ) as response: + if not response.ok: + body = await response.text() + msg = f"findNeighbors returned {response.status}: {body}" + log_structured_entry( + "Vector search API request failed", + "ERROR", + { + "status": response.status, + "response_body": body, + "deployed_index_id": deployed_index_id + } + ) + raise RuntimeError(msg) + data = await response.json() - neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) - content_tasks = [] - for neighbor in neighbors: - datapoint_id = neighbor["datapoint"]["datapointId"] - file_path = f"{self.index_name}/contents/{datapoint_id}.md" - content_tasks.append( - self.storage.async_get_file_stream(file_path), + neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) + log_structured_entry( + "Vector search API request successful", + "INFO", + { + "neighbors_found": len(neighbors), + "deployed_index_id": deployed_index_id + } ) - file_streams = await asyncio.gather(*content_tasks) - results: list[SearchResult] = [] - for neighbor, stream in zip( - neighbors, - file_streams, - strict=True, - ): - results.append( - SearchResult( - id=neighbor["datapoint"]["datapointId"], - distance=neighbor["distance"], - content=stream.read().decode("utf-8"), - ), + if not neighbors: + log_structured_entry( + "No neighbors found in vector search", + "WARNING", + {"deployed_index_id": deployed_index_id} + ) + return [] + + # Fetch content for all neighbors + content_tasks = [] + for neighbor in neighbors: + datapoint_id = neighbor["datapoint"]["datapointId"] + file_path = f"{self.index_name}/contents/{datapoint_id}.md" + content_tasks.append( + self.storage.async_get_file_stream(file_path), + ) + + log_structured_entry( + "Fetching content for search results", + "INFO", + {"file_count": len(content_tasks)} ) - return results + + file_streams = await asyncio.gather(*content_tasks) + results: list[SearchResult] = [] + for neighbor, stream in zip( + neighbors, + file_streams, + strict=True, + ): + results.append( + SearchResult( + id=neighbor["datapoint"]["datapointId"], + distance=neighbor["distance"], + content=stream.read().decode("utf-8"), + ), + ) + + log_structured_entry( + "Vector search completed successfully", + "INFO", + { + "results_count": len(results), + "deployed_index_id": deployed_index_id + } + ) + return results + + except Exception as e: + log_structured_entry( + "Vector search query failed with exception", + "ERROR", + { + "error": str(e), + "error_type": type(e).__name__, + "deployed_index_id": deployed_index_id + } + ) + raise # --------------------------------------------------------------------------- @@ -284,28 +400,102 @@ class AppContext: @asynccontextmanager async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: """Create and configure the vector-search client for the server lifetime.""" - vs = GoogleCloudVectorSearch( - project_id=cfg.project_id, - location=cfg.location, - bucket=cfg.bucket, - index_name=cfg.index_name, - ) - vs.configure_index_endpoint( - name=cfg.endpoint_name, - public_domain=cfg.endpoint_domain, + log_structured_entry( + "Initializing MCP server", + "INFO", + { + "project_id": cfg.project_id, + "location": cfg.location, + "bucket": cfg.bucket, + "index_name": cfg.index_name, + } ) - genai_client = genai.Client( - vertexai=True, - project=cfg.project_id, - location=cfg.location, - ) + try: + # Initialize vector search client + log_structured_entry("Creating GoogleCloudVectorSearch client", "INFO") + vs = GoogleCloudVectorSearch( + project_id=cfg.project_id, + location=cfg.location, + bucket=cfg.bucket, + index_name=cfg.index_name, + ) - yield AppContext( - vector_search=vs, - genai_client=genai_client, - settings=cfg, - ) + # Configure endpoint + log_structured_entry( + "Configuring index endpoint", + "INFO", + { + "endpoint_name": cfg.endpoint_name, + "endpoint_domain": cfg.endpoint_domain, + } + ) + vs.configure_index_endpoint( + name=cfg.endpoint_name, + public_domain=cfg.endpoint_domain, + ) + + # Initialize GenAI client + log_structured_entry( + "Creating GenAI client", + "INFO", + {"project_id": cfg.project_id, "location": cfg.location} + ) + genai_client = genai.Client( + vertexai=True, + project=cfg.project_id, + location=cfg.location, + ) + + # Validate credentials by testing a simple operation + log_structured_entry("Validating credentials with test embedding", "INFO") + try: + test_response = await genai_client.aio.models.embed_content( + model=cfg.embedding_model, + contents="test", + config=genai_types.EmbedContentConfig( + task_type="RETRIEVAL_QUERY", + ), + ) + if test_response and test_response.embeddings: + log_structured_entry( + "Credentials validated successfully", + "INFO", + {"embedding_dimension": len(test_response.embeddings[0].values)} + ) + else: + log_structured_entry( + "Credential validation returned empty response", + "WARNING" + ) + except Exception as e: + log_structured_entry( + "Failed to validate credentials - embedding test failed", + "ERROR", + {"error": str(e), "error_type": type(e).__name__} + ) + raise + + log_structured_entry("MCP server initialization complete", "INFO") + + yield AppContext( + vector_search=vs, + genai_client=genai_client, + settings=cfg, + ) + + except Exception as e: + log_structured_entry( + "Failed to initialize MCP server", + "ERROR", + { + "error": str(e), + "error_type": type(e).__name__, + } + ) + raise + finally: + log_structured_entry("MCP server lifespan ending", "INFO") cfg = Settings.model_validate({}) @@ -372,9 +562,9 @@ async def knowledge_search( "INFO", { "embedding": f"{round((t_embed - t0) * 1000, 1)}ms", - "vector_serach": f"{round((t_search - t_embed) * 1000, 1)}ms", + "vector_search": f"{round((t_search - t_embed) * 1000, 1)}ms", "total": f"{round((t_search - t0) * 1000, 1)}ms", - "chunks": {[s["id"] for s in search_results]} + "chunks": [s["id"] for s in search_results] } )