fix: add robust handling for exceptions

This commit is contained in:
2026-02-24 18:08:04 +00:00
parent a8c611fbec
commit 0fd97a31a5

87
main.py
View File

@@ -654,23 +654,78 @@ async def knowledge_search(
t0 = time.perf_counter() t0 = time.perf_counter()
min_sim = 0.6 min_sim = 0.6
log_structured_entry(
"knowledge_search request received",
"INFO",
{"query": query[:100]} # Log first 100 chars of query
)
try:
# Generate embedding for the query
log_structured_entry("Generating query embedding", "INFO")
try:
response = await app.genai_client.aio.models.embed_content( response = await app.genai_client.aio.models.embed_content(
model=app.settings.embedding_model, model=app.settings.embedding_model,
contents=query, contents=query,
config=genai_types.EmbedContentConfig( config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY", task_type="RETRIEVAL_QUERY",
), ),
) )
embedding = response.embeddings[0].values embedding = response.embeddings[0].values
t_embed = time.perf_counter() t_embed = time.perf_counter()
log_structured_entry(
"Query embedding generated successfully",
"INFO",
{"time_ms": round((t_embed - t0) * 1000, 1)}
)
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
# Check if it's a rate limit error
if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
log_structured_entry(
"Rate limit exceeded while generating embedding",
"WARNING",
{
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
)
return "Error: API rate limit exceeded. Please try again later."
else:
log_structured_entry(
"Failed to generate query embedding",
"ERROR",
{
"error": error_msg,
"error_type": error_type,
"query": query[:100]
}
)
return f"Error generating embedding: {error_msg}"
# Perform vector search
log_structured_entry("Performing vector search", "INFO")
try:
search_results = await app.vector_search.async_run_query( search_results = await app.vector_search.async_run_query(
deployed_index_id=app.settings.deployed_index_id, deployed_index_id=app.settings.deployed_index_id,
query=embedding, query=embedding,
limit=app.settings.search_limit, limit=app.settings.search_limit,
) )
t_search = time.perf_counter() t_search = time.perf_counter()
except Exception as e:
log_structured_entry(
"Vector search failed",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Error performing vector search: {str(e)}"
# Apply similarity filtering # Apply similarity filtering
if search_results: if search_results:
@@ -683,23 +738,45 @@ async def knowledge_search(
] ]
log_structured_entry( log_structured_entry(
"knowledge_search timing", "knowledge_search completed successfully",
"INFO", "INFO",
{ {
"embedding": f"{round((t_embed - t0) * 1000, 1)}ms", "embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms",
"vector_search": f"{round((t_search - t_embed) * 1000, 1)}ms", "vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms",
"total": f"{round((t_search - t0) * 1000, 1)}ms", "total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
"results_count": len(search_results),
"chunks": [s["id"] for s in search_results] "chunks": [s["id"] for s in search_results]
} }
) )
# Format results as XML-like documents # Format results as XML-like documents
if not search_results:
log_structured_entry(
"No results found for query",
"INFO",
{"query": query[:100]}
)
return "No relevant documents found for your query."
formatted_results = [ formatted_results = [
f"<document {i} name={result['id']}>\n{result['content']}\n</document {i}>" f"<document {i} name={result['id']}>\n{result['content']}\n</document {i}>"
for i, result in enumerate(search_results, start=1) for i, result in enumerate(search_results, start=1)
] ]
return "\n".join(formatted_results) return "\n".join(formatted_results)
except Exception as e:
# Catch-all for any unexpected errors
log_structured_entry(
"Unexpected error in knowledge_search",
"ERROR",
{
"error": str(e),
"error_type": type(e).__name__,
"query": query[:100]
}
)
return f"Unexpected error during search: {str(e)}"
if __name__ == "__main__": if __name__ == "__main__":
mcp.run(transport=_args.transport) mcp.run(transport=_args.transport)