fix: add more validations for bd connection

This commit is contained in:
2026-02-24 17:40:37 +00:00
parent 753b5c7871
commit 13c8e122de

136
main.py
View File

@@ -447,8 +447,11 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
location=cfg.location, location=cfg.location,
) )
# Validate credentials by testing a simple operation # Validate credentials and configuration by testing actual resources
log_structured_entry("Validating credentials with test embedding", "INFO") log_structured_entry("Starting validation of credentials and resources", "INFO")
# 1. Validate GenAI embedding access
log_structured_entry("Validating GenAI embedding access", "INFO")
try: try:
test_response = await genai_client.aio.models.embed_content( test_response = await genai_client.aio.models.embed_content(
model=cfg.embedding_model, model=cfg.embedding_model,
@@ -459,24 +462,139 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
) )
if test_response and test_response.embeddings: if test_response and test_response.embeddings:
log_structured_entry( log_structured_entry(
"Credentials validated successfully", "GenAI embedding validation successful",
"INFO", "INFO",
{"embedding_dimension": len(test_response.embeddings[0].values)} {"embedding_dimension": len(test_response.embeddings[0].values)}
) )
else: else:
log_structured_entry( msg = "Embedding validation returned empty response"
"Credential validation returned empty response", log_structured_entry(msg, "ERROR")
"WARNING" raise RuntimeError(msg)
)
except Exception as e: except Exception as e:
log_structured_entry( log_structured_entry(
"Failed to validate credentials - embedding test failed", "Failed to validate GenAI embedding access",
"ERROR", "ERROR",
{"error": str(e), "error_type": type(e).__name__} {"error": str(e), "error_type": type(e).__name__}
) )
raise raise
log_structured_entry("MCP server initialization complete", "INFO") # 2. Validate GCS bucket access
log_structured_entry(
"Validating GCS bucket access",
"INFO",
{"bucket": cfg.bucket}
)
try:
session = vs.storage._get_aio_session()
token_obj = Token(
session=session,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
access_token = await token_obj.get()
headers = {"Authorization": f"Bearer {access_token}"}
async with session.get(
f"https://storage.googleapis.com/storage/v1/b/{cfg.bucket}/o?maxResults=1",
headers=headers,
) as response:
if response.status == 403:
msg = f"Access denied to bucket '{cfg.bucket}'. Check permissions."
log_structured_entry(
"GCS bucket validation failed - access denied",
"ERROR",
{"bucket": cfg.bucket, "status": response.status}
)
raise RuntimeError(msg)
elif response.status == 404:
msg = f"Bucket '{cfg.bucket}' not found. Check bucket name and project."
log_structured_entry(
"GCS bucket validation failed - not found",
"ERROR",
{"bucket": cfg.bucket, "status": response.status}
)
raise RuntimeError(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access bucket '{cfg.bucket}': {response.status} - {body}"
log_structured_entry(
"GCS bucket validation failed",
"ERROR",
{"bucket": cfg.bucket, "status": response.status, "response": body}
)
raise RuntimeError(msg)
log_structured_entry(
"GCS bucket validation successful",
"INFO",
{"bucket": cfg.bucket}
)
except RuntimeError:
raise
except Exception as e:
log_structured_entry(
"Failed to validate GCS bucket access",
"ERROR",
{"error": str(e), "error_type": type(e).__name__, "bucket": cfg.bucket}
)
raise
# 3. Validate vector search endpoint access
log_structured_entry(
"Validating vector search endpoint access",
"INFO",
{"endpoint_name": cfg.endpoint_name}
)
try:
# Try to get endpoint info
headers = await vs._async_get_auth_headers()
session = vs._get_aio_session()
endpoint_url = (
f"https://{cfg.location}-aiplatform.googleapis.com/v1/{cfg.endpoint_name}"
)
async with session.get(endpoint_url, headers=headers) as response:
if response.status == 403:
msg = f"Access denied to endpoint '{cfg.endpoint_name}'. Check permissions."
log_structured_entry(
"Vector search endpoint validation failed - access denied",
"ERROR",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
raise RuntimeError(msg)
elif response.status == 404:
msg = f"Endpoint '{cfg.endpoint_name}' not found. Check endpoint name and project."
log_structured_entry(
"Vector search endpoint validation failed - not found",
"ERROR",
{"endpoint": cfg.endpoint_name, "status": response.status}
)
raise RuntimeError(msg)
elif not response.ok:
body = await response.text()
msg = f"Failed to access endpoint '{cfg.endpoint_name}': {response.status} - {body}"
log_structured_entry(
"Vector search endpoint validation failed",
"ERROR",
{"endpoint": cfg.endpoint_name, "status": response.status, "response": body}
)
raise RuntimeError(msg)
log_structured_entry(
"Vector search endpoint validation successful",
"INFO",
{"endpoint": cfg.endpoint_name}
)
except RuntimeError:
raise
except Exception as e:
log_structured_entry(
"Failed to validate vector search endpoint access",
"ERROR",
{"error": str(e), "error_type": type(e).__name__, "endpoint": cfg.endpoint_name}
)
raise
log_structured_entry("All validations passed - MCP server initialization complete", "INFO")
yield AppContext( yield AppContext(
vector_search=vs, vector_search=vs,