From 9ccfbb44c27c1c3679d047c49f14b5f3b0840638 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Tue, 24 Feb 2026 03:50:34 +0000 Subject: [PATCH] code-shredding --- AGENTS.md | 1 + README.md | 31 +- apps/index-gen/README.md | 0 apps/index-gen/pyproject.toml | 34 -- apps/index-gen/src/index_gen/__init__.py | 2 - apps/index-gen/src/index_gen/cli.py | 68 --- apps/index-gen/src/index_gen/main.py | 238 -------- config.example.yaml | 38 +- packages/chunker/README.md | 0 packages/chunker/pyproject.toml | 23 - packages/chunker/src/chunker/py.typed | 0 packages/document-converter/.python-version | 1 - packages/document-converter/README.md | 0 packages/document-converter/pyproject.toml | 20 - .../src/document_converter/__init__.py | 2 - .../src/document_converter/base.py | 35 -- .../src/document_converter/markdown.py | 131 ----- .../src/document_converter/py.typed | 0 packages/embedder/.python-version | 1 - packages/embedder/README.md | 0 packages/embedder/pyproject.toml | 16 - packages/embedder/src/embedder/__init__.py | 0 packages/embedder/src/embedder/base.py | 79 --- packages/embedder/src/embedder/py.typed | 0 packages/embedder/src/embedder/vertex_ai.py | 77 --- packages/file-storage/.python-version | 1 - packages/file-storage/README.md | 0 packages/file-storage/pyproject.toml | 22 - .../file-storage/src/file_storage/__init__.py | 2 - .../file-storage/src/file_storage/base.py | 48 -- packages/file-storage/src/file_storage/cli.py | 89 --- .../src/file_storage/google_cloud.py | 138 ----- .../file-storage/src/file_storage/py.typed | 0 packages/llm/.python-version | 1 - packages/llm/README.md | 0 packages/llm/pyproject.toml | 18 - packages/llm/src/llm/__init__.py | 2 - packages/llm/src/llm/base.py | 128 ---- packages/llm/src/llm/py.typed | 0 packages/llm/src/llm/vertex_ai.py | 181 ------ packages/utils/README.md | 0 packages/utils/pyproject.toml | 17 - packages/utils/src/utils/__init__.py | 2 - .../utils/src/utils/normalize_filenames.py | 115 ---- packages/utils/src/utils/py.typed | 0 packages/vector-search/.python-version | 1 - packages/vector-search/README.md | 0 packages/vector-search/pyproject.toml | 29 - .../src/vector_search/__init__.py | 2 - .../vector-search/src/vector_search/base.py | 62 -- .../src/vector_search/cli/__init__.py | 10 - .../src/vector_search/cli/create.py | 91 --- .../src/vector_search/cli/delete.py | 38 -- .../src/vector_search/cli/generate.py | 91 --- .../src/vector_search/cli/query.py | 55 -- .../vector-search/src/vector_search/py.typed | 0 .../src/vector_search/vertex_ai.py | 255 -------- pyproject.toml | 70 +-- .../__init__.py | 0 .../knowledge_pipeline}/chunker/__init__.py | 0 .../chunker/base_chunker.py | 2 + .../chunker/contextual_chunker.py | 81 +-- .../chunker/llm_chunker.py | 1 - .../chunker/recursive_chunker.py | 0 src/knowledge_pipeline/cli.py | 20 + src/knowledge_pipeline/config.py | 100 ++++ src/knowledge_pipeline/pipeline.py | 210 +++++++ src/rag_eval/config.py | 121 ---- tests/__init__.py | 1 + tests/conftest.py | 89 +++ tests/test_pipeline.py | 553 ++++++++++++++++++ 71 files changed, 1026 insertions(+), 2417 deletions(-) create mode 100644 AGENTS.md delete mode 100644 apps/index-gen/README.md delete mode 100644 apps/index-gen/pyproject.toml delete mode 100644 apps/index-gen/src/index_gen/__init__.py delete mode 100644 apps/index-gen/src/index_gen/cli.py delete mode 100644 apps/index-gen/src/index_gen/main.py delete mode 100644 packages/chunker/README.md delete mode 100644 packages/chunker/pyproject.toml delete mode 100644 packages/chunker/src/chunker/py.typed delete mode 100644 packages/document-converter/.python-version delete mode 100644 packages/document-converter/README.md delete mode 100644 packages/document-converter/pyproject.toml delete mode 100644 packages/document-converter/src/document_converter/__init__.py delete mode 100644 packages/document-converter/src/document_converter/base.py delete mode 100644 packages/document-converter/src/document_converter/markdown.py delete mode 100644 packages/document-converter/src/document_converter/py.typed delete mode 100644 packages/embedder/.python-version delete mode 100644 packages/embedder/README.md delete mode 100644 packages/embedder/pyproject.toml delete mode 100644 packages/embedder/src/embedder/__init__.py delete mode 100644 packages/embedder/src/embedder/base.py delete mode 100644 packages/embedder/src/embedder/py.typed delete mode 100644 packages/embedder/src/embedder/vertex_ai.py delete mode 100644 packages/file-storage/.python-version delete mode 100644 packages/file-storage/README.md delete mode 100644 packages/file-storage/pyproject.toml delete mode 100644 packages/file-storage/src/file_storage/__init__.py delete mode 100644 packages/file-storage/src/file_storage/base.py delete mode 100644 packages/file-storage/src/file_storage/cli.py delete mode 100644 packages/file-storage/src/file_storage/google_cloud.py delete mode 100644 packages/file-storage/src/file_storage/py.typed delete mode 100644 packages/llm/.python-version delete mode 100644 packages/llm/README.md delete mode 100644 packages/llm/pyproject.toml delete mode 100644 packages/llm/src/llm/__init__.py delete mode 100644 packages/llm/src/llm/base.py delete mode 100644 packages/llm/src/llm/py.typed delete mode 100644 packages/llm/src/llm/vertex_ai.py delete mode 100644 packages/utils/README.md delete mode 100644 packages/utils/pyproject.toml delete mode 100644 packages/utils/src/utils/__init__.py delete mode 100644 packages/utils/src/utils/normalize_filenames.py delete mode 100644 packages/utils/src/utils/py.typed delete mode 100644 packages/vector-search/.python-version delete mode 100644 packages/vector-search/README.md delete mode 100644 packages/vector-search/pyproject.toml delete mode 100644 packages/vector-search/src/vector_search/__init__.py delete mode 100644 packages/vector-search/src/vector_search/base.py delete mode 100644 packages/vector-search/src/vector_search/cli/__init__.py delete mode 100644 packages/vector-search/src/vector_search/cli/create.py delete mode 100644 packages/vector-search/src/vector_search/cli/delete.py delete mode 100644 packages/vector-search/src/vector_search/cli/generate.py delete mode 100644 packages/vector-search/src/vector_search/cli/query.py delete mode 100644 packages/vector-search/src/vector_search/py.typed delete mode 100644 packages/vector-search/src/vector_search/vertex_ai.py rename src/{rag_eval => knowledge_pipeline}/__init__.py (100%) rename {packages/chunker/src => src/knowledge_pipeline}/chunker/__init__.py (100%) rename {packages/chunker/src => src/knowledge_pipeline}/chunker/base_chunker.py (98%) rename {packages/chunker/src => src/knowledge_pipeline}/chunker/contextual_chunker.py (55%) rename {packages/chunker/src => src/knowledge_pipeline}/chunker/llm_chunker.py (99%) rename {packages/chunker/src => src/knowledge_pipeline}/chunker/recursive_chunker.py (100%) create mode 100644 src/knowledge_pipeline/cli.py create mode 100644 src/knowledge_pipeline/config.py create mode 100644 src/knowledge_pipeline/pipeline.py delete mode 100644 src/rag_eval/config.py create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_pipeline.py diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..c9b0532 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1 @@ +Use `uv` for project management diff --git a/README.md b/README.md index a0104b3..fb6bd1d 100644 --- a/README.md +++ b/README.md @@ -260,30 +260,18 @@ embeddings = embedder.generate_embeddings_batch(texts, batch_size=10) ### **Opción 4: Almacenar en GCS** ```python -from file_storage.google_cloud import GoogleCloudFileStorage +import gcsfs -storage = GoogleCloudFileStorage(bucket="mi-bucket") +fs = gcsfs.GCSFileSystem() # Subir archivo -storage.upload_file( - file_path="local_file.md", - destination_blob_name="chunks/documento_0.md", - content_type="text/markdown" -) +fs.put("local_file.md", "mi-bucket/chunks/documento_0.md") # Listar archivos -files = storage.list_files(path="chunks/") +files = fs.ls("mi-bucket/chunks/") # Descargar archivo -file_stream = storage.get_file_stream("chunks/documento_0.md") -content = file_stream.read().decode("utf-8") -``` - -**CLI:** -```bash -file-storage upload local_file.md chunks/documento_0.md -file-storage list chunks/ -file-storage download chunks/documento_0.md +content = fs.cat_file("mi-bucket/chunks/documento_0.md").decode("utf-8") ``` --- @@ -340,10 +328,10 @@ vector-search delete mi-indice ## 🔄 Flujo Completo de Ejemplo ```python +import gcsfs from pathlib import Path from chunker.contextual_chunker import ContextualChunker from embedder.vertex_ai import VertexAIEmbedder -from file_storage.google_cloud import GoogleCloudFileStorage from llm.vertex_ai import VertexAILLM # 1. Setup @@ -354,7 +342,7 @@ embedder = VertexAIEmbedder( project="mi-proyecto", location="us-central1" ) -storage = GoogleCloudFileStorage(bucket="mi-bucket") +fs = gcsfs.GCSFileSystem() # 2. Chunking documents = chunker.process_path(Path("documento.pdf")) @@ -368,10 +356,7 @@ for i, doc in enumerate(documents): embedding = embedder.generate_embedding(doc["page_content"]) # Guardar contenido en GCS - storage.upload_file( - file_path=f"temp_{chunk_id}.md", - destination_blob_name=f"contents/{chunk_id}.md" - ) + fs.put(f"temp_{chunk_id}.md", f"mi-bucket/contents/{chunk_id}.md") # Guardar vector (escribir a JSONL localmente, luego subir) print(f"Chunk {chunk_id}: {len(embedding)} dimensiones") diff --git a/apps/index-gen/README.md b/apps/index-gen/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/apps/index-gen/pyproject.toml b/apps/index-gen/pyproject.toml deleted file mode 100644 index 5795c70..0000000 --- a/apps/index-gen/pyproject.toml +++ /dev/null @@ -1,34 +0,0 @@ -[project] -name = "index-gen" -version = "0.1.0" -description = "Add your description here" -readme = "README.md" -authors = [ - { name = "Anibal Angulo", email = "a8065384@banorte.com" } -] -requires-python = ">=3.12" -dependencies = [ - "chunker", - "document-converter", - "embedder", - "file-storage", - "llm", - "utils", - "vector-search", -] - -[project.scripts] -index-gen = "index_gen.cli:app" - -[build-system] -requires = ["uv_build>=0.8.12,<0.9.0"] -build-backend = "uv_build" - -[tool.uv.sources] -file-storage = { workspace = true } -vector-search = { workspace = true } -utils = { workspace = true } -embedder = { workspace = true } -chunker = { workspace = true } -document-converter = { workspace = true } -llm = { workspace = true } diff --git a/apps/index-gen/src/index_gen/__init__.py b/apps/index-gen/src/index_gen/__init__.py deleted file mode 100644 index 5b33691..0000000 --- a/apps/index-gen/src/index_gen/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def main() -> None: - print("Hello from index-gen!") diff --git a/apps/index-gen/src/index_gen/cli.py b/apps/index-gen/src/index_gen/cli.py deleted file mode 100644 index 4f7eee0..0000000 --- a/apps/index-gen/src/index_gen/cli.py +++ /dev/null @@ -1,68 +0,0 @@ -import logging -import tempfile -from pathlib import Path - -import typer - -from index_gen.main import ( - aggregate_vectors, - build_gcs_path, - create_vector_index, - gather_files, - process_file, -) -from rag_eval.config import settings - -app = typer.Typer() - - -@app.command() -def run_ingestion(): - """Main function for the CLI script.""" - logging.basicConfig(level=logging.INFO) - - agent_config = settings.agent - index_config = settings.index - - if not agent_config or not index_config: - raise ValueError("Agent or index configuration not found in config.yaml") - - # Gather files - files = gather_files(index_config.origin) - - # Build output paths - contents_output_dir = build_gcs_path(index_config.data, "/contents") - vectors_output_dir = build_gcs_path(index_config.data, "/vectors") - aggregated_vectors_gcs_path = build_gcs_path( - index_config.data, "/vectors/vectors.json" - ) - - with tempfile.TemporaryDirectory() as temp_dir: - temp_dir_path = Path(temp_dir) - vector_artifact_paths = [] - - # Process files and create local artifacts - for i, file in enumerate(files): - artifact_path = temp_dir_path / f"vectors_{i}.jsonl" - vector_artifact_paths.append(artifact_path) - - process_file( - file, - agent_config.embedding_model, - contents_output_dir, - artifact_path, # Pass the local path - index_config.chunk_limit, - ) - - # Aggregate the local artifacts into one file in GCS - aggregate_vectors( - vector_artifacts=vector_artifact_paths, - output_gcs_path=aggregated_vectors_gcs_path, - ) - - # Create vector index - create_vector_index(vectors_output_dir) - - -if __name__ == "__main__": - app() \ No newline at end of file diff --git a/apps/index-gen/src/index_gen/main.py b/apps/index-gen/src/index_gen/main.py deleted file mode 100644 index dad6f26..0000000 --- a/apps/index-gen/src/index_gen/main.py +++ /dev/null @@ -1,238 +0,0 @@ -""" -This script defines a Kubeflow Pipeline (KFP) for ingesting and processing documents. - -The pipeline is designed to run on Vertex AI Pipelines and consists of the following steps: -1. **Gather Files**: Scans a GCS directory for PDF files to process. -2. **Process Files (in parallel)**: For each PDF file found, this step: - a. Converts the PDF to Markdown text. - b. Chunks the text if it's too long. - c. Generates a vector embedding for each chunk using a Vertex AI embedding model. - d. Saves the markdown content and the vector embedding to separate GCS output paths. -""" - -import json -import logging -import os -import tempfile -from pathlib import Path - -from rag_eval.config import settings - - -def build_gcs_path(base_path: str, suffix: str) -> str: - """Builds a GCS path by appending a suffix.""" - return f"{base_path}{suffix}" - - -def gather_files( - input_dir: str, -) -> list: - """Gathers all PDF file paths from a GCS directory.""" - from google.cloud import storage - - logging.getLogger().setLevel(logging.INFO) - - gcs_client = storage.Client() - bucket_name, prefix = input_dir.replace("gs://", "").split("/", 1) - bucket = gcs_client.bucket(bucket_name) - blob_list = bucket.list_blobs(prefix=prefix) - - pdf_files = [ - f"gs://{bucket_name}/{blob.name}" - for blob in blob_list - if blob.name.endswith(".pdf") - ] - logging.info(f"Found {len(pdf_files)} PDF files in {input_dir}") - return pdf_files - - -def process_file( - file_path: str, - model_name: str, - contents_output_dir: str, - vectors_output_file: Path, - chunk_limit: int, -): - """ - Processes a single PDF file: converts to markdown, chunks, and generates embeddings. - The vector embeddings are written to a local JSONL file. - """ - # Imports are inside the function as KFP serializes this function - from pathlib import Path - - from chunker.contextual_chunker import ContextualChunker - from document_converter.markdown import MarkdownConverter - from embedder.vertex_ai import VertexAIEmbedder - from google.cloud import storage - from llm.vertex_ai import VertexAILLM - from utils.normalize_filenames import normalize_string - - logging.getLogger().setLevel(logging.INFO) - - # Initialize converters and embedders - converter = MarkdownConverter() - embedder = VertexAIEmbedder(model_name=model_name, project=settings.project_id, location=settings.location) - llm = VertexAILLM(project=settings.project_id, location=settings.location) - chunker = ContextualChunker(llm_client=llm, max_chunk_size=chunk_limit) - gcs_client = storage.Client() - - file_id = normalize_string(Path(file_path).stem) - local_path = Path(f"/tmp/{Path(file_path).name}") - - with open(vectors_output_file, "w", encoding="utf-8") as f: - try: - # Download file from GCS - bucket_name, blob_name = file_path.replace("gs://", "").split("/", 1) - bucket = gcs_client.bucket(bucket_name) - blob = bucket.blob(blob_name) - blob.download_to_filename(local_path) - logging.info(f"Processing file: {file_path}") - - # Process the downloaded file - markdown_content = converter.process_file(local_path) - - def upload_to_gcs(bucket_name, blob_name, data): - bucket = gcs_client.bucket(bucket_name) - blob = bucket.blob(blob_name) - blob.upload_from_string(data, content_type="text/markdown; charset=utf-8") - - # Determine output bucket and paths for markdown - contents_bucket_name, contents_prefix = contents_output_dir.replace( - "gs://", "" - ).split("/", 1) - - # Extract source folder from file path - source_folder = Path(blob_name).parent.as_posix() if blob_name else "" - - if len(markdown_content) > chunk_limit: - chunks = chunker.process_text(markdown_content) - for i, chunk in enumerate(chunks): - chunk_id = f"{file_id}_{i}" - embedding = embedder.generate_embedding(chunk["page_content"]) - - # Upload markdown chunk - md_blob_name = f"{contents_prefix}/{chunk_id}.md" - upload_to_gcs( - contents_bucket_name, md_blob_name, chunk["page_content"] - ) - - # Write vector to local JSONL file with source folder - vector_data = { - "id": chunk_id, - "embedding": embedding, - "source_folder": source_folder - } - json_line = json.dumps(vector_data) - f.write(json_line + '\n') - else: - embedding = embedder.generate_embedding(markdown_content) - - # Upload markdown - md_blob_name = f"{contents_prefix}/{file_id}.md" - upload_to_gcs(contents_bucket_name, md_blob_name, markdown_content) - - # Write vector to local JSONL file with source folder - vector_data = { - "id": file_id, - "embedding": embedding, - "source_folder": source_folder - } - json_line = json.dumps(vector_data) - f.write(json_line + '\n') - - except Exception as e: - logging.error(f"Failed to process file {file_path}: {e}", exc_info=True) - raise - - finally: - # Clean up the downloaded file - if os.path.exists(local_path): - os.remove(local_path) - - - - - -def aggregate_vectors( - vector_artifacts: list, # This will be a list of paths to the artifact files - output_gcs_path: str, -): - """ - Aggregates multiple JSONL artifact files into a single JSONL file in GCS. - """ - from google.cloud import storage - - logging.getLogger().setLevel(logging.INFO) - - # Create a temporary file to aggregate all vector data - with tempfile.NamedTemporaryFile( - mode="w", delete=False, encoding="utf-8" - ) as temp_agg_file: - logging.info(f"Aggregating vectors into temporary file: {temp_agg_file.name}") - for artifact_path in vector_artifacts: - with open(artifact_path, "r", encoding="utf-8") as f: - # Each line is a complete JSON object - for line in f: - temp_agg_file.write(line) # line already includes newline - - temp_file_path = temp_agg_file.name - - logging.info("Uploading aggregated file to GCS...") - gcs_client = storage.Client() - bucket_name, blob_name = output_gcs_path.replace("gs://", "").split("/", 1) - bucket = gcs_client.bucket(bucket_name) - blob = bucket.blob(blob_name) - blob.upload_from_filename(temp_file_path, content_type="application/json; charset=utf-8") - - logging.info(f"Successfully uploaded aggregated vectors to {output_gcs_path}") - - # Clean up the temporary file - import os - - os.remove(temp_file_path) - - - -def create_vector_index( - vectors_dir: str, -): - """Creates and deploys a Vertex AI Vector Search Index.""" - from vector_search.vertex_ai import GoogleCloudVectorSearch - - from rag_eval.config import settings as config - - logging.getLogger().setLevel(logging.INFO) - - try: - index_config = config.index - - logging.info( - f"Initializing Vertex AI client for project '{config.project_id}' in '{config.location}'..." - ) - vector_search = GoogleCloudVectorSearch( - project_id=config.project_id, - location=config.location, - bucket=config.bucket, - index_name=index_config.name, - ) - - logging.info(f"Starting creation of index '{index_config.name}'...") - vector_search.create_index( - name=index_config.name, - content_path=vectors_dir, - dimensions=index_config.dimensions, - ) - logging.info(f"Index '{index_config.name}' created successfully.") - - logging.info("Deploying index to a new endpoint...") - vector_search.deploy_index( - index_name=index_config.name, machine_type=index_config.machine_type - ) - logging.info("Index deployed successfully!") - logging.info(f"Endpoint name: {vector_search.index_endpoint.display_name}") - logging.info( - f"Endpoint resource name: {vector_search.index_endpoint.resource_name}" - ) - except Exception as e: - logging.error(f"An error occurred during index creation or deployment: {e}", exc_info=True) - raise \ No newline at end of file diff --git a/config.example.yaml b/config.example.yaml index 37eb95f..3460cbe 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -1,29 +1,15 @@ -# Configuración de Google Cloud Platform +# Google Cloud Platform project_id: "tu-proyecto-gcp" -location: "us-central1" # o us-east1, europe-west1, etc. -bucket: "tu-bucket-nombre" +location: "us-central1" -# Configuración del índice vectorial -index: - name: "mi-indice-rag" - dimensions: 768 # Para text-embedding-005 usa 768 - machine_type: "e2-standard-2" # Tipo de máquina para el endpoint - approximate_neighbors_count: 150 - distance_measure_type: "DOT_PRODUCT_DISTANCE" # O "COSINE_DISTANCE", "EUCLIDEAN_DISTANCE" +# Embedding model +agent_embedding_model: "text-embedding-005" -# Configuración de embeddings -embedder: - model_name: "text-embedding-005" - task: "RETRIEVAL_DOCUMENT" # O "RETRIEVAL_QUERY" para queries - -# Configuración de LLM para chunking -llm: - model: "gemini-2.0-flash" # O "gemini-1.5-pro", "gemini-1.5-flash" - -# Configuración de chunking -chunking: - strategy: "contextual" # "recursive", "contextual", "llm" - max_chunk_size: 800 - chunk_overlap: 200 # Solo para LLMChunker - merge_related: true # Solo para LLMChunker - extract_images: true # Solo para LLMChunker +# Vector index +index_name: "mi-indice-rag" +index_dimensions: 768 +index_machine_type: "e2-standard-16" +index_origin: "gs://tu-bucket/input/" +index_destination: "gs://tu-bucket/output/" +index_chunk_limit: 800 +index_distance_measure_type: "DOT_PRODUCT_DISTANCE" diff --git a/packages/chunker/README.md b/packages/chunker/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/packages/chunker/pyproject.toml b/packages/chunker/pyproject.toml deleted file mode 100644 index c593c1b..0000000 --- a/packages/chunker/pyproject.toml +++ /dev/null @@ -1,23 +0,0 @@ -[project] -name = "chunker" -version = "0.1.0" -description = "Add your description here" -readme = "README.md" -authors = [ - { name = "Anibal Angulo", email = "a8065384@banorte.com" } -] -requires-python = ">=3.12" -dependencies = [ - "chonkie>=1.1.2", - "pdf2image>=1.17.0", - "pypdf>=6.0.0", -] - -[project.scripts] -llm-chunker = "chunker.llm_chunker:app" -recursive-chunker = "chunker.recursive_chunker:app" -contextual-chunker = "chunker.contextual_chunker:app" - -[build-system] -requires = ["uv_build>=0.8.3,<0.9.0"] -build-backend = "uv_build" diff --git a/packages/chunker/src/chunker/py.typed b/packages/chunker/src/chunker/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/packages/document-converter/.python-version b/packages/document-converter/.python-version deleted file mode 100644 index c8cfe39..0000000 --- a/packages/document-converter/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.10 diff --git a/packages/document-converter/README.md b/packages/document-converter/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/packages/document-converter/pyproject.toml b/packages/document-converter/pyproject.toml deleted file mode 100644 index 467a270..0000000 --- a/packages/document-converter/pyproject.toml +++ /dev/null @@ -1,20 +0,0 @@ -[project] -name = "document-converter" -version = "0.1.0" -description = "Add your description here" -readme = "README.md" -authors = [ - { name = "Anibal Angulo", email = "a8065384@banorte.com" } -] -requires-python = ">=3.12" -dependencies = [ - "markitdown[pdf]>=0.1.2", - "pypdf>=6.1.2", -] - -[project.scripts] -convert-md = "document_converter.markdown:app" - -[build-system] -requires = ["uv_build>=0.8.3,<0.9.0"] -build-backend = "uv_build" diff --git a/packages/document-converter/src/document_converter/__init__.py b/packages/document-converter/src/document_converter/__init__.py deleted file mode 100644 index 5f35dae..0000000 --- a/packages/document-converter/src/document_converter/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def hello() -> str: - return "Hello from document-converter!" diff --git a/packages/document-converter/src/document_converter/base.py b/packages/document-converter/src/document_converter/base.py deleted file mode 100644 index 5f66102..0000000 --- a/packages/document-converter/src/document_converter/base.py +++ /dev/null @@ -1,35 +0,0 @@ -from abc import ABC, abstractmethod -from typing import List - - -class BaseConverter(ABC): - """ - Abstract base class for a remote file processor. - - This class defines the interface for listing and processing files from a remote source. - """ - - @abstractmethod - def process_file(self, file: str) -> str: - """ - Processes a single file from a remote source and returns the result. - - Args: - file: The path to the file to be processed from the remote source. - - Returns: - A string containing the processing result for the file. - """ - ... - - def process_files(self, files: List[str]) -> List[str]: - """ - Processes a list of files from a remote source and returns the results. - - Args: - files: A list of file paths to be processed from the remote source. - - Returns: - A list of strings containing the processing results for each file. - """ - return [self.process_file(file) for file in files] diff --git a/packages/document-converter/src/document_converter/markdown.py b/packages/document-converter/src/document_converter/markdown.py deleted file mode 100644 index a30075a..0000000 --- a/packages/document-converter/src/document_converter/markdown.py +++ /dev/null @@ -1,131 +0,0 @@ -from pathlib import Path -from typing import Annotated, BinaryIO, Union - -import typer -from markitdown import MarkItDown -from rich.console import Console -from rich.progress import Progress - -from .base import BaseConverter - - -class MarkdownConverter(BaseConverter): - """Converts PDF documents to Markdown format.""" - - def __init__(self) -> None: - """Initializes the MarkItDown converter.""" - self.markitdown = MarkItDown(enable_plugins=False) - - def process_file(self, file_stream: Union[str, Path, BinaryIO]) -> str: - """ - Processes a single file and returns the result as a markdown string. - - Args: - file_stream: A file path (string or Path) or a binary file stream. - - Returns: - The converted markdown content as a string. - """ - result = self.markitdown.convert(file_stream) - return result.text_content - - -# --- CLI Application --- - -app = typer.Typer() - - -@app.command() -def main( - input_path: Annotated[ - Path, - typer.Argument( - help="Path to the input PDF file or directory.", - exists=True, - file_okay=True, - dir_okay=True, - readable=True, - resolve_path=True, - ), - ], - output_path: Annotated[ - Path, - typer.Argument( - help="Path for the output Markdown file or directory.", - file_okay=True, - dir_okay=True, - writable=True, - resolve_path=True, - ), - ], -): - """ - Converts a PDF file or a directory of PDF files into Markdown. - """ - console = Console() - converter = MarkdownConverter() - - if input_path.is_dir(): - # --- Directory Processing --- - console.print(f"[bold green]Processing directory:[/bold green] {input_path}") - output_dir = output_path - - if output_dir.exists() and not output_dir.is_dir(): - console.print( - f"[bold red]Error:[/bold red] Input is a directory, but output path '{output_dir}' is an existing file." - ) - raise typer.Exit(code=1) - - pdf_files = sorted(list(input_path.rglob("*.pdf"))) - if not pdf_files: - console.print("[yellow]No PDF files found in the input directory.[/yellow]") - return - - console.print(f"Found {len(pdf_files)} PDF files to convert.") - output_dir.mkdir(parents=True, exist_ok=True) - - with Progress(console=console) as progress: - task = progress.add_task("[cyan]Converting...", total=len(pdf_files)) - for pdf_file in pdf_files: - relative_path = pdf_file.relative_to(input_path) - output_md_path = output_dir.joinpath(relative_path).with_suffix(".md") - output_md_path.parent.mkdir(parents=True, exist_ok=True) - - progress.update(task, description=f"Processing {pdf_file.name}") - try: - markdown_content = converter.process_file(pdf_file) - output_md_path.write_text(markdown_content, encoding="utf-8") - except Exception as e: - console.print( - f"\n[bold red]Failed to process {pdf_file.name}:[/bold red] {e}" - ) - progress.advance(task) - - console.print( - f"[bold green]Conversion complete.[/bold green] Output directory: {output_dir}" - ) - - elif input_path.is_file(): - # --- Single File Processing --- - console.print(f"[bold green]Processing file:[/bold green] {input_path.name}") - final_output_path = output_path - - # If output path is a directory, create a file inside it - if output_path.is_dir(): - final_output_path = output_path / input_path.with_suffix(".md").name - - final_output_path.parent.mkdir(parents=True, exist_ok=True) - - try: - markdown_content = converter.process_file(input_path) - final_output_path.write_text(markdown_content, encoding="utf-8") - console.print( - f"[bold green]Successfully converted file to:[/bold green] {final_output_path}" - ) - except Exception as e: - console.print(f"[bold red]Error processing file:[/bold red] {e}") - raise typer.Exit(code=1) - - -if __name__ == "__main__": - app() diff --git a/packages/document-converter/src/document_converter/py.typed b/packages/document-converter/src/document_converter/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/packages/embedder/.python-version b/packages/embedder/.python-version deleted file mode 100644 index c8cfe39..0000000 --- a/packages/embedder/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.10 diff --git a/packages/embedder/README.md b/packages/embedder/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/packages/embedder/pyproject.toml b/packages/embedder/pyproject.toml deleted file mode 100644 index 4099b91..0000000 --- a/packages/embedder/pyproject.toml +++ /dev/null @@ -1,16 +0,0 @@ -[project] -name = "embedder" -version = "0.1.0" -description = "Add your description here" -readme = "README.md" -authors = [ - { name = "Anibal Angulo", email = "a8065384@banorte.com" } -] -requires-python = ">=3.12" -dependencies = [ - "google-cloud-aiplatform>=1.106.0", -] - -[build-system] -requires = ["uv_build>=0.8.3,<0.9.0"] -build-backend = "uv_build" diff --git a/packages/embedder/src/embedder/__init__.py b/packages/embedder/src/embedder/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/packages/embedder/src/embedder/base.py b/packages/embedder/src/embedder/base.py deleted file mode 100644 index 1513556..0000000 --- a/packages/embedder/src/embedder/base.py +++ /dev/null @@ -1,79 +0,0 @@ -from abc import ABC, abstractmethod -from typing import List - -import numpy as np - - -class BaseEmbedder(ABC): - """Base class for all embedding models.""" - - @abstractmethod - def generate_embedding(self, text: str) -> List[float]: - """ - Generate embeddings for text. - - Args: - text: Single text string or list of texts - - Returns: - Single embedding vector or list of embedding vectors - """ - pass - - @abstractmethod - def generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]: - """ - Generate embeddings for a batch of texts. - - Args: - texts: List of text strings - - Returns: - List of embedding vectors - """ - pass - - def preprocess_text( - self, - text: str, - *, - into_lowercase: bool = False, - normalize_whitespace: bool = True, - remove_punctuation: bool = False, - ) -> str: - """Preprocess text before embedding.""" - # Basic preprocessing - text = text.strip() - - if into_lowercase: - text = text.lower() - - if normalize_whitespace: - text = " ".join(text.split()) - - if remove_punctuation: - import string - - text = text.translate(str.maketrans("", "", string.punctuation)) - - return text - - def normalize_embedding(self, embedding: List[float]) -> List[float]: - """Normalize embedding vector to unit length.""" - norm = np.linalg.norm(embedding) - if norm > 0: - return (np.array(embedding) / norm).tolist() - return embedding - - @abstractmethod - async def async_generate_embedding(self, text: str) -> List[float]: - """ - Generate embeddings for text. - - Args: - text: Single text string or list of texts - - Returns: - Single embedding vector or list of embedding vectors - """ - pass diff --git a/packages/embedder/src/embedder/py.typed b/packages/embedder/src/embedder/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/packages/embedder/src/embedder/vertex_ai.py b/packages/embedder/src/embedder/vertex_ai.py deleted file mode 100644 index b35c2ae..0000000 --- a/packages/embedder/src/embedder/vertex_ai.py +++ /dev/null @@ -1,77 +0,0 @@ -import logging -import time -from typing import List - -from google import genai -from google.genai import types -from tenacity import retry, stop_after_attempt, wait_exponential - -from .base import BaseEmbedder - -logger = logging.getLogger(__name__) - - -class VertexAIEmbedder(BaseEmbedder): - """Embedder using Vertex AI text embedding models.""" - - def __init__( - self, model_name: str, project: str, location: str, task: str = "RETRIEVAL_DOCUMENT" - ) -> None: - self.model_name = model_name - self.client = genai.Client( - vertexai=True, - project=project, - location=location, - ) - self.task = task - - # @retry( - # stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=30) - # ) - def generate_embedding(self, text: str) -> List[float]: - preprocessed_text = self.preprocess_text(text) - result = self.client.models.embed_content( - model=self.model_name, contents=preprocessed_text, config=types.EmbedContentConfig(task_type=self.task) - ) - return result.embeddings[0].values - - # @retry( - # stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=30) - # ) - def generate_embeddings_batch( - self, texts: List[str], batch_size: int = 10 - ) -> List[List[float]]: - """Generate embeddings for a batch of texts.""" - if not texts: - return [] - - # Preprocess texts - preprocessed_texts = [self.preprocess_text(text) for text in texts] - - # Process in batches if necessary - all_embeddings = [] - - for i in range(0, len(preprocessed_texts), batch_size): - batch = preprocessed_texts[i : i + batch_size] - - # Generate embeddings for batch - result = self.client.models.embed_content( - model=self.model_name, contents=batch, config=types.EmbedContentConfig(task_type=self.task) - ) - - # Extract values - batch_embeddings = [emb.values for emb in result.embeddings] - all_embeddings.extend(batch_embeddings) - - # Rate limiting - if i + batch_size < len(preprocessed_texts): - time.sleep(0.1) # Small delay between batches - - return all_embeddings - - async def async_generate_embedding(self, text: str) -> List[float]: - preprocessed_text = self.preprocess_text(text) - result = await self.client.aio.models.embed_content( - model=self.model_name, contents=preprocessed_text, config=types.EmbedContentConfig(task_type=self.task) - ) - return result.embeddings[0].values diff --git a/packages/file-storage/.python-version b/packages/file-storage/.python-version deleted file mode 100644 index c8cfe39..0000000 --- a/packages/file-storage/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.10 diff --git a/packages/file-storage/README.md b/packages/file-storage/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/packages/file-storage/pyproject.toml b/packages/file-storage/pyproject.toml deleted file mode 100644 index d18dc2a..0000000 --- a/packages/file-storage/pyproject.toml +++ /dev/null @@ -1,22 +0,0 @@ -[project] -name = "file-storage" -version = "0.1.0" -description = "Add your description here" -readme = "README.md" -authors = [ - { name = "Anibal Angulo", email = "a8065384@banorte.com" } -] -requires-python = ">=3.12" -dependencies = [ - "gcloud-aio-storage>=9.6.1", - "google-cloud-storage>=2.19.0", - "aiohttp>=3.10.11,<4", - "typer>=0.12.3", -] - -[project.scripts] -file-storage = "file_storage.cli:app" - -[build-system] -requires = ["uv_build>=0.8.3,<0.9.0"] -build-backend = "uv_build" diff --git a/packages/file-storage/src/file_storage/__init__.py b/packages/file-storage/src/file_storage/__init__.py deleted file mode 100644 index 2c8bd2b..0000000 --- a/packages/file-storage/src/file_storage/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def hello() -> str: - return "Hello from file-storage!" diff --git a/packages/file-storage/src/file_storage/base.py b/packages/file-storage/src/file_storage/base.py deleted file mode 100644 index fc17d59..0000000 --- a/packages/file-storage/src/file_storage/base.py +++ /dev/null @@ -1,48 +0,0 @@ -from abc import ABC, abstractmethod -from typing import BinaryIO, List, Optional - - -class BaseFileStorage(ABC): - """ - Abstract base class for a remote file processor. - - This class defines the interface for listing and processing files from a remote source. - """ - - @abstractmethod - def upload_file( - self, - file_path: str, - destination_blob_name: str, - content_type: Optional[str] = None, - ) -> None: - """ - Uploads a file to the remote source. - - Args: - file_path: The local path to the file to upload. - destination_blob_name: The name of the file in the remote source. - content_type: The content type of the file. - """ - ... - - @abstractmethod - def list_files(self, path: Optional[str] = None) -> List[str]: - """ - Lists files from a remote location. - - Args: - path: The path to a specific file or directory in the remote bucket. - If None, it recursively lists all files in the bucket. - - Returns: - A list of file paths. - """ - ... - - @abstractmethod - def get_file_stream(self, file_name: str) -> BinaryIO: - """ - Gets a file from the remote source and returns it as a file-like object. - """ - ... diff --git a/packages/file-storage/src/file_storage/cli.py b/packages/file-storage/src/file_storage/cli.py deleted file mode 100644 index 3f656ff..0000000 --- a/packages/file-storage/src/file_storage/cli.py +++ /dev/null @@ -1,89 +0,0 @@ -import os -from typing import Annotated - -import rich -import typer - -from rag_eval.config import settings - -from .google_cloud import GoogleCloudFileStorage - -app = typer.Typer() - - -def get_storage_client() -> GoogleCloudFileStorage: - return GoogleCloudFileStorage(bucket=settings.bucket) - - -@app.command("upload") -def upload( - file_path: str, - destination_blob_name: str, - content_type: Annotated[str, typer.Option()] = None, -): - """ - Uploads a file or directory to the remote source. - """ - storage_client = get_storage_client() - if os.path.isdir(file_path): - for root, _, files in os.walk(file_path): - for file in files: - local_file_path = os.path.join(root, file) - # preserve the directory structure and use forward slashes for blob name - dest_blob_name = os.path.join( - destination_blob_name, os.path.relpath(local_file_path, file_path) - ).replace(os.sep, "/") - storage_client.upload_file( - local_file_path, dest_blob_name, content_type - ) - rich.print( - f"[green]File {local_file_path} uploaded to {dest_blob_name}.[/green]" - ) - rich.print( - f"[bold green]Directory {file_path} uploaded to {destination_blob_name}.[/bold green]" - ) - else: - storage_client.upload_file(file_path, destination_blob_name, content_type) - rich.print( - f"[green]File {file_path} uploaded to {destination_blob_name}.[/green]" - ) - - -@app.command("list") -def list_items(path: Annotated[str, typer.Option()] = None): - """ - Obtain a list of all files at the given location inside the remote bucket - If path is none, recursively shows all files in the remote bucket. - """ - storage_client = get_storage_client() - files = storage_client.list_files(path) - for file in files: - rich.print(f"[blue]{file}[/blue]") - - -@app.command("download") -def download(file_name: str, destination_path: str): - """ - Gets a file from the remote source and returns it as a file-like object. - """ - storage_client = get_storage_client() - file_stream = storage_client.get_file_stream(file_name) - with open(destination_path, "wb") as f: - f.write(file_stream.read()) - rich.print(f"[green]File {file_name} downloaded to {destination_path}[/green]") - - -@app.command("delete") -def delete(path: str): - """ - Deletes all files at the given location inside the remote bucket. - If path is a single file, it will delete only that file. - If path is a directory, it will delete all files in that directory. - """ - storage_client = get_storage_client() - storage_client.delete_files(path) - rich.print(f"[bold red]Files at {path} deleted.[/bold red]") - - -if __name__ == "__main__": - app() diff --git a/packages/file-storage/src/file_storage/google_cloud.py b/packages/file-storage/src/file_storage/google_cloud.py deleted file mode 100644 index 01e06fc..0000000 --- a/packages/file-storage/src/file_storage/google_cloud.py +++ /dev/null @@ -1,138 +0,0 @@ -import asyncio -import io -import logging -from typing import BinaryIO, List, Optional - -import aiohttp -from gcloud.aio.storage import Storage -from google.cloud import storage - -from .base import BaseFileStorage - -logger = logging.getLogger(__name__) - - -class GoogleCloudFileStorage(BaseFileStorage): - def __init__(self, bucket: str) -> None: - self.bucket_name = bucket - - self.storage_client = storage.Client() - self.bucket_client = self.storage_client.bucket(self.bucket_name) - self._aio_session: aiohttp.ClientSession | None = None - self._aio_storage: Storage | None = None - self._cache: dict[str, bytes] = {} - - def upload_file( - self, - file_path: str, - destination_blob_name: str, - content_type: Optional[str] = None, - ) -> None: - """ - Uploads a file to the remote source. - - Args: - file_path: The local path to the file to upload. - destination_blob_name: The name of the file in the remote source. - content_type: The content type of the file. - """ - blob = self.bucket_client.blob(destination_blob_name) - blob.upload_from_filename( - file_path, - content_type=content_type, - if_generation_match=0, - ) - self._cache.pop(destination_blob_name, None) - - def list_files(self, path: Optional[str] = None) -> List[str]: - """ - Obtain a list of all files at the given location inside the remote bucket - If path is none, recursively shows all files in the remote bucket. - """ - blobs = self.storage_client.list_blobs(self.bucket_name, prefix=path) - return [blob.name for blob in blobs] - - def get_file_stream(self, file_name: str) -> BinaryIO: - """ - Gets a file from the remote source and returns it as a file-like object. - """ - if file_name not in self._cache: - blob = self.bucket_client.blob(file_name) - self._cache[file_name] = blob.download_as_bytes() - file_stream = io.BytesIO(self._cache[file_name]) - file_stream.name = file_name - return file_stream - - def _get_aio_session(self) -> aiohttp.ClientSession: - if self._aio_session is None or self._aio_session.closed: - connector = aiohttp.TCPConnector(limit=300, limit_per_host=50) - timeout = aiohttp.ClientTimeout(total=60) - self._aio_session = aiohttp.ClientSession( - timeout=timeout, connector=connector - ) - return self._aio_session - - def _get_aio_storage(self) -> Storage: - if self._aio_storage is None: - self._aio_storage = Storage(session=self._get_aio_session()) - return self._aio_storage - - async def async_get_file_stream( - self, file_name: str, max_retries: int = 3 - ) -> BinaryIO: - """ - Gets a file from the remote source asynchronously and returns it as a file-like object. - Retries on transient errors (429, 5xx, timeouts) with exponential backoff. - """ - if file_name in self._cache: - file_stream = io.BytesIO(self._cache[file_name]) - file_stream.name = file_name - return file_stream - - storage_client = self._get_aio_storage() - last_exception: Exception | None = None - - for attempt in range(max_retries): - try: - self._cache[file_name] = await storage_client.download( - self.bucket_name, file_name - ) - file_stream = io.BytesIO(self._cache[file_name]) - file_stream.name = file_name - return file_stream - except asyncio.TimeoutError as exc: - last_exception = exc - logger.warning( - "Timeout downloading gs://%s/%s (attempt %d/%d)", - self.bucket_name, file_name, attempt + 1, max_retries, - ) - except aiohttp.ClientResponseError as exc: - last_exception = exc - if exc.status == 429 or exc.status >= 500: - logger.warning( - "HTTP %d downloading gs://%s/%s (attempt %d/%d)", - exc.status, self.bucket_name, file_name, - attempt + 1, max_retries, - ) - else: - raise - - if attempt < max_retries - 1: - delay = 0.5 * (2 ** attempt) - await asyncio.sleep(delay) - - raise TimeoutError( - f"Failed to download gs://{self.bucket_name}/{file_name} " - f"after {max_retries} attempts" - ) from last_exception - - def delete_files(self, path: str) -> None: - """ - Deletes all files at the given location inside the remote bucket. - If path is a single file, it will delete only that file. - If path is a directory, it will delete all files in that directory. - """ - blobs = self.storage_client.list_blobs(self.bucket_name, prefix=path) - for blob in blobs: - blob.delete() - self._cache.pop(blob.name, None) diff --git a/packages/file-storage/src/file_storage/py.typed b/packages/file-storage/src/file_storage/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/packages/llm/.python-version b/packages/llm/.python-version deleted file mode 100644 index c8cfe39..0000000 --- a/packages/llm/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.10 diff --git a/packages/llm/README.md b/packages/llm/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/packages/llm/pyproject.toml b/packages/llm/pyproject.toml deleted file mode 100644 index 3abc91f..0000000 --- a/packages/llm/pyproject.toml +++ /dev/null @@ -1,18 +0,0 @@ -[project] -name = "llm" -version = "0.1.0" -description = "Add your description here" -readme = "README.md" -authors = [ - { name = "Anibal Angulo", email = "a8065384@banorte.com" } -] -requires-python = ">=3.12" -dependencies = [ - "google-genai>=1.20.0", - "pydantic>=2.11.7", - "tenacity>=9.1.2", -] - -[build-system] -requires = ["uv_build>=0.8.3,<0.9.0"] -build-backend = "uv_build" diff --git a/packages/llm/src/llm/__init__.py b/packages/llm/src/llm/__init__.py deleted file mode 100644 index 350844e..0000000 --- a/packages/llm/src/llm/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def hello() -> str: - return "Hello from llm!" diff --git a/packages/llm/src/llm/base.py b/packages/llm/src/llm/base.py deleted file mode 100644 index c0e7a41..0000000 --- a/packages/llm/src/llm/base.py +++ /dev/null @@ -1,128 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any, Type, TypeVar - -from pydantic import BaseModel, field_validator - - -class ToolCall(BaseModel): - name: str - arguments: dict - -class Usage(BaseModel): - prompt_tokens: int | None = 0 - thought_tokens: int | None = 0 - response_tokens: int | None = 0 - - @field_validator("prompt_tokens", "thought_tokens", "response_tokens", mode="before") - @classmethod - def _validate_tokens(cls, v: int | None) -> int: - return v or 0 - - def __add__(self, other): - return Usage( - prompt_tokens=self.prompt_tokens + other.prompt_tokens, - thought_tokens=self.thought_tokens + other.thought_tokens, - response_tokens=self.response_tokens + other.response_tokens - ) - - def get_cost(self, name: str) -> int: - million = 1000000 - if name == "gemini-2.5-pro": - if self.prompt_tokens > 200000: - input_cost = self.prompt_tokens * (2.5/million) - output_cost = self.thought_tokens * (15/million) + self.response_tokens * (15/million) - else: - input_cost = self.prompt_tokens * (1.25/million) - output_cost = self.thought_tokens * (10/million) + self.response_tokens * (10/million) - return (input_cost + output_cost) * 18.65 - if name == "gemini-2.5-flash": - input_cost = self.prompt_tokens * (0.30/million) - output_cost = self.thought_tokens * (2.5/million) + self.response_tokens * (2.5/million) - return (input_cost + output_cost) * 18.65 - else: - raise Exception("Invalid model") - - -class Generation(BaseModel): - """A class to represent a single generation from a model. - - Attributes: - text: The generated text. - usage: A dictionary containing usage metadata. - """ - - text: str | None = None - tool_calls: list[ToolCall] | None = None - usage: Usage = Usage() - extra: dict = {} - - -T = TypeVar("T", bound=BaseModel) - - -class BaseLLM(ABC): - """An abstract base class for all LLMs.""" - - @abstractmethod - def generate( - self, - model: str, - prompt: Any, - tools: list | None = None, - system_prompt: str | None = None, - ) -> Generation: - """Generates text from a prompt. - - Args: - model: The model to use for generation. - prompt: The prompt to generate text from. - tools: An optional list of tools to use for generation. - system_prompt: An optional system prompt to guide the model's behavior. - - Returns: - A Generation object containing the generated text and usage metadata. - """ - ... - - @abstractmethod - def structured_generation( - self, - model: str, - prompt: Any, - response_model: Type[T], - tools: list | None = None, - ) -> T: - """Generates structured data from a prompt. - - Args: - model: The model to use for generation. - prompt: The prompt to generate text from. - response_model: The pydantic model to parse the response into. - tools: An optional list of tools to use for generation. - - Returns: - An instance of the provided pydantic model. - """ - ... - - @abstractmethod - async def async_generate( - self, - model: str, - prompt: Any, - tools: list | None = None, - system_prompt: str | None = None, - tool_mode: str = "AUTO", - ) -> Generation: - """Generates text from a prompt. - - Args: - model: The model to use for generation. - prompt: The prompt to generate text from. - tools: An optional list of tools to use for generation. - system_prompt: An optional system prompt to guide the model's behavior. - - Returns: - A Generation object containing the generated text and usage metadata. - """ - ... diff --git a/packages/llm/src/llm/py.typed b/packages/llm/src/llm/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/packages/llm/src/llm/vertex_ai.py b/packages/llm/src/llm/vertex_ai.py deleted file mode 100644 index f3e91d7..0000000 --- a/packages/llm/src/llm/vertex_ai.py +++ /dev/null @@ -1,181 +0,0 @@ -import logging -from typing import Any, Type - -from google import genai -from google.genai import types -from tenacity import retry, stop_after_attempt, wait_exponential - -from rag_eval.config import settings - -from .base import BaseLLM, Generation, T, ToolCall, Usage - -logger = logging.getLogger(__name__) - - -class VertexAILLM(BaseLLM): - """A class for interacting with the Vertex AI API.""" - - def __init__( - self, project: str | None = None, location: str | None = None, thinking: int = 0 - ) -> None: - """Initializes the VertexAILLM client. - Args: - project: The Google Cloud project ID. - location: The Google Cloud location. - """ - self.client = genai.Client( - vertexai=True, - project=project or settings.project_id, - location=location or settings.location, - ) - self.thinking_budget = thinking - - # @retry( - # wait=wait_exponential(multiplier=1, min=2, max=60), - # stop=stop_after_attempt(3), - # reraise=True, - # ) - def generate( - self, - model: str, - prompt: Any, - tools: list = [], - system_prompt: str | None = None, - tool_mode: str = "AUTO", - ) -> Generation: - """Generates text using the specified model and prompt. - Args: - model: The name of the model to use for generation. - prompt: The prompt to use for generation. - tools: A list of tools to use for generation. - system_prompt: An optional system prompt to guide the model's behavior. - Returns: - A Generation object containing the generated text and usage metadata. - """ - logger.debug("Entering VertexAILLM.generate") - logger.debug(f"Model: {model}, Tool Mode: {tool_mode}") - logger.debug(f"System prompt: {system_prompt}") - logger.debug("Calling Vertex AI API: models.generate_content...") - response = self.client.models.generate_content( - model=model, - contents=prompt, - config=types.GenerateContentConfig( - tools=tools, - system_instruction=system_prompt, - thinking_config=genai.types.ThinkingConfig( - thinking_budget=self.thinking_budget - ), - tool_config=types.ToolConfig( - function_calling_config=types.FunctionCallingConfig( - mode=tool_mode - ) - ) - ), - ) - logger.debug("Received response from Vertex AI API.") - logger.debug(f"API Response: {response}") - - return self._create_generation(response) - - - # @retry( - # wait=wait_exponential(multiplier=1, min=2, max=60), - # stop=stop_after_attempt(3), - # reraise=True, - # ) - def structured_generation( - self, - model: str, - prompt: Any, - response_model: Type[T], - system_prompt: str | None = None, - tools: list | None = None, - ) -> T: - """Generates structured data from a prompt. - Args: - model: The model to use for generation. - prompt: The prompt to generate text from. - response_model: The pydantic model to parse the response into. - tools: An optional list of tools to use for generation. - Returns: - An instance of the provided pydantic model. - """ - config = genai.types.GenerateContentConfig( - response_mime_type="application/json", - response_schema=response_model, - system_instruction=system_prompt, - tools=tools, - ) - - response: genai.types.GenerateContentResponse = ( - self.client.models.generate_content( - model=model, contents=prompt, config=config - ) - ) - - return response_model.model_validate_json(response.text) - - # @retry( - # wait=wait_exponential(multiplier=1, min=2, max=60), - # stop=stop_after_attempt(3), - # reraise=True, - # ) - async def async_generate( - self, - model: str, - prompt: Any, - tools: list = [], - system_prompt: str | None = None, - tool_mode: str = "AUTO", - ) -> Generation: - response = await self.client.aio.models.generate_content( - model=model, - contents=prompt, - config=types.GenerateContentConfig( - tools=tools, - system_instruction=system_prompt, - thinking_config=genai.types.ThinkingConfig( - thinking_budget=self.thinking_budget - ), - tool_config=types.ToolConfig( - function_calling_config=types.FunctionCallingConfig( - mode=tool_mode - ) - ), - ), - ) - - return self._create_generation(response) - - - def _create_generation(self, response): - logger.debug("Creating Generation object from API response.") - m=response.usage_metadata - usage = Usage( - prompt_tokens=m.prompt_token_count, - thought_tokens=m.thoughts_token_count or 0, - response_tokens=m.candidates_token_count - ) - - logger.debug(f"{usage=}") - logger.debug(f"{response=}") - - candidate = response.candidates[0] - - tool_calls = [] - - for part in candidate.content.parts: - if fn := part.function_call: - tool_calls.append(ToolCall(name=fn.name, arguments=fn.args)) - - if len(tool_calls) > 0: - logger.debug(f"Found {len(tool_calls)} tool calls.") - return Generation( - tool_calls=tool_calls, - usage=usage, - extra={"original_content": candidate.content} - ) - - logger.debug("No tool calls found, returning text response.") - text = candidate.content.parts[0].text - return Generation(text=text, usage=usage) diff --git a/packages/utils/README.md b/packages/utils/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/packages/utils/pyproject.toml b/packages/utils/pyproject.toml deleted file mode 100644 index fdad8b3..0000000 --- a/packages/utils/pyproject.toml +++ /dev/null @@ -1,17 +0,0 @@ -[project] -name = "utils" -version = "0.1.0" -description = "Add your description here" -readme = "README.md" -authors = [ - { name = "Anibal Angulo", email = "a8065384@banorte.com" } -] -requires-python = ">=3.12" -dependencies = [] - -[project.scripts] -normalize-filenames = "utils.normalize_filenames:app" - -[build-system] -requires = ["uv_build>=0.8.3,<0.9.0"] -build-backend = "uv_build" diff --git a/packages/utils/src/utils/__init__.py b/packages/utils/src/utils/__init__.py deleted file mode 100644 index eeedf57..0000000 --- a/packages/utils/src/utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def hello() -> str: - return "Hello from utils!" diff --git a/packages/utils/src/utils/normalize_filenames.py b/packages/utils/src/utils/normalize_filenames.py deleted file mode 100644 index 22bfacc..0000000 --- a/packages/utils/src/utils/normalize_filenames.py +++ /dev/null @@ -1,115 +0,0 @@ -"""Normalize filenames in a directory.""" - -import pathlib -import re -import unicodedata - -import typer -from rich.console import Console -from rich.panel import Panel -from rich.table import Table - -app = typer.Typer() - - -def normalize_string(s: str) -> str: - """Normalizes a string to be a valid filename.""" - # 1. Decompose Unicode characters into base characters and diacritics - nfkd_form = unicodedata.normalize("NFKD", s) - # 2. Keep only the base characters (non-diacritics) - only_ascii = "".join([c for c in nfkd_form if not unicodedata.combining(c)]) - # 3. To lowercase - only_ascii = only_ascii.lower() - # 4. Replace spaces with underscores - only_ascii = re.sub(r"\s+", "_", only_ascii) - # 5. Remove any characters that are not alphanumeric, underscores, dots, or hyphens - only_ascii = re.sub(r"[^a-z0-9_.-]", "", only_ascii) - return only_ascii - - -def truncate_string(s: str) -> str: - """given a string with /, return a string with only the text after the last /""" - return pathlib.Path(s).name - - -def remove_extension(s: str) -> str: - """Given a string, if it has a extension like .pdf, remove it and return the new string""" - return str(pathlib.Path(s).with_suffix("")) - - -def remove_duplicate_vowels(s: str) -> str: - """Removes consecutive duplicate vowels (a, e, i, o, u) from a string.""" - return re.sub(r"([aeiou])\1+", r"\1", s, flags=re.IGNORECASE) - - -@app.callback(invoke_without_command=True) -def normalize_filenames( - directory: str = typer.Argument( - ..., help="The path to the directory containing files to normalize." - ), -): - """Normalizes all filenames in a directory.""" - console = Console() - console.print( - Panel( - f"Normalizing filenames in directory: [bold cyan]{directory}[/bold cyan]", - title="[bold green]Filename Normalizer[/bold green]", - expand=False, - ) - ) - - source_path = pathlib.Path(directory) - if not source_path.is_dir(): - console.print(f"[bold red]Error: Directory not found at {directory}[/bold red]") - raise typer.Exit(code=1) - - files_to_rename = [p for p in source_path.rglob("*") if p.is_file()] - - if not files_to_rename: - console.print( - f"[bold yellow]No files found in {directory} to normalize.[/bold yellow]" - ) - return - - table = Table(title="File Renaming Summary") - table.add_column("Original Name", style="cyan", no_wrap=True) - table.add_column("New Name", style="magenta", no_wrap=True) - table.add_column("Status", style="green") - - for file_path in files_to_rename: - original_name = file_path.name - file_stem = file_path.stem - file_suffix = file_path.suffix - - normalized_stem = normalize_string(file_stem) - new_name = f"{normalized_stem}{file_suffix}" - - if new_name == original_name: - table.add_row( - original_name, new_name, "[yellow]Skipped (No change)[/yellow]" - ) - continue - - new_path = file_path.with_name(new_name) - - # Handle potential name collisions - counter = 1 - while new_path.exists(): - new_name = f"{normalized_stem}_{counter}{file_suffix}" - new_path = file_path.with_name(new_name) - counter += 1 - - try: - file_path.rename(new_path) - table.add_row(original_name, new_name, "[green]Renamed[/green]") - except OSError as e: - table.add_row(original_name, new_name, f"[bold red]Error: {e}[/bold red]") - - console.print(table) - console.print( - Panel( - f"[bold]Normalization complete.[/bold] Processed [bold blue]{len(files_to_rename)}[/bold blue] files.", - title="[bold green]Complete[/bold green]", - expand=False, - ) - ) diff --git a/packages/utils/src/utils/py.typed b/packages/utils/src/utils/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/packages/vector-search/.python-version b/packages/vector-search/.python-version deleted file mode 100644 index c8cfe39..0000000 --- a/packages/vector-search/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.10 diff --git a/packages/vector-search/README.md b/packages/vector-search/README.md deleted file mode 100644 index e69de29..0000000 diff --git a/packages/vector-search/pyproject.toml b/packages/vector-search/pyproject.toml deleted file mode 100644 index 68963e6..0000000 --- a/packages/vector-search/pyproject.toml +++ /dev/null @@ -1,29 +0,0 @@ -[project] -name = "vector-search" -version = "0.1.0" -description = "Add your description here" -readme = "README.md" -authors = [ - { name = "Anibal Angulo", email = "a8065384@banorte.com" } -] -requires-python = ">=3.12" -dependencies = [ - "embedder", - "file-storage", - "google-cloud-aiplatform>=1.106.0", - "aiohttp>=3.10.11,<4", - "gcloud-aio-auth>=5.3.0", - "google-auth==2.29.0", - "typer>=0.16.1", -] - -[project.scripts] -vector-search = "vector_search.cli:app" - -[build-system] -requires = ["uv_build>=0.8.3,<0.9.0"] -build-backend = "uv_build" - -[tool.uv.sources] -file-storage = { workspace = true } -embedder = { workspace = true } diff --git a/packages/vector-search/src/vector_search/__init__.py b/packages/vector-search/src/vector_search/__init__.py deleted file mode 100644 index 9d2ad75..0000000 --- a/packages/vector-search/src/vector_search/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -def hello() -> str: - return "Hello from vector-search!" diff --git a/packages/vector-search/src/vector_search/base.py b/packages/vector-search/src/vector_search/base.py deleted file mode 100644 index 4ba16ba..0000000 --- a/packages/vector-search/src/vector_search/base.py +++ /dev/null @@ -1,62 +0,0 @@ -from abc import ABC, abstractmethod -from typing import List, TypedDict - - -class SearchResult(TypedDict): - id: str - distance: float - content: str - - -class BaseVectorSearch(ABC): - """ - Abstract base class for a vector search provider. - - This class defines the standard interface for creating a vector search index - and running queries against it. - """ - - @abstractmethod - def create_index(self, name: str, content_path: str, **kwargs) -> None: - """ - Creates a new vector search index and populates it with the provided content. - - Args: - name: The desired name for the new index. - content_path: The local file system path to the data that will be used to - populate the index. This is expected to be a JSON file - containing a list of objects, each with an 'id', 'name', - and 'embedding' key. - **kwargs: Additional provider-specific arguments for index creation. - """ - ... - - @abstractmethod - def update_index(self, index_name: str, content_path: str, **kwargs) -> None: - """ - Updates an existing vector search index with new content. - - Args: - index_name: The name of the index to update. - content_path: The local file system path to the data that will be used to - populate the index. - **kwargs: Additional provider-specific arguments for index update. - """ - ... - - @abstractmethod - def run_query( - self, index: str, query: List[float], limit: int - ) -> List[SearchResult]: - """ - Runs a similarity search query against the index. - - Args: - query: The embedding vector to use for the search query. - limit: The maximum number of nearest neighbors to return. - - Returns: - A list of dictionaries, where each dictionary represents a matched item - and contains at least the item's 'id' and the search 'distance'. - """ - ... diff --git a/packages/vector-search/src/vector_search/cli/__init__.py b/packages/vector-search/src/vector_search/cli/__init__.py deleted file mode 100644 index c6c06af..0000000 --- a/packages/vector-search/src/vector_search/cli/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from typer import Typer - -from .create import app as create_callback -from .delete import app as delete_callback -from .query import app as query_callback - -app = Typer() -app.add_typer(create_callback, name="create") -app.add_typer(delete_callback, name="delete") -app.add_typer(query_callback, name="query") diff --git a/packages/vector-search/src/vector_search/cli/create.py b/packages/vector-search/src/vector_search/cli/create.py deleted file mode 100644 index a893892..0000000 --- a/packages/vector-search/src/vector_search/cli/create.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Create and deploy a Vertex AI Vector Search index.""" - -from typing import Annotated - -import typer -from rich.console import Console - -from rag_eval.config import settings as config -from vector_search.vertex_ai import GoogleCloudVectorSearch - -app = typer.Typer() - - -@app.callback(invoke_without_command=True) -def create( - path: Annotated[ - str, - typer.Option( - "--path", - "-p", - help="The GCS URI (gs://...) to the directory containing your embedding JSON file(s).", - ), - ], - agent_name: Annotated[ - str, - typer.Option( - "--agent", - "-a", - help="The name of the agent to create the index for.", - ), - ], -): - """Create and deploy a Vertex AI Vector Search index for a specific agent.""" - console = Console() - - try: - console.print( - f"[bold green]Looking up configuration for agent '{agent_name}'...[/bold green]" - ) - agent_config = config.agents.get(agent_name) - if not agent_config: - console.print( - f"[bold red]Agent '{agent_name}' not found in settings.[/bold red]" - ) - raise typer.Exit(code=1) - - if not agent_config.index: - console.print( - f"[bold red]Index configuration not found for agent '{agent_name}'.[/bold red]" - ) - raise typer.Exit(code=1) - - index_config = agent_config.index - - console.print( - f"[bold green]Initializing Vertex AI client for project '{config.project_id}' in '{config.location}'...[/bold green]" - ) - vector_search = GoogleCloudVectorSearch( - project_id=config.project_id, - location=config.location, - bucket=config.bucket, - index_name=index_config.name, - ) - - console.print( - f"[bold green]Starting creation of index '{index_config.name}'...[/bold green]" - ) - console.print("This may take a while.") - vector_search.create_index( - name=index_config.name, - content_path=f"gs://{config.bucket}/{path}", - dimensions=index_config.dimensions, - ) - console.print( - f"[bold green]Index '{index_config.name}' created successfully.[/bold green]" - ) - - console.print("[bold green]Deploying index to a new endpoint...[/bold green]") - console.print("This will also take some time.") - vector_search.deploy_index( - index_name=index_config.name, machine_type=index_config.machine_type - ) - console.print("[bold green]Index deployed successfully![/bold green]") - console.print(f"Endpoint name: {vector_search.index_endpoint.display_name}") - console.print( - f"Endpoint resource name: {vector_search.index_endpoint.resource_name}" - ) - - except Exception as e: - console.print(f"[bold red]An error occurred: {e}[/bold red]") - raise typer.Exit(code=1) diff --git a/packages/vector-search/src/vector_search/cli/delete.py b/packages/vector-search/src/vector_search/cli/delete.py deleted file mode 100644 index 59f3fab..0000000 --- a/packages/vector-search/src/vector_search/cli/delete.py +++ /dev/null @@ -1,38 +0,0 @@ -"""Delete a vector index or endpoint.""" - -import typer -from rich.console import Console - -from rag_eval.config import settings as config -from vector_search.vertex_ai import GoogleCloudVectorSearch - -app = typer.Typer() - - -@app.callback(invoke_without_command=True) -def delete( - id: str = typer.Argument(..., help="The ID of the index or endpoint to delete."), - endpoint: bool = typer.Option( - False, "--endpoint", help="Delete an endpoint instead of an index." - ), -): - """Delete a vector index or endpoint.""" - console = Console() - vector_search = GoogleCloudVectorSearch( - project_id=config.project_id, location=config.location, bucket=config.bucket - ) - - try: - if endpoint: - console.print(f"[bold red]Deleting endpoint {id}...[/bold red]") - vector_search.delete_index_endpoint(id) - console.print( - f"[bold green]Endpoint {id} deleted successfully.[/bold green]" - ) - else: - console.print(f"[bold red]Deleting index {id}...[/bold red]") - vector_search.delete_index(id) - console.print(f"[bold green]Index {id} deleted successfully.[/bold green]") - except Exception as e: - console.print(f"[bold red]An error occurred: {e}[/bold red]") - raise typer.Exit(code=1) diff --git a/packages/vector-search/src/vector_search/cli/generate.py b/packages/vector-search/src/vector_search/cli/generate.py deleted file mode 100644 index dad63cd..0000000 --- a/packages/vector-search/src/vector_search/cli/generate.py +++ /dev/null @@ -1,91 +0,0 @@ -"""Generate embeddings for documents and save them to a JSON file.""" - -import json -from pathlib import Path - -import typer -from embedder.vertex_ai import VertexAIEmbedder -from file_storage.google_cloud import GoogleCloudFileStorage -from rich.console import Console -from rich.progress import Progress - -from rag_eval.config import Settings - -app = typer.Typer() - - -@app.callback(invoke_without_command=True) -def generate( - path: str = typer.Argument(..., help="The path to the markdown files."), - output_file: str = typer.Option( - ..., - "--output-file", - "-o", - help="The local path to save the output JSON file.", - ), - batch_size: int = typer.Option( - 10, - "--batch-size", - "-b", - help="The batch size for processing files.", - ), - jsonl: bool = typer.Option( - False, - "--jsonl", - help="Output in JSONL format instead of JSON.", - ), -): - """Generate embeddings for documents and save them to a JSON file.""" - config = Settings() - console = Console() - - console.print("[bold green]Starting vector generation...[/bold green]") - - try: - storage = GoogleCloudFileStorage(bucket=config.bucket) - embedder = VertexAIEmbedder(model_name=config.embedding_model) - - remote_files = storage.list_files(path=path) - results = [] - - with Progress(console=console) as progress: - task = progress.add_task( - "[cyan]Generating embeddings...", total=len(remote_files) - ) - - for i in range(0, len(remote_files), batch_size): - batch_files = remote_files[i : i + batch_size] - batch_contents = [] - - for remote_file in batch_files: - file_stream = storage.get_file_stream(remote_file) - batch_contents.append( - file_stream.read().decode("utf-8-sig", errors="replace") - ) - - batch_embeddings = embedder.generate_embeddings_batch(batch_contents) - - for j, remote_file in enumerate(batch_files): - results.append( - {"id": remote_file, "embedding": batch_embeddings[j]} - ) - progress.update(task, advance=1) - - except Exception as e: - console.print( - f"[bold red]An error occurred during vector generation: {e}[/bold red]" - ) - raise typer.Exit(code=1) - - output_path = Path(output_file) - output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, "w") as f: - if jsonl: - for record in results: - f.write(json.dumps(record) + "\n") - else: - json.dump(results, f, indent=2) - - console.print( - f"[bold green]Embedding generation complete. {len(results)} vectors saved to '{output_path.resolve()}'[/bold green]" - ) diff --git a/packages/vector-search/src/vector_search/cli/query.py b/packages/vector-search/src/vector_search/cli/query.py deleted file mode 100644 index 0e8893b..0000000 --- a/packages/vector-search/src/vector_search/cli/query.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Query the vector search index.""" - -import typer -from embedder.vertex_ai import VertexAIEmbedder -from rich.console import Console -from rich.table import Table -from typer import Argument, Option - -from rag_eval.config import settings as config -from vector_search.vertex_ai import GoogleCloudVectorSearch - -app = typer.Typer() - - -@app.callback(invoke_without_command=True) -def query( - query: str = Argument(..., help="The text query to search for."), - limit: int = Option(5, "--limit", "-l", help="The number of results to return."), -): - """Queries the vector search index.""" - console = Console() - - try: - console.print("[bold green]Initializing clients...[/bold green]") - embedder = VertexAIEmbedder(model_name=config.embedding_model) - vector_search = GoogleCloudVectorSearch( - project_id=config.project_id, location=config.location, bucket=config.bucket - ) - - console.print("[bold green]Loading index endpoint...[/bold green]") - vector_search.load_index_endpoint(config.index.endpoint) - - console.print("[bold green]Generating embedding for query...[/bold green]") - query_embedding = embedder.generate_embedding(query) - - console.print("[bold green]Running search query...[/bold green]") - search_results = vector_search.run_query( - deployed_index_id=config.index.deployment, - query=query_embedding, - limit=limit, - ) - - table = Table(title="Search Results") - table.add_column("ID", justify="left", style="cyan") - table.add_column("Distance", justify="left", style="magenta") - table.add_column("Content", justify="left", style="green") - - for result in search_results: - table.add_row(result["id"], str(result["distance"]), result["content"]) - - console.print(table) - - except Exception as e: - console.print(f"[bold red]An error occurred: {e}[/bold red]") - raise typer.Exit(code=1) diff --git a/packages/vector-search/src/vector_search/py.typed b/packages/vector-search/src/vector_search/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/packages/vector-search/src/vector_search/vertex_ai.py b/packages/vector-search/src/vector_search/vertex_ai.py deleted file mode 100644 index 2721820..0000000 --- a/packages/vector-search/src/vector_search/vertex_ai.py +++ /dev/null @@ -1,255 +0,0 @@ -import asyncio -from typing import List -from uuid import uuid4 - -import aiohttp -import google.auth -import google.auth.transport.requests -from file_storage.google_cloud import GoogleCloudFileStorage -from gcloud.aio.auth import Token -from google.cloud import aiplatform - -from .base import BaseVectorSearch, SearchResult - - -class GoogleCloudVectorSearch(BaseVectorSearch): - """ - A vector search provider that uses Google Cloud's Vertex AI Vector Search. - """ - - def __init__( - self, project_id: str, location: str, bucket: str, index_name: str = None - ): - """ - Initializes the GoogleCloudVectorSearch client. - - Args: - project_id: The Google Cloud project ID. - location: The Google Cloud location (e.g., 'us-central1'). - bucket: The GCS bucket to use for file storage. - index_name: The name of the index. If None, it will be taken from settings. - """ - aiplatform.init(project=project_id, location=location) - self.project_id = project_id - self.location = location - self.storage = GoogleCloudFileStorage(bucket=bucket) - self.index_name = index_name - self._credentials = None - self._aio_session: aiohttp.ClientSession | None = None - self._async_token: Token | None = None - - def _get_auth_headers(self) -> dict: - if self._credentials is None: - self._credentials, _ = google.auth.default( - scopes=["https://www.googleapis.com/auth/cloud-platform"] - ) - if not self._credentials.token or self._credentials.expired: - self._credentials.refresh(google.auth.transport.requests.Request()) - return { - "Authorization": f"Bearer {self._credentials.token}", - "Content-Type": "application/json", - } - - async def _async_get_auth_headers(self) -> dict: - if self._async_token is None: - self._async_token = Token( - session=self._get_aio_session(), - scopes=["https://www.googleapis.com/auth/cloud-platform"], - ) - access_token = await self._async_token.get() - return { - "Authorization": f"Bearer {access_token}", - "Content-Type": "application/json", - } - - def _get_aio_session(self) -> aiohttp.ClientSession: - if self._aio_session is None or self._aio_session.closed: - connector = aiohttp.TCPConnector(limit=300, limit_per_host=50) - timeout = aiohttp.ClientTimeout(total=60) - self._aio_session = aiohttp.ClientSession( - timeout=timeout, connector=connector - ) - return self._aio_session - - def create_index( - self, - name: str, - content_path: str, - dimensions: int, - approximate_neighbors_count: int = 150, - distance_measure_type: str = "DOT_PRODUCT_DISTANCE", - **kwargs, - ) -> None: - """ - Creates a new Vertex AI Vector Search index. - - Args: - name: The display name for the new index. - content_path: The GCS URI to the JSON file containing the embeddings. - dimensions: The number of dimensions in the embedding vectors. - approximate_neighbors_count: The number of neighbors to find for each vector. - distance_measure_type: The distance measure to use (e.g., 'DOT_PRODUCT_DISTANCE'). - """ - index = aiplatform.MatchingEngineIndex.create_tree_ah_index( - display_name=name, - contents_delta_uri=content_path, - dimensions=dimensions, - approximate_neighbors_count=approximate_neighbors_count, - distance_measure_type=distance_measure_type, - leaf_node_embedding_count=1000, - leaf_nodes_to_search_percent=10, - ) - self.index = index - - def update_index(self, index_name: str, content_path: str, **kwargs) -> None: - """ - Updates an existing Vertex AI Vector Search index. - - Args: - index_name: The resource name of the index to update. - content_path: The GCS URI to the JSON file containing the new embeddings. - """ - index = aiplatform.MatchingEngineIndex(index_name=index_name) - index.update_embeddings( - contents_delta_uri=content_path, - ) - self.index = index - - def deploy_index( - self, index_name: str, machine_type: str = "e2-standard-2" - ) -> None: - """ - Deploys a Vertex AI Vector Search index to an endpoint. - - Args: - index_name: The name of the index to deploy. - machine_type: The type of machine to use for the endpoint. - """ - index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create( - display_name=f"{index_name}-endpoint", - public_endpoint_enabled=True, - ) - index_endpoint.deploy_index( - index=self.index, - deployed_index_id=f"{index_name.replace('-', '_')}_deployed_{uuid4().hex}", - machine_type=machine_type, - ) - self.index_endpoint = index_endpoint - - def load_index_endpoint(self, endpoint_name: str) -> None: - """ - Loads an existing Vertex AI Vector Search index endpoint. - - Args: - endpoint_name: The resource name of the index endpoint. - """ - self.index_endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_name) - if not self.index_endpoint.public_endpoint_domain_name: - raise ValueError( - "The index endpoint does not have a public endpoint. " - "Please ensure that the endpoint is configured for public access." - ) - - def run_query( - self, deployed_index_id: str, query: List[float], limit: int - ) -> List[SearchResult]: - """ - Runs a similarity search query against the deployed index. - - Args: - deployed_index_id: The ID of the deployed index. - query: The embedding vector to use for the search query. - limit: The maximum number of nearest neighbors to return. - - Returns: - A list of dictionaries representing the matched items. - """ - response = self.index_endpoint.find_neighbors( - deployed_index_id=deployed_index_id, queries=[query], num_neighbors=limit - ) - results = [] - for neighbor in response[0]: - file_path = self.index_name + "/contents/" + neighbor.id + ".md" - content = self.storage.get_file_stream(file_path).read().decode("utf-8") - results.append( - {"id": neighbor.id, "distance": neighbor.distance, "content": content} - ) - return results - - async def async_run_query( - self, deployed_index_id: str, query: List[float], limit: int - ) -> List[SearchResult]: - """ - Runs a non-blocking similarity search query against the deployed index - using the REST API directly with an async HTTP client. - - Args: - deployed_index_id: The ID of the deployed index. - query: The embedding vector to use for the search query. - limit: The maximum number of nearest neighbors to return. - - Returns: - A list of dictionaries representing the matched items. - """ - domain = self.index_endpoint.public_endpoint_domain_name - endpoint_id = self.index_endpoint.name.split("/")[-1] - url = ( - f"https://{domain}/v1/projects/{self.project_id}" - f"/locations/{self.location}" - f"/indexEndpoints/{endpoint_id}:findNeighbors" - ) - payload = { - "deployed_index_id": deployed_index_id, - "queries": [ - { - "datapoint": {"feature_vector": query}, - "neighbor_count": limit, - } - ], - } - - headers = await self._async_get_auth_headers() - session = self._get_aio_session() - async with session.post(url, json=payload, headers=headers) as response: - response.raise_for_status() - data = await response.json() - - neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) - content_tasks = [] - for neighbor in neighbors: - datapoint_id = neighbor["datapoint"]["datapointId"] - file_path = f"{self.index_name}/contents/{datapoint_id}.md" - content_tasks.append(self.storage.async_get_file_stream(file_path)) - - file_streams = await asyncio.gather(*content_tasks) - results: List[SearchResult] = [] - for neighbor, stream in zip(neighbors, file_streams): - results.append( - { - "id": neighbor["datapoint"]["datapointId"], - "distance": neighbor["distance"], - "content": stream.read().decode("utf-8"), - } - ) - return results - - def delete_index(self, index_name: str) -> None: - """ - Deletes a Vertex AI Vector Search index. - - Args: - index_name: The resource name of the index. - """ - index = aiplatform.MatchingEngineIndex(index_name) - index.delete() - - def delete_index_endpoint(self, index_endpoint_name: str) -> None: - """ - Deletes a Vertex AI Vector Search index endpoint. - - Args: - index_endpoint_name: The resource name of the index endpoint. - """ - index_endpoint = aiplatform.MatchingEngineIndexEndpoint(index_endpoint_name) - index_endpoint.undeploy_all() - index_endpoint.delete(force=True) diff --git a/pyproject.toml b/pyproject.toml index f289a4b..2552707 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,91 +1,57 @@ [project] -name = "rag-pipeline" +name = "knowledge-pipeline" version = "0.1.0" description = "RAG Pipeline for document chunking, embedding, and vector search" readme = "README.md" requires-python = ">=3.12" authors = [ - { name = "Pipeline Team" } + { name = "Anibal Angulo", email = "A8065384@banorte.com" } ] - dependencies = [ # Core dependencies - "google-genai>=1.45.0", "google-cloud-aiplatform>=1.106.0", "google-cloud-storage>=2.19.0", - "google-auth>=2.29.0", "pydantic>=2.11.7", "pydantic-settings[yaml]>=2.10.1", - "python-dotenv>=1.0.0", - # Chunking "chonkie>=1.1.2", "tiktoken>=0.7.0", "langchain>=0.3.0", "langchain-core>=0.3.0", - # Document processing "markitdown[pdf]>=0.1.2", "pypdf>=6.1.2", "pdf2image>=1.17.0", - - # Storage & networking - "gcloud-aio-storage>=9.6.1", - "gcloud-aio-auth>=5.3.0", - "aiohttp>=3.10.11,<4", - # Utils - "tenacity>=9.1.2", "typer>=0.16.1", - - # Pipeline orchestration (optional) - "kfp>=2.15.2", + "pydantic-ai>=0.0.5", ] [project.scripts] -# Chunkers -llm-chunker = "chunker.llm_chunker:app" -recursive-chunker = "chunker.recursive_chunker:app" -contextual-chunker = "chunker.contextual_chunker:app" - -# Converters -convert-md = "document_converter.markdown:app" - -# Storage -file-storage = "file_storage.cli:app" - -# Vector Search -vector-search = "vector_search.cli:app" - -# Utils -normalize-filenames = "utils.normalize_filenames:app" +knowledge-pipeline = "knowledge_pipeline.cli:app" [build-system] requires = ["uv_build>=0.8.3,<0.9.0"] build-backend = "uv_build" -[tool.uv.workspace] -members = [ - "apps/*", - "packages/*", -] - -[tool.uv.sources] -chunker = { workspace = true } -document-converter = { workspace = true } -embedder = { workspace = true } -file-storage = { workspace = true } -llm = { workspace = true } -utils = { workspace = true } -vector-search = { workspace = true } -index-gen = { workspace = true } - [dependency-groups] dev = [ "pytest>=8.4.1", - "mypy>=1.17.1", "ruff>=0.12.10", + "ty>=0.0.18", ] [tool.ruff.lint] -extend-select = ["I", "F"] +select = ["I", "F"] + +[tool.pytest.ini_options] +addopts = [ + "--strict-markers", + "--tb=short", + "--disable-warnings", +] +markers = [ + "unit: Unit tests", + "integration: Integration tests", + "slow: Slow running tests", +] diff --git a/src/rag_eval/__init__.py b/src/knowledge_pipeline/__init__.py similarity index 100% rename from src/rag_eval/__init__.py rename to src/knowledge_pipeline/__init__.py diff --git a/packages/chunker/src/chunker/__init__.py b/src/knowledge_pipeline/chunker/__init__.py similarity index 100% rename from packages/chunker/src/chunker/__init__.py rename to src/knowledge_pipeline/chunker/__init__.py diff --git a/packages/chunker/src/chunker/base_chunker.py b/src/knowledge_pipeline/chunker/base_chunker.py similarity index 98% rename from packages/chunker/src/chunker/base_chunker.py rename to src/knowledge_pipeline/chunker/base_chunker.py index 4ec2eed..03e2fa8 100644 --- a/packages/chunker/src/chunker/base_chunker.py +++ b/src/knowledge_pipeline/chunker/base_chunker.py @@ -13,6 +13,8 @@ class Document(TypedDict): class BaseChunker(ABC): """Abstract base class for chunker implementations.""" + max_chunk_size: int + @abstractmethod def process_text(self, text: str) -> List[Document]: """ diff --git a/packages/chunker/src/chunker/contextual_chunker.py b/src/knowledge_pipeline/chunker/contextual_chunker.py similarity index 55% rename from packages/chunker/src/chunker/contextual_chunker.py rename to src/knowledge_pipeline/chunker/contextual_chunker.py index 6504f7d..6faa9df 100644 --- a/packages/chunker/src/chunker/contextual_chunker.py +++ b/src/knowledge_pipeline/chunker/contextual_chunker.py @@ -1,11 +1,3 @@ -import json -import os -from pathlib import Path -from typing import Annotated, List - -import typer -from llm.vertex_ai import VertexAILLM - from .base_chunker import BaseChunker, Document @@ -16,23 +8,13 @@ class ContextualChunker(BaseChunker): def __init__( self, - llm_client: VertexAILLM, + model: str = "google-vertex:gemini-2.0-flash", max_chunk_size: int = 800, - model: str = "gemini-2.0-flash", ): - """ - Initializes the ContextualChunker. - - Args: - max_chunk_size: The maximum length of a chunk in characters. - model: The name of the language model to use. - llm_client: An optional instance of a language model client. - """ self.max_chunk_size = max_chunk_size self.model = model - self.llm_client = llm_client - def _split_text(self, text: str) -> List[str]: + def _split_text(self, text: str) -> list[str]: """Splits text into evenly sized chunks of a maximum size, trying to respect sentence and paragraph boundaries.""" import math @@ -67,7 +49,7 @@ class ContextualChunker(BaseChunker): return chunks - def process_text(self, text: str) -> List[Document]: + def process_text(self, text: str) -> list[Document]: """ Processes a string of text into a list of context-aware Document chunks. """ @@ -75,7 +57,7 @@ class ContextualChunker(BaseChunker): return [{"page_content": text, "metadata": {}}] chunks = self._split_text(text) - processed_chunks: List[Document] = [] + processed_chunks: list[Document] = [] for i, chunk_content in enumerate(chunks): prompt = f""" @@ -93,7 +75,14 @@ class ContextualChunker(BaseChunker): Genera un resumen conciso del "Documento Original" que proporcione el contexto necesario para entender el "Fragmento Actual". El resumen debe ser un solo párrafo en español. """ - summary = self.llm_client.generate(self.model, prompt).text + from pydantic_ai import ModelRequest + from pydantic_ai.direct import model_request_sync + + response = model_request_sync( + self.model, + [ModelRequest.user_text_prompt(prompt)], + ) + summary = next(p.content for p in response.parts if p.part_kind == "text") contextualized_chunk = ( f"> **Contexto del documento original:**\n> {summary}\n\n---\n\n" + chunk_content @@ -107,49 +96,3 @@ class ContextualChunker(BaseChunker): ) return processed_chunks - - -app = typer.Typer() - - -@app.command() -def main( - input_file_path: Annotated[ - str, typer.Argument(help="Path to the input text file.") - ], - output_dir: Annotated[ - str, typer.Argument(help="Directory to save the output file.") - ], - max_chunk_size: Annotated[ - int, typer.Option(help="Maximum chunk size in characters.") - ] = 800, - model: Annotated[ - str, typer.Option(help="Model to use for the processing") - ] = "gemini-2.0-flash", -): - """ - Processes a text file using ContextualChunker and saves the output to a JSONL file. - """ - print(f"Starting to process {input_file_path}...") - - chunker = ContextualChunker(max_chunk_size=max_chunk_size, model=model) - documents = chunker.process_path(Path(input_file_path)) - - print(f"Successfully created {len(documents)} chunks.") - - if not os.path.exists(output_dir): - os.makedirs(output_dir) - print(f"Created output directory: {output_dir}") - - output_file_path = os.path.join(output_dir, "chunked_documents.jsonl") - - with open(output_file_path, "w", encoding="utf-8") as f: - for doc in documents: - doc["metadata"]["source_file"] = os.path.basename(input_file_path) - f.write(json.dumps(doc, ensure_ascii=False) + "\n") - - print(f"Successfully saved {len(documents)} chunks to {output_file_path}") - - -if __name__ == "__main__": - app() diff --git a/packages/chunker/src/chunker/llm_chunker.py b/src/knowledge_pipeline/chunker/llm_chunker.py similarity index 99% rename from packages/chunker/src/chunker/llm_chunker.py rename to src/knowledge_pipeline/chunker/llm_chunker.py index 2b44474..37f1d93 100644 --- a/packages/chunker/src/chunker/llm_chunker.py +++ b/src/knowledge_pipeline/chunker/llm_chunker.py @@ -13,7 +13,6 @@ from langchain_core.documents import Document as LangchainDocument from llm.vertex_ai import VertexAILLM from pdf2image import convert_from_path from pypdf import PdfReader - from rag_eval.config import Settings from .base_chunker import BaseChunker, Document diff --git a/packages/chunker/src/chunker/recursive_chunker.py b/src/knowledge_pipeline/chunker/recursive_chunker.py similarity index 100% rename from packages/chunker/src/chunker/recursive_chunker.py rename to src/knowledge_pipeline/chunker/recursive_chunker.py diff --git a/src/knowledge_pipeline/cli.py b/src/knowledge_pipeline/cli.py new file mode 100644 index 0000000..daea397 --- /dev/null +++ b/src/knowledge_pipeline/cli.py @@ -0,0 +1,20 @@ +import logging + +import typer + +from .config import Settings +from .pipeline import run_pipeline + +app = typer.Typer() + + +@app.command() +def run_ingestion(): + """Main function for the CLI script.""" + logging.basicConfig(level=logging.INFO) + settings = Settings.model_validate({}) + run_pipeline(settings) + + +if __name__ == "__main__": + app() diff --git a/src/knowledge_pipeline/config.py b/src/knowledge_pipeline/config.py new file mode 100644 index 0000000..c1ce7ef --- /dev/null +++ b/src/knowledge_pipeline/config.py @@ -0,0 +1,100 @@ +import os +from functools import cached_property + +from google.cloud.aiplatform.matching_engine.matching_engine_index_config import ( + DistanceMeasureType, +) +from pydantic_settings import ( + BaseSettings, + PydanticBaseSettingsSource, + SettingsConfigDict, + YamlConfigSettingsSource, +) + +CONFIG_FILE_PATH = os.getenv("CONFIG_YAML", "config.yaml") + + +class Settings(BaseSettings): + project_id: str + location: str + + agent_embedding_model: str + + index_name: str + index_dimensions: int + index_machine_type: str = "e2-standard-16" + index_origin: str + index_destination: str + index_chunk_limit: int + index_distance_measure_type: DistanceMeasureType = ( + DistanceMeasureType.DOT_PRODUCT_DISTANCE + ) + index_approximate_neighbors_count: int = 150 + index_leaf_node_embedding_count: int = 1000 + index_leaf_nodes_to_search_percent: int = 10 + index_public_endpoint_enabled: bool = True + + model_config = SettingsConfigDict(yaml_file=CONFIG_FILE_PATH) + + def model_post_init(self, _): + from google.cloud import aiplatform + + aiplatform.init(project=self.project_id, location=self.location) + + @property + def index_deployment(self) -> str: + return self.index_name.replace("-", "_") + "_deployed" + + @property + def index_data(self) -> str: + return self.index_destination + self.index_name + + @property + def index_contents_dir(self) -> str: + return f"{self.index_data}/contents" + + @property + def index_vectors_dir(self) -> str: + return f"{self.index_data}/vectors" + + @property + def index_vectors_jsonl_path(self) -> str: + return f"{self.index_vectors_dir}/vectors.json" + + @cached_property + def gcs_client(self): + from google.cloud import storage + + return storage.Client() + + @cached_property + def converter(self): + from markitdown import MarkItDown + + return MarkItDown(enable_plugins=False) + + @cached_property + def embedder(self): + from pydantic_ai import Embedder + + return Embedder(f"google-vertex:{self.agent_embedding_model}") + + @cached_property + def chunker(self): + from .chunker.contextual_chunker import ContextualChunker + + return ContextualChunker(max_chunk_size=self.index_chunk_limit) + + @classmethod + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return ( + env_settings, + YamlConfigSettingsSource(settings_cls), + ) diff --git a/src/knowledge_pipeline/pipeline.py b/src/knowledge_pipeline/pipeline.py new file mode 100644 index 0000000..8aa5da0 --- /dev/null +++ b/src/knowledge_pipeline/pipeline.py @@ -0,0 +1,210 @@ +import json +import logging +import re +import tempfile +import unicodedata +from collections.abc import Sequence +from pathlib import Path + +from google.cloud import aiplatform +from google.cloud.aiplatform.matching_engine.matching_engine_index_config import ( + DistanceMeasureType, +) +from google.cloud.storage import Client as StorageClient +from markitdown import MarkItDown +from pydantic_ai import Embedder + +from .chunker.base_chunker import BaseChunker, Document +from .config import Settings + +log = logging.getLogger(__name__) + + +def _parse_gcs_uri(uri: str) -> tuple[str, str]: + """Parse a 'gs://bucket/path' URI into (bucket_name, object_path).""" + bucket, _, path = uri.removeprefix("gs://").partition("/") + return bucket, path + + +def normalize_string(s: str) -> str: + """Normalizes a string to be a valid filename.""" + nfkd_form = unicodedata.normalize("NFKD", s) + only_ascii = "".join([c for c in nfkd_form if not unicodedata.combining(c)]) + only_ascii = only_ascii.lower() + only_ascii = re.sub(r"\s+", "_", only_ascii) + only_ascii = re.sub(r"[^a-z0-9_.-]", "", only_ascii) + return only_ascii + + +def gather_pdfs(index_origin: str, gcs_client: StorageClient) -> list[str]: + """Lists all PDF file URIs in a GCS directory.""" + bucket, prefix = _parse_gcs_uri(index_origin) + blobs = gcs_client.bucket(bucket).list_blobs(prefix=prefix) + pdf_files = [ + f"gs://{bucket}/{blob.name}" for blob in blobs if blob.name.endswith(".pdf") + ] + log.info("Found %d PDF files in %s", len(pdf_files), index_origin) + return pdf_files + + +def split_into_chunks(text: str, file_id: str, chunker: BaseChunker) -> list[Document]: + """Splits text into chunks, or returns a single chunk if small enough.""" + if len(text) <= chunker.max_chunk_size: + return [{"page_content": text, "metadata": {"id": file_id}}] + + chunks = chunker.process_text(text) + for i, chunk in enumerate(chunks): + chunk["metadata"]["id"] = f"{file_id}_{i}" + return chunks + + +def upload_to_gcs( + chunks: list[Document], + vectors: list[dict], + index_contents_dir: str, + index_vectors_jsonl_path: str, + gcs_client: StorageClient, +) -> None: + """Uploads chunk contents and vectors to GCS.""" + bucket, prefix = _parse_gcs_uri(index_contents_dir) + gcs_bucket = gcs_client.bucket(bucket) + for chunk in chunks: + chunk_id = chunk["metadata"]["id"] + gcs_bucket.blob(f"{prefix}/{chunk_id}.md").upload_from_string( + chunk["page_content"], content_type="text/markdown; charset=utf-8" + ) + + vectors_jsonl = "\n".join(json.dumps(v) for v in vectors) + "\n" + bucket, obj_path = _parse_gcs_uri(index_vectors_jsonl_path) + gcs_client.bucket(bucket).blob(obj_path).upload_from_string( + vectors_jsonl, content_type="application/x-ndjson; charset=utf-8" + ) + log.info("Uploaded %d chunks and %d vectors to GCS", len(chunks), len(vectors)) + + +def build_vectors( + chunks: list[Document], + embeddings: Sequence[Sequence[float]], + source_folder: str, +) -> list[dict]: + """Builds vector records from chunks and their embeddings.""" + source = Path(source_folder).parts[0] if source_folder else "" + return [ + { + "id": chunk["metadata"]["id"], + "embedding": list(embedding), + "restricts": [{"namespace": "source", "allow": [source]}], + } + for chunk, embedding in zip(chunks, embeddings) + ] + + +def create_vector_index( + index_name: str, + index_vectors_dir: str, + index_dimensions: int, + index_distance_measure_type: DistanceMeasureType, + index_deployment: str, + index_machine_type: str, + approximate_neighbors_count: int, + leaf_node_embedding_count: int, + leaf_nodes_to_search_percent: int, + public_endpoint_enabled: bool, +): + """Creates and deploys a Vertex AI Vector Search Index.""" + log.info("Creating index '%s'...", index_name) + index = aiplatform.MatchingEngineIndex.create_tree_ah_index( + display_name=index_name, + contents_delta_uri=index_vectors_dir, + dimensions=index_dimensions, + approximate_neighbors_count=approximate_neighbors_count, + distance_measure_type=index_distance_measure_type, + leaf_node_embedding_count=leaf_node_embedding_count, + leaf_nodes_to_search_percent=leaf_nodes_to_search_percent, + ) + log.info("Index '%s' created successfully.", index_name) + + log.info("Deploying index to a new endpoint...") + endpoint = aiplatform.MatchingEngineIndexEndpoint.create( + display_name=f"{index_name}-endpoint", + public_endpoint_enabled=public_endpoint_enabled, + ) + endpoint.deploy_index( + index=index, + deployed_index_id=index_deployment, + machine_type=index_machine_type, + sync=False, + ) + log.info("Index deployed: %s", endpoint.display_name) + + +def process_file( + file_uri: str, + temp_dir: Path, + gcs_client: StorageClient, + converter: MarkItDown, + embedder: Embedder, + chunker: BaseChunker, +) -> tuple[list[Document], list[dict]]: + """Downloads a PDF from GCS, converts to markdown, chunks, and embeds.""" + bucket, obj_path = _parse_gcs_uri(file_uri) + local_path = temp_dir / Path(file_uri).name + gcs_client.bucket(bucket).blob(obj_path).download_to_filename(local_path) + + try: + markdown = converter.convert(local_path).text_content + file_id = normalize_string(Path(file_uri).stem) + source_folder = Path(obj_path).parent.as_posix() + + chunks = split_into_chunks(markdown, file_id, chunker) + texts = [c["page_content"] for c in chunks] + embeddings = embedder.embed_documents_sync(texts).embeddings + + vectors = build_vectors(chunks, embeddings, source_folder) + return chunks, vectors + finally: + if local_path.exists(): + local_path.unlink() + + +def run_pipeline(settings: Settings): + """Runs the full ingestion pipeline: gather → process → aggregate → index.""" + files = gather_pdfs(settings.index_origin, settings.gcs_client) + + all_chunks: list[Document] = [] + all_vectors: list[dict] = [] + + with tempfile.TemporaryDirectory() as temp_dir: + for file_uri in files: + log.info("Processing file: %s", file_uri) + chunks, vectors = process_file( + file_uri, + Path(temp_dir), + settings.gcs_client, + settings.converter, + settings.embedder, + settings.chunker, + ) + all_chunks.extend(chunks) + all_vectors.extend(vectors) + + upload_to_gcs( + all_chunks, + all_vectors, + settings.index_contents_dir, + settings.index_vectors_jsonl_path, + settings.gcs_client, + ) + + create_vector_index( + settings.index_name, + settings.index_vectors_dir, + settings.index_dimensions, + settings.index_distance_measure_type, + settings.index_deployment, + settings.index_machine_type, + settings.index_approximate_neighbors_count, + settings.index_leaf_node_embedding_count, + settings.index_leaf_nodes_to_search_percent, + settings.index_public_endpoint_enabled, + ) diff --git a/src/rag_eval/config.py b/src/rag_eval/config.py deleted file mode 100644 index 5be171b..0000000 --- a/src/rag_eval/config.py +++ /dev/null @@ -1,121 +0,0 @@ -import os - -from pydantic import BaseModel -from pydantic_settings import ( - BaseSettings, - PydanticBaseSettingsSource, - SettingsConfigDict, - YamlConfigSettingsSource, -) - -CONFIG_FILE_PATH = os.getenv("CONFIG_YAML", "config.yaml") - - -class IndexConfig(BaseModel): - name: str - endpoint: str - dimensions: int - machine_type: str = "e2-standard-16" - origin: str - destination: str - chunk_limit: int - - @property - def deployment(self) -> str: - return self.name.replace("-", "_") + "_deployed" - - @property - def data(self) -> str: - return self.destination + self.name - -class AgentConfig(BaseModel): - name: str - instructions: str - language_model: str - embedding_model: str - thinking: int - - -class BigQueryConfig(BaseModel): - dataset_id: str - project_id: str | None = None - table_ids: dict[str, str] - - -class Settings(BaseSettings): - project_id: str - location: str - service_account: str - - # Flattened fields from nested models - agent_name: str - agent_instructions: str - agent_language_model: str - agent_embedding_model: str - agent_thinking: int - - index_name: str - index_endpoint: str - index_dimensions: int - index_machine_type: str = "e2-standard-16" - index_origin: str - index_destination: str - index_chunk_limit: int - - bigquery_dataset_id: str - bigquery_project_id: str | None = None - bigquery_table_ids: dict[str, str] - - bucket: str - base_image: str - dialogflow_agent_id: str - processing_image: str - - model_config = SettingsConfigDict(yaml_file=CONFIG_FILE_PATH) - - @property - def agent(self) -> AgentConfig: - return AgentConfig( - name=self.agent_name, - instructions=self.agent_instructions, - language_model=self.agent_language_model, - embedding_model=self.agent_embedding_model, - thinking=self.agent_thinking, - ) - - @property - def index(self) -> IndexConfig: - return IndexConfig( - name=self.index_name, - endpoint=self.index_endpoint, - dimensions=self.index_dimensions, - machine_type=self.index_machine_type, - origin=self.index_origin, - destination=self.index_destination, - chunk_limit=self.index_chunk_limit, - ) - - @property - def bigquery(self) -> BigQueryConfig: - return BigQueryConfig( - dataset_id=self.bigquery_dataset_id, - project_id=self.bigquery_project_id, - table_ids=self.bigquery_table_ids, - ) - - @classmethod - def settings_customise_sources( - cls, - settings_cls: type[BaseSettings], - init_settings: PydanticBaseSettingsSource, - env_settings: PydanticBaseSettingsSource, - dotenv_settings: PydanticBaseSettingsSource, - file_secret_settings: PydanticBaseSettingsSource, - ) -> tuple[PydanticBaseSettingsSource, ...]: - return ( - env_settings, - YamlConfigSettingsSource(settings_cls), - ) - - -settings = Settings() \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d4839a6 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# Tests package diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..91eb83b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,89 @@ +"""Shared pytest fixtures for knowledge_pipeline tests.""" + +from unittest.mock import Mock + +import pytest + +from knowledge_pipeline.chunker.base_chunker import BaseChunker, Document + + +@pytest.fixture +def mock_gcs_client(): + """Mock Google Cloud Storage client.""" + client = Mock() + bucket = Mock() + blob = Mock() + + client.bucket.return_value = bucket + bucket.blob.return_value = blob + bucket.list_blobs.return_value = [] + + return client + + +@pytest.fixture +def mock_chunker(): + """Mock BaseChunker implementation.""" + chunker = Mock(spec=BaseChunker) + chunker.max_chunk_size = 1000 + chunker.process_text.return_value = [ + {"page_content": "Test chunk content", "metadata": {"id": "test_chunk"}} + ] + return chunker + + +@pytest.fixture +def mock_embedder(): + """Mock pydantic_ai Embedder.""" + embedder = Mock() + embeddings_result = Mock() + embeddings_result.embeddings = [[0.1, 0.2, 0.3]] + embedder.embed_documents_sync.return_value = embeddings_result + return embedder + + +@pytest.fixture +def mock_converter(): + """Mock MarkItDown converter.""" + converter = Mock() + result = Mock() + result.text_content = "# Markdown Content\n\nTest content here." + converter.convert.return_value = result + return converter + + +@pytest.fixture +def sample_chunks() -> list[Document]: + """Sample document chunks for testing.""" + return [ + {"page_content": "First chunk content", "metadata": {"id": "doc_1_0"}}, + {"page_content": "Second chunk content", "metadata": {"id": "doc_1_1"}}, + {"page_content": "Third chunk content", "metadata": {"id": "doc_1_2"}}, + ] + + +@pytest.fixture +def sample_embeddings(): + """Sample embeddings for testing.""" + return [ + [0.1, 0.2, 0.3, 0.4, 0.5], + [0.6, 0.7, 0.8, 0.9, 1.0], + [0.2, 0.3, 0.4, 0.5, 0.6], + ] + + +@pytest.fixture +def sample_vectors(): + """Sample vector records for testing.""" + return [ + { + "id": "doc_1_0", + "embedding": [0.1, 0.2, 0.3], + "restricts": [{"namespace": "source", "allow": ["documents"]}], + }, + { + "id": "doc_1_1", + "embedding": [0.4, 0.5, 0.6], + "restricts": [{"namespace": "source", "allow": ["documents"]}], + }, + ] diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..cf642b8 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,553 @@ +import tempfile +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest +from google.cloud.aiplatform.matching_engine.matching_engine_index_config import ( + DistanceMeasureType, +) + +from knowledge_pipeline.chunker.base_chunker import BaseChunker +from knowledge_pipeline.pipeline import ( + _parse_gcs_uri, + build_vectors, + create_vector_index, + gather_pdfs, + normalize_string, + process_file, + run_pipeline, + split_into_chunks, + upload_to_gcs, +) + + +class TestParseGcsUri: + """Tests for _parse_gcs_uri function.""" + + def test_basic_gcs_uri(self): + bucket, path = _parse_gcs_uri("gs://my-bucket/path/to/file.pdf") + assert bucket == "my-bucket" + assert path == "path/to/file.pdf" + + def test_gcs_uri_with_nested_path(self): + bucket, path = _parse_gcs_uri("gs://test-bucket/deep/nested/path/file.txt") + assert bucket == "test-bucket" + assert path == "deep/nested/path/file.txt" + + def test_gcs_uri_bucket_only(self): + bucket, path = _parse_gcs_uri("gs://my-bucket/") + assert bucket == "my-bucket" + assert path == "" + + def test_gcs_uri_no_trailing_slash(self): + bucket, path = _parse_gcs_uri("gs://bucket-name") + assert bucket == "bucket-name" + assert path == "" + + +class TestNormalizeString: + """Tests for normalize_string function.""" + + def test_normalize_basic_string(self): + result = normalize_string("Hello World") + assert result == "hello_world" + + def test_normalize_special_characters(self): + result = normalize_string("File#Name@2024!.pdf") + assert result == "filename2024.pdf" + + def test_normalize_unicode(self): + result = normalize_string("Café Münchën") + assert result == "cafe_munchen" + + def test_normalize_multiple_spaces(self): + result = normalize_string("Multiple Spaces Here") + assert result == "multiple_spaces_here" + + def test_normalize_with_hyphens_and_periods(self): + result = normalize_string("valid-filename.2024") + assert result == "valid-filename.2024" + + def test_normalize_empty_string(self): + result = normalize_string("") + assert result == "" + + def test_normalize_only_special_chars(self): + result = normalize_string("@#$%^&*()") + assert result == "" + + +class TestGatherFiles: + """Tests for gather_files function.""" + + def test_gather_files_finds_pdfs(self): + mock_client = Mock() + mock_bucket = Mock() + mock_client.bucket.return_value = mock_bucket + + # Create mock blobs + mock_blob1 = Mock() + mock_blob1.name = "docs/file1.pdf" + mock_blob2 = Mock() + mock_blob2.name = "docs/file2.pdf" + mock_blob3 = Mock() + mock_blob3.name = "docs/readme.txt" + + mock_bucket.list_blobs.return_value = [mock_blob1, mock_blob2, mock_blob3] + + files = gather_pdfs("gs://my-bucket/docs", mock_client) + + assert len(files) == 2 + assert "gs://my-bucket/docs/file1.pdf" in files + assert "gs://my-bucket/docs/file2.pdf" in files + assert "gs://my-bucket/docs/readme.txt" not in files + + def test_gather_files_no_pdfs(self): + mock_client = Mock() + mock_bucket = Mock() + mock_client.bucket.return_value = mock_bucket + + mock_blob = Mock() + mock_blob.name = "docs/readme.txt" + mock_bucket.list_blobs.return_value = [mock_blob] + + files = gather_pdfs("gs://my-bucket/docs", mock_client) + + assert len(files) == 0 + + def test_gather_files_empty_bucket(self): + mock_client = Mock() + mock_bucket = Mock() + mock_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [] + + files = gather_pdfs("gs://my-bucket/docs", mock_client) + + assert len(files) == 0 + + def test_gather_files_correct_prefix(self): + mock_client = Mock() + mock_bucket = Mock() + mock_client.bucket.return_value = mock_bucket + mock_bucket.list_blobs.return_value = [] + + gather_pdfs("gs://my-bucket/docs/subfolder", mock_client) + + mock_client.bucket.assert_called_once_with("my-bucket") + mock_bucket.list_blobs.assert_called_once_with(prefix="docs/subfolder") + + +class TestSplitIntoChunks: + """Tests for split_into_chunks function.""" + + def test_split_small_text_single_chunk(self): + mock_chunker = Mock(spec=BaseChunker) + mock_chunker.max_chunk_size = 1000 + + text = "Small text" + file_id = "test_file" + + chunks = split_into_chunks(text, file_id, mock_chunker) + + assert len(chunks) == 1 + assert chunks[0]["page_content"] == "Small text" + assert chunks[0]["metadata"]["id"] == "test_file" + mock_chunker.process_text.assert_not_called() + + def test_split_large_text_multiple_chunks(self): + mock_chunker = Mock(spec=BaseChunker) + mock_chunker.max_chunk_size = 10 + + # Create text larger than max_chunk_size + text = "This is a very long text that needs to be split into chunks" + file_id = "test_file" + + # Mock the chunker to return multiple chunks + mock_chunker.process_text.return_value = [ + {"page_content": "This is a very", "metadata": {}}, + {"page_content": "long text that", "metadata": {}}, + {"page_content": "needs to be split", "metadata": {}}, + ] + + chunks = split_into_chunks(text, file_id, mock_chunker) + + assert len(chunks) == 3 + assert chunks[0]["metadata"]["id"] == "test_file_0" + assert chunks[1]["metadata"]["id"] == "test_file_1" + assert chunks[2]["metadata"]["id"] == "test_file_2" + mock_chunker.process_text.assert_called_once_with(text) + + def test_split_exactly_max_size(self): + mock_chunker = Mock(spec=BaseChunker) + mock_chunker.max_chunk_size = 10 + + text = "0123456789" # Exactly 10 characters + file_id = "test_file" + + chunks = split_into_chunks(text, file_id, mock_chunker) + + assert len(chunks) == 1 + assert chunks[0]["page_content"] == text + mock_chunker.process_text.assert_not_called() + + +class TestUploadToGcs: + """Tests for upload_to_gcs function.""" + + def test_upload_single_chunk_and_vectors(self): + mock_client = Mock() + mock_bucket = Mock() + mock_blob = Mock() + + mock_client.bucket.return_value = mock_bucket + mock_bucket.blob.return_value = mock_blob + + chunks = [ + { + "page_content": "Test content", + "metadata": {"id": "chunk_1"}, + } + ] + vectors = [{"id": "chunk_1", "embedding": [0.1, 0.2]}] + + upload_to_gcs( + chunks, + vectors, + "gs://my-bucket/contents", + "gs://my-bucket/vectors/vectors.jsonl", + mock_client, + ) + + blob_calls = [call[0][0] for call in mock_bucket.blob.call_args_list] + assert "contents/chunk_1.md" in blob_calls + assert "vectors/vectors.jsonl" in blob_calls + + def test_upload_multiple_chunks(self): + mock_client = Mock() + mock_bucket = Mock() + mock_blob = Mock() + + mock_client.bucket.return_value = mock_bucket + mock_bucket.blob.return_value = mock_blob + + chunks = [ + {"page_content": "Content 1", "metadata": {"id": "chunk_1"}}, + {"page_content": "Content 2", "metadata": {"id": "chunk_2"}}, + {"page_content": "Content 3", "metadata": {"id": "chunk_3"}}, + ] + vectors = [{"id": "chunk_1", "embedding": [0.1]}] + + upload_to_gcs( + chunks, + vectors, + "gs://my-bucket/contents", + "gs://my-bucket/vectors/vectors.jsonl", + mock_client, + ) + + # 3 chunk blobs + 1 vectors blob + assert mock_bucket.blob.call_count == 4 + + blob_calls = [call[0][0] for call in mock_bucket.blob.call_args_list] + assert blob_calls == [ + "contents/chunk_1.md", + "contents/chunk_2.md", + "contents/chunk_3.md", + "vectors/vectors.jsonl", + ] + + +class TestBuildVectors: + """Tests for build_vectors function.""" + + def test_build_vectors_basic(self): + chunks = [ + {"metadata": {"id": "doc_1"}, "page_content": "content 1"}, + {"metadata": {"id": "doc_2"}, "page_content": "content 2"}, + ] + embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] + source_folder = "documents/reports" + + vectors = build_vectors(chunks, embeddings, source_folder) + + assert len(vectors) == 2 + assert vectors[0]["id"] == "doc_1" + assert vectors[0]["embedding"] == [0.1, 0.2, 0.3] + assert vectors[0]["restricts"] == [ + {"namespace": "source", "allow": ["documents"]} + ] + assert vectors[1]["id"] == "doc_2" + assert vectors[1]["embedding"] == [0.4, 0.5, 0.6] + + def test_build_vectors_empty_source(self): + chunks = [{"metadata": {"id": "doc_1"}, "page_content": "content"}] + embeddings = [[0.1, 0.2]] + source_folder = "" + + vectors = build_vectors(chunks, embeddings, source_folder) + + assert len(vectors) == 1 + assert vectors[0]["restricts"] == [{"namespace": "source", "allow": [""]}] + + def test_build_vectors_nested_path(self): + chunks = [{"metadata": {"id": "doc_1"}, "page_content": "content"}] + embeddings = [[0.1]] + source_folder = "a/b/c/d" + + vectors = build_vectors(chunks, embeddings, source_folder) + + assert vectors[0]["restricts"] == [{"namespace": "source", "allow": ["a"]}] + + +class TestCreateVectorIndex: + """Tests for create_vector_index function.""" + + @patch("knowledge_pipeline.pipeline.aiplatform.MatchingEngineIndexEndpoint") + @patch("knowledge_pipeline.pipeline.aiplatform.MatchingEngineIndex") + def test_create_vector_index(self, mock_index_class, mock_endpoint_class): + mock_index = Mock() + mock_endpoint = Mock() + + mock_index_class.create_tree_ah_index.return_value = mock_index + mock_endpoint_class.create.return_value = mock_endpoint + + create_vector_index( + index_name="test-index", + index_vectors_dir="gs://bucket/vectors", + index_dimensions=768, + index_distance_measure_type=DistanceMeasureType.DOT_PRODUCT_DISTANCE, + index_deployment="test_index_deployed", + index_machine_type="e2-standard-16", + ) + + mock_index_class.create_tree_ah_index.assert_called_once_with( + display_name="test-index", + contents_delta_uri="gs://bucket/vectors", + dimensions=768, + approximate_neighbors_count=150, + distance_measure_type=DistanceMeasureType.DOT_PRODUCT_DISTANCE, + leaf_node_embedding_count=1000, + leaf_nodes_to_search_percent=10, + ) + + mock_endpoint_class.create.assert_called_once_with( + display_name="test-index-endpoint", + public_endpoint_enabled=True, + ) + + mock_endpoint.deploy_index.assert_called_once_with( + index=mock_index, + deployed_index_id="test_index_deployed", + machine_type="e2-standard-16", + sync=False, + ) + + +class TestProcessFile: + """Tests for process_file function.""" + + def test_process_file_success(self): + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Mock dependencies + mock_client = Mock() + mock_bucket = Mock() + mock_blob = Mock() + mock_client.bucket.return_value = mock_bucket + mock_bucket.blob.return_value = mock_blob + + mock_converter = Mock() + mock_result = Mock() + mock_result.text_content = "Converted markdown content" + mock_converter.convert.return_value = mock_result + + mock_embedder = Mock() + mock_embeddings_result = Mock() + mock_embeddings_result.embeddings = [[0.1, 0.2, 0.3]] + mock_embedder.embed_documents_sync.return_value = mock_embeddings_result + + mock_chunker = Mock(spec=BaseChunker) + mock_chunker.max_chunk_size = 1000 + + file_uri = "gs://my-bucket/docs/test-file.pdf" + + chunks, vectors = process_file( + file_uri, + temp_path, + mock_client, + mock_converter, + mock_embedder, + mock_chunker, + ) + + # Verify download was called + mock_client.bucket.assert_called_with("my-bucket") + mock_bucket.blob.assert_called_with("docs/test-file.pdf") + assert mock_blob.download_to_filename.called + + # Verify converter was called + assert mock_converter.convert.called + + # Verify embedder was called + mock_embedder.embed_documents_sync.assert_called_once() + + # Verify results + assert len(chunks) == 1 + assert chunks[0]["page_content"] == "Converted markdown content" + assert len(vectors) == 1 + assert vectors[0]["embedding"] == [0.1, 0.2, 0.3] + + def test_process_file_cleans_up_temp_file(self): + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + mock_client = Mock() + mock_bucket = Mock() + mock_blob = Mock() + mock_client.bucket.return_value = mock_bucket + mock_bucket.blob.return_value = mock_blob + + mock_converter = Mock() + mock_converter.convert.side_effect = Exception("Conversion failed") + + mock_embedder = Mock() + mock_chunker = Mock(spec=BaseChunker) + + file_uri = "gs://my-bucket/docs/test.pdf" + + # This should raise an exception but still clean up + with pytest.raises(Exception, match="Conversion failed"): + process_file( + file_uri, + temp_path, + mock_client, + mock_converter, + mock_embedder, + mock_chunker, + ) + + # File should be cleaned up even after exception + temp_file = temp_path / "test.pdf" + assert not temp_file.exists() + + +class TestRunPipeline: + """Tests for run_pipeline function.""" + + @patch("knowledge_pipeline.pipeline.create_vector_index") + @patch("knowledge_pipeline.pipeline.upload_to_gcs") + @patch("knowledge_pipeline.pipeline.process_file") + @patch("knowledge_pipeline.pipeline.gather_pdfs") + def test_run_pipeline_integration( + self, + mock_gather, + mock_process, + mock_upload, + mock_create_index, + ): + # Mock settings + mock_settings = Mock() + mock_settings.index_origin = "gs://bucket/input" + mock_settings.index_contents_dir = "gs://bucket/contents" + mock_settings.index_vectors_jsonl_path = "gs://bucket/vectors/vectors.jsonl" + mock_settings.index_name = "test-index" + mock_settings.index_vectors_dir = "gs://bucket/vectors" + mock_settings.index_dimensions = 768 + mock_settings.index_distance_measure_type = ( + DistanceMeasureType.DOT_PRODUCT_DISTANCE + ) + mock_settings.index_deployment = "test_index_deployed" + mock_settings.index_machine_type = "e2-standard-16" + + mock_gcs_client = Mock() + mock_bucket = Mock() + mock_blob = Mock() + mock_gcs_client.bucket.return_value = mock_bucket + mock_bucket.blob.return_value = mock_blob + mock_settings.gcs_client = mock_gcs_client + + mock_settings.converter = Mock() + mock_settings.embedder = Mock() + mock_settings.chunker = Mock() + + # Mock gather_files to return test files + mock_gather.return_value = ["gs://bucket/input/file1.pdf"] + + # Mock process_file to return chunks and vectors + mock_chunks = [{"page_content": "content", "metadata": {"id": "chunk_1"}}] + mock_vectors = [ + { + "id": "chunk_1", + "embedding": [0.1, 0.2], + "restricts": [{"namespace": "source", "allow": ["input"]}], + } + ] + mock_process.return_value = (mock_chunks, mock_vectors) + + run_pipeline(mock_settings) + + # Verify all steps were called + mock_gather.assert_called_once_with("gs://bucket/input", mock_gcs_client) + mock_process.assert_called_once() + mock_upload.assert_called_once_with( + mock_chunks, + mock_vectors, + "gs://bucket/contents", + "gs://bucket/vectors/vectors.jsonl", + mock_gcs_client, + ) + mock_create_index.assert_called_once() + + @patch("knowledge_pipeline.pipeline.create_vector_index") + @patch("knowledge_pipeline.pipeline.upload_to_gcs") + @patch("knowledge_pipeline.pipeline.process_file") + @patch("knowledge_pipeline.pipeline.gather_pdfs") + def test_run_pipeline_multiple_files( + self, + mock_gather, + mock_process, + mock_upload, + mock_create_index, + ): + mock_settings = Mock() + mock_settings.index_origin = "gs://bucket/input" + mock_settings.index_contents_dir = "gs://bucket/contents" + mock_settings.index_vectors_jsonl_path = "gs://bucket/vectors/vectors.jsonl" + mock_settings.index_name = "test-index" + mock_settings.index_vectors_dir = "gs://bucket/vectors" + mock_settings.index_dimensions = 768 + mock_settings.index_distance_measure_type = ( + DistanceMeasureType.DOT_PRODUCT_DISTANCE + ) + mock_settings.index_deployment = "test_index_deployed" + mock_settings.index_machine_type = "e2-standard-16" + + mock_gcs_client = Mock() + mock_bucket = Mock() + mock_blob = Mock() + mock_gcs_client.bucket.return_value = mock_bucket + mock_bucket.blob.return_value = mock_blob + mock_settings.gcs_client = mock_gcs_client + + mock_settings.converter = Mock() + mock_settings.embedder = Mock() + mock_settings.chunker = Mock() + + # Return multiple files + mock_gather.return_value = [ + "gs://bucket/input/file1.pdf", + "gs://bucket/input/file2.pdf", + ] + + mock_process.return_value = ( + [{"page_content": "content", "metadata": {"id": "chunk_1"}}], + [{"id": "chunk_1", "embedding": [0.1], "restricts": []}], + ) + + run_pipeline(mock_settings) + + # Verify process_file was called for each file + assert mock_process.call_count == 2 + # Upload is called once with all accumulated chunks and vectors + mock_upload.assert_called_once()