153 lines
4.9 KiB
Python
153 lines
4.9 KiB
Python
"""MCP server for semantic search over Vertex AI Vector Search."""
|
|
|
|
import time
|
|
|
|
from mcp.server.fastmcp import Context, FastMCP
|
|
|
|
from .config import _args
|
|
from .logging import log_structured_entry
|
|
from .models import AppContext, SourceNamespace
|
|
from .server import lifespan
|
|
from .services.search import (
|
|
filter_search_results,
|
|
format_search_results,
|
|
generate_query_embedding,
|
|
)
|
|
|
|
mcp = FastMCP(
|
|
"knowledge-search",
|
|
host=_args.host,
|
|
port=_args.port,
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def knowledge_search(
|
|
query: str,
|
|
ctx: Context,
|
|
source: SourceNamespace | None = None,
|
|
) -> str:
|
|
"""Search a knowledge base using a natural-language query.
|
|
|
|
Args:
|
|
query: The text query to search for.
|
|
ctx: MCP request context (injected automatically).
|
|
source: Optional filter to restrict results by source.
|
|
Allowed values: 'Educacion Financiera',
|
|
'Productos y Servicios', 'Funcionalidades de la App Movil'.
|
|
|
|
Returns:
|
|
A formatted string containing matched documents with id and content.
|
|
|
|
"""
|
|
app: AppContext = ctx.request_context.lifespan_context
|
|
t0 = time.perf_counter()
|
|
|
|
log_structured_entry(
|
|
"knowledge_search request received",
|
|
"INFO",
|
|
{"query": query[:100]}, # Log first 100 chars of query
|
|
)
|
|
|
|
try:
|
|
# Generate embedding for the query
|
|
embedding, error = await generate_query_embedding(
|
|
app.genai_client,
|
|
app.settings.embedding_model,
|
|
query,
|
|
)
|
|
if error:
|
|
return error
|
|
|
|
t_embed = time.perf_counter()
|
|
log_structured_entry(
|
|
"Query embedding generated successfully",
|
|
"INFO",
|
|
{"time_ms": round((t_embed - t0) * 1000, 1)},
|
|
)
|
|
|
|
# Check semantic cache before vector search
|
|
if app.semantic_cache is not None and source is None:
|
|
cached = await app.semantic_cache.check(embedding)
|
|
if cached is not None:
|
|
t_cache = time.perf_counter()
|
|
log_structured_entry(
|
|
"knowledge_search completed from cache",
|
|
"INFO",
|
|
{
|
|
"embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms",
|
|
"cache_check_ms": f"{round((t_cache - t_embed) * 1000, 1)}ms",
|
|
"total_ms": f"{round((t_cache - t0) * 1000, 1)}ms",
|
|
"cache_hit": True,
|
|
},
|
|
)
|
|
return cached
|
|
|
|
# Perform vector search
|
|
log_structured_entry("Performing vector search", "INFO")
|
|
try:
|
|
search_results = await app.vector_search.async_run_query(
|
|
deployed_index_id=app.settings.deployed_index_id,
|
|
query=embedding,
|
|
limit=app.settings.search_limit,
|
|
source=source,
|
|
)
|
|
t_search = time.perf_counter()
|
|
except Exception as e: # noqa: BLE001
|
|
log_structured_entry(
|
|
"Vector search failed",
|
|
"ERROR",
|
|
{"error": str(e), "error_type": type(e).__name__, "query": query[:100]},
|
|
)
|
|
return f"Error performing vector search: {e!s}"
|
|
|
|
# Apply similarity filtering
|
|
filtered_results = filter_search_results(search_results)
|
|
|
|
log_structured_entry(
|
|
"knowledge_search completed successfully",
|
|
"INFO",
|
|
{
|
|
"embedding_ms": f"{round((t_embed - t0) * 1000, 1)}ms",
|
|
"vector_search_ms": f"{round((t_search - t_embed) * 1000, 1)}ms",
|
|
"total_ms": f"{round((t_search - t0) * 1000, 1)}ms",
|
|
"source_filter": source.value if source is not None else None,
|
|
"results_count": len(filtered_results),
|
|
"chunks": [s["id"] for s in filtered_results],
|
|
"cache_hit": False,
|
|
},
|
|
)
|
|
|
|
# Format and return results
|
|
formatted = format_search_results(filtered_results)
|
|
|
|
if not filtered_results:
|
|
log_structured_entry(
|
|
"No results found for query", "INFO", {"query": query[:100]}
|
|
)
|
|
|
|
# Store in semantic cache (only for unfiltered queries with results)
|
|
if app.semantic_cache is not None and source is None and filtered_results:
|
|
await app.semantic_cache.store(query, formatted, embedding)
|
|
|
|
return formatted
|
|
|
|
except Exception as e: # noqa: BLE001
|
|
# Catch-all for any unexpected errors
|
|
log_structured_entry(
|
|
"Unexpected error in knowledge_search",
|
|
"ERROR",
|
|
{"error": str(e), "error_type": type(e).__name__, "query": query[:100]},
|
|
)
|
|
return f"Unexpected error during search: {e!s}"
|
|
|
|
|
|
def main() -> None:
|
|
"""Entry point for the MCP server."""
|
|
mcp.run(transport=_args.transport)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|