From 35d5a65b1733eef35759ca73eb16a2cbd9be7c8b Mon Sep 17 00:00:00 2001 From: A8072846 Date: Sun, 22 Feb 2026 15:25:27 +0000 Subject: [PATCH] First commmit --- .gitignore | 55 ++ .python-version | 1 + 00_START_HERE.md | 158 +++++ QUICKSTART.md | 97 +++ README.md | 503 +++++++++++++++ RESUMEN.txt | 65 ++ STRUCTURE.md | 171 ++++++ apps/index-gen/README.md | 0 apps/index-gen/pyproject.toml | 34 ++ apps/index-gen/src/index_gen/__init__.py | 2 + apps/index-gen/src/index_gen/cli.py | 68 +++ apps/index-gen/src/index_gen/main.py | 238 ++++++++ config.example.yaml | 29 + packages/chunker/README.md | 0 packages/chunker/pyproject.toml | 23 + packages/chunker/src/chunker/__init__.py | 2 + packages/chunker/src/chunker/base_chunker.py | 66 ++ .../chunker/src/chunker/contextual_chunker.py | 155 +++++ packages/chunker/src/chunker/llm_chunker.py | 577 ++++++++++++++++++ packages/chunker/src/chunker/py.typed | 0 .../chunker/src/chunker/recursive_chunker.py | 80 +++ packages/document-converter/.python-version | 1 + packages/document-converter/README.md | 0 packages/document-converter/pyproject.toml | 20 + .../src/document_converter/__init__.py | 2 + .../src/document_converter/base.py | 35 ++ .../src/document_converter/markdown.py | 131 ++++ .../src/document_converter/py.typed | 0 packages/embedder/.python-version | 1 + packages/embedder/README.md | 0 packages/embedder/pyproject.toml | 16 + packages/embedder/src/embedder/__init__.py | 0 packages/embedder/src/embedder/base.py | 79 +++ packages/embedder/src/embedder/py.typed | 0 packages/embedder/src/embedder/vertex_ai.py | 77 +++ packages/file-storage/.python-version | 1 + packages/file-storage/README.md | 0 packages/file-storage/pyproject.toml | 22 + .../file-storage/src/file_storage/__init__.py | 2 + .../file-storage/src/file_storage/base.py | 48 ++ packages/file-storage/src/file_storage/cli.py | 89 +++ .../src/file_storage/google_cloud.py | 138 +++++ .../file-storage/src/file_storage/py.typed | 0 packages/llm/.python-version | 1 + packages/llm/README.md | 0 packages/llm/pyproject.toml | 18 + packages/llm/src/llm/__init__.py | 2 + packages/llm/src/llm/base.py | 128 ++++ packages/llm/src/llm/py.typed | 0 packages/llm/src/llm/vertex_ai.py | 181 ++++++ packages/utils/README.md | 0 packages/utils/pyproject.toml | 17 + packages/utils/src/utils/__init__.py | 2 + .../utils/src/utils/normalize_filenames.py | 115 ++++ packages/utils/src/utils/py.typed | 0 packages/vector-search/.python-version | 1 + packages/vector-search/README.md | 0 packages/vector-search/pyproject.toml | 29 + .../src/vector_search/__init__.py | 2 + .../vector-search/src/vector_search/base.py | 62 ++ .../src/vector_search/cli/__init__.py | 10 + .../src/vector_search/cli/create.py | 91 +++ .../src/vector_search/cli/delete.py | 38 ++ .../src/vector_search/cli/generate.py | 91 +++ .../src/vector_search/cli/query.py | 55 ++ .../vector-search/src/vector_search/py.typed | 0 .../src/vector_search/vertex_ai.py | 255 ++++++++ pyproject.toml | 91 +++ src/rag_eval/__init__.py | 2 + src/rag_eval/config.py | 121 ++++ 70 files changed, 4298 insertions(+) create mode 100644 .gitignore create mode 100644 .python-version create mode 100644 00_START_HERE.md create mode 100644 QUICKSTART.md create mode 100644 README.md create mode 100644 RESUMEN.txt create mode 100644 STRUCTURE.md create mode 100644 apps/index-gen/README.md create mode 100644 apps/index-gen/pyproject.toml create mode 100644 apps/index-gen/src/index_gen/__init__.py create mode 100644 apps/index-gen/src/index_gen/cli.py create mode 100644 apps/index-gen/src/index_gen/main.py create mode 100644 config.example.yaml create mode 100644 packages/chunker/README.md create mode 100644 packages/chunker/pyproject.toml create mode 100644 packages/chunker/src/chunker/__init__.py create mode 100644 packages/chunker/src/chunker/base_chunker.py create mode 100644 packages/chunker/src/chunker/contextual_chunker.py create mode 100644 packages/chunker/src/chunker/llm_chunker.py create mode 100644 packages/chunker/src/chunker/py.typed create mode 100644 packages/chunker/src/chunker/recursive_chunker.py create mode 100644 packages/document-converter/.python-version create mode 100644 packages/document-converter/README.md create mode 100644 packages/document-converter/pyproject.toml create mode 100644 packages/document-converter/src/document_converter/__init__.py create mode 100644 packages/document-converter/src/document_converter/base.py create mode 100644 packages/document-converter/src/document_converter/markdown.py create mode 100644 packages/document-converter/src/document_converter/py.typed create mode 100644 packages/embedder/.python-version create mode 100644 packages/embedder/README.md create mode 100644 packages/embedder/pyproject.toml create mode 100644 packages/embedder/src/embedder/__init__.py create mode 100644 packages/embedder/src/embedder/base.py create mode 100644 packages/embedder/src/embedder/py.typed create mode 100644 packages/embedder/src/embedder/vertex_ai.py create mode 100644 packages/file-storage/.python-version create mode 100644 packages/file-storage/README.md create mode 100644 packages/file-storage/pyproject.toml create mode 100644 packages/file-storage/src/file_storage/__init__.py create mode 100644 packages/file-storage/src/file_storage/base.py create mode 100644 packages/file-storage/src/file_storage/cli.py create mode 100644 packages/file-storage/src/file_storage/google_cloud.py create mode 100644 packages/file-storage/src/file_storage/py.typed create mode 100644 packages/llm/.python-version create mode 100644 packages/llm/README.md create mode 100644 packages/llm/pyproject.toml create mode 100644 packages/llm/src/llm/__init__.py create mode 100644 packages/llm/src/llm/base.py create mode 100644 packages/llm/src/llm/py.typed create mode 100644 packages/llm/src/llm/vertex_ai.py create mode 100644 packages/utils/README.md create mode 100644 packages/utils/pyproject.toml create mode 100644 packages/utils/src/utils/__init__.py create mode 100644 packages/utils/src/utils/normalize_filenames.py create mode 100644 packages/utils/src/utils/py.typed create mode 100644 packages/vector-search/.python-version create mode 100644 packages/vector-search/README.md create mode 100644 packages/vector-search/pyproject.toml create mode 100644 packages/vector-search/src/vector_search/__init__.py create mode 100644 packages/vector-search/src/vector_search/base.py create mode 100644 packages/vector-search/src/vector_search/cli/__init__.py create mode 100644 packages/vector-search/src/vector_search/cli/create.py create mode 100644 packages/vector-search/src/vector_search/cli/delete.py create mode 100644 packages/vector-search/src/vector_search/cli/generate.py create mode 100644 packages/vector-search/src/vector_search/cli/query.py create mode 100644 packages/vector-search/src/vector_search/py.typed create mode 100644 packages/vector-search/src/vector_search/vertex_ai.py create mode 100644 pyproject.toml create mode 100644 src/rag_eval/__init__.py create mode 100644 src/rag_eval/config.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5daf117 --- /dev/null +++ b/.gitignore @@ -0,0 +1,55 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ +env/ + +# UV +.uv/ +uv.lock + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Config files with secrets +config.yaml +.env +*.key.json + +# Logs +*.log + +# Temporary files +temp_*.md +temp_*.jsonl +vectors.jsonl + +# Output directories +output/ +chunks/ diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/00_START_HERE.md b/00_START_HERE.md new file mode 100644 index 0000000..4d049a6 --- /dev/null +++ b/00_START_HERE.md @@ -0,0 +1,158 @@ +# 🚀 START HERE - Pipeline RAG + +## ¿Qué hay en esta carpeta? + +Este proyecto contiene todo el código necesario para: + +1. ✂️ **Chunkear documentos** (dividir en fragmentos) +2. 🧠 **Generar embeddings** (vectorización) +3. 💾 **Almacenar en GCS** (Google Cloud Storage) +4. 🔍 **Crear índices vectoriales** (Vertex AI Vector Search) + +--- + +## 📁 Estructura Básica + +``` +pipeline/ +├── packages/ # 7 librerías reutilizables +│ ├── chunker/ # ⭐ Para dividir documentos +│ ├── embedder/ # ⭐ Para vectorizar texto +│ ├── file-storage/ # ⭐ Para guardar en GCS +│ └── vector-search/# ⭐ Para índices vectoriales +│ +├── apps/ +│ └── index-gen/ # ⭐ Pipeline completo KFP +│ +└── src/rag_eval/ # Configuración +``` + +--- + +## ⚡ Instalación Rápida + +```bash +# En tu Workbench de GCP: +cd ~/pipeline +uv sync +``` + +--- + +## 🎯 Uso Más Común + +### Opción 1: Chunking Contextual (Recomendado) + +```python +from chunker.contextual_chunker import ContextualChunker +from llm.vertex_ai import VertexAILLM +from pathlib import Path + +# Setup +llm = VertexAILLM(project="tu-proyecto", location="us-central1") +chunker = ContextualChunker(llm_client=llm, max_chunk_size=800) + +# Procesar +documents = chunker.process_path(Path("documento.txt")) +print(f"Creados {len(documents)} chunks") +``` + +### Opción 2: Pipeline Completo + +```python +from apps.index_gen.src.index_gen.main import ( + gather_files, + process_file, + aggregate_vectors, + create_vector_index +) + +# Procesar PDFs desde GCS +pdf_files = gather_files("gs://mi-bucket/pdfs/") + +for pdf in pdf_files: + process_file( + file_path=pdf, + model_name="text-embedding-005", + contents_output_dir="gs://mi-bucket/contents/", + vectors_output_file="vectors.jsonl", + chunk_limit=800 + ) +``` + +--- + +## 📚 Documentación + +| Archivo | Descripción | +|---------|-------------| +| **[QUICKSTART.md](QUICKSTART.md)** | ⭐ Inicio rápido con ejemplos | +| **[README.md](README.md)** | Documentación completa | +| **[STRUCTURE.md](STRUCTURE.md)** | Estructura detallada | +| **config.yaml** | Configuración de GCP | + +--- + +## 🔧 Configuración Necesaria + +Edita `config.yaml`: + +```yaml +project_id: "tu-proyecto-gcp" # ⚠️ CAMBIAR +location: "us-central1" +bucket: "tu-bucket-nombre" # ⚠️ CAMBIAR + +index: + name: "mi-indice-rag" + dimensions: 768 +``` + +--- + +## 💡 Estrategias de Chunking Disponibles + +1. **RecursiveChunker** - Simple y rápido +2. **ContextualChunker** - ⭐ Agrega contexto con LLM (recomendado) +3. **LLMChunker** - Avanzado: optimiza, fusiona, extrae imágenes + +--- + +## 📦 Dependencias Principales + +- `google-genai` - LLM y embeddings +- `google-cloud-aiplatform` - Vertex AI +- `google-cloud-storage` - GCS +- `chonkie` - Chunking recursivo +- `langchain` - Text splitting +- `tiktoken` - Token counting +- `pypdf` - PDF processing + +Total instaladas: ~30 packages + +--- + +## ❓ FAQ + +**P: ¿Qué chunker debo usar?** +R: `ContextualChunker` para producción (agrega contexto del documento) + +**P: ¿Cómo instalo en Workbench?** +R: `uv sync` (las credenciales de GCP ya están configuradas) + +**P: ¿Dónde está el código del pipeline completo?** +R: `apps/index-gen/src/index_gen/main.py` + +**P: ¿Cómo genero embeddings?** +R: Usa `embedder.vertex_ai.VertexAIEmbedder` + +--- + +## 🆘 Soporte + +- Ver ejemplos en [QUICKSTART.md](QUICKSTART.md) +- Ver API completa en [README.md](README.md) +- Ver estructura en [STRUCTURE.md](STRUCTURE.md) + +--- + +**Total**: 33 archivos Python | ~400KB | Listo para Workbench ✅ diff --git a/QUICKSTART.md b/QUICKSTART.md new file mode 100644 index 0000000..5cd15e4 --- /dev/null +++ b/QUICKSTART.md @@ -0,0 +1,97 @@ +# Quick Start - GCP Workbench + +## 📦 Instalación en Workbench + +```bash +# 1. Instalar dependencias del sistema (si es necesario) +sudo apt-get update +sudo apt-get install -y poppler-utils libcairo2-dev + +# 2. Instalar dependencias de Python +cd ~/pipeline +uv sync + +# 3. Configurar credenciales (ya deberían estar en Workbench) +# Las credenciales de Application Default Credentials ya están configuradas +``` + +## ⚙️ Configuración Mínima + +Edita `config.yaml`: + +```yaml +project_id: "tu-proyecto-gcp" +location: "us-central1" +bucket: "tu-bucket-gcs" + +index: + name: "mi-indice-vectorial" + dimensions: 768 + machine_type: "e2-standard-2" +``` + +## 🚀 Uso Rápido + +### 1. Chunking Simple +```python +from chunker.recursive_chunker import RecursiveChunker +from pathlib import Path + +chunker = RecursiveChunker() +docs = chunker.process_text("Tu texto aquí") +print(f"Chunks: {len(docs)}") +``` + +### 2. Chunking Contextual (Recomendado) +```python +from chunker.contextual_chunker import ContextualChunker +from llm.vertex_ai import VertexAILLM + +llm = VertexAILLM(project="tu-proyecto", location="us-central1") +chunker = ContextualChunker(llm_client=llm, max_chunk_size=800) +docs = chunker.process_path(Path("documento.txt")) +``` + +### 3. Generar Embeddings +```python +from embedder.vertex_ai import VertexAIEmbedder + +embedder = VertexAIEmbedder( + model_name="text-embedding-005", + project="tu-proyecto", + location="us-central1" +) +embedding = embedder.generate_embedding("texto") +``` + +### 4. Pipeline Completo +```python +from apps.index_gen.src.index_gen.main import process_file + +process_file( + file_path="gs://bucket/file.pdf", + model_name="text-embedding-005", + contents_output_dir="gs://bucket/contents/", + vectors_output_file="vectors.jsonl", + chunk_limit=800 +) +``` + +## 📚 Archivos Importantes + +- `README.md` - Documentación completa +- `STRUCTURE.md` - Estructura del proyecto +- `config.yaml` - Configuración de GCP +- `pyproject.toml` - Dependencias + +## 🔗 Componentes Principales + +1. **packages/chunker/** - Chunking (Recursive, Contextual, LLM) +2. **packages/embedder/** - Embeddings (Vertex AI) +3. **packages/file-storage/** - Storage (GCS) +4. **packages/vector-search/** - Vector Search (Vertex AI) +5. **apps/index-gen/** - Pipeline completo + +--- + +**Tamaño total**: ~400KB | **Archivos Python**: 33 diff --git a/README.md b/README.md new file mode 100644 index 0000000..a0104b3 --- /dev/null +++ b/README.md @@ -0,0 +1,503 @@ +# RAG Pipeline - Document Chunking & Vector Storage + +Este proyecto contiene todo el código necesario para procesar documentos (PDFs), dividirlos en chunks, generar embeddings vectoriales y almacenarlos en Google Cloud Storage + Vertex AI Vector Search. + +## 📁 Estructura del Proyecto + +``` +pipeline/ +├── packages/ # Librerías reutilizables +│ ├── chunker/ # ⭐ Estrategias de chunking +│ │ ├── base_chunker.py +│ │ ├── recursive_chunker.py +│ │ ├── contextual_chunker.py # Usado en producción +│ │ └── llm_chunker.py # Avanzado con optimización +│ ├── embedder/ # Generación de embeddings +│ │ └── vertex_ai.py +│ ├── file-storage/ # Storage en GCS +│ │ └── google_cloud.py +│ ├── vector-search/ # Índices vectoriales +│ │ └── vertex_ai.py +│ ├── llm/ # Cliente LLM +│ │ └── vertex_ai.py +│ ├── document-converter/ # PDF → Markdown +│ │ └── markdown.py +│ └── utils/ # Utilidades +├── apps/ +│ └── index-gen/ # ⭐ Pipeline principal +│ └── src/index_gen/ +│ └── main.py # Orquestador completo +├── src/ +│ └── rag_eval/ +│ └── config.py # Configuración centralizada +├── pyproject.toml # Dependencias del proyecto +└── config.yaml # Configuración de GCP +``` + +--- + +## 🚀 Instalación + +### 1. Prerrequisitos + +- **Python 3.12+** +- **uv** (gestor de paquetes) +- **Poppler** (para pdf2image): + ```bash + # Ubuntu/Debian + sudo apt-get update + sudo apt-get install -y poppler-utils libcairo2-dev + + # macOS + brew install poppler cairo + ``` + +### 2. Instalar dependencias + +```bash +cd /home/coder/sigma-chat/pipeline + +# Instalar todas las dependencias +uv sync + +# O instalar solo las necesarias (sin dev) +uv sync --no-dev +``` + +--- + +## ⚙️ Configuración + +### 1. Configurar credenciales de GCP + +```bash +# Autenticar con Google Cloud +gcloud auth application-default login + +# O usar service account key +export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json" +``` + +### 2. Configurar `config.yaml` + +Edita el archivo `config.yaml`: + +```yaml +project_id: "tu-proyecto-gcp" +location: "us-central1" +bucket: "tu-bucket-gcs" + +index: + name: "mi-indice-vectorial" + dimensions: 768 # Para text-embedding-005 + machine_type: "e2-standard-2" +``` + +--- + +## 📖 Uso + +### **Opción 1: Pipeline Completo (Kubeflow/Vertex AI)** + +El archivo [`apps/index-gen/src/index_gen/main.py`](apps/index-gen/src/index_gen/main.py) define un pipeline KFP completo: + +```python +from apps.index_gen.src.index_gen.main import ( + gather_files, + process_file, + aggregate_vectors, + create_vector_index +) + +# 1. Buscar PDFs en GCS +pdf_files = gather_files("gs://mi-bucket/pdfs/") + +# 2. Procesar cada archivo +for pdf_file in pdf_files: + process_file( + file_path=pdf_file, + model_name="text-embedding-005", + contents_output_dir="gs://mi-bucket/contents/", + vectors_output_file="vectors.jsonl", + chunk_limit=800 + ) + +# 3. Agregar vectores +aggregate_vectors( + vector_artifacts=["vectors.jsonl"], + output_gcs_path="gs://mi-bucket/vectors/all_vectors.jsonl" +) + +# 4. Crear índice vectorial +create_vector_index( + vectors_dir="gs://mi-bucket/vectors/" +) +``` + +--- + +### **Opción 2: Usar Chunkers Individuales** + +#### **A) RecursiveChunker (Simple y Rápido)** + +```python +from chunker.recursive_chunker import RecursiveChunker +from pathlib import Path + +chunker = RecursiveChunker() +documents = chunker.process_path(Path("documento.txt")) + +# Resultado: +# [ +# {"page_content": "...", "metadata": {"chunk_index": 0}}, +# {"page_content": "...", "metadata": {"chunk_index": 1}}, +# ] +``` + +**CLI:** +```bash +recursive-chunker input.txt output_dir/ +``` + +--- + +#### **B) ContextualChunker (⭐ Recomendado para Producción)** + +Agrega contexto del documento original usando LLM: + +```python +from chunker.contextual_chunker import ContextualChunker +from llm.vertex_ai import VertexAILLM + +llm = VertexAILLM( + project="tu-proyecto", + location="us-central1" +) + +chunker = ContextualChunker( + llm_client=llm, + max_chunk_size=800, + model="gemini-2.0-flash" +) + +documents = chunker.process_path(Path("documento.txt")) + +# Resultado con contexto: +# [ +# { +# "page_content": "> **Contexto del documento original:**\n> [Resumen LLM]\n\n---\n\n[Contenido del chunk]", +# "metadata": {"chunk_index": 0} +# } +# ] +``` + +**CLI:** +```bash +contextual-chunker input.txt output_dir/ --max-chunk-size 800 --model gemini-2.0-flash +``` + +--- + +#### **C) LLMChunker (Avanzado)** + +Con optimización, fusión de chunks y extracción de imágenes: + +```python +from chunker.llm_chunker import LLMChunker +from llm.vertex_ai import VertexAILLM + +llm = VertexAILLM(project="tu-proyecto", location="us-central1") + +chunker = LLMChunker( + output_dir="output/", + model="gemini-2.0-flash", + max_tokens=1000, + target_tokens=800, + gemini_client=llm, + merge_related=True, + extract_images=True, + custom_instructions="Mantener términos técnicos en inglés" +) + +documents = chunker.process_path(Path("documento.pdf")) +``` + +**CLI:** +```bash +llm-chunker documento.pdf output_dir/ \ + --model gemini-2.0-flash \ + --max-tokens 1000 \ + --target-tokens 800 \ + --merge-related \ + --extract-images +``` + +--- + +### **Opción 3: Generar Embeddings** + +```python +from embedder.vertex_ai import VertexAIEmbedder + +embedder = VertexAIEmbedder( + model_name="text-embedding-005", + project="tu-proyecto", + location="us-central1" +) + +# Single embedding +embedding = embedder.generate_embedding("Texto de ejemplo") +# Returns: List[float] con 768 dimensiones + +# Batch embeddings +texts = ["Texto 1", "Texto 2", "Texto 3"] +embeddings = embedder.generate_embeddings_batch(texts, batch_size=10) +# Returns: List[List[float]] +``` + +--- + +### **Opción 4: Almacenar en GCS** + +```python +from file_storage.google_cloud import GoogleCloudFileStorage + +storage = GoogleCloudFileStorage(bucket="mi-bucket") + +# Subir archivo +storage.upload_file( + file_path="local_file.md", + destination_blob_name="chunks/documento_0.md", + content_type="text/markdown" +) + +# Listar archivos +files = storage.list_files(path="chunks/") + +# Descargar archivo +file_stream = storage.get_file_stream("chunks/documento_0.md") +content = file_stream.read().decode("utf-8") +``` + +**CLI:** +```bash +file-storage upload local_file.md chunks/documento_0.md +file-storage list chunks/ +file-storage download chunks/documento_0.md +``` + +--- + +### **Opción 5: Vector Search** + +```python +from vector_search.vertex_ai import GoogleCloudVectorSearch + +vector_search = GoogleCloudVectorSearch( + project_id="tu-proyecto", + location="us-central1", + bucket="mi-bucket", + index_name="mi-indice" +) + +# Crear índice +vector_search.create_index( + name="mi-indice", + content_path="gs://mi-bucket/vectors/all_vectors.jsonl", + dimensions=768 +) + +# Deploy índice +vector_search.deploy_index( + index_name="mi-indice", + machine_type="e2-standard-2" +) + +# Query +query_embedding = embedder.generate_embedding("¿Qué es RAG?") +results = vector_search.run_query( + deployed_index_id="mi_indice_deployed_xxxxx", + query=query_embedding, + limit=5 +) + +# Resultado: +# [ +# {"id": "documento_0", "distance": 0.85, "content": "RAG es..."}, +# {"id": "documento_1", "distance": 0.78, "content": "..."}, +# ] +``` + +**CLI:** +```bash +vector-search create mi-indice gs://bucket/vectors/ --dimensions 768 +vector-search query deployed_id "¿Qué es RAG?" --limit 5 +vector-search delete mi-indice +``` + +--- + +## 🔄 Flujo Completo de Ejemplo + +```python +from pathlib import Path +from chunker.contextual_chunker import ContextualChunker +from embedder.vertex_ai import VertexAIEmbedder +from file_storage.google_cloud import GoogleCloudFileStorage +from llm.vertex_ai import VertexAILLM + +# 1. Setup +llm = VertexAILLM(project="mi-proyecto", location="us-central1") +chunker = ContextualChunker(llm_client=llm, max_chunk_size=800) +embedder = VertexAIEmbedder( + model_name="text-embedding-005", + project="mi-proyecto", + location="us-central1" +) +storage = GoogleCloudFileStorage(bucket="mi-bucket") + +# 2. Chunking +documents = chunker.process_path(Path("documento.pdf")) +print(f"Creados {len(documents)} chunks") + +# 3. Generate embeddings y guardar +for i, doc in enumerate(documents): + chunk_id = f"doc_{i}" + + # Generar embedding + embedding = embedder.generate_embedding(doc["page_content"]) + + # Guardar contenido en GCS + storage.upload_file( + file_path=f"temp_{chunk_id}.md", + destination_blob_name=f"contents/{chunk_id}.md" + ) + + # Guardar vector (escribir a JSONL localmente, luego subir) + print(f"Chunk {chunk_id}: {len(embedding)} dimensiones") +``` + +--- + +## 📦 Packages Instalados + +Ver lista completa en [`pyproject.toml`](pyproject.toml). + +**Principales:** +- `google-genai` - SDK GenAI para LLM y embeddings +- `google-cloud-aiplatform` - Vertex AI +- `google-cloud-storage` - GCS +- `chonkie` - Recursive chunking +- `langchain` - Text splitting avanzado +- `tiktoken` - Token counting +- `markitdown` - Document conversion +- `pypdf` - PDF processing +- `pdf2image` - PDF to image +- `kfp` - Kubeflow Pipelines + +--- + +## 🛠️ Scripts de CLI Disponibles + +Después de `uv sync`, puedes usar estos comandos: + +```bash +# Chunkers +recursive-chunker input.txt output/ +contextual-chunker input.txt output/ --max-chunk-size 800 +llm-chunker documento.pdf output/ --model gemini-2.0-flash + +# Document converter +convert-md documento.pdf + +# File storage +file-storage upload local.md remote/path.md +file-storage list remote/ +file-storage download remote/path.md + +# Vector search +vector-search create index-name gs://bucket/vectors/ --dimensions 768 +vector-search query deployed-id "query text" --limit 5 + +# Utils +normalize-filenames input_dir/ +``` + +--- + +## 📊 Arquitectura del Sistema + +``` +┌─────────────┐ +│ PDF File │ +└──────┬──────┘ + │ + ▼ +┌─────────────────────────────┐ +│ document-converter │ +│ (PDF → Markdown) │ +└──────┬──────────────────────┘ + │ + ▼ +┌─────────────────────────────┐ +│ chunker │ +│ (Markdown → Chunks) │ +│ - RecursiveChunker │ +│ - ContextualChunker ⭐ │ +│ - LLMChunker │ +└──────┬──────────────────────┘ + │ + ▼ +┌─────────────────────────────┐ +│ embedder │ +│ (Text → Vectors) │ +│ Vertex AI embeddings │ +└──────┬──────────────────────┘ + │ + ├─────────────────────────┐ + │ │ + ▼ ▼ +┌─────────────────┐ ┌─────────────────┐ +│ file-storage │ │ vector-search │ +│ GCS Storage │ │ Vertex AI │ +│ (.md files) │ │ Vector Index │ +└─────────────────┘ └─────────────────┘ +``` + +--- + +## 🐛 Troubleshooting + +### Error: "poppler not found" +```bash +sudo apt-get install -y poppler-utils +``` + +### Error: "Permission denied" en GCS +```bash +gcloud auth application-default login +# O configurar GOOGLE_APPLICATION_CREDENTIALS +``` + +### Error: "Module not found" +```bash +# Reinstalar dependencias +uv sync --reinstall +``` + +--- + +## 📝 Notas + +- **ContextualChunker** es el recomendado para producción (agrega contexto del documento) +- **LLMChunker** es más lento pero genera chunks óptimos (fusiona, optimiza tokens) +- **RecursiveChunker** es el más rápido para pruebas rápidas +- Los chunks se guardan como `.md` en GCS +- Los vectores se guardan en formato JSONL: `{"id": "...", "embedding": [...]}` +- El índice vectorial se crea en Vertex AI Vector Search + +--- + +## 📄 License + +Este código es parte del proyecto legacy-rag. diff --git a/RESUMEN.txt b/RESUMEN.txt new file mode 100644 index 0000000..f10c6b7 --- /dev/null +++ b/RESUMEN.txt @@ -0,0 +1,65 @@ +╔════════════════════════════════════════════════════════════════╗ +║ ✅ PROYECTO PIPELINE COPIADO EXITOSAMENTE ║ +╚════════════════════════════════════════════════════════════════╝ + +📁 UBICACIÓN: /home/coder/sigma-chat/pipeline + +📊 ESTADÍSTICAS: + • Tamaño total: ~400KB + • Archivos Python: 33 + • Packages: 7 + • Apps: 1 + • Archivos de documentación: 5 + +📦 PACKAGES INCLUIDOS: + ✅ chunker - 3 estrategias de chunking + ✅ embedder - Generación de embeddings (Vertex AI) + ✅ file-storage - Almacenamiento en GCS + ✅ vector-search - Índices vectoriales (Vertex AI) + ✅ llm - Cliente para Gemini/Vertex AI + ✅ document-converter - Conversión PDF → Markdown + ✅ utils - Utilidades varias + +🎯 APPS INCLUIDAS: + ✅ index-gen - Pipeline completo KFP + +📚 DOCUMENTACIÓN: + ✅ 00_START_HERE.md - Punto de inicio rápido + ✅ QUICKSTART.md - Guía rápida con ejemplos + ✅ README.md - Documentación completa + ✅ STRUCTURE.md - Estructura detallada + ✅ config.example.yaml - Plantilla de configuración + +⚙️ ARCHIVOS DE CONFIGURACIÓN: + ✅ pyproject.toml - Dependencias y scripts CLI + ✅ config.yaml - Configuración de GCP + ✅ .python-version - Python 3.12 + ✅ .gitignore - Exclusiones de git + +🚀 PRÓXIMOS PASOS: + +1. Subir a GCP Workbench: + • Comprimir: tar -czf pipeline.tar.gz pipeline/ + • Subir a Workbench + • Descomprimir: tar -xzf pipeline.tar.gz + +2. Instalar dependencias: + cd ~/pipeline + uv sync + +3. Configurar: + nano config.yaml + # Editar: project_id, location, bucket + +4. Probar: + echo "Texto de prueba" > test.txt + recursive-chunker test.txt output/ + +📖 LEER PRIMERO: + cat 00_START_HERE.md + +═══════════════════════════════════════════════════════════════ + +✨ TODO LISTO PARA USAR EN GCP WORKBENCH ✨ + +═══════════════════════════════════════════════════════════════ diff --git a/STRUCTURE.md b/STRUCTURE.md new file mode 100644 index 0000000..5db7221 --- /dev/null +++ b/STRUCTURE.md @@ -0,0 +1,171 @@ +# Estructura del Proyecto Pipeline + +## ✅ Carpetas y Archivos Copiados + +``` +pipeline/ +├── 📄 pyproject.toml # Configuración principal del proyecto +├── 📄 config.yaml # Configuración de GCP (del original) +├── 📄 config.example.yaml # Plantilla de configuración +├── 📄 .python-version # Python 3.12 +├── 📄 .gitignore # Archivos a ignorar +├── 📄 README.md # Documentación completa +│ +├── 📁 packages/ # Librerías reutilizables +│ ├── chunker/ # ⭐ CHUNKING +│ │ ├── pyproject.toml +│ │ └── src/chunker/ +│ │ ├── base_chunker.py +│ │ ├── recursive_chunker.py +│ │ ├── contextual_chunker.py +│ │ └── llm_chunker.py +│ │ +│ ├── embedder/ # ⭐ EMBEDDINGS +│ │ ├── pyproject.toml +│ │ └── src/embedder/ +│ │ ├── base.py +│ │ └── vertex_ai.py +│ │ +│ ├── file-storage/ # ⭐ ALMACENAMIENTO GCS +│ │ ├── pyproject.toml +│ │ └── src/file_storage/ +│ │ ├── base.py +│ │ ├── google_cloud.py +│ │ └── cli.py +│ │ +│ ├── vector-search/ # ⭐ ÍNDICE VECTORIAL +│ │ ├── pyproject.toml +│ │ └── src/vector_search/ +│ │ ├── base.py +│ │ ├── vertex_ai.py +│ │ └── cli/ +│ │ ├── create.py +│ │ ├── query.py +│ │ ├── delete.py +│ │ └── generate.py +│ │ +│ ├── llm/ # Cliente LLM +│ │ ├── pyproject.toml +│ │ └── src/llm/ +│ │ ├── base.py +│ │ └── vertex_ai.py +│ │ +│ ├── document-converter/ # Conversión PDF→Markdown +│ │ ├── pyproject.toml +│ │ └── src/document_converter/ +│ │ ├── base.py +│ │ └── markdown.py +│ │ +│ └── utils/ # Utilidades +│ ├── pyproject.toml +│ └── src/utils/ +│ └── normalize_filenames.py +│ +├── 📁 apps/ # Aplicaciones +│ └── index-gen/ # ⭐ PIPELINE PRINCIPAL +│ ├── pyproject.toml +│ └── src/index_gen/ +│ ├── cli.py +│ └── main.py # Pipeline KFP completo +│ +└── 📁 src/ # Código fuente principal + └── rag_eval/ + ├── __init__.py + └── config.py # Configuración centralizada +``` + +## 📊 Resumen de Componentes + +### Packages Core (7) +1. ✅ **chunker** - 3 estrategias de chunking (Recursive, Contextual, LLM) +2. ✅ **embedder** - Generación de embeddings con Vertex AI +3. ✅ **file-storage** - Almacenamiento en Google Cloud Storage +4. ✅ **vector-search** - Índices vectoriales en Vertex AI +5. ✅ **llm** - Cliente para modelos Gemini/Vertex AI +6. ✅ **document-converter** - Conversión de documentos +7. ✅ **utils** - Utilidades varias + +### Aplicaciones (1) +1. ✅ **index-gen** - Pipeline completo de procesamiento + +### Configuración (1) +1. ✅ **rag_eval** - Configuración centralizada + +## 🔧 Archivos de Configuración + +- ✅ `pyproject.toml` - Dependencias y scripts CLI +- ✅ `config.yaml` - Configuración de GCP +- ✅ `config.example.yaml` - Plantilla +- ✅ `.python-version` - Versión de Python +- ✅ `.gitignore` - Archivos ignorados + +## 📝 Documentación + +- ✅ `README.md` - Documentación completa con ejemplos +- ✅ `STRUCTURE.md` - Este archivo + +## 🎯 Funcionalidades Disponibles + +### CLI Scripts +```bash +# Chunking +recursive-chunker input.txt output/ +contextual-chunker input.txt output/ --max-chunk-size 800 +llm-chunker documento.pdf output/ --model gemini-2.0-flash + +# Document conversion +convert-md documento.pdf + +# File storage +file-storage upload local.md remote/path.md +file-storage list remote/ +file-storage download remote/path.md + +# Vector search +vector-search create index-name gs://bucket/vectors/ --dimensions 768 +vector-search query deployed-id "query text" --limit 5 +vector-search delete index-name + +# Utils +normalize-filenames input_dir/ +``` + +### Python API +Todas las clases están disponibles para importación directa: + +```python +from chunker.contextual_chunker import ContextualChunker +from embedder.vertex_ai import VertexAIEmbedder +from file_storage.google_cloud import GoogleCloudFileStorage +from vector_search.vertex_ai import GoogleCloudVectorSearch +from llm.vertex_ai import VertexAILLM +``` + +## 🚀 Próximos Pasos + +1. **Instalar dependencias**: + ```bash + cd /home/coder/sigma-chat/pipeline + uv sync + ``` + +2. **Configurar GCP**: + - Editar `config.yaml` con tus credenciales + - Ejecutar `gcloud auth application-default login` + +3. **Probar chunking**: + ```bash + echo "Texto de prueba" > test.txt + recursive-chunker test.txt output/ + ``` + +4. **Ver documentación completa**: + ```bash + cat README.md + ``` + +--- + +**Total de archivos Python copiados**: ~30+ archivos +**Total de packages**: 8 (7 packages + 1 app) +**Listo para usar**: ✅ diff --git a/apps/index-gen/README.md b/apps/index-gen/README.md new file mode 100644 index 0000000..e69de29 diff --git a/apps/index-gen/pyproject.toml b/apps/index-gen/pyproject.toml new file mode 100644 index 0000000..5795c70 --- /dev/null +++ b/apps/index-gen/pyproject.toml @@ -0,0 +1,34 @@ +[project] +name = "index-gen" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Anibal Angulo", email = "a8065384@banorte.com" } +] +requires-python = ">=3.12" +dependencies = [ + "chunker", + "document-converter", + "embedder", + "file-storage", + "llm", + "utils", + "vector-search", +] + +[project.scripts] +index-gen = "index_gen.cli:app" + +[build-system] +requires = ["uv_build>=0.8.12,<0.9.0"] +build-backend = "uv_build" + +[tool.uv.sources] +file-storage = { workspace = true } +vector-search = { workspace = true } +utils = { workspace = true } +embedder = { workspace = true } +chunker = { workspace = true } +document-converter = { workspace = true } +llm = { workspace = true } diff --git a/apps/index-gen/src/index_gen/__init__.py b/apps/index-gen/src/index_gen/__init__.py new file mode 100644 index 0000000..5b33691 --- /dev/null +++ b/apps/index-gen/src/index_gen/__init__.py @@ -0,0 +1,2 @@ +def main() -> None: + print("Hello from index-gen!") diff --git a/apps/index-gen/src/index_gen/cli.py b/apps/index-gen/src/index_gen/cli.py new file mode 100644 index 0000000..4f7eee0 --- /dev/null +++ b/apps/index-gen/src/index_gen/cli.py @@ -0,0 +1,68 @@ +import logging +import tempfile +from pathlib import Path + +import typer + +from index_gen.main import ( + aggregate_vectors, + build_gcs_path, + create_vector_index, + gather_files, + process_file, +) +from rag_eval.config import settings + +app = typer.Typer() + + +@app.command() +def run_ingestion(): + """Main function for the CLI script.""" + logging.basicConfig(level=logging.INFO) + + agent_config = settings.agent + index_config = settings.index + + if not agent_config or not index_config: + raise ValueError("Agent or index configuration not found in config.yaml") + + # Gather files + files = gather_files(index_config.origin) + + # Build output paths + contents_output_dir = build_gcs_path(index_config.data, "/contents") + vectors_output_dir = build_gcs_path(index_config.data, "/vectors") + aggregated_vectors_gcs_path = build_gcs_path( + index_config.data, "/vectors/vectors.json" + ) + + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) + vector_artifact_paths = [] + + # Process files and create local artifacts + for i, file in enumerate(files): + artifact_path = temp_dir_path / f"vectors_{i}.jsonl" + vector_artifact_paths.append(artifact_path) + + process_file( + file, + agent_config.embedding_model, + contents_output_dir, + artifact_path, # Pass the local path + index_config.chunk_limit, + ) + + # Aggregate the local artifacts into one file in GCS + aggregate_vectors( + vector_artifacts=vector_artifact_paths, + output_gcs_path=aggregated_vectors_gcs_path, + ) + + # Create vector index + create_vector_index(vectors_output_dir) + + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/apps/index-gen/src/index_gen/main.py b/apps/index-gen/src/index_gen/main.py new file mode 100644 index 0000000..dad6f26 --- /dev/null +++ b/apps/index-gen/src/index_gen/main.py @@ -0,0 +1,238 @@ +""" +This script defines a Kubeflow Pipeline (KFP) for ingesting and processing documents. + +The pipeline is designed to run on Vertex AI Pipelines and consists of the following steps: +1. **Gather Files**: Scans a GCS directory for PDF files to process. +2. **Process Files (in parallel)**: For each PDF file found, this step: + a. Converts the PDF to Markdown text. + b. Chunks the text if it's too long. + c. Generates a vector embedding for each chunk using a Vertex AI embedding model. + d. Saves the markdown content and the vector embedding to separate GCS output paths. +""" + +import json +import logging +import os +import tempfile +from pathlib import Path + +from rag_eval.config import settings + + +def build_gcs_path(base_path: str, suffix: str) -> str: + """Builds a GCS path by appending a suffix.""" + return f"{base_path}{suffix}" + + +def gather_files( + input_dir: str, +) -> list: + """Gathers all PDF file paths from a GCS directory.""" + from google.cloud import storage + + logging.getLogger().setLevel(logging.INFO) + + gcs_client = storage.Client() + bucket_name, prefix = input_dir.replace("gs://", "").split("/", 1) + bucket = gcs_client.bucket(bucket_name) + blob_list = bucket.list_blobs(prefix=prefix) + + pdf_files = [ + f"gs://{bucket_name}/{blob.name}" + for blob in blob_list + if blob.name.endswith(".pdf") + ] + logging.info(f"Found {len(pdf_files)} PDF files in {input_dir}") + return pdf_files + + +def process_file( + file_path: str, + model_name: str, + contents_output_dir: str, + vectors_output_file: Path, + chunk_limit: int, +): + """ + Processes a single PDF file: converts to markdown, chunks, and generates embeddings. + The vector embeddings are written to a local JSONL file. + """ + # Imports are inside the function as KFP serializes this function + from pathlib import Path + + from chunker.contextual_chunker import ContextualChunker + from document_converter.markdown import MarkdownConverter + from embedder.vertex_ai import VertexAIEmbedder + from google.cloud import storage + from llm.vertex_ai import VertexAILLM + from utils.normalize_filenames import normalize_string + + logging.getLogger().setLevel(logging.INFO) + + # Initialize converters and embedders + converter = MarkdownConverter() + embedder = VertexAIEmbedder(model_name=model_name, project=settings.project_id, location=settings.location) + llm = VertexAILLM(project=settings.project_id, location=settings.location) + chunker = ContextualChunker(llm_client=llm, max_chunk_size=chunk_limit) + gcs_client = storage.Client() + + file_id = normalize_string(Path(file_path).stem) + local_path = Path(f"/tmp/{Path(file_path).name}") + + with open(vectors_output_file, "w", encoding="utf-8") as f: + try: + # Download file from GCS + bucket_name, blob_name = file_path.replace("gs://", "").split("/", 1) + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + blob.download_to_filename(local_path) + logging.info(f"Processing file: {file_path}") + + # Process the downloaded file + markdown_content = converter.process_file(local_path) + + def upload_to_gcs(bucket_name, blob_name, data): + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + blob.upload_from_string(data, content_type="text/markdown; charset=utf-8") + + # Determine output bucket and paths for markdown + contents_bucket_name, contents_prefix = contents_output_dir.replace( + "gs://", "" + ).split("/", 1) + + # Extract source folder from file path + source_folder = Path(blob_name).parent.as_posix() if blob_name else "" + + if len(markdown_content) > chunk_limit: + chunks = chunker.process_text(markdown_content) + for i, chunk in enumerate(chunks): + chunk_id = f"{file_id}_{i}" + embedding = embedder.generate_embedding(chunk["page_content"]) + + # Upload markdown chunk + md_blob_name = f"{contents_prefix}/{chunk_id}.md" + upload_to_gcs( + contents_bucket_name, md_blob_name, chunk["page_content"] + ) + + # Write vector to local JSONL file with source folder + vector_data = { + "id": chunk_id, + "embedding": embedding, + "source_folder": source_folder + } + json_line = json.dumps(vector_data) + f.write(json_line + '\n') + else: + embedding = embedder.generate_embedding(markdown_content) + + # Upload markdown + md_blob_name = f"{contents_prefix}/{file_id}.md" + upload_to_gcs(contents_bucket_name, md_blob_name, markdown_content) + + # Write vector to local JSONL file with source folder + vector_data = { + "id": file_id, + "embedding": embedding, + "source_folder": source_folder + } + json_line = json.dumps(vector_data) + f.write(json_line + '\n') + + except Exception as e: + logging.error(f"Failed to process file {file_path}: {e}", exc_info=True) + raise + + finally: + # Clean up the downloaded file + if os.path.exists(local_path): + os.remove(local_path) + + + + + +def aggregate_vectors( + vector_artifacts: list, # This will be a list of paths to the artifact files + output_gcs_path: str, +): + """ + Aggregates multiple JSONL artifact files into a single JSONL file in GCS. + """ + from google.cloud import storage + + logging.getLogger().setLevel(logging.INFO) + + # Create a temporary file to aggregate all vector data + with tempfile.NamedTemporaryFile( + mode="w", delete=False, encoding="utf-8" + ) as temp_agg_file: + logging.info(f"Aggregating vectors into temporary file: {temp_agg_file.name}") + for artifact_path in vector_artifacts: + with open(artifact_path, "r", encoding="utf-8") as f: + # Each line is a complete JSON object + for line in f: + temp_agg_file.write(line) # line already includes newline + + temp_file_path = temp_agg_file.name + + logging.info("Uploading aggregated file to GCS...") + gcs_client = storage.Client() + bucket_name, blob_name = output_gcs_path.replace("gs://", "").split("/", 1) + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + blob.upload_from_filename(temp_file_path, content_type="application/json; charset=utf-8") + + logging.info(f"Successfully uploaded aggregated vectors to {output_gcs_path}") + + # Clean up the temporary file + import os + + os.remove(temp_file_path) + + + +def create_vector_index( + vectors_dir: str, +): + """Creates and deploys a Vertex AI Vector Search Index.""" + from vector_search.vertex_ai import GoogleCloudVectorSearch + + from rag_eval.config import settings as config + + logging.getLogger().setLevel(logging.INFO) + + try: + index_config = config.index + + logging.info( + f"Initializing Vertex AI client for project '{config.project_id}' in '{config.location}'..." + ) + vector_search = GoogleCloudVectorSearch( + project_id=config.project_id, + location=config.location, + bucket=config.bucket, + index_name=index_config.name, + ) + + logging.info(f"Starting creation of index '{index_config.name}'...") + vector_search.create_index( + name=index_config.name, + content_path=vectors_dir, + dimensions=index_config.dimensions, + ) + logging.info(f"Index '{index_config.name}' created successfully.") + + logging.info("Deploying index to a new endpoint...") + vector_search.deploy_index( + index_name=index_config.name, machine_type=index_config.machine_type + ) + logging.info("Index deployed successfully!") + logging.info(f"Endpoint name: {vector_search.index_endpoint.display_name}") + logging.info( + f"Endpoint resource name: {vector_search.index_endpoint.resource_name}" + ) + except Exception as e: + logging.error(f"An error occurred during index creation or deployment: {e}", exc_info=True) + raise \ No newline at end of file diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..37eb95f --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,29 @@ +# Configuración de Google Cloud Platform +project_id: "tu-proyecto-gcp" +location: "us-central1" # o us-east1, europe-west1, etc. +bucket: "tu-bucket-nombre" + +# Configuración del índice vectorial +index: + name: "mi-indice-rag" + dimensions: 768 # Para text-embedding-005 usa 768 + machine_type: "e2-standard-2" # Tipo de máquina para el endpoint + approximate_neighbors_count: 150 + distance_measure_type: "DOT_PRODUCT_DISTANCE" # O "COSINE_DISTANCE", "EUCLIDEAN_DISTANCE" + +# Configuración de embeddings +embedder: + model_name: "text-embedding-005" + task: "RETRIEVAL_DOCUMENT" # O "RETRIEVAL_QUERY" para queries + +# Configuración de LLM para chunking +llm: + model: "gemini-2.0-flash" # O "gemini-1.5-pro", "gemini-1.5-flash" + +# Configuración de chunking +chunking: + strategy: "contextual" # "recursive", "contextual", "llm" + max_chunk_size: 800 + chunk_overlap: 200 # Solo para LLMChunker + merge_related: true # Solo para LLMChunker + extract_images: true # Solo para LLMChunker diff --git a/packages/chunker/README.md b/packages/chunker/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/chunker/pyproject.toml b/packages/chunker/pyproject.toml new file mode 100644 index 0000000..c593c1b --- /dev/null +++ b/packages/chunker/pyproject.toml @@ -0,0 +1,23 @@ +[project] +name = "chunker" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Anibal Angulo", email = "a8065384@banorte.com" } +] +requires-python = ">=3.12" +dependencies = [ + "chonkie>=1.1.2", + "pdf2image>=1.17.0", + "pypdf>=6.0.0", +] + +[project.scripts] +llm-chunker = "chunker.llm_chunker:app" +recursive-chunker = "chunker.recursive_chunker:app" +contextual-chunker = "chunker.contextual_chunker:app" + +[build-system] +requires = ["uv_build>=0.8.3,<0.9.0"] +build-backend = "uv_build" diff --git a/packages/chunker/src/chunker/__init__.py b/packages/chunker/src/chunker/__init__.py new file mode 100644 index 0000000..b63575c --- /dev/null +++ b/packages/chunker/src/chunker/__init__.py @@ -0,0 +1,2 @@ +def hello() -> str: + return "Hello from chunker!" diff --git a/packages/chunker/src/chunker/base_chunker.py b/packages/chunker/src/chunker/base_chunker.py new file mode 100644 index 0000000..4ec2eed --- /dev/null +++ b/packages/chunker/src/chunker/base_chunker.py @@ -0,0 +1,66 @@ +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List, TypedDict + + +class Document(TypedDict): + """A dictionary representing a processed document chunk.""" + + page_content: str + metadata: dict + + +class BaseChunker(ABC): + """Abstract base class for chunker implementations.""" + + @abstractmethod + def process_text(self, text: str) -> List[Document]: + """ + Processes a string of text into a list of Document chunks. + + Args: + text: The input string to process. + + Returns: + A list of Document objects. + """ + ... + + def process_path(self, path: Path) -> List[Document]: + """ + Reads a file from a Path object and processes its content. + + It attempts to read the file with UTF-8 encoding and falls back to + latin-1 if a UnicodeDecodeError occurs. + + Args: + path: The Path object pointing to the file. + + Returns: + A list of Document objects from the file's content. + """ + try: + text = path.read_text(encoding="utf-8") + except UnicodeDecodeError: + text = path.read_text(encoding="latin-1") + return self.process_text(text) + + def process_bytes(self, b: bytes) -> List[Document]: + """ + Decodes a byte string and processes its content. + + It first attempts to decode the bytes as UTF-8. If that fails, + it falls back to latin-1. + + Args: + b: The input byte string. + + Returns: + A list of Document objects from the byte string's content. + """ + try: + text = b.decode("utf-8") + except UnicodeDecodeError: + # Fallback for files that are not UTF-8 encoded. + text = b.decode("utf-8-sig") + return self.process_text(text) diff --git a/packages/chunker/src/chunker/contextual_chunker.py b/packages/chunker/src/chunker/contextual_chunker.py new file mode 100644 index 0000000..6504f7d --- /dev/null +++ b/packages/chunker/src/chunker/contextual_chunker.py @@ -0,0 +1,155 @@ +import json +import os +from pathlib import Path +from typing import Annotated, List + +import typer +from llm.vertex_ai import VertexAILLM + +from .base_chunker import BaseChunker, Document + + +class ContextualChunker(BaseChunker): + """ + A chunker that uses a large language model to create context-aware chunks. + """ + + def __init__( + self, + llm_client: VertexAILLM, + max_chunk_size: int = 800, + model: str = "gemini-2.0-flash", + ): + """ + Initializes the ContextualChunker. + + Args: + max_chunk_size: The maximum length of a chunk in characters. + model: The name of the language model to use. + llm_client: An optional instance of a language model client. + """ + self.max_chunk_size = max_chunk_size + self.model = model + self.llm_client = llm_client + + def _split_text(self, text: str) -> List[str]: + """Splits text into evenly sized chunks of a maximum size, trying to respect sentence and paragraph boundaries.""" + import math + + num_chunks = math.ceil(len(text) / self.max_chunk_size) + if num_chunks == 1: + return [text] + + ideal_chunk_size = math.ceil(len(text) / num_chunks) + + chunks = [] + current_pos = 0 + while current_pos < len(text): + end_pos = min(current_pos + ideal_chunk_size, len(text)) + + # Find a good split point around the end_pos + split_point = -1 + if end_pos < len(text): + paragraph_break = text.rfind("\n\n", current_pos, end_pos) + if paragraph_break != -1: + split_point = paragraph_break + 2 + else: + sentence_break = text.rfind(". ", current_pos, end_pos) + if sentence_break != -1: + split_point = sentence_break + 1 + else: + split_point = end_pos + else: + split_point = end_pos + + chunks.append(text[current_pos:split_point]) + current_pos = split_point + + return chunks + + def process_text(self, text: str) -> List[Document]: + """ + Processes a string of text into a list of context-aware Document chunks. + """ + if len(text) <= self.max_chunk_size: + return [{"page_content": text, "metadata": {}}] + + chunks = self._split_text(text) + processed_chunks: List[Document] = [] + + for i, chunk_content in enumerate(chunks): + prompt = f""" + Documento Original: + --- + {text} + --- + + Fragmento Actual: + --- + {chunk_content} + --- + + Tarea: + Genera un resumen conciso del "Documento Original" que proporcione el contexto necesario para entender el "Fragmento Actual". El resumen debe ser un solo párrafo en español. + """ + + summary = self.llm_client.generate(self.model, prompt).text + contextualized_chunk = ( + f"> **Contexto del documento original:**\n> {summary}\n\n---\n\n" + + chunk_content + ) + + processed_chunks.append( + { + "page_content": contextualized_chunk, + "metadata": {"chunk_index": i}, + } + ) + + return processed_chunks + + +app = typer.Typer() + + +@app.command() +def main( + input_file_path: Annotated[ + str, typer.Argument(help="Path to the input text file.") + ], + output_dir: Annotated[ + str, typer.Argument(help="Directory to save the output file.") + ], + max_chunk_size: Annotated[ + int, typer.Option(help="Maximum chunk size in characters.") + ] = 800, + model: Annotated[ + str, typer.Option(help="Model to use for the processing") + ] = "gemini-2.0-flash", +): + """ + Processes a text file using ContextualChunker and saves the output to a JSONL file. + """ + print(f"Starting to process {input_file_path}...") + + chunker = ContextualChunker(max_chunk_size=max_chunk_size, model=model) + documents = chunker.process_path(Path(input_file_path)) + + print(f"Successfully created {len(documents)} chunks.") + + if not os.path.exists(output_dir): + os.makedirs(output_dir) + print(f"Created output directory: {output_dir}") + + output_file_path = os.path.join(output_dir, "chunked_documents.jsonl") + + with open(output_file_path, "w", encoding="utf-8") as f: + for doc in documents: + doc["metadata"]["source_file"] = os.path.basename(input_file_path) + f.write(json.dumps(doc, ensure_ascii=False) + "\n") + + print(f"Successfully saved {len(documents)} chunks to {output_file_path}") + + +if __name__ == "__main__": + app() diff --git a/packages/chunker/src/chunker/llm_chunker.py b/packages/chunker/src/chunker/llm_chunker.py new file mode 100644 index 0000000..2b44474 --- /dev/null +++ b/packages/chunker/src/chunker/llm_chunker.py @@ -0,0 +1,577 @@ +import hashlib +import json +import os +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Annotated, List + +import tiktoken +import typer +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain_core.documents import Document as LangchainDocument +from llm.vertex_ai import VertexAILLM +from pdf2image import convert_from_path +from pypdf import PdfReader + +from rag_eval.config import Settings + +from .base_chunker import BaseChunker, Document + + +class TokenManager: + """Manages token counting and truncation.""" + + def __init__(self, model_name: str = "gpt-3.5-turbo"): + try: + self.encoding = tiktoken.encoding_for_model(model_name) + except KeyError: + self.encoding = tiktoken.get_encoding("cl100k_base") + + def count_tokens(self, text: str) -> int: + return len(self.encoding.encode(text)) + + def truncate_to_tokens( + self, text: str, max_tokens: int, preserve_sentences: bool = True + ) -> str: + tokens = self.encoding.encode(text) + + if len(tokens) <= max_tokens: + return text + + truncated_tokens = tokens[:max_tokens] + truncated_text = self.encoding.decode(truncated_tokens) + + if preserve_sentences: + last_period = truncated_text.rfind(".") + if last_period > len(truncated_text) * 0.7: + return truncated_text[: last_period + 1] + + return truncated_text + + +class OptimizedChunkProcessor: + """Uses an LLM to merge and enhance text chunks.""" + + def __init__( + self, + model: str, + max_tokens: int = 1000, + target_tokens: int = 800, + chunks_per_batch: int = 5, + gemini_client: VertexAILLM | None = None, + model_name: str = "gpt-3.5-turbo", + custom_instructions: str = "", + ): + self.model = model + self.client = gemini_client + self.chunks_per_batch = chunks_per_batch + self.max_tokens = max_tokens + self.target_tokens = target_tokens + self.token_manager = TokenManager(model_name) + self.custom_instructions = custom_instructions + self._merge_cache = {} + self._enhance_cache = {} + + def _get_cache_key(self, text: str) -> str: + combined = text + self.custom_instructions + return hashlib.md5(combined.encode()).hexdigest()[:16] + + def should_merge_chunks(self, chunk1: str, chunk2: str) -> bool: + cache_key = f"{self._get_cache_key(chunk1)}_{self._get_cache_key(chunk2)}" + if cache_key in self._merge_cache: + return self._merge_cache[cache_key] + + try: + combined_text = f"{chunk1}\n\n{chunk2}" + combined_tokens = self.token_manager.count_tokens(combined_text) + + if combined_tokens > self.max_tokens: + self._merge_cache[cache_key] = False + return False + + if self.client: + base_prompt = f"""Analiza estos dos fragmentos de texto y determina si deben unirse. + + LÍMITES ESTRICTOS: + - Tokens combinados: {combined_tokens}/{self.max_tokens} + - Solo unir si hay continuidad semántica clara + + Criterios de unión: + 1. El primer fragmento termina abruptamente + 2. El segundo fragmento continúa la misma idea/concepto + 3. La unión mejora la coherencia del contenido + 4. Exceder {self.max_tokens} tokens, SOLAMENTE si es necesario para mantener el contexto""" + + base_prompt += f""" + + Responde SOLO 'SI' o 'NO'. + + Fragmento 1 ({self.token_manager.count_tokens(chunk1)} tokens): + {chunk1[:500]}... + + Fragmento 2 ({self.token_manager.count_tokens(chunk2)} tokens): + {chunk2[:500]}...""" + + response = self.client.generate(self.model, base_prompt).text + result = response.strip().upper() == "SI" + self._merge_cache[cache_key] = result + return result + + result = ( + not chunk1.rstrip().endswith((".", "!", "?")) + and combined_tokens <= self.target_tokens + ) + self._merge_cache[cache_key] = result + return result + + except Exception as e: + print(f"Error analizando chunks para merge: {e}") + self._merge_cache[cache_key] = False + return False + + def enhance_chunk(self, chunk_text: str) -> str: + cache_key = self._get_cache_key(chunk_text) + if cache_key in self._enhance_cache: + return self._enhance_cache[cache_key] + + current_tokens = self.token_manager.count_tokens(chunk_text) + + try: + if self.client and current_tokens < self.max_tokens: + base_prompt = f"""Optimiza este texto siguiendo estas reglas ESTRICTAS: + + LÍMITES DE TOKENS: + - Actual: {current_tokens} tokens + - Máximo permitido: {self.max_tokens} tokens + - Objetivo: {self.target_tokens} tokens + + REGLAS FUNDAMENTALES: + NO exceder {self.max_tokens} tokens bajo ninguna circunstancia + Mantener TODA la información esencial y metadatos + NO cambiar términos técnicos o palabras clave + Asegurar oraciones completas y coherentes + Optimizar claridad y estructura sin añadir contenido + SOLO devuelve el texto no agregues conclusiones NUNCA + + Si el texto está cerca del límite, NO expandir. Solo mejorar estructura.""" + + if self.custom_instructions.strip(): + base_prompt += ( + f"\n\nINSTRUCCIONES ADICIONALES:\n{self.custom_instructions}" + ) + + base_prompt += f"\n\nTexto a optimizar:\n{chunk_text}" + + response = self.client.generate(self.model, base_prompt).text + enhanced_text = response.strip() + + enhanced_tokens = self.token_manager.count_tokens(enhanced_text) + if enhanced_tokens > self.max_tokens: + print( + f"Advertencia: Texto optimizado excede límite ({enhanced_tokens} > {self.max_tokens})" + ) + enhanced_text = self.token_manager.truncate_to_tokens( + enhanced_text, self.max_tokens + ) + + self._enhance_cache[cache_key] = enhanced_text + return enhanced_text + else: + if current_tokens > self.max_tokens: + truncated = self.token_manager.truncate_to_tokens( + chunk_text, self.max_tokens + ) + self._enhance_cache[cache_key] = truncated + return truncated + + self._enhance_cache[cache_key] = chunk_text + return chunk_text + + except Exception as e: + print(f"Error procesando chunk: {e}") + if current_tokens > self.max_tokens: + truncated = self.token_manager.truncate_to_tokens( + chunk_text, self.max_tokens + ) + self._enhance_cache[cache_key] = truncated + return truncated + + self._enhance_cache[cache_key] = chunk_text + return chunk_text + + def process_chunks_batch( + self, chunks: List[LangchainDocument], merge_related: bool = False + ) -> List[LangchainDocument]: + processed_chunks = [] + total_chunks = len(chunks) + + print(f"Procesando {total_chunks} chunks en lotes de {self.chunks_per_batch}") + if self.custom_instructions: + print( + f"Con instrucciones personalizadas: {self.custom_instructions[:100]}..." + ) + + i = 0 + while i < len(chunks): + batch_start = time.time() + current_chunk = chunks[i] + merged_content = current_chunk.page_content + original_tokens = self.token_manager.count_tokens(merged_content) + + if merge_related and i < len(chunks) - 1: + merge_count = 0 + while i + merge_count < len(chunks) - 1 and self.should_merge_chunks( + merged_content, chunks[i + merge_count + 1].page_content + ): + merge_count += 1 + merged_content += "\n\n" + chunks[i + merge_count].page_content + print(f" Uniendo chunk {i + 1} con chunk {i + merge_count + 1}") + + i += merge_count + + print(f"\nProcesando chunk {i + 1}/{total_chunks}") + print(f" Tokens originales: {original_tokens}") + + enhanced_content = self.enhance_chunk(merged_content) + final_tokens = self.token_manager.count_tokens(enhanced_content) + + processed_chunks.append( + LangchainDocument( + page_content=enhanced_content, + metadata={ + **current_chunk.metadata, + "final_tokens": final_tokens, + }, + ) + ) + + print(f" Tokens finales: {final_tokens}") + print(f" Tiempo de procesamiento: {time.time() - batch_start:.2f}s") + + i += 1 + + if i % self.chunks_per_batch == 0 and i < len(chunks): + print(f"\nCompletados {i}/{total_chunks} chunks") + time.sleep(0.1) + + return processed_chunks + + +class LLMChunker(BaseChunker): + """Implements a chunker that uses an LLM to optimize PDF and text content.""" + + def __init__( + self, + output_dir: str, + model: str, + max_tokens: int = 1000, + target_tokens: int = 800, + gemini_client: VertexAILLM | None = None, + custom_instructions: str = "", + extract_images: bool = True, + max_workers: int = 4, + chunk_size: int = 1000, + chunk_overlap: int = 200, + merge_related: bool = True, + ): + self.output_dir = output_dir + self.model = model + self.client = gemini_client + self.max_workers = max_workers + self.token_manager = TokenManager() + self.custom_instructions = custom_instructions + self.extract_images = extract_images + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.merge_related = merge_related + self._format_cache = {} + + self.chunk_processor = OptimizedChunkProcessor( + model=self.model, + max_tokens=max_tokens, + target_tokens=target_tokens, + gemini_client=gemini_client, + custom_instructions=custom_instructions, + ) + + def process_text(self, text: str) -> List[Document]: + """Processes raw text using the LLM optimizer.""" + print("\n=== Iniciando procesamiento de texto ===") + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap, + length_function=self.token_manager.count_tokens, + separators=["\n\n", "\n", ". ", " ", ""], + ) + # Create dummy LangchainDocuments for compatibility with process_chunks_batch + langchain_docs = text_splitter.create_documents([text]) + + processed_docs = self.chunk_processor.process_chunks_batch( + langchain_docs, self.merge_related + ) + + # Convert from LangchainDocument to our Document TypedDict + final_documents: List[Document] = [ + {"page_content": doc.page_content, "metadata": doc.metadata} + for doc in processed_docs + ] + print( + f"\n=== Procesamiento de texto completado: {len(final_documents)} chunks creados ===" + ) + return final_documents + + def process_path(self, path: Path) -> List[Document]: + """Processes a PDF file, extracts text and images, and optimizes chunks.""" + overall_start = time.time() + print(f"\n=== Iniciando procesamiento optimizado de PDF: {path.name} ===") + # ... (rest of the logic from process_pdf_optimized) + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + + print("\n1. Creando chunks del PDF...") + chunks = self._create_optimized_chunks( + str(path), self.chunk_size, self.chunk_overlap + ) + print(f" Total chunks creados: {len(chunks)}") + + pages_to_extract = set() + if self.extract_images: + print("\n2. Detectando formatos especiales...") + format_results = self.detect_special_format_batch(chunks) + for i, has_special_format in format_results.items(): + if has_special_format: + page_number = chunks[i].metadata.get("page") + if page_number: + pages_to_extract.add(page_number) + print(f" Páginas con formato especial: {sorted(pages_to_extract)}") + + if self.extract_images and pages_to_extract: + print(f"\n3. Extrayendo {len(pages_to_extract)} páginas como imágenes...") + self._extract_pages_parallel(str(path), self.output_dir, pages_to_extract) + + print("\n4. Procesando y optimizando chunks...") + processed_chunks = self.chunk_processor.process_chunks_batch( + chunks, self.merge_related + ) + + if self.extract_images: + final_chunks = self._add_image_references( + processed_chunks, pages_to_extract, str(path), self.output_dir + ) + else: + final_chunks = processed_chunks + + total_time = time.time() - overall_start + print(f"\n=== Procesamiento completado en {total_time:.2f}s ===") + + # Convert from LangchainDocument to our Document TypedDict + final_documents: List[Document] = [ + {"page_content": doc.page_content, "metadata": doc.metadata} + for doc in final_chunks + ] + return final_documents + + def detect_special_format_batch( + self, chunks: List[LangchainDocument] + ) -> dict[int, bool]: + results = {} + chunks_to_process = [] + for i, chunk in enumerate(chunks): + cache_key = hashlib.md5(chunk.page_content.encode()).hexdigest()[:16] + if cache_key in self._format_cache: + results[i] = self._format_cache[cache_key] + else: + chunks_to_process.append((i, chunk, cache_key)) + + if not chunks_to_process: + return results + + if self.client and len(chunks_to_process) > 1: + with ThreadPoolExecutor( + max_workers=min(self.max_workers, len(chunks_to_process)) + ) as executor: + futures = { + executor.submit(self._detect_single_format, chunk): (i, cache_key) + for i, chunk, cache_key in chunks_to_process + } + for future in futures: + i, cache_key = futures[future] + try: + result = future.result() + results[i] = result + self._format_cache[cache_key] = result + except Exception as e: + print(f"Error procesando chunk {i}: {e}") + results[i] = False + else: + for i, chunk, cache_key in chunks_to_process: + result = self._detect_single_format(chunk) + results[i] = result + self._format_cache[cache_key] = result + return results + + def _detect_single_format(self, chunk: LangchainDocument) -> bool: + if not self.client: + content = chunk.page_content + table_indicators = ["│", "├", "┼", "┤", "┬", "┴", "|", "+", "-"] + has_table_chars = any(char in content for char in table_indicators) + has_multiple_columns = content.count("\t") > 10 or content.count(" ") > 20 + return has_table_chars or has_multiple_columns + try: + prompt = f"""¿Contiene este texto tablas estructuradas, diagramas ASCII, o elementos que requieren formato especial? + + Responde SOLO 'SI' o 'NO'. + + Texto: + {chunk.page_content[:1000]}""" + response = self.client.generate(self.model, prompt).text + return response.strip().upper() == "SI" + except Exception as e: + print(f"Error detectando formato: {e}") + return False + + def _create_optimized_chunks( + self, pdf_path: str, chunk_size: int, chunk_overlap: int + ) -> List[LangchainDocument]: + pdf = PdfReader(pdf_path) + chunks = [] + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + length_function=self.token_manager.count_tokens, + separators=["\n\n", "\n", ". ", " ", ""], + ) + for page_num, page in enumerate(pdf.pages, 1): + text = page.extract_text() + if text.strip(): + page_chunks = text_splitter.create_documents( + [text], + metadatas=[ + { + "page": page_num, + "file_name": os.path.basename(pdf_path), + } + ], + ) + chunks.extend(page_chunks) + return chunks + + def _extract_pages_parallel(self, pdf_path: str, output_dir: str, pages: set): + def extract_single_page(page_number): + try: + pdf_filename = os.path.basename(pdf_path) + image_path = os.path.join( + output_dir, f"{page_number}_{pdf_filename}.png" + ) + images = convert_from_path( + pdf_path, + first_page=page_number, + last_page=page_number, + dpi=150, + thread_count=1, + grayscale=False, + ) + if images: + images[0].save(image_path, "PNG", optimize=True) + except Exception as e: + print(f" Error extrayendo página {page_number}: {e}") + + with ThreadPoolExecutor( + max_workers=min(self.max_workers, len(pages)) + ) as executor: + futures = [executor.submit(extract_single_page, page) for page in pages] + for future in futures: + future.result() # Wait for completion + + def _add_image_references( + self, + chunks: List[LangchainDocument], + pages_to_extract: set, + pdf_path: str, + output_dir: str, + ) -> List[LangchainDocument]: + pdf_filename = os.path.basename(pdf_path) + for chunk in chunks: + page_number = chunk.metadata.get("page") + if page_number in pages_to_extract: + image_path = os.path.join( + output_dir, f"page_{page_number}_{pdf_filename}.png" + ) + if os.path.exists(image_path): + image_reference = ( + f"\n[IMAGEN DISPONIBLE - Página {page_number}: {image_path}]\n" + ) + chunk.page_content = image_reference + chunk.page_content + chunk.metadata["has_image"] = True + chunk.metadata["image_path"] = image_path + return chunks + + +app = typer.Typer() + + +@app.command() +def main( + pdf_path: Annotated[str, typer.Argument(help="Ruta al archivo PDF")], + output_dir: Annotated[ + str, typer.Argument(help="Directorio de salida para imágenes y chunks") + ], + model: Annotated[ + str, typer.Option(help="Modelo a usar para el procesamiento") + ] = "gemini-2.0-flash", + max_tokens: Annotated[ + int, typer.Option(help="Límite máximo de tokens por chunk") + ] = 950, + target_tokens: Annotated[ + int, typer.Option(help="Tokens objetivo para optimización") + ] = 800, + chunk_size: Annotated[int, typer.Option(help="Tamaño base de chunks")] = 1000, + chunk_overlap: Annotated[int, typer.Option(help="Solapamiento entre chunks")] = 200, + merge_related: Annotated[ + bool, typer.Option(help="Si unir chunks relacionados") + ] = True, + custom_instructions: Annotated[ + str, typer.Option(help="Instrucciones adicionales para optimización") + ] = "", + extract_images: Annotated[ + bool, + typer.Option(help="Si True, extrae páginas con formato especial como imágenes"), + ] = True, +): + """ + Función principal para procesar PDFs con control completo de tokens. + """ + settings = Settings() + llm = VertexAILLM( + project=settings.project_id, + location=settings.location, + ) + + chunker = LLMChunker( + output_dir=output_dir, + model=model, + max_tokens=max_tokens, + target_tokens=target_tokens, + gemini_client=llm, + custom_instructions=custom_instructions, + extract_images=extract_images, + max_workers=4, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + merge_related=merge_related, + ) + + documents = chunker.process_path(Path(pdf_path)) + print(f"Processed {len(documents)} documents.") + + output_file_path = os.path.join(output_dir, "chunked_documents.jsonl") + with open(output_file_path, "w", encoding="utf-8") as f: + for doc in documents: + f.write(json.dumps(doc, ensure_ascii=False) + "\n") + + print(f"Saved {len(documents)} documents to {output_file_path}") + + +if __name__ == "__main__": + app() diff --git a/packages/chunker/src/chunker/py.typed b/packages/chunker/src/chunker/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/chunker/src/chunker/recursive_chunker.py b/packages/chunker/src/chunker/recursive_chunker.py new file mode 100644 index 0000000..8e3b4ac --- /dev/null +++ b/packages/chunker/src/chunker/recursive_chunker.py @@ -0,0 +1,80 @@ +import json +import os +from pathlib import Path +from typing import Annotated, List + +import chonkie +import typer + +from .base_chunker import BaseChunker, Document + + +class RecursiveChunker(BaseChunker): + """A chunker that uses the chonkie RecursiveChunker.""" + + def __init__(self) -> None: + """Initializes the RecursiveChunker.""" + self.processor = chonkie.RecursiveChunker() + + def process_text(self, text: str) -> List[Document]: + """ + Processes a string of text into a list of Document chunks. + + Args: + text: The input string to process. + + Returns: + A list of Document objects. + """ + chunks = self.processor(text) + documents: List[Document] = [] + for i, chunk in enumerate(chunks): + doc: Document = { + "page_content": chunk.text, + "metadata": {"chunk_index": i}, + } + documents.append(doc) + return documents + + +app = typer.Typer() + + +@app.command() +def main( + input_file_path: Annotated[ + str, typer.Argument(help="Path to the input text file.") + ], + output_dir: Annotated[ + str, typer.Argument(help="Directory to save the output file.") + ], +): + """ + Processes a text file using RecursiveChunker and saves the output to a JSONL file. + """ + print(f"Starting to process {input_file_path}...") + + # 1. Instantiate chunker and process the file using the inherited method + chunker = RecursiveChunker() + documents = chunker.process_path(Path(input_file_path)) + + print(f"Successfully created {len(documents)} chunks.") + + # 2. Prepare and save the output + if not os.path.exists(output_dir): + os.makedirs(output_dir) + print(f"Created output directory: {output_dir}") + + output_file_path = os.path.join(output_dir, "chunked_documents.jsonl") + + with open(output_file_path, "w", encoding="utf-8") as f: + for doc in documents: + # Add source file info to metadata before writing + doc["metadata"]["source_file"] = os.path.basename(input_file_path) + f.write(json.dumps(doc, ensure_ascii=False) + "\n") + + print(f"Successfully saved {len(documents)} chunks to {output_file_path}") + + +if __name__ == "__main__": + app() diff --git a/packages/document-converter/.python-version b/packages/document-converter/.python-version new file mode 100644 index 0000000..c8cfe39 --- /dev/null +++ b/packages/document-converter/.python-version @@ -0,0 +1 @@ +3.10 diff --git a/packages/document-converter/README.md b/packages/document-converter/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/document-converter/pyproject.toml b/packages/document-converter/pyproject.toml new file mode 100644 index 0000000..467a270 --- /dev/null +++ b/packages/document-converter/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "document-converter" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Anibal Angulo", email = "a8065384@banorte.com" } +] +requires-python = ">=3.12" +dependencies = [ + "markitdown[pdf]>=0.1.2", + "pypdf>=6.1.2", +] + +[project.scripts] +convert-md = "document_converter.markdown:app" + +[build-system] +requires = ["uv_build>=0.8.3,<0.9.0"] +build-backend = "uv_build" diff --git a/packages/document-converter/src/document_converter/__init__.py b/packages/document-converter/src/document_converter/__init__.py new file mode 100644 index 0000000..5f35dae --- /dev/null +++ b/packages/document-converter/src/document_converter/__init__.py @@ -0,0 +1,2 @@ +def hello() -> str: + return "Hello from document-converter!" diff --git a/packages/document-converter/src/document_converter/base.py b/packages/document-converter/src/document_converter/base.py new file mode 100644 index 0000000..5f66102 --- /dev/null +++ b/packages/document-converter/src/document_converter/base.py @@ -0,0 +1,35 @@ +from abc import ABC, abstractmethod +from typing import List + + +class BaseConverter(ABC): + """ + Abstract base class for a remote file processor. + + This class defines the interface for listing and processing files from a remote source. + """ + + @abstractmethod + def process_file(self, file: str) -> str: + """ + Processes a single file from a remote source and returns the result. + + Args: + file: The path to the file to be processed from the remote source. + + Returns: + A string containing the processing result for the file. + """ + ... + + def process_files(self, files: List[str]) -> List[str]: + """ + Processes a list of files from a remote source and returns the results. + + Args: + files: A list of file paths to be processed from the remote source. + + Returns: + A list of strings containing the processing results for each file. + """ + return [self.process_file(file) for file in files] diff --git a/packages/document-converter/src/document_converter/markdown.py b/packages/document-converter/src/document_converter/markdown.py new file mode 100644 index 0000000..a30075a --- /dev/null +++ b/packages/document-converter/src/document_converter/markdown.py @@ -0,0 +1,131 @@ +from pathlib import Path +from typing import Annotated, BinaryIO, Union + +import typer +from markitdown import MarkItDown +from rich.console import Console +from rich.progress import Progress + +from .base import BaseConverter + + +class MarkdownConverter(BaseConverter): + """Converts PDF documents to Markdown format.""" + + def __init__(self) -> None: + """Initializes the MarkItDown converter.""" + self.markitdown = MarkItDown(enable_plugins=False) + + def process_file(self, file_stream: Union[str, Path, BinaryIO]) -> str: + """ + Processes a single file and returns the result as a markdown string. + + Args: + file_stream: A file path (string or Path) or a binary file stream. + + Returns: + The converted markdown content as a string. + """ + result = self.markitdown.convert(file_stream) + return result.text_content + + +# --- CLI Application --- + +app = typer.Typer() + + +@app.command() +def main( + input_path: Annotated[ + Path, + typer.Argument( + help="Path to the input PDF file or directory.", + exists=True, + file_okay=True, + dir_okay=True, + readable=True, + resolve_path=True, + ), + ], + output_path: Annotated[ + Path, + typer.Argument( + help="Path for the output Markdown file or directory.", + file_okay=True, + dir_okay=True, + writable=True, + resolve_path=True, + ), + ], +): + """ + Converts a PDF file or a directory of PDF files into Markdown. + """ + console = Console() + converter = MarkdownConverter() + + if input_path.is_dir(): + # --- Directory Processing --- + console.print(f"[bold green]Processing directory:[/bold green] {input_path}") + output_dir = output_path + + if output_dir.exists() and not output_dir.is_dir(): + console.print( + f"[bold red]Error:[/bold red] Input is a directory, but output path '{output_dir}' is an existing file." + ) + raise typer.Exit(code=1) + + pdf_files = sorted(list(input_path.rglob("*.pdf"))) + if not pdf_files: + console.print("[yellow]No PDF files found in the input directory.[/yellow]") + return + + console.print(f"Found {len(pdf_files)} PDF files to convert.") + output_dir.mkdir(parents=True, exist_ok=True) + + with Progress(console=console) as progress: + task = progress.add_task("[cyan]Converting...", total=len(pdf_files)) + for pdf_file in pdf_files: + relative_path = pdf_file.relative_to(input_path) + output_md_path = output_dir.joinpath(relative_path).with_suffix(".md") + output_md_path.parent.mkdir(parents=True, exist_ok=True) + + progress.update(task, description=f"Processing {pdf_file.name}") + try: + markdown_content = converter.process_file(pdf_file) + output_md_path.write_text(markdown_content, encoding="utf-8") + except Exception as e: + console.print( + f"\n[bold red]Failed to process {pdf_file.name}:[/bold red] {e}" + ) + progress.advance(task) + + console.print( + f"[bold green]Conversion complete.[/bold green] Output directory: {output_dir}" + ) + + elif input_path.is_file(): + # --- Single File Processing --- + console.print(f"[bold green]Processing file:[/bold green] {input_path.name}") + final_output_path = output_path + + # If output path is a directory, create a file inside it + if output_path.is_dir(): + final_output_path = output_path / input_path.with_suffix(".md").name + + final_output_path.parent.mkdir(parents=True, exist_ok=True) + + try: + markdown_content = converter.process_file(input_path) + final_output_path.write_text(markdown_content, encoding="utf-8") + console.print( + f"[bold green]Successfully converted file to:[/bold green] {final_output_path}" + ) + except Exception as e: + console.print(f"[bold red]Error processing file:[/bold red] {e}") + raise typer.Exit(code=1) + + +if __name__ == "__main__": + app() diff --git a/packages/document-converter/src/document_converter/py.typed b/packages/document-converter/src/document_converter/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/embedder/.python-version b/packages/embedder/.python-version new file mode 100644 index 0000000..c8cfe39 --- /dev/null +++ b/packages/embedder/.python-version @@ -0,0 +1 @@ +3.10 diff --git a/packages/embedder/README.md b/packages/embedder/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/embedder/pyproject.toml b/packages/embedder/pyproject.toml new file mode 100644 index 0000000..4099b91 --- /dev/null +++ b/packages/embedder/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "embedder" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Anibal Angulo", email = "a8065384@banorte.com" } +] +requires-python = ">=3.12" +dependencies = [ + "google-cloud-aiplatform>=1.106.0", +] + +[build-system] +requires = ["uv_build>=0.8.3,<0.9.0"] +build-backend = "uv_build" diff --git a/packages/embedder/src/embedder/__init__.py b/packages/embedder/src/embedder/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/embedder/src/embedder/base.py b/packages/embedder/src/embedder/base.py new file mode 100644 index 0000000..1513556 --- /dev/null +++ b/packages/embedder/src/embedder/base.py @@ -0,0 +1,79 @@ +from abc import ABC, abstractmethod +from typing import List + +import numpy as np + + +class BaseEmbedder(ABC): + """Base class for all embedding models.""" + + @abstractmethod + def generate_embedding(self, text: str) -> List[float]: + """ + Generate embeddings for text. + + Args: + text: Single text string or list of texts + + Returns: + Single embedding vector or list of embedding vectors + """ + pass + + @abstractmethod + def generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]: + """ + Generate embeddings for a batch of texts. + + Args: + texts: List of text strings + + Returns: + List of embedding vectors + """ + pass + + def preprocess_text( + self, + text: str, + *, + into_lowercase: bool = False, + normalize_whitespace: bool = True, + remove_punctuation: bool = False, + ) -> str: + """Preprocess text before embedding.""" + # Basic preprocessing + text = text.strip() + + if into_lowercase: + text = text.lower() + + if normalize_whitespace: + text = " ".join(text.split()) + + if remove_punctuation: + import string + + text = text.translate(str.maketrans("", "", string.punctuation)) + + return text + + def normalize_embedding(self, embedding: List[float]) -> List[float]: + """Normalize embedding vector to unit length.""" + norm = np.linalg.norm(embedding) + if norm > 0: + return (np.array(embedding) / norm).tolist() + return embedding + + @abstractmethod + async def async_generate_embedding(self, text: str) -> List[float]: + """ + Generate embeddings for text. + + Args: + text: Single text string or list of texts + + Returns: + Single embedding vector or list of embedding vectors + """ + pass diff --git a/packages/embedder/src/embedder/py.typed b/packages/embedder/src/embedder/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/embedder/src/embedder/vertex_ai.py b/packages/embedder/src/embedder/vertex_ai.py new file mode 100644 index 0000000..b35c2ae --- /dev/null +++ b/packages/embedder/src/embedder/vertex_ai.py @@ -0,0 +1,77 @@ +import logging +import time +from typing import List + +from google import genai +from google.genai import types +from tenacity import retry, stop_after_attempt, wait_exponential + +from .base import BaseEmbedder + +logger = logging.getLogger(__name__) + + +class VertexAIEmbedder(BaseEmbedder): + """Embedder using Vertex AI text embedding models.""" + + def __init__( + self, model_name: str, project: str, location: str, task: str = "RETRIEVAL_DOCUMENT" + ) -> None: + self.model_name = model_name + self.client = genai.Client( + vertexai=True, + project=project, + location=location, + ) + self.task = task + + # @retry( + # stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=30) + # ) + def generate_embedding(self, text: str) -> List[float]: + preprocessed_text = self.preprocess_text(text) + result = self.client.models.embed_content( + model=self.model_name, contents=preprocessed_text, config=types.EmbedContentConfig(task_type=self.task) + ) + return result.embeddings[0].values + + # @retry( + # stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=30) + # ) + def generate_embeddings_batch( + self, texts: List[str], batch_size: int = 10 + ) -> List[List[float]]: + """Generate embeddings for a batch of texts.""" + if not texts: + return [] + + # Preprocess texts + preprocessed_texts = [self.preprocess_text(text) for text in texts] + + # Process in batches if necessary + all_embeddings = [] + + for i in range(0, len(preprocessed_texts), batch_size): + batch = preprocessed_texts[i : i + batch_size] + + # Generate embeddings for batch + result = self.client.models.embed_content( + model=self.model_name, contents=batch, config=types.EmbedContentConfig(task_type=self.task) + ) + + # Extract values + batch_embeddings = [emb.values for emb in result.embeddings] + all_embeddings.extend(batch_embeddings) + + # Rate limiting + if i + batch_size < len(preprocessed_texts): + time.sleep(0.1) # Small delay between batches + + return all_embeddings + + async def async_generate_embedding(self, text: str) -> List[float]: + preprocessed_text = self.preprocess_text(text) + result = await self.client.aio.models.embed_content( + model=self.model_name, contents=preprocessed_text, config=types.EmbedContentConfig(task_type=self.task) + ) + return result.embeddings[0].values diff --git a/packages/file-storage/.python-version b/packages/file-storage/.python-version new file mode 100644 index 0000000..c8cfe39 --- /dev/null +++ b/packages/file-storage/.python-version @@ -0,0 +1 @@ +3.10 diff --git a/packages/file-storage/README.md b/packages/file-storage/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/file-storage/pyproject.toml b/packages/file-storage/pyproject.toml new file mode 100644 index 0000000..d18dc2a --- /dev/null +++ b/packages/file-storage/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "file-storage" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Anibal Angulo", email = "a8065384@banorte.com" } +] +requires-python = ">=3.12" +dependencies = [ + "gcloud-aio-storage>=9.6.1", + "google-cloud-storage>=2.19.0", + "aiohttp>=3.10.11,<4", + "typer>=0.12.3", +] + +[project.scripts] +file-storage = "file_storage.cli:app" + +[build-system] +requires = ["uv_build>=0.8.3,<0.9.0"] +build-backend = "uv_build" diff --git a/packages/file-storage/src/file_storage/__init__.py b/packages/file-storage/src/file_storage/__init__.py new file mode 100644 index 0000000..2c8bd2b --- /dev/null +++ b/packages/file-storage/src/file_storage/__init__.py @@ -0,0 +1,2 @@ +def hello() -> str: + return "Hello from file-storage!" diff --git a/packages/file-storage/src/file_storage/base.py b/packages/file-storage/src/file_storage/base.py new file mode 100644 index 0000000..fc17d59 --- /dev/null +++ b/packages/file-storage/src/file_storage/base.py @@ -0,0 +1,48 @@ +from abc import ABC, abstractmethod +from typing import BinaryIO, List, Optional + + +class BaseFileStorage(ABC): + """ + Abstract base class for a remote file processor. + + This class defines the interface for listing and processing files from a remote source. + """ + + @abstractmethod + def upload_file( + self, + file_path: str, + destination_blob_name: str, + content_type: Optional[str] = None, + ) -> None: + """ + Uploads a file to the remote source. + + Args: + file_path: The local path to the file to upload. + destination_blob_name: The name of the file in the remote source. + content_type: The content type of the file. + """ + ... + + @abstractmethod + def list_files(self, path: Optional[str] = None) -> List[str]: + """ + Lists files from a remote location. + + Args: + path: The path to a specific file or directory in the remote bucket. + If None, it recursively lists all files in the bucket. + + Returns: + A list of file paths. + """ + ... + + @abstractmethod + def get_file_stream(self, file_name: str) -> BinaryIO: + """ + Gets a file from the remote source and returns it as a file-like object. + """ + ... diff --git a/packages/file-storage/src/file_storage/cli.py b/packages/file-storage/src/file_storage/cli.py new file mode 100644 index 0000000..3f656ff --- /dev/null +++ b/packages/file-storage/src/file_storage/cli.py @@ -0,0 +1,89 @@ +import os +from typing import Annotated + +import rich +import typer + +from rag_eval.config import settings + +from .google_cloud import GoogleCloudFileStorage + +app = typer.Typer() + + +def get_storage_client() -> GoogleCloudFileStorage: + return GoogleCloudFileStorage(bucket=settings.bucket) + + +@app.command("upload") +def upload( + file_path: str, + destination_blob_name: str, + content_type: Annotated[str, typer.Option()] = None, +): + """ + Uploads a file or directory to the remote source. + """ + storage_client = get_storage_client() + if os.path.isdir(file_path): + for root, _, files in os.walk(file_path): + for file in files: + local_file_path = os.path.join(root, file) + # preserve the directory structure and use forward slashes for blob name + dest_blob_name = os.path.join( + destination_blob_name, os.path.relpath(local_file_path, file_path) + ).replace(os.sep, "/") + storage_client.upload_file( + local_file_path, dest_blob_name, content_type + ) + rich.print( + f"[green]File {local_file_path} uploaded to {dest_blob_name}.[/green]" + ) + rich.print( + f"[bold green]Directory {file_path} uploaded to {destination_blob_name}.[/bold green]" + ) + else: + storage_client.upload_file(file_path, destination_blob_name, content_type) + rich.print( + f"[green]File {file_path} uploaded to {destination_blob_name}.[/green]" + ) + + +@app.command("list") +def list_items(path: Annotated[str, typer.Option()] = None): + """ + Obtain a list of all files at the given location inside the remote bucket + If path is none, recursively shows all files in the remote bucket. + """ + storage_client = get_storage_client() + files = storage_client.list_files(path) + for file in files: + rich.print(f"[blue]{file}[/blue]") + + +@app.command("download") +def download(file_name: str, destination_path: str): + """ + Gets a file from the remote source and returns it as a file-like object. + """ + storage_client = get_storage_client() + file_stream = storage_client.get_file_stream(file_name) + with open(destination_path, "wb") as f: + f.write(file_stream.read()) + rich.print(f"[green]File {file_name} downloaded to {destination_path}[/green]") + + +@app.command("delete") +def delete(path: str): + """ + Deletes all files at the given location inside the remote bucket. + If path is a single file, it will delete only that file. + If path is a directory, it will delete all files in that directory. + """ + storage_client = get_storage_client() + storage_client.delete_files(path) + rich.print(f"[bold red]Files at {path} deleted.[/bold red]") + + +if __name__ == "__main__": + app() diff --git a/packages/file-storage/src/file_storage/google_cloud.py b/packages/file-storage/src/file_storage/google_cloud.py new file mode 100644 index 0000000..01e06fc --- /dev/null +++ b/packages/file-storage/src/file_storage/google_cloud.py @@ -0,0 +1,138 @@ +import asyncio +import io +import logging +from typing import BinaryIO, List, Optional + +import aiohttp +from gcloud.aio.storage import Storage +from google.cloud import storage + +from .base import BaseFileStorage + +logger = logging.getLogger(__name__) + + +class GoogleCloudFileStorage(BaseFileStorage): + def __init__(self, bucket: str) -> None: + self.bucket_name = bucket + + self.storage_client = storage.Client() + self.bucket_client = self.storage_client.bucket(self.bucket_name) + self._aio_session: aiohttp.ClientSession | None = None + self._aio_storage: Storage | None = None + self._cache: dict[str, bytes] = {} + + def upload_file( + self, + file_path: str, + destination_blob_name: str, + content_type: Optional[str] = None, + ) -> None: + """ + Uploads a file to the remote source. + + Args: + file_path: The local path to the file to upload. + destination_blob_name: The name of the file in the remote source. + content_type: The content type of the file. + """ + blob = self.bucket_client.blob(destination_blob_name) + blob.upload_from_filename( + file_path, + content_type=content_type, + if_generation_match=0, + ) + self._cache.pop(destination_blob_name, None) + + def list_files(self, path: Optional[str] = None) -> List[str]: + """ + Obtain a list of all files at the given location inside the remote bucket + If path is none, recursively shows all files in the remote bucket. + """ + blobs = self.storage_client.list_blobs(self.bucket_name, prefix=path) + return [blob.name for blob in blobs] + + def get_file_stream(self, file_name: str) -> BinaryIO: + """ + Gets a file from the remote source and returns it as a file-like object. + """ + if file_name not in self._cache: + blob = self.bucket_client.blob(file_name) + self._cache[file_name] = blob.download_as_bytes() + file_stream = io.BytesIO(self._cache[file_name]) + file_stream.name = file_name + return file_stream + + def _get_aio_session(self) -> aiohttp.ClientSession: + if self._aio_session is None or self._aio_session.closed: + connector = aiohttp.TCPConnector(limit=300, limit_per_host=50) + timeout = aiohttp.ClientTimeout(total=60) + self._aio_session = aiohttp.ClientSession( + timeout=timeout, connector=connector + ) + return self._aio_session + + def _get_aio_storage(self) -> Storage: + if self._aio_storage is None: + self._aio_storage = Storage(session=self._get_aio_session()) + return self._aio_storage + + async def async_get_file_stream( + self, file_name: str, max_retries: int = 3 + ) -> BinaryIO: + """ + Gets a file from the remote source asynchronously and returns it as a file-like object. + Retries on transient errors (429, 5xx, timeouts) with exponential backoff. + """ + if file_name in self._cache: + file_stream = io.BytesIO(self._cache[file_name]) + file_stream.name = file_name + return file_stream + + storage_client = self._get_aio_storage() + last_exception: Exception | None = None + + for attempt in range(max_retries): + try: + self._cache[file_name] = await storage_client.download( + self.bucket_name, file_name + ) + file_stream = io.BytesIO(self._cache[file_name]) + file_stream.name = file_name + return file_stream + except asyncio.TimeoutError as exc: + last_exception = exc + logger.warning( + "Timeout downloading gs://%s/%s (attempt %d/%d)", + self.bucket_name, file_name, attempt + 1, max_retries, + ) + except aiohttp.ClientResponseError as exc: + last_exception = exc + if exc.status == 429 or exc.status >= 500: + logger.warning( + "HTTP %d downloading gs://%s/%s (attempt %d/%d)", + exc.status, self.bucket_name, file_name, + attempt + 1, max_retries, + ) + else: + raise + + if attempt < max_retries - 1: + delay = 0.5 * (2 ** attempt) + await asyncio.sleep(delay) + + raise TimeoutError( + f"Failed to download gs://{self.bucket_name}/{file_name} " + f"after {max_retries} attempts" + ) from last_exception + + def delete_files(self, path: str) -> None: + """ + Deletes all files at the given location inside the remote bucket. + If path is a single file, it will delete only that file. + If path is a directory, it will delete all files in that directory. + """ + blobs = self.storage_client.list_blobs(self.bucket_name, prefix=path) + for blob in blobs: + blob.delete() + self._cache.pop(blob.name, None) diff --git a/packages/file-storage/src/file_storage/py.typed b/packages/file-storage/src/file_storage/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/llm/.python-version b/packages/llm/.python-version new file mode 100644 index 0000000..c8cfe39 --- /dev/null +++ b/packages/llm/.python-version @@ -0,0 +1 @@ +3.10 diff --git a/packages/llm/README.md b/packages/llm/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/llm/pyproject.toml b/packages/llm/pyproject.toml new file mode 100644 index 0000000..3abc91f --- /dev/null +++ b/packages/llm/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "llm" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Anibal Angulo", email = "a8065384@banorte.com" } +] +requires-python = ">=3.12" +dependencies = [ + "google-genai>=1.20.0", + "pydantic>=2.11.7", + "tenacity>=9.1.2", +] + +[build-system] +requires = ["uv_build>=0.8.3,<0.9.0"] +build-backend = "uv_build" diff --git a/packages/llm/src/llm/__init__.py b/packages/llm/src/llm/__init__.py new file mode 100644 index 0000000..350844e --- /dev/null +++ b/packages/llm/src/llm/__init__.py @@ -0,0 +1,2 @@ +def hello() -> str: + return "Hello from llm!" diff --git a/packages/llm/src/llm/base.py b/packages/llm/src/llm/base.py new file mode 100644 index 0000000..c0e7a41 --- /dev/null +++ b/packages/llm/src/llm/base.py @@ -0,0 +1,128 @@ +from abc import ABC, abstractmethod +from typing import Any, Type, TypeVar + +from pydantic import BaseModel, field_validator + + +class ToolCall(BaseModel): + name: str + arguments: dict + +class Usage(BaseModel): + prompt_tokens: int | None = 0 + thought_tokens: int | None = 0 + response_tokens: int | None = 0 + + @field_validator("prompt_tokens", "thought_tokens", "response_tokens", mode="before") + @classmethod + def _validate_tokens(cls, v: int | None) -> int: + return v or 0 + + def __add__(self, other): + return Usage( + prompt_tokens=self.prompt_tokens + other.prompt_tokens, + thought_tokens=self.thought_tokens + other.thought_tokens, + response_tokens=self.response_tokens + other.response_tokens + ) + + def get_cost(self, name: str) -> int: + million = 1000000 + if name == "gemini-2.5-pro": + if self.prompt_tokens > 200000: + input_cost = self.prompt_tokens * (2.5/million) + output_cost = self.thought_tokens * (15/million) + self.response_tokens * (15/million) + else: + input_cost = self.prompt_tokens * (1.25/million) + output_cost = self.thought_tokens * (10/million) + self.response_tokens * (10/million) + return (input_cost + output_cost) * 18.65 + if name == "gemini-2.5-flash": + input_cost = self.prompt_tokens * (0.30/million) + output_cost = self.thought_tokens * (2.5/million) + self.response_tokens * (2.5/million) + return (input_cost + output_cost) * 18.65 + else: + raise Exception("Invalid model") + + +class Generation(BaseModel): + """A class to represent a single generation from a model. + + Attributes: + text: The generated text. + usage: A dictionary containing usage metadata. + """ + + text: str | None = None + tool_calls: list[ToolCall] | None = None + usage: Usage = Usage() + extra: dict = {} + + +T = TypeVar("T", bound=BaseModel) + + +class BaseLLM(ABC): + """An abstract base class for all LLMs.""" + + @abstractmethod + def generate( + self, + model: str, + prompt: Any, + tools: list | None = None, + system_prompt: str | None = None, + ) -> Generation: + """Generates text from a prompt. + + Args: + model: The model to use for generation. + prompt: The prompt to generate text from. + tools: An optional list of tools to use for generation. + system_prompt: An optional system prompt to guide the model's behavior. + + Returns: + A Generation object containing the generated text and usage metadata. + """ + ... + + @abstractmethod + def structured_generation( + self, + model: str, + prompt: Any, + response_model: Type[T], + tools: list | None = None, + ) -> T: + """Generates structured data from a prompt. + + Args: + model: The model to use for generation. + prompt: The prompt to generate text from. + response_model: The pydantic model to parse the response into. + tools: An optional list of tools to use for generation. + + Returns: + An instance of the provided pydantic model. + """ + ... + + @abstractmethod + async def async_generate( + self, + model: str, + prompt: Any, + tools: list | None = None, + system_prompt: str | None = None, + tool_mode: str = "AUTO", + ) -> Generation: + """Generates text from a prompt. + + Args: + model: The model to use for generation. + prompt: The prompt to generate text from. + tools: An optional list of tools to use for generation. + system_prompt: An optional system prompt to guide the model's behavior. + + Returns: + A Generation object containing the generated text and usage metadata. + """ + ... diff --git a/packages/llm/src/llm/py.typed b/packages/llm/src/llm/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/llm/src/llm/vertex_ai.py b/packages/llm/src/llm/vertex_ai.py new file mode 100644 index 0000000..f3e91d7 --- /dev/null +++ b/packages/llm/src/llm/vertex_ai.py @@ -0,0 +1,181 @@ +import logging +from typing import Any, Type + +from google import genai +from google.genai import types +from tenacity import retry, stop_after_attempt, wait_exponential + +from rag_eval.config import settings + +from .base import BaseLLM, Generation, T, ToolCall, Usage + +logger = logging.getLogger(__name__) + + +class VertexAILLM(BaseLLM): + """A class for interacting with the Vertex AI API.""" + + def __init__( + self, project: str | None = None, location: str | None = None, thinking: int = 0 + ) -> None: + """Initializes the VertexAILLM client. + Args: + project: The Google Cloud project ID. + location: The Google Cloud location. + """ + self.client = genai.Client( + vertexai=True, + project=project or settings.project_id, + location=location or settings.location, + ) + self.thinking_budget = thinking + + # @retry( + # wait=wait_exponential(multiplier=1, min=2, max=60), + # stop=stop_after_attempt(3), + # reraise=True, + # ) + def generate( + self, + model: str, + prompt: Any, + tools: list = [], + system_prompt: str | None = None, + tool_mode: str = "AUTO", + ) -> Generation: + """Generates text using the specified model and prompt. + Args: + model: The name of the model to use for generation. + prompt: The prompt to use for generation. + tools: A list of tools to use for generation. + system_prompt: An optional system prompt to guide the model's behavior. + Returns: + A Generation object containing the generated text and usage metadata. + """ + logger.debug("Entering VertexAILLM.generate") + logger.debug(f"Model: {model}, Tool Mode: {tool_mode}") + logger.debug(f"System prompt: {system_prompt}") + logger.debug("Calling Vertex AI API: models.generate_content...") + response = self.client.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + tools=tools, + system_instruction=system_prompt, + thinking_config=genai.types.ThinkingConfig( + thinking_budget=self.thinking_budget + ), + tool_config=types.ToolConfig( + function_calling_config=types.FunctionCallingConfig( + mode=tool_mode + ) + ) + ), + ) + logger.debug("Received response from Vertex AI API.") + logger.debug(f"API Response: {response}") + + return self._create_generation(response) + + + # @retry( + # wait=wait_exponential(multiplier=1, min=2, max=60), + # stop=stop_after_attempt(3), + # reraise=True, + # ) + def structured_generation( + self, + model: str, + prompt: Any, + response_model: Type[T], + system_prompt: str | None = None, + tools: list | None = None, + ) -> T: + """Generates structured data from a prompt. + Args: + model: The model to use for generation. + prompt: The prompt to generate text from. + response_model: The pydantic model to parse the response into. + tools: An optional list of tools to use for generation. + Returns: + An instance of the provided pydantic model. + """ + config = genai.types.GenerateContentConfig( + response_mime_type="application/json", + response_schema=response_model, + system_instruction=system_prompt, + tools=tools, + ) + + response: genai.types.GenerateContentResponse = ( + self.client.models.generate_content( + model=model, contents=prompt, config=config + ) + ) + + return response_model.model_validate_json(response.text) + + # @retry( + # wait=wait_exponential(multiplier=1, min=2, max=60), + # stop=stop_after_attempt(3), + # reraise=True, + # ) + async def async_generate( + self, + model: str, + prompt: Any, + tools: list = [], + system_prompt: str | None = None, + tool_mode: str = "AUTO", + ) -> Generation: + response = await self.client.aio.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + tools=tools, + system_instruction=system_prompt, + thinking_config=genai.types.ThinkingConfig( + thinking_budget=self.thinking_budget + ), + tool_config=types.ToolConfig( + function_calling_config=types.FunctionCallingConfig( + mode=tool_mode + ) + ), + ), + ) + + return self._create_generation(response) + + + def _create_generation(self, response): + logger.debug("Creating Generation object from API response.") + m=response.usage_metadata + usage = Usage( + prompt_tokens=m.prompt_token_count, + thought_tokens=m.thoughts_token_count or 0, + response_tokens=m.candidates_token_count + ) + + logger.debug(f"{usage=}") + logger.debug(f"{response=}") + + candidate = response.candidates[0] + + tool_calls = [] + + for part in candidate.content.parts: + if fn := part.function_call: + tool_calls.append(ToolCall(name=fn.name, arguments=fn.args)) + + if len(tool_calls) > 0: + logger.debug(f"Found {len(tool_calls)} tool calls.") + return Generation( + tool_calls=tool_calls, + usage=usage, + extra={"original_content": candidate.content} + ) + + logger.debug("No tool calls found, returning text response.") + text = candidate.content.parts[0].text + return Generation(text=text, usage=usage) diff --git a/packages/utils/README.md b/packages/utils/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/utils/pyproject.toml b/packages/utils/pyproject.toml new file mode 100644 index 0000000..fdad8b3 --- /dev/null +++ b/packages/utils/pyproject.toml @@ -0,0 +1,17 @@ +[project] +name = "utils" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Anibal Angulo", email = "a8065384@banorte.com" } +] +requires-python = ">=3.12" +dependencies = [] + +[project.scripts] +normalize-filenames = "utils.normalize_filenames:app" + +[build-system] +requires = ["uv_build>=0.8.3,<0.9.0"] +build-backend = "uv_build" diff --git a/packages/utils/src/utils/__init__.py b/packages/utils/src/utils/__init__.py new file mode 100644 index 0000000..eeedf57 --- /dev/null +++ b/packages/utils/src/utils/__init__.py @@ -0,0 +1,2 @@ +def hello() -> str: + return "Hello from utils!" diff --git a/packages/utils/src/utils/normalize_filenames.py b/packages/utils/src/utils/normalize_filenames.py new file mode 100644 index 0000000..22bfacc --- /dev/null +++ b/packages/utils/src/utils/normalize_filenames.py @@ -0,0 +1,115 @@ +"""Normalize filenames in a directory.""" + +import pathlib +import re +import unicodedata + +import typer +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +app = typer.Typer() + + +def normalize_string(s: str) -> str: + """Normalizes a string to be a valid filename.""" + # 1. Decompose Unicode characters into base characters and diacritics + nfkd_form = unicodedata.normalize("NFKD", s) + # 2. Keep only the base characters (non-diacritics) + only_ascii = "".join([c for c in nfkd_form if not unicodedata.combining(c)]) + # 3. To lowercase + only_ascii = only_ascii.lower() + # 4. Replace spaces with underscores + only_ascii = re.sub(r"\s+", "_", only_ascii) + # 5. Remove any characters that are not alphanumeric, underscores, dots, or hyphens + only_ascii = re.sub(r"[^a-z0-9_.-]", "", only_ascii) + return only_ascii + + +def truncate_string(s: str) -> str: + """given a string with /, return a string with only the text after the last /""" + return pathlib.Path(s).name + + +def remove_extension(s: str) -> str: + """Given a string, if it has a extension like .pdf, remove it and return the new string""" + return str(pathlib.Path(s).with_suffix("")) + + +def remove_duplicate_vowels(s: str) -> str: + """Removes consecutive duplicate vowels (a, e, i, o, u) from a string.""" + return re.sub(r"([aeiou])\1+", r"\1", s, flags=re.IGNORECASE) + + +@app.callback(invoke_without_command=True) +def normalize_filenames( + directory: str = typer.Argument( + ..., help="The path to the directory containing files to normalize." + ), +): + """Normalizes all filenames in a directory.""" + console = Console() + console.print( + Panel( + f"Normalizing filenames in directory: [bold cyan]{directory}[/bold cyan]", + title="[bold green]Filename Normalizer[/bold green]", + expand=False, + ) + ) + + source_path = pathlib.Path(directory) + if not source_path.is_dir(): + console.print(f"[bold red]Error: Directory not found at {directory}[/bold red]") + raise typer.Exit(code=1) + + files_to_rename = [p for p in source_path.rglob("*") if p.is_file()] + + if not files_to_rename: + console.print( + f"[bold yellow]No files found in {directory} to normalize.[/bold yellow]" + ) + return + + table = Table(title="File Renaming Summary") + table.add_column("Original Name", style="cyan", no_wrap=True) + table.add_column("New Name", style="magenta", no_wrap=True) + table.add_column("Status", style="green") + + for file_path in files_to_rename: + original_name = file_path.name + file_stem = file_path.stem + file_suffix = file_path.suffix + + normalized_stem = normalize_string(file_stem) + new_name = f"{normalized_stem}{file_suffix}" + + if new_name == original_name: + table.add_row( + original_name, new_name, "[yellow]Skipped (No change)[/yellow]" + ) + continue + + new_path = file_path.with_name(new_name) + + # Handle potential name collisions + counter = 1 + while new_path.exists(): + new_name = f"{normalized_stem}_{counter}{file_suffix}" + new_path = file_path.with_name(new_name) + counter += 1 + + try: + file_path.rename(new_path) + table.add_row(original_name, new_name, "[green]Renamed[/green]") + except OSError as e: + table.add_row(original_name, new_name, f"[bold red]Error: {e}[/bold red]") + + console.print(table) + console.print( + Panel( + f"[bold]Normalization complete.[/bold] Processed [bold blue]{len(files_to_rename)}[/bold blue] files.", + title="[bold green]Complete[/bold green]", + expand=False, + ) + ) diff --git a/packages/utils/src/utils/py.typed b/packages/utils/src/utils/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/vector-search/.python-version b/packages/vector-search/.python-version new file mode 100644 index 0000000..c8cfe39 --- /dev/null +++ b/packages/vector-search/.python-version @@ -0,0 +1 @@ +3.10 diff --git a/packages/vector-search/README.md b/packages/vector-search/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/vector-search/pyproject.toml b/packages/vector-search/pyproject.toml new file mode 100644 index 0000000..68963e6 --- /dev/null +++ b/packages/vector-search/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "vector-search" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +authors = [ + { name = "Anibal Angulo", email = "a8065384@banorte.com" } +] +requires-python = ">=3.12" +dependencies = [ + "embedder", + "file-storage", + "google-cloud-aiplatform>=1.106.0", + "aiohttp>=3.10.11,<4", + "gcloud-aio-auth>=5.3.0", + "google-auth==2.29.0", + "typer>=0.16.1", +] + +[project.scripts] +vector-search = "vector_search.cli:app" + +[build-system] +requires = ["uv_build>=0.8.3,<0.9.0"] +build-backend = "uv_build" + +[tool.uv.sources] +file-storage = { workspace = true } +embedder = { workspace = true } diff --git a/packages/vector-search/src/vector_search/__init__.py b/packages/vector-search/src/vector_search/__init__.py new file mode 100644 index 0000000..9d2ad75 --- /dev/null +++ b/packages/vector-search/src/vector_search/__init__.py @@ -0,0 +1,2 @@ +def hello() -> str: + return "Hello from vector-search!" diff --git a/packages/vector-search/src/vector_search/base.py b/packages/vector-search/src/vector_search/base.py new file mode 100644 index 0000000..4ba16ba --- /dev/null +++ b/packages/vector-search/src/vector_search/base.py @@ -0,0 +1,62 @@ +from abc import ABC, abstractmethod +from typing import List, TypedDict + + +class SearchResult(TypedDict): + id: str + distance: float + content: str + + +class BaseVectorSearch(ABC): + """ + Abstract base class for a vector search provider. + + This class defines the standard interface for creating a vector search index + and running queries against it. + """ + + @abstractmethod + def create_index(self, name: str, content_path: str, **kwargs) -> None: + """ + Creates a new vector search index and populates it with the provided content. + + Args: + name: The desired name for the new index. + content_path: The local file system path to the data that will be used to + populate the index. This is expected to be a JSON file + containing a list of objects, each with an 'id', 'name', + and 'embedding' key. + **kwargs: Additional provider-specific arguments for index creation. + """ + ... + + @abstractmethod + def update_index(self, index_name: str, content_path: str, **kwargs) -> None: + """ + Updates an existing vector search index with new content. + + Args: + index_name: The name of the index to update. + content_path: The local file system path to the data that will be used to + populate the index. + **kwargs: Additional provider-specific arguments for index update. + """ + ... + + @abstractmethod + def run_query( + self, index: str, query: List[float], limit: int + ) -> List[SearchResult]: + """ + Runs a similarity search query against the index. + + Args: + query: The embedding vector to use for the search query. + limit: The maximum number of nearest neighbors to return. + + Returns: + A list of dictionaries, where each dictionary represents a matched item + and contains at least the item's 'id' and the search 'distance'. + """ + ... diff --git a/packages/vector-search/src/vector_search/cli/__init__.py b/packages/vector-search/src/vector_search/cli/__init__.py new file mode 100644 index 0000000..c6c06af --- /dev/null +++ b/packages/vector-search/src/vector_search/cli/__init__.py @@ -0,0 +1,10 @@ +from typer import Typer + +from .create import app as create_callback +from .delete import app as delete_callback +from .query import app as query_callback + +app = Typer() +app.add_typer(create_callback, name="create") +app.add_typer(delete_callback, name="delete") +app.add_typer(query_callback, name="query") diff --git a/packages/vector-search/src/vector_search/cli/create.py b/packages/vector-search/src/vector_search/cli/create.py new file mode 100644 index 0000000..a893892 --- /dev/null +++ b/packages/vector-search/src/vector_search/cli/create.py @@ -0,0 +1,91 @@ +"""Create and deploy a Vertex AI Vector Search index.""" + +from typing import Annotated + +import typer +from rich.console import Console + +from rag_eval.config import settings as config +from vector_search.vertex_ai import GoogleCloudVectorSearch + +app = typer.Typer() + + +@app.callback(invoke_without_command=True) +def create( + path: Annotated[ + str, + typer.Option( + "--path", + "-p", + help="The GCS URI (gs://...) to the directory containing your embedding JSON file(s).", + ), + ], + agent_name: Annotated[ + str, + typer.Option( + "--agent", + "-a", + help="The name of the agent to create the index for.", + ), + ], +): + """Create and deploy a Vertex AI Vector Search index for a specific agent.""" + console = Console() + + try: + console.print( + f"[bold green]Looking up configuration for agent '{agent_name}'...[/bold green]" + ) + agent_config = config.agents.get(agent_name) + if not agent_config: + console.print( + f"[bold red]Agent '{agent_name}' not found in settings.[/bold red]" + ) + raise typer.Exit(code=1) + + if not agent_config.index: + console.print( + f"[bold red]Index configuration not found for agent '{agent_name}'.[/bold red]" + ) + raise typer.Exit(code=1) + + index_config = agent_config.index + + console.print( + f"[bold green]Initializing Vertex AI client for project '{config.project_id}' in '{config.location}'...[/bold green]" + ) + vector_search = GoogleCloudVectorSearch( + project_id=config.project_id, + location=config.location, + bucket=config.bucket, + index_name=index_config.name, + ) + + console.print( + f"[bold green]Starting creation of index '{index_config.name}'...[/bold green]" + ) + console.print("This may take a while.") + vector_search.create_index( + name=index_config.name, + content_path=f"gs://{config.bucket}/{path}", + dimensions=index_config.dimensions, + ) + console.print( + f"[bold green]Index '{index_config.name}' created successfully.[/bold green]" + ) + + console.print("[bold green]Deploying index to a new endpoint...[/bold green]") + console.print("This will also take some time.") + vector_search.deploy_index( + index_name=index_config.name, machine_type=index_config.machine_type + ) + console.print("[bold green]Index deployed successfully![/bold green]") + console.print(f"Endpoint name: {vector_search.index_endpoint.display_name}") + console.print( + f"Endpoint resource name: {vector_search.index_endpoint.resource_name}" + ) + + except Exception as e: + console.print(f"[bold red]An error occurred: {e}[/bold red]") + raise typer.Exit(code=1) diff --git a/packages/vector-search/src/vector_search/cli/delete.py b/packages/vector-search/src/vector_search/cli/delete.py new file mode 100644 index 0000000..59f3fab --- /dev/null +++ b/packages/vector-search/src/vector_search/cli/delete.py @@ -0,0 +1,38 @@ +"""Delete a vector index or endpoint.""" + +import typer +from rich.console import Console + +from rag_eval.config import settings as config +from vector_search.vertex_ai import GoogleCloudVectorSearch + +app = typer.Typer() + + +@app.callback(invoke_without_command=True) +def delete( + id: str = typer.Argument(..., help="The ID of the index or endpoint to delete."), + endpoint: bool = typer.Option( + False, "--endpoint", help="Delete an endpoint instead of an index." + ), +): + """Delete a vector index or endpoint.""" + console = Console() + vector_search = GoogleCloudVectorSearch( + project_id=config.project_id, location=config.location, bucket=config.bucket + ) + + try: + if endpoint: + console.print(f"[bold red]Deleting endpoint {id}...[/bold red]") + vector_search.delete_index_endpoint(id) + console.print( + f"[bold green]Endpoint {id} deleted successfully.[/bold green]" + ) + else: + console.print(f"[bold red]Deleting index {id}...[/bold red]") + vector_search.delete_index(id) + console.print(f"[bold green]Index {id} deleted successfully.[/bold green]") + except Exception as e: + console.print(f"[bold red]An error occurred: {e}[/bold red]") + raise typer.Exit(code=1) diff --git a/packages/vector-search/src/vector_search/cli/generate.py b/packages/vector-search/src/vector_search/cli/generate.py new file mode 100644 index 0000000..dad63cd --- /dev/null +++ b/packages/vector-search/src/vector_search/cli/generate.py @@ -0,0 +1,91 @@ +"""Generate embeddings for documents and save them to a JSON file.""" + +import json +from pathlib import Path + +import typer +from embedder.vertex_ai import VertexAIEmbedder +from file_storage.google_cloud import GoogleCloudFileStorage +from rich.console import Console +from rich.progress import Progress + +from rag_eval.config import Settings + +app = typer.Typer() + + +@app.callback(invoke_without_command=True) +def generate( + path: str = typer.Argument(..., help="The path to the markdown files."), + output_file: str = typer.Option( + ..., + "--output-file", + "-o", + help="The local path to save the output JSON file.", + ), + batch_size: int = typer.Option( + 10, + "--batch-size", + "-b", + help="The batch size for processing files.", + ), + jsonl: bool = typer.Option( + False, + "--jsonl", + help="Output in JSONL format instead of JSON.", + ), +): + """Generate embeddings for documents and save them to a JSON file.""" + config = Settings() + console = Console() + + console.print("[bold green]Starting vector generation...[/bold green]") + + try: + storage = GoogleCloudFileStorage(bucket=config.bucket) + embedder = VertexAIEmbedder(model_name=config.embedding_model) + + remote_files = storage.list_files(path=path) + results = [] + + with Progress(console=console) as progress: + task = progress.add_task( + "[cyan]Generating embeddings...", total=len(remote_files) + ) + + for i in range(0, len(remote_files), batch_size): + batch_files = remote_files[i : i + batch_size] + batch_contents = [] + + for remote_file in batch_files: + file_stream = storage.get_file_stream(remote_file) + batch_contents.append( + file_stream.read().decode("utf-8-sig", errors="replace") + ) + + batch_embeddings = embedder.generate_embeddings_batch(batch_contents) + + for j, remote_file in enumerate(batch_files): + results.append( + {"id": remote_file, "embedding": batch_embeddings[j]} + ) + progress.update(task, advance=1) + + except Exception as e: + console.print( + f"[bold red]An error occurred during vector generation: {e}[/bold red]" + ) + raise typer.Exit(code=1) + + output_path = Path(output_file) + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + if jsonl: + for record in results: + f.write(json.dumps(record) + "\n") + else: + json.dump(results, f, indent=2) + + console.print( + f"[bold green]Embedding generation complete. {len(results)} vectors saved to '{output_path.resolve()}'[/bold green]" + ) diff --git a/packages/vector-search/src/vector_search/cli/query.py b/packages/vector-search/src/vector_search/cli/query.py new file mode 100644 index 0000000..0e8893b --- /dev/null +++ b/packages/vector-search/src/vector_search/cli/query.py @@ -0,0 +1,55 @@ +"""Query the vector search index.""" + +import typer +from embedder.vertex_ai import VertexAIEmbedder +from rich.console import Console +from rich.table import Table +from typer import Argument, Option + +from rag_eval.config import settings as config +from vector_search.vertex_ai import GoogleCloudVectorSearch + +app = typer.Typer() + + +@app.callback(invoke_without_command=True) +def query( + query: str = Argument(..., help="The text query to search for."), + limit: int = Option(5, "--limit", "-l", help="The number of results to return."), +): + """Queries the vector search index.""" + console = Console() + + try: + console.print("[bold green]Initializing clients...[/bold green]") + embedder = VertexAIEmbedder(model_name=config.embedding_model) + vector_search = GoogleCloudVectorSearch( + project_id=config.project_id, location=config.location, bucket=config.bucket + ) + + console.print("[bold green]Loading index endpoint...[/bold green]") + vector_search.load_index_endpoint(config.index.endpoint) + + console.print("[bold green]Generating embedding for query...[/bold green]") + query_embedding = embedder.generate_embedding(query) + + console.print("[bold green]Running search query...[/bold green]") + search_results = vector_search.run_query( + deployed_index_id=config.index.deployment, + query=query_embedding, + limit=limit, + ) + + table = Table(title="Search Results") + table.add_column("ID", justify="left", style="cyan") + table.add_column("Distance", justify="left", style="magenta") + table.add_column("Content", justify="left", style="green") + + for result in search_results: + table.add_row(result["id"], str(result["distance"]), result["content"]) + + console.print(table) + + except Exception as e: + console.print(f"[bold red]An error occurred: {e}[/bold red]") + raise typer.Exit(code=1) diff --git a/packages/vector-search/src/vector_search/py.typed b/packages/vector-search/src/vector_search/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/vector-search/src/vector_search/vertex_ai.py b/packages/vector-search/src/vector_search/vertex_ai.py new file mode 100644 index 0000000..2721820 --- /dev/null +++ b/packages/vector-search/src/vector_search/vertex_ai.py @@ -0,0 +1,255 @@ +import asyncio +from typing import List +from uuid import uuid4 + +import aiohttp +import google.auth +import google.auth.transport.requests +from file_storage.google_cloud import GoogleCloudFileStorage +from gcloud.aio.auth import Token +from google.cloud import aiplatform + +from .base import BaseVectorSearch, SearchResult + + +class GoogleCloudVectorSearch(BaseVectorSearch): + """ + A vector search provider that uses Google Cloud's Vertex AI Vector Search. + """ + + def __init__( + self, project_id: str, location: str, bucket: str, index_name: str = None + ): + """ + Initializes the GoogleCloudVectorSearch client. + + Args: + project_id: The Google Cloud project ID. + location: The Google Cloud location (e.g., 'us-central1'). + bucket: The GCS bucket to use for file storage. + index_name: The name of the index. If None, it will be taken from settings. + """ + aiplatform.init(project=project_id, location=location) + self.project_id = project_id + self.location = location + self.storage = GoogleCloudFileStorage(bucket=bucket) + self.index_name = index_name + self._credentials = None + self._aio_session: aiohttp.ClientSession | None = None + self._async_token: Token | None = None + + def _get_auth_headers(self) -> dict: + if self._credentials is None: + self._credentials, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) + if not self._credentials.token or self._credentials.expired: + self._credentials.refresh(google.auth.transport.requests.Request()) + return { + "Authorization": f"Bearer {self._credentials.token}", + "Content-Type": "application/json", + } + + async def _async_get_auth_headers(self) -> dict: + if self._async_token is None: + self._async_token = Token( + session=self._get_aio_session(), + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + access_token = await self._async_token.get() + return { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + def _get_aio_session(self) -> aiohttp.ClientSession: + if self._aio_session is None or self._aio_session.closed: + connector = aiohttp.TCPConnector(limit=300, limit_per_host=50) + timeout = aiohttp.ClientTimeout(total=60) + self._aio_session = aiohttp.ClientSession( + timeout=timeout, connector=connector + ) + return self._aio_session + + def create_index( + self, + name: str, + content_path: str, + dimensions: int, + approximate_neighbors_count: int = 150, + distance_measure_type: str = "DOT_PRODUCT_DISTANCE", + **kwargs, + ) -> None: + """ + Creates a new Vertex AI Vector Search index. + + Args: + name: The display name for the new index. + content_path: The GCS URI to the JSON file containing the embeddings. + dimensions: The number of dimensions in the embedding vectors. + approximate_neighbors_count: The number of neighbors to find for each vector. + distance_measure_type: The distance measure to use (e.g., 'DOT_PRODUCT_DISTANCE'). + """ + index = aiplatform.MatchingEngineIndex.create_tree_ah_index( + display_name=name, + contents_delta_uri=content_path, + dimensions=dimensions, + approximate_neighbors_count=approximate_neighbors_count, + distance_measure_type=distance_measure_type, + leaf_node_embedding_count=1000, + leaf_nodes_to_search_percent=10, + ) + self.index = index + + def update_index(self, index_name: str, content_path: str, **kwargs) -> None: + """ + Updates an existing Vertex AI Vector Search index. + + Args: + index_name: The resource name of the index to update. + content_path: The GCS URI to the JSON file containing the new embeddings. + """ + index = aiplatform.MatchingEngineIndex(index_name=index_name) + index.update_embeddings( + contents_delta_uri=content_path, + ) + self.index = index + + def deploy_index( + self, index_name: str, machine_type: str = "e2-standard-2" + ) -> None: + """ + Deploys a Vertex AI Vector Search index to an endpoint. + + Args: + index_name: The name of the index to deploy. + machine_type: The type of machine to use for the endpoint. + """ + index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create( + display_name=f"{index_name}-endpoint", + public_endpoint_enabled=True, + ) + index_endpoint.deploy_index( + index=self.index, + deployed_index_id=f"{index_name.replace('-', '_')}_deployed_{uuid4().hex}", + machine_type=machine_type, + ) + self.index_endpoint = index_endpoint + + def load_index_endpoint(self, endpoint_name: str) -> None: + """ + Loads an existing Vertex AI Vector Search index endpoint. + + Args: + endpoint_name: The resource name of the index endpoint. + """ + self.index_endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_name) + if not self.index_endpoint.public_endpoint_domain_name: + raise ValueError( + "The index endpoint does not have a public endpoint. " + "Please ensure that the endpoint is configured for public access." + ) + + def run_query( + self, deployed_index_id: str, query: List[float], limit: int + ) -> List[SearchResult]: + """ + Runs a similarity search query against the deployed index. + + Args: + deployed_index_id: The ID of the deployed index. + query: The embedding vector to use for the search query. + limit: The maximum number of nearest neighbors to return. + + Returns: + A list of dictionaries representing the matched items. + """ + response = self.index_endpoint.find_neighbors( + deployed_index_id=deployed_index_id, queries=[query], num_neighbors=limit + ) + results = [] + for neighbor in response[0]: + file_path = self.index_name + "/contents/" + neighbor.id + ".md" + content = self.storage.get_file_stream(file_path).read().decode("utf-8") + results.append( + {"id": neighbor.id, "distance": neighbor.distance, "content": content} + ) + return results + + async def async_run_query( + self, deployed_index_id: str, query: List[float], limit: int + ) -> List[SearchResult]: + """ + Runs a non-blocking similarity search query against the deployed index + using the REST API directly with an async HTTP client. + + Args: + deployed_index_id: The ID of the deployed index. + query: The embedding vector to use for the search query. + limit: The maximum number of nearest neighbors to return. + + Returns: + A list of dictionaries representing the matched items. + """ + domain = self.index_endpoint.public_endpoint_domain_name + endpoint_id = self.index_endpoint.name.split("/")[-1] + url = ( + f"https://{domain}/v1/projects/{self.project_id}" + f"/locations/{self.location}" + f"/indexEndpoints/{endpoint_id}:findNeighbors" + ) + payload = { + "deployed_index_id": deployed_index_id, + "queries": [ + { + "datapoint": {"feature_vector": query}, + "neighbor_count": limit, + } + ], + } + + headers = await self._async_get_auth_headers() + session = self._get_aio_session() + async with session.post(url, json=payload, headers=headers) as response: + response.raise_for_status() + data = await response.json() + + neighbors = data.get("nearestNeighbors", [{}])[0].get("neighbors", []) + content_tasks = [] + for neighbor in neighbors: + datapoint_id = neighbor["datapoint"]["datapointId"] + file_path = f"{self.index_name}/contents/{datapoint_id}.md" + content_tasks.append(self.storage.async_get_file_stream(file_path)) + + file_streams = await asyncio.gather(*content_tasks) + results: List[SearchResult] = [] + for neighbor, stream in zip(neighbors, file_streams): + results.append( + { + "id": neighbor["datapoint"]["datapointId"], + "distance": neighbor["distance"], + "content": stream.read().decode("utf-8"), + } + ) + return results + + def delete_index(self, index_name: str) -> None: + """ + Deletes a Vertex AI Vector Search index. + + Args: + index_name: The resource name of the index. + """ + index = aiplatform.MatchingEngineIndex(index_name) + index.delete() + + def delete_index_endpoint(self, index_endpoint_name: str) -> None: + """ + Deletes a Vertex AI Vector Search index endpoint. + + Args: + index_endpoint_name: The resource name of the index endpoint. + """ + index_endpoint = aiplatform.MatchingEngineIndexEndpoint(index_endpoint_name) + index_endpoint.undeploy_all() + index_endpoint.delete(force=True) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f289a4b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,91 @@ +[project] +name = "rag-pipeline" +version = "0.1.0" +description = "RAG Pipeline for document chunking, embedding, and vector search" +readme = "README.md" +requires-python = ">=3.12" +authors = [ + { name = "Pipeline Team" } +] + +dependencies = [ + # Core dependencies + "google-genai>=1.45.0", + "google-cloud-aiplatform>=1.106.0", + "google-cloud-storage>=2.19.0", + "google-auth>=2.29.0", + "pydantic>=2.11.7", + "pydantic-settings[yaml]>=2.10.1", + "python-dotenv>=1.0.0", + + # Chunking + "chonkie>=1.1.2", + "tiktoken>=0.7.0", + "langchain>=0.3.0", + "langchain-core>=0.3.0", + + # Document processing + "markitdown[pdf]>=0.1.2", + "pypdf>=6.1.2", + "pdf2image>=1.17.0", + + # Storage & networking + "gcloud-aio-storage>=9.6.1", + "gcloud-aio-auth>=5.3.0", + "aiohttp>=3.10.11,<4", + + # Utils + "tenacity>=9.1.2", + "typer>=0.16.1", + + # Pipeline orchestration (optional) + "kfp>=2.15.2", +] + +[project.scripts] +# Chunkers +llm-chunker = "chunker.llm_chunker:app" +recursive-chunker = "chunker.recursive_chunker:app" +contextual-chunker = "chunker.contextual_chunker:app" + +# Converters +convert-md = "document_converter.markdown:app" + +# Storage +file-storage = "file_storage.cli:app" + +# Vector Search +vector-search = "vector_search.cli:app" + +# Utils +normalize-filenames = "utils.normalize_filenames:app" + +[build-system] +requires = ["uv_build>=0.8.3,<0.9.0"] +build-backend = "uv_build" + +[tool.uv.workspace] +members = [ + "apps/*", + "packages/*", +] + +[tool.uv.sources] +chunker = { workspace = true } +document-converter = { workspace = true } +embedder = { workspace = true } +file-storage = { workspace = true } +llm = { workspace = true } +utils = { workspace = true } +vector-search = { workspace = true } +index-gen = { workspace = true } + +[dependency-groups] +dev = [ + "pytest>=8.4.1", + "mypy>=1.17.1", + "ruff>=0.12.10", +] + +[tool.ruff.lint] +extend-select = ["I", "F"] diff --git a/src/rag_eval/__init__.py b/src/rag_eval/__init__.py new file mode 100644 index 0000000..bd6b0a4 --- /dev/null +++ b/src/rag_eval/__init__.py @@ -0,0 +1,2 @@ +def main() -> None: + print("Hello from rag-eval!") diff --git a/src/rag_eval/config.py b/src/rag_eval/config.py new file mode 100644 index 0000000..5be171b --- /dev/null +++ b/src/rag_eval/config.py @@ -0,0 +1,121 @@ +import os + +from pydantic import BaseModel +from pydantic_settings import ( + BaseSettings, + PydanticBaseSettingsSource, + SettingsConfigDict, + YamlConfigSettingsSource, +) + +CONFIG_FILE_PATH = os.getenv("CONFIG_YAML", "config.yaml") + + +class IndexConfig(BaseModel): + name: str + endpoint: str + dimensions: int + machine_type: str = "e2-standard-16" + origin: str + destination: str + chunk_limit: int + + @property + def deployment(self) -> str: + return self.name.replace("-", "_") + "_deployed" + + @property + def data(self) -> str: + return self.destination + self.name + +class AgentConfig(BaseModel): + name: str + instructions: str + language_model: str + embedding_model: str + thinking: int + + +class BigQueryConfig(BaseModel): + dataset_id: str + project_id: str | None = None + table_ids: dict[str, str] + + +class Settings(BaseSettings): + project_id: str + location: str + service_account: str + + # Flattened fields from nested models + agent_name: str + agent_instructions: str + agent_language_model: str + agent_embedding_model: str + agent_thinking: int + + index_name: str + index_endpoint: str + index_dimensions: int + index_machine_type: str = "e2-standard-16" + index_origin: str + index_destination: str + index_chunk_limit: int + + bigquery_dataset_id: str + bigquery_project_id: str | None = None + bigquery_table_ids: dict[str, str] + + bucket: str + base_image: str + dialogflow_agent_id: str + processing_image: str + + model_config = SettingsConfigDict(yaml_file=CONFIG_FILE_PATH) + + @property + def agent(self) -> AgentConfig: + return AgentConfig( + name=self.agent_name, + instructions=self.agent_instructions, + language_model=self.agent_language_model, + embedding_model=self.agent_embedding_model, + thinking=self.agent_thinking, + ) + + @property + def index(self) -> IndexConfig: + return IndexConfig( + name=self.index_name, + endpoint=self.index_endpoint, + dimensions=self.index_dimensions, + machine_type=self.index_machine_type, + origin=self.index_origin, + destination=self.index_destination, + chunk_limit=self.index_chunk_limit, + ) + + @property + def bigquery(self) -> BigQueryConfig: + return BigQueryConfig( + dataset_id=self.bigquery_dataset_id, + project_id=self.bigquery_project_id, + table_ids=self.bigquery_table_ids, + ) + + @classmethod + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return ( + env_settings, + YamlConfigSettingsSource(settings_cls), + ) + + +settings = Settings() \ No newline at end of file