Compare commits

4 Commits

Author SHA1 Message Date
72808b1475 Add filter with metadata using restricts 2026-02-24 03:05:50 +00:00
427de45522 Merge pull request 'Adapt Dockerfile' (#3) from config into main
Reviewed-on: #3
2026-02-24 00:01:30 +00:00
Anibal Angulo
f0b9d1b27a Add local config file support 2026-02-23 23:17:15 +00:00
Anibal Angulo
bf2cc2f556 Rename Dockerfile 2026-02-23 17:48:19 +00:00
7 changed files with 123 additions and 52 deletions

View File

@@ -1,25 +0,0 @@
FROM python:3.12-slim AS builder
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN uv sync --no-dev --frozen --no-install-project
COPY main.py .
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /app /app
ENV PATH="/app/.venv/bin:$PATH"
# Cloud Run injects PORT (defaults to 8080)
ENV PORT=8080
EXPOSE ${PORT}
# Shell form so ${PORT} is expanded at runtime
CMD python main.py --transport sse --port ${PORT}

14
DockerfileConnector Normal file
View File

@@ -0,0 +1,14 @@
FROM quay.ocp.banorte.com/golden/python-312:latest
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN uv sync --no-dev --frozen
COPY main.py .
ENV PATH="/app/.venv/bin:$PATH"
CMD ["uv", "run", "python", "main.py", "--transport", "sse", "--port", "8000"]

View File

@@ -6,7 +6,24 @@ An MCP (Model Context Protocol) server that exposes a `knowledge_search` tool fo
1. A natural-language query is embedded using a Gemini embedding model.
2. The embedding is sent to a Vertex AI Matching Engine index endpoint to find nearest neighbors.
3. The matched document contents are fetched from a GCS bucket and returned to the caller.
3. Optional filters (restricts) can be applied to search only specific source folders.
4. The matched document contents are fetched from a GCS bucket and returned to the caller.
## Filtering by Source Folder
The `knowledge_search` tool supports filtering results by source folder:
```python
# Search all folders
knowledge_search(query="what is a savings account?")
# Search only in specific folders
knowledge_search(
query="what is a savings account?",
source_folders=["Educacion Financiera", "Productos y Servicios"]
)
```
## Prerequisites

View File

@@ -57,9 +57,20 @@ async def async_main() -> None:
model="gemini-2.0-flash",
name="knowledge_agent",
instruction=(
"You are a helpful assistant with access to a knowledge base. "
"Use the knowledge_search tool to find relevant information "
"when the user asks questions. Summarize the results clearly."
"You are a helpful assistant with access to a knowledge base organized by folders. "
"Use the knowledge_search tool to find relevant information when the user asks questions.\n\n"
"Available folders in the knowledge base:\n"
"- 'Educacion Financiera': Educational content about finance, savings, investments, financial concepts\n"
"- 'Funcionalidades de la App Movil': Mobile app features, functionality, usage instructions\n"
"- 'Productos y Servicios': Bank products and services, accounts, procedures\n\n"
"IMPORTANT: When the user asks about a specific topic, analyze which folders are relevant "
"and use the source_folders parameter to filter results for more precise answers.\n\n"
"Examples:\n"
"- User asks about 'cuenta de ahorros' → Use source_folders=['Educacion Financiera', 'Productos y Servicios']\n"
"- User asks about 'cómo usar la app móvil' → Use source_folders=['Funcionalidades de App Movil']\n"
"- User asks about 'transferencias en la app' → Use source_folders=['Funcionalidades de App Movil', 'Productos y Servicios']\n"
"- User asks general question → Don't use source_folders (search all)\n\n"
"Summarize the results clearly in Spanish."
),
tools=[toolset],
)

97
main.py
View File

@@ -5,6 +5,7 @@ import argparse
import asyncio
import io
import logging
import os
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from dataclasses import dataclass
@@ -16,7 +17,7 @@ from gcloud.aio.storage import Storage
from google import genai
from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP
from pydantic_settings import BaseSettings
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
logger = logging.getLogger(__name__)
@@ -203,6 +204,7 @@ class GoogleCloudVectorSearch:
deployed_index_id: str,
query: Sequence[float],
limit: int,
restricts: list[dict[str, list[str]]] | None = None,
) -> list[SearchResult]:
"""Run an async similarity search via the REST API.
@@ -228,14 +230,18 @@ class GoogleCloudVectorSearch:
f"/locations/{self.location}"
f"/indexEndpoints/{endpoint_id}:findNeighbors"
)
payload = {
"deployed_index_id": deployed_index_id,
"queries": [
{
query_payload = {
"datapoint": {"feature_vector": list(query)},
"neighbor_count": limit,
},
],
}
# Add restricts if provided
if restricts:
query_payload["restricts"] = restricts
payload = {
"deployed_index_id": deployed_index_id,
"queries": [query_payload],
}
headers = await self._async_get_auth_headers()
@@ -282,10 +288,29 @@ class GoogleCloudVectorSearch:
# ---------------------------------------------------------------------------
class Settings(BaseSettings):
"""Server configuration populated from environment variables."""
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
model_config = {"env_file": ".env"}
_args = _parse_args()
class Settings(BaseSettings):
"""Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str
location: str
@@ -297,6 +322,23 @@ class Settings(BaseSettings):
embedding_model: str = "gemini-embedding-001"
search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
@dataclass
class AppContext:
@@ -334,19 +376,6 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
)
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
return parser.parse_args()
_args = _parse_args()
cfg = Settings.model_validate({})
mcp = FastMCP(
@@ -361,12 +390,16 @@ mcp = FastMCP(
async def knowledge_search(
query: str,
ctx: Context,
source_folders: list[str] | None = None,
) -> str:
"""Search a knowledge base using a natural-language query.
Args:
query: The text query to search for.
ctx: MCP request context (injected automatically).
source_folders: Optional list of source folder paths to filter results.
If provided, only documents from these folders will be returned.
Example: ["Educacion Financiera", "Productos y Servicios"]
Returns:
A formatted string containing matched documents with id and content.
@@ -389,13 +422,31 @@ async def knowledge_search(
embedding = response.embeddings[0].values
t_embed = time.perf_counter()
# Build restricts for source folder filtering if provided
restricts = None
if source_folders:
restricts = [
{
"namespace": "source_folder",
"allow": source_folders,
}
]
logger.info(f"Filtering by source_folders: {source_folders}")
else:
logger.info("No filtering - searching all folders")
search_results = await app.vector_search.async_run_query(
deployed_index_id=app.settings.deployed_index_id,
query=embedding,
limit=app.settings.search_limit,
restricts=restricts,
)
t_search = time.perf_counter()
# Log raw results from Vertex AI before similarity filtering
logger.info(f"Raw results from Vertex AI (before similarity filter): {len(search_results)} chunks")
logger.info(f"Raw chunk IDs: {[s['id'] for s in search_results]}")
# Apply similarity filtering
if search_results:
max_sim = max(r["distance"] for r in search_results)

View File

@@ -12,6 +12,7 @@ dependencies = [
"google-genai>=1.64.0",
"mcp[cli]>=1.26.0",
"pydantic-settings>=2.9.1",
"pyyaml>=6.0",
]
[dependency-groups]

2
uv.lock generated
View File

@@ -1356,6 +1356,7 @@ dependencies = [
{ name = "google-genai" },
{ name = "mcp", extra = ["cli"] },
{ name = "pydantic-settings" },
{ name = "pyyaml" },
]
[package.dev-dependencies]
@@ -1374,6 +1375,7 @@ requires-dist = [
{ name = "google-genai", specifier = ">=1.64.0" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.26.0" },
{ name = "pydantic-settings", specifier = ">=2.9.1" },
{ name = "pyyaml", specifier = ">=6.0" },
]
[package.metadata.requires-dev]