diff --git a/src/knowledge_pipeline/config.py b/src/knowledge_pipeline/config.py index 3b7a5a3..723b953 100644 --- a/src/knowledge_pipeline/config.py +++ b/src/knowledge_pipeline/config.py @@ -48,7 +48,7 @@ class Settings(BaseSettings): @property def index_data(self) -> str: - return self.index_destination + self.index_name + return f"{self.index_destination}/{self.index_name}" @property def index_contents_dir(self) -> str: diff --git a/utils/delete_endpoint.py b/utils/delete_endpoint.py new file mode 100644 index 0000000..a61104f --- /dev/null +++ b/utils/delete_endpoint.py @@ -0,0 +1,40 @@ +"""Delete a GCP Vector Search endpoint by ID. + +Undeploys any deployed indexes before deleting the endpoint. + +Usage: + uv run python utils/delete_endpoint.py [--project PROJECT] [--location LOCATION] +""" + +import argparse + +from google.cloud import aiplatform + + +def delete_endpoint(endpoint_id: str, project: str, location: str) -> None: + aiplatform.init(project=project, location=location) + endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_id) + + print(f"Endpoint: {endpoint.display_name}") + + for deployed in endpoint.deployed_indexes: + print(f"Undeploying index: {deployed.id}") + endpoint.undeploy_index(deployed_index_id=deployed.id) + print(f"Undeployed: {deployed.id}") + + endpoint.delete() + print(f"Endpoint {endpoint_id} deleted successfully.") + + +def main(): + parser = argparse.ArgumentParser(description="Delete a GCP Vector Search endpoint.") + parser.add_argument("endpoint_id", help="The endpoint ID to delete.") + parser.add_argument("--project", default="bnt-orquestador-cognitivo-dev") + parser.add_argument("--location", default="us-central1") + args = parser.parse_args() + + delete_endpoint(args.endpoint_id, args.project, args.location) + + +if __name__ == "__main__": + main() diff --git a/utils/search_index.py b/utils/search_index.py new file mode 100644 index 0000000..c7cd577 --- /dev/null +++ b/utils/search_index.py @@ -0,0 +1,132 @@ +"""Search a deployed Vertex AI Vector Search index. + +Embeds a query, finds nearest neighbors, and retrieves chunk contents from GCS. + +Usage: + uv run python utils/search_index.py "your search query" \ + [--source SOURCE] [--top-k 5] [--project PROJECT] [--location LOCATION] + +Examples: + # Basic search + uv run python utils/search_index.py "¿Cómo funciona el proceso?" 123456 blue_ivy_deployed + + # Filter by source folder + uv run python utils/search_index.py "requisitos" 123456 blue_ivy_deployed --source "manuales" + + # Return more results + uv run python utils/search_index.py "políticas" 123456 blue_ivy_deployed --top-k 10 +""" + +import argparse + +from google.cloud import aiplatform, storage +from pydantic_ai import Embedder + + +def search_index( + query: str, + endpoint_id: str, + deployed_index_id: str, + project: str, + location: str, + embedding_model: str, + contents_dir: str, + top_k: int, + source: str | None, +) -> None: + aiplatform.init(project=project, location=location) + + embedder = Embedder(f"google-vertex:{embedding_model}") + query_embedding = embedder.embed_documents_sync([query]).embeddings[0] + + endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_id) + + restricts = None + if source: + restricts = [ + aiplatform.matching_engine.matching_engine_index_endpoint.Namespace( + name="source", + allow_tokens=[source], + ) + ] + + response = endpoint.find_neighbors( + deployed_index_id=deployed_index_id, + queries=[list(query_embedding)], + num_neighbors=top_k, + filter=restricts, + ) + + if not response or not response[0]: + print("No results found.") + return + + gcs_client = storage.Client() + neighbors = response[0] + + print(f"Found {len(neighbors)} results for: {query!r}\n") + for i, neighbor in enumerate(neighbors, 1): + chunk_id = neighbor.id + distance = neighbor.distance + + content = _fetch_chunk_content(gcs_client, contents_dir, chunk_id) + + print(f"--- Result {i} (id={chunk_id}, distance={distance:.4f}) ---") + print(content) + print() + + +def _fetch_chunk_content( + gcs_client: storage.Client, contents_dir: str, chunk_id: str +) -> str: + """Fetches a chunk's markdown content from GCS.""" + uri = f"{contents_dir}/{chunk_id}.md" + bucket_name, _, obj_path = uri.removeprefix("gs://").partition("/") + blob = gcs_client.bucket(bucket_name).blob(obj_path) + if not blob.exists(): + return f"[content not found: {uri}]" + return blob.download_as_text() + + +def main(): + parser = argparse.ArgumentParser( + description="Search a deployed Vertex AI Vector Search index." + ) + parser.add_argument("query", help="The search query text.") + parser.add_argument("endpoint_id", help="The deployed endpoint ID.") + parser.add_argument("deployed_index_id", help="The deployed index ID.") + parser.add_argument( + "--source", + default=None, + help="Filter results by source folder (metadata namespace).", + ) + parser.add_argument( + "--top-k", type=int, default=5, help="Number of results to return (default: 5)." + ) + parser.add_argument("--project", default="bnt-orquestador-cognitivo-dev") + parser.add_argument("--location", default="us-central1") + parser.add_argument( + "--embedding-model", default="gemini-embedding-001", help="Embedding model name." + ) + parser.add_argument( + "--contents-dir", + default="gs://bnt_orquestador_cognitivo_gcs_configs_dev/blue-ivy/contents", + help="GCS URI of the contents directory.", + ) + args = parser.parse_args() + + search_index( + query=args.query, + endpoint_id=args.endpoint_id, + deployed_index_id=args.deployed_index_id, + project=args.project, + location=args.location, + embedding_model=args.embedding_model, + contents_dir=args.contents_dir, + top_k=args.top_k, + source=args.source, + ) + + +if __name__ == "__main__": + main()