From 13c8e122deb9678cf12a8daf7e3ee4c9e2ea67d0 Mon Sep 17 00:00:00 2001 From: A8080816 Date: Tue, 24 Feb 2026 17:40:37 +0000 Subject: [PATCH] fix: add more validations for bd connection --- main.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 127 insertions(+), 9 deletions(-) diff --git a/main.py b/main.py index 5bb2117..4f6858e 100644 --- a/main.py +++ b/main.py @@ -447,8 +447,11 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: location=cfg.location, ) - # Validate credentials by testing a simple operation - log_structured_entry("Validating credentials with test embedding", "INFO") + # Validate credentials and configuration by testing actual resources + log_structured_entry("Starting validation of credentials and resources", "INFO") + + # 1. Validate GenAI embedding access + log_structured_entry("Validating GenAI embedding access", "INFO") try: test_response = await genai_client.aio.models.embed_content( model=cfg.embedding_model, @@ -459,24 +462,139 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: ) if test_response and test_response.embeddings: log_structured_entry( - "Credentials validated successfully", + "GenAI embedding validation successful", "INFO", {"embedding_dimension": len(test_response.embeddings[0].values)} ) else: - log_structured_entry( - "Credential validation returned empty response", - "WARNING" - ) + msg = "Embedding validation returned empty response" + log_structured_entry(msg, "ERROR") + raise RuntimeError(msg) except Exception as e: log_structured_entry( - "Failed to validate credentials - embedding test failed", + "Failed to validate GenAI embedding access", "ERROR", {"error": str(e), "error_type": type(e).__name__} ) raise - log_structured_entry("MCP server initialization complete", "INFO") + # 2. Validate GCS bucket access + log_structured_entry( + "Validating GCS bucket access", + "INFO", + {"bucket": cfg.bucket} + ) + try: + session = vs.storage._get_aio_session() + token_obj = Token( + session=session, + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + access_token = await token_obj.get() + headers = {"Authorization": f"Bearer {access_token}"} + + async with session.get( + f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1", + headers=headers, + ) as response: + if response.status == 403: + msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions." + log_structured_entry( + "GCS bucket validation failed - access denied", + "ERROR", + {"bucket": cfg.bucket, "status": response.status} + ) + raise RuntimeError(msg) + elif response.status == 404: + msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project." + log_structured_entry( + "GCS bucket validation failed - not found", + "ERROR", + {"bucket": cfg.bucket, "status": response.status} + ) + raise RuntimeError(msg) + elif not response.ok: + body = await response.text() + msg = f"Failed to access bucket '{cfg.bucket}': {response.status} - {body}" + log_structured_entry( + "GCS bucket validation failed", + "ERROR", + {"bucket": cfg.bucket, "status": response.status, "response": body} + ) + raise RuntimeError(msg) + + log_structured_entry( + "GCS bucket validation successful", + "INFO", + {"bucket": cfg.bucket} + ) + except RuntimeError: + raise + except Exception as e: + log_structured_entry( + "Failed to validate GCS bucket access", + "ERROR", + {"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket} + ) + raise + + # 3. Validate vector search endpoint access + log_structured_entry( + "Validating vector search endpoint access", + "INFO", + {"endpoint_name": cfg.endpoint_name} + ) + try: + # Try to get endpoint info + headers = await vs._async_get_auth_headers() + session = vs._get_aio_session() + endpoint_url = ( + f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}" + ) + + async with session.get(endpoint_url, headers=headers) as response: + if response.status == 403: + msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions." + log_structured_entry( + "Vector search endpoint validation failed - access denied", + "ERROR", + {"endpoint": cfg.endpoint_name, "status": response.status} + ) + raise RuntimeError(msg) + elif response.status == 404: + msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project." + log_structured_entry( + "Vector search endpoint validation failed - not found", + "ERROR", + {"endpoint": cfg.endpoint_name, "status": response.status} + ) + raise RuntimeError(msg) + elif not response.ok: + body = await response.text() + msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status} - {body}" + log_structured_entry( + "Vector search endpoint validation failed", + "ERROR", + {"endpoint": cfg.endpoint_name, "status": response.status, "response": body} + ) + raise RuntimeError(msg) + + log_structured_entry( + "Vector search endpoint validation successful", + "INFO", + {"endpoint": cfg.endpoint_name} + ) + except RuntimeError: + raise + except Exception as e: + log_structured_entry( + "Failed to validate vector search endpoint access", + "ERROR", + {"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name} + ) + raise + + log_structured_entry("All validations passed - MCP server initialization complete", "INFO") yield AppContext( vector_search=vs,