Compare commits

10 Commits

Author SHA1 Message Date
72808b1475 Add filter with metadata using restricts 2026-02-24 03:05:50 +00:00
427de45522 Merge pull request 'Adapt Dockerfile' (#3) from config into main
Reviewed-on: #3
2026-02-24 00:01:30 +00:00
Anibal Angulo
f0b9d1b27a Add local config file support 2026-02-23 23:17:15 +00:00
Anibal Angulo
bf2cc2f556 Rename Dockerfile 2026-02-23 17:48:19 +00:00
Anibal Angulo
b92a5d5b0e Move config validation to top-level 2026-02-22 16:34:24 +00:00
Anibal Angulo
bd107a027a Update README 2026-02-22 16:02:46 +00:00
Anibal Angulo
dcc05d697e Add Docker 2026-02-22 15:57:57 +00:00
Anibal Angulo
82764bd60b Add SSE support 2026-02-22 15:52:35 +00:00
Anibal Angulo
54eb6f240c add agent for testing directly 2026-02-22 15:41:16 +00:00
Anibal Angulo
bb19770663 Update MCP defaults 2026-02-22 15:40:59 +00:00
8 changed files with 1873 additions and 14 deletions

9
.dockerignore Normal file
View File

@@ -0,0 +1,9 @@
.git/
.venv/
.ruff_cache/
__pycache__/
*.pyc
.env
agent.py
AGENTS.md
README.md

216
.gitignore vendored Normal file
View File

@@ -0,0 +1,216 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
# .idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml

14
DockerfileConnector Normal file
View File

@@ -0,0 +1,14 @@
FROM quay.ocp.banorte.com/golden/python-312:latest
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN uv sync --no-dev --frozen
COPY main.py .
ENV PATH="/app/.venv/bin:$PATH"
CMD ["uv", "run", "python", "main.py", "--transport", "sse", "--port", "8000"]

104
README.md
View File

@@ -0,0 +1,104 @@
# knowledge-search-mcp
An MCP (Model Context Protocol) server that exposes a `knowledge_search` tool for semantic search over a knowledge base backed by Vertex AI Vector Search and Google Cloud Storage.
## How it works
1. A natural-language query is embedded using a Gemini embedding model.
2. The embedding is sent to a Vertex AI Matching Engine index endpoint to find nearest neighbors.
3. Optional filters (restricts) can be applied to search only specific source folders.
4. The matched document contents are fetched from a GCS bucket and returned to the caller.
## Filtering by Source Folder
The `knowledge_search` tool supports filtering results by source folder:
```python
# Search all folders
knowledge_search(query="what is a savings account?")
# Search only in specific folders
knowledge_search(
query="what is a savings account?",
source_folders=["Educacion Financiera", "Productos y Servicios"]
)
```
## Prerequisites
- Python ≥ 3.12
- [uv](https://docs.astral.sh/uv/) for dependency management
- A Google Cloud project with:
- A Vertex AI Vector Search index and deployed endpoint
- A GCS bucket containing the indexed document chunks
- Application Default Credentials (or a service account) with appropriate permissions
## Configuration
Create a `.env` file (see `Settings` in `main.py` for all options):
```env
PROJECT_ID=my-gcp-project
LOCATION=us-central1
BUCKET=my-knowledge-bucket
INDEX_NAME=my-index
DEPLOYED_INDEX_ID=my-deployed-index
ENDPOINT_NAME=projects/…/locations/…/indexEndpoints/…
ENDPOINT_DOMAIN=123456789.us-central1-aiplatform.googleapis.com
# optional
EMBEDDING_MODEL=gemini-embedding-001
SEARCH_LIMIT=10
```
## Usage
### Install dependencies
```bash
uv sync
```
### Run the MCP server (stdio)
```bash
uv run python main.py
```
### Run the MCP server (SSE, e.g. for remote clients)
```bash
uv run python main.py --transport sse --port 8080
```
### Run the interactive agent (ADK)
The bundled agent spawns the MCP server as a subprocess and provides a REPL:
```bash
uv run python agent.py
```
Or connect to an already-running SSE server:
```bash
uv run python agent.py --remote http://localhost:8080/sse
```
## Docker
```bash
docker build -t knowledge-search-mcp .
docker run -p 8080:8080 --env-file .env knowledge-search-mcp
```
The container starts the server in SSE mode on the port specified by `PORT` (default `8080`).
## Project structure
```
main.py MCP server, vector search client, and GCS storage helper
agent.py Interactive ADK agent that consumes the MCP server
Dockerfile Multi-stage build for Cloud Run / containerized deployment
pyproject.toml Project metadata and dependencies
```

122
agent.py Normal file
View File

@@ -0,0 +1,122 @@
# ruff: noqa: INP001
"""ADK agent that connects to the knowledge-search MCP server."""
import argparse
import asyncio
import os
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import (
SseConnectionParams,
StdioConnectionParams,
)
from google.genai import types
from mcp import StdioServerParameters
# ADK needs these env vars for Vertex AI; reuse the ones from .env
os.environ.setdefault("GOOGLE_GENAI_USE_VERTEXAI", "True")
if project := os.environ.get("PROJECT_ID"):
os.environ.setdefault("GOOGLE_CLOUD_PROJECT", project)
if location := os.environ.get("LOCATION"):
os.environ.setdefault("GOOGLE_CLOUD_LOCATION", location)
SERVER_SCRIPT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "main.py")
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Knowledge Search Agent")
parser.add_argument(
"--remote",
metavar="URL",
help="Connect to an already-running MCP server at this SSE URL "
"(e.g. http://localhost:8080/sse). Without this flag the agent "
"spawns the server as a subprocess.",
)
return parser.parse_args()
async def async_main() -> None:
args = _parse_args()
if args.remote:
connection_params = SseConnectionParams(url=args.remote)
else:
connection_params = StdioConnectionParams(
server_params=StdioServerParameters(
command="uv",
args=["run", "python", SERVER_SCRIPT],
),
)
toolset = McpToolset(connection_params=connection_params)
agent = LlmAgent(
model="gemini-2.0-flash",
name="knowledge_agent",
instruction=(
"You are a helpful assistant with access to a knowledge base organized by folders. "
"Use the knowledge_search tool to find relevant information when the user asks questions.\n\n"
"Available folders in the knowledge base:\n"
"- 'Educacion Financiera': Educational content about finance, savings, investments, financial concepts\n"
"- 'Funcionalidades de la App Movil': Mobile app features, functionality, usage instructions\n"
"- 'Productos y Servicios': Bank products and services, accounts, procedures\n\n"
"IMPORTANT: When the user asks about a specific topic, analyze which folders are relevant "
"and use the source_folders parameter to filter results for more precise answers.\n\n"
"Examples:\n"
"- User asks about 'cuenta de ahorros' → Use source_folders=['Educacion Financiera', 'Productos y Servicios']\n"
"- User asks about 'cómo usar la app móvil' → Use source_folders=['Funcionalidades de App Movil']\n"
"- User asks about 'transferencias en la app' → Use source_folders=['Funcionalidades de App Movil', 'Productos y Servicios']\n"
"- User asks general question → Don't use source_folders (search all)\n\n"
"Summarize the results clearly in Spanish."
),
tools=[toolset],
)
session_service = InMemorySessionService()
session = await session_service.create_session(
state={},
app_name="knowledge_agent",
user_id="user",
)
runner = Runner(
app_name="knowledge_agent",
agent=agent,
session_service=session_service,
)
print("Knowledge Search Agent ready. Type your query (Ctrl+C to exit):")
try:
while True:
try:
query = input("\n> ").strip()
except EOFError:
break
if not query:
continue
content = types.Content(
role="user",
parts=[types.Part(text=query)],
)
async for event in runner.run_async(
session_id=session.id,
user_id=session.user_id,
new_message=content,
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if part.text:
print(part.text)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
await toolset.close()
if __name__ == "__main__":
asyncio.run(async_main())

104
main.py
View File

@@ -1,9 +1,11 @@
# ruff: noqa: INP001 # ruff: noqa: INP001
"""Async helpers for querying Vertex AI vector search via MCP.""" """Async helpers for querying Vertex AI vector search via MCP."""
import argparse
import asyncio import asyncio
import io import io
import logging import logging
import os
from collections.abc import AsyncIterator, Sequence from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
@@ -15,7 +17,7 @@ from gcloud.aio.storage import Storage
from google import genai from google import genai
from google.genai import types as genai_types from google.genai import types as genai_types
from mcp.server.fastmcp import Context, FastMCP from mcp.server.fastmcp import Context, FastMCP
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, YamlConfigSettingsSource
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -202,6 +204,7 @@ class GoogleCloudVectorSearch:
deployed_index_id: str, deployed_index_id: str,
query: Sequence[float], query: Sequence[float],
limit: int, limit: int,
restricts: list[dict[str, list[str]]] | None = None,
) -> list[SearchResult]: ) -> list[SearchResult]:
"""Run an async similarity search via the REST API. """Run an async similarity search via the REST API.
@@ -227,14 +230,18 @@ class GoogleCloudVectorSearch:
f"/locations/{self.location}" f"/locations/{self.location}"
f"/indexEndpoints/{endpoint_id}:findNeighbors" f"/indexEndpoints/{endpoint_id}:findNeighbors"
) )
payload = { query_payload = {
"deployed_index_id": deployed_index_id,
"queries": [
{
"datapoint": {"feature_vector": list(query)}, "datapoint": {"feature_vector": list(query)},
"neighbor_count": limit, "neighbor_count": limit,
}, }
],
# Add restricts if provided
if restricts:
query_payload["restricts"] = restricts
payload = {
"deployed_index_id": deployed_index_id,
"queries": [query_payload],
} }
headers = await self._async_get_auth_headers() headers = await self._async_get_auth_headers()
@@ -244,7 +251,10 @@ class GoogleCloudVectorSearch:
json=payload, json=payload,
headers=headers, headers=headers,
) as response: ) as response:
response.raise_for_status() if not response.ok:
body = await response.text()
msg = f"findNeighbors returned {response.status}: {body}"
raise RuntimeError(msg)
data = await response.json() data = await response.json()
neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", [])
@@ -278,8 +288,29 @@ class GoogleCloudVectorSearch:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument(
"--config",
default=os.environ.get("CONFIG_FILE", "config.yaml"),
)
return parser.parse_args()
_args = _parse_args()
class Settings(BaseSettings): class Settings(BaseSettings):
"""Server configuration populated from environment variables.""" """Server configuration populated from env vars and a YAML config file."""
model_config = {"env_file": ".env", "yaml_file": _args.config}
project_id: str project_id: str
location: str location: str
@@ -288,9 +319,26 @@ class Settings(BaseSettings):
deployed_index_id: str deployed_index_id: str
endpoint_name: str endpoint_name: str
endpoint_domain: str endpoint_domain: str
embedding_model: str = "text-embedding-005" embedding_model: str = "gemini-embedding-001"
search_limit: int = 10 search_limit: int = 10
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
YamlConfigSettingsSource(settings_cls),
file_secret_settings,
)
@dataclass @dataclass
class AppContext: class AppContext:
@@ -304,8 +352,6 @@ class AppContext:
@asynccontextmanager @asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]: async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Create and configure the vector-search client for the server lifetime.""" """Create and configure the vector-search client for the server lifetime."""
cfg = Settings.model_validate({})
vs = GoogleCloudVectorSearch( vs = GoogleCloudVectorSearch(
project_id=cfg.project_id, project_id=cfg.project_id,
location=cfg.location, location=cfg.location,
@@ -330,19 +376,30 @@ async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
) )
mcp = FastMCP("knowledge-search", lifespan=lifespan) cfg = Settings.model_validate({})
mcp = FastMCP(
"knowledge-search",
host=_args.host,
port=_args.port,
lifespan=lifespan,
)
@mcp.tool() @mcp.tool()
async def knowledge_search( async def knowledge_search(
query: str, query: str,
ctx: Context, ctx: Context,
source_folders: list[str] | None = None,
) -> str: ) -> str:
"""Search a knowledge base using a natural-language query. """Search a knowledge base using a natural-language query.
Args: Args:
query: The text query to search for. query: The text query to search for.
ctx: MCP request context (injected automatically). ctx: MCP request context (injected automatically).
source_folders: Optional list of source folder paths to filter results.
If provided, only documents from these folders will be returned.
Example: ["Educacion Financiera", "Productos y Servicios"]
Returns: Returns:
A formatted string containing matched documents with id and content. A formatted string containing matched documents with id and content.
@@ -359,18 +416,37 @@ async def knowledge_search(
contents=query, contents=query,
config=genai_types.EmbedContentConfig( config=genai_types.EmbedContentConfig(
task_type="RETRIEVAL_QUERY", task_type="RETRIEVAL_QUERY",
), ),
) )
embedding = response.embeddings[0].values embedding = response.embeddings[0].values
t_embed = time.perf_counter() t_embed = time.perf_counter()
# Build restricts for source folder filtering if provided
restricts = None
if source_folders:
restricts = [
{
"namespace": "source_folder",
"allow": source_folders,
}
]
logger.info(f"Filtering by source_folders: {source_folders}")
else:
logger.info("No filtering - searching all folders")
search_results = await app.vector_search.async_run_query( search_results = await app.vector_search.async_run_query(
deployed_index_id=app.settings.deployed_index_id, deployed_index_id=app.settings.deployed_index_id,
query=embedding, query=embedding,
limit=app.settings.search_limit, limit=app.settings.search_limit,
restricts=restricts,
) )
t_search = time.perf_counter() t_search = time.perf_counter()
# Log raw results from Vertex AI before similarity filtering
logger.info(f"Raw results from Vertex AI (before similarity filter): {len(search_results)} chunks")
logger.info(f"Raw chunk IDs: {[s['id'] for s in search_results]}")
# Apply similarity filtering # Apply similarity filtering
if search_results: if search_results:
max_sim = max(r["distance"] for r in search_results) max_sim = max(r["distance"] for r in search_results)
@@ -398,4 +474,4 @@ async def knowledge_search(
if __name__ == "__main__": if __name__ == "__main__":
mcp.run() mcp.run(transport=_args.transport)

View File

@@ -12,10 +12,12 @@ dependencies = [
"google-genai>=1.64.0", "google-genai>=1.64.0",
"mcp[cli]>=1.26.0", "mcp[cli]>=1.26.0",
"pydantic-settings>=2.9.1", "pydantic-settings>=2.9.1",
"pyyaml>=6.0",
] ]
[dependency-groups] [dependency-groups]
dev = [ dev = [
"google-adk>=1.25.1",
"ruff>=0.15.2", "ruff>=0.15.2",
"ty>=0.0.18", "ty>=0.0.18",
] ]

1316
uv.lock generated

File diff suppressed because it is too large Load Diff