"""MCP server for semantic search over Vertex AI Vector Search.""" import time from mcp.server.fastmcp import Context, FastMCP from .config import _args from .logging import log_structured_entry from .models import AppContext, SourceNamespace from .server import lifespan from .services.search import ( filter_search_results, format_search_results, generate_query_embedding, ) mcp = FastMCP( "knowledge-search", host=_args.host, port=_args.port, lifespan=lifespan, ) @mcp.tool() async def knowledge_search( query: str, ctx: Context, source: SourceNamespace | None = None, ) -> str: """Search a knowledge base using a natural-language query. Args: query: The text query to search for. ctx: MCP request context (injected automatically). source: Optional filter to restrict results by source. Allowed values: 'Educacion Financiera', 'Productos y Servicios', 'Funcionalidades de la App Movil'. Returns: A formatted string containing matched documents with id and content. """ app: AppContext = ctx.request_context.lifespan_context t0 = time.perf_counter() log_structured_entry( "knowledge_search request received", "INFO", {"query": query[:100]}, # Log first 100 chars of query ) try: # Generate embedding for the query embedding, error = await generate_query_embedding( app.genai_client, app.settings.embedding_model, query, ) if error: return error t_embed = time.perf_counter() log_structured_entry( "Query embedding generated successfully", "INFO", {"time_ms": round((t_embed - t0) * 1000, 1)}, ) # Check semantic cache before vector search if app.semantic_cache is not None and source is None: cached = await app.semantic_cache.check(embedding) if cached is not None: t_cache = time.perf_counter() log_structured_entry( "knowledge_search completed from cache", "INFO", { "embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms", "cache_check_ms": f"{round((t_cache - t_embed) * 1000, 1)}ms", "total_ms": f"{round((t_cache - t0) * 1000, 1)}ms", "cache_hit": True, }, ) return cached # Perform vector search log_structured_entry("Performing vector search", "INFO") try: search_results = await app.vector_search.async_run_query( deployed_index_id=app.settings.deployed_index_id, query=embedding, limit=app.settings.search_limit, source=source, ) t_search = time.perf_counter() except Exception as e: # noqa: BLE001 log_structured_entry( "Vector search failed", "ERROR", {"error": str(e), "error_type": type(e).__name__, "query": query[:100]}, ) return f"Error performing vector search: {e!s}" # Apply similarity filtering filtered_results = filter_search_results(search_results) log_structured_entry( "knowledge_search completed successfully", "INFO", { "embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms", "vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms", "total_ms": f"{round((t_search - t0) * 1000, 1)}ms", "source_filter": source.value if source is not None else None, "results_count": len(filtered_results), "chunks": [s["id"] for s in filtered_results], "cache_hit": False, }, ) # Format and return results formatted = format_search_results(filtered_results) if not filtered_results: log_structured_entry( "No results found for query", "INFO", {"query": query[:100]} ) # Store in semantic cache (only for unfiltered queries with results) if app.semantic_cache is not None and source is None and filtered_results: await app.semantic_cache.store(query, formatted, embedding) return formatted except Exception as e: # noqa: BLE001 # Catch-all for any unexpected errors log_structured_entry( "Unexpected error in knowledge_search", "ERROR", {"error": str(e), "error_type": type(e).__name__, "query": query[:100]}, ) return f"Unexpected error during search: {e!s}" def main() -> None: """Entry point for the MCP server.""" mcp.run(transport=_args.transport) if __name__ == "__main__": main()