"""Search a deployed Vertex AI Vector Search index. Embeds a query, finds nearest neighbors, and retrieves chunk contents from GCS. Usage: uv run python utils/search_index.py "your search query" \ [--source SOURCE] [--top-k 5] [--project PROJECT] [--location LOCATION] Examples: # Basic search uv run python utils/search_index.py "¿Cómo funciona el proceso?" 123456 blue_ivy_deployed # Filter by source folder uv run python utils/search_index.py "requisitos" 123456 blue_ivy_deployed --source "manuales" # Return more results uv run python utils/search_index.py "políticas" 123456 blue_ivy_deployed --top-k 10 """ import argparse from google.cloud import aiplatform, storage from pydantic_ai import Embedder def search_index( query: str, endpoint_id: str, deployed_index_id: str, project: str, location: str, embedding_model: str, contents_dir: str, top_k: int, source: str | None, ) -> None: aiplatform.init(project=project, location=location) embedder = Embedder(f"google-vertex:{embedding_model}") query_embedding = embedder.embed_documents_sync([query]).embeddings[0] endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_id) restricts = None if source: restricts = [ aiplatform.matching_engine.matching_engine_index_endpoint.Namespace( name="source", allow_tokens=[source], ) ] response = endpoint.find_neighbors( deployed_index_id=deployed_index_id, queries=[list(query_embedding)], num_neighbors=top_k, filter=restricts, ) if not response or not response[0]: print("No results found.") return gcs_client = storage.Client() neighbors = response[0] print(f"Found {len(neighbors)} results for: {query!r}\n") for i, neighbor in enumerate(neighbors, 1): chunk_id = neighbor.id distance = neighbor.distance content = _fetch_chunk_content(gcs_client, contents_dir, chunk_id) print(f"--- Result {i} (id={chunk_id}, distance={distance:.4f}) ---") print(content) print() def _fetch_chunk_content( gcs_client: storage.Client, contents_dir: str, chunk_id: str ) -> str: """Fetches a chunk's markdown content from GCS.""" uri = f"{contents_dir}/{chunk_id}.md" bucket_name, _, obj_path = uri.removeprefix("gs://").partition("/") blob = gcs_client.bucket(bucket_name).blob(obj_path) if not blob.exists(): return f"[content not found: {uri}]" return blob.download_as_text() def main(): parser = argparse.ArgumentParser( description="Search a deployed Vertex AI Vector Search index." ) parser.add_argument("query", help="The search query text.") parser.add_argument("endpoint_id", help="The deployed endpoint ID.") parser.add_argument("deployed_index_id", help="The deployed index ID.") parser.add_argument( "--source", default=None, help="Filter results by source folder (metadata namespace).", ) parser.add_argument( "--top-k", type=int, default=5, help="Number of results to return (default: 5)." ) parser.add_argument("--project", default="bnt-orquestador-cognitivo-dev") parser.add_argument("--location", default="us-central1") parser.add_argument( "--embedding-model", default="gemini-embedding-001", help="Embedding model name." ) parser.add_argument( "--contents-dir", default="gs://bnt_orquestador_cognitivo_gcs_configs_dev/blue-ivy/contents", help="GCS URI of the contents directory.", ) args = parser.parse_args() search_index( query=args.query, endpoint_id=args.endpoint_id, deployed_index_id=args.deployed_index_id, project=args.project, location=args.location, embedding_model=args.embedding_model, contents_dir=args.contents_dir, top_k=args.top_k, source=args.source, ) if __name__ == "__main__": main()