Primera version de chunkeo completo crud

This commit is contained in:
Sebastian
2025-11-05 19:18:11 +00:00
parent df2c184814
commit 7c6e8c4858
36 changed files with 6242 additions and 5 deletions

View File

@@ -0,0 +1,18 @@
"""
Utilidades de chunking para procesamiento de PDFs.
Refactorización modular del pipeline de chunking_token.py
"""
from .gemini_client import GeminiClient, get_gemini_client
from .token_manager import TokenManager
from .chunk_processor import OptimizedChunkProcessor
from .pdf_extractor import OptimizedPDFExtractor
from .pipeline import process_pdf_with_token_control
__all__ = [
"GeminiClient",
"get_gemini_client",
"TokenManager",
"OptimizedChunkProcessor",
"OptimizedPDFExtractor",
"process_pdf_with_token_control",
]

View File

@@ -0,0 +1,258 @@
"""
Procesador optimizado de chunks con soporte para LLM (Gemini).
Permite merge inteligente y mejora de chunks usando IA.
"""
import logging
import time
import hashlib
from typing import List, Optional
from langchain_core.documents import Document
from .token_manager import TokenManager
from .gemini_client import GeminiClient
logger = logging.getLogger(__name__)
class OptimizedChunkProcessor:
"""Procesador de chunks con optimización mediante LLM"""
def __init__(
self,
max_tokens: int = 1000,
target_tokens: int = 800,
chunks_per_batch: int = 5,
gemini_client: Optional[GeminiClient] = None,
model_name: str = "gpt-3.5-turbo",
custom_instructions: str = ""
):
"""
Inicializa el procesador de chunks.
Args:
max_tokens: Límite máximo de tokens por chunk
target_tokens: Tokens objetivo para chunks optimizados
chunks_per_batch: Chunks a procesar por lote
gemini_client: Cliente de Gemini para procesamiento (opcional)
model_name: Modelo para cálculo de tokens
custom_instructions: Instrucciones adicionales para el prompt de optimización
"""
self.client = gemini_client
self.chunks_per_batch = chunks_per_batch
self.max_tokens = max_tokens
self.target_tokens = target_tokens
self.token_manager = TokenManager(model_name)
self.custom_instructions = custom_instructions
# Caché para evitar reprocesamiento
self._merge_cache = {}
self._enhance_cache = {}
def _get_cache_key(self, text: str) -> str:
"""Genera una clave de caché para el texto"""
combined = text + self.custom_instructions
return hashlib.md5(combined.encode()).hexdigest()[:16]
def should_merge_chunks(self, chunk1: str, chunk2: str) -> bool:
"""
Determina si dos chunks deben unirse basándose en continuidad semántica.
Args:
chunk1: Primer chunk
chunk2: Segundo chunk
Returns:
True si los chunks deben unirse
"""
cache_key = f"{self._get_cache_key(chunk1)}_{self._get_cache_key(chunk2)}"
if cache_key in self._merge_cache:
return self._merge_cache[cache_key]
try:
combined_text = f"{chunk1}\n\n{chunk2}"
combined_tokens = self.token_manager.count_tokens(combined_text)
if combined_tokens > self.max_tokens:
self._merge_cache[cache_key] = False
return False
if self.client:
base_prompt = f"""Analiza estos dos fragmentos de texto y determina si deben unirse.
LÍMITES ESTRICTOS:
- Tokens combinados: {combined_tokens}/{self.max_tokens}
- Solo unir si hay continuidad semántica clara
Criterios de unión:
1. El primer fragmento termina abruptamente
2. El segundo fragmento continúa la misma idea/concepto
3. La unión mejora la coherencia del contenido
4. Exceder {self.max_tokens} tokens, SOLAMENTE si es necesario para mantener el contexto
Responde SOLO 'SI' o 'NO'.
Fragmento 1 ({self.token_manager.count_tokens(chunk1)} tokens):
{chunk1[:500]}...
Fragmento 2 ({self.token_manager.count_tokens(chunk2)} tokens):
{chunk2[:500]}..."""
response = self.client.generate_content(base_prompt)
result = response.strip().upper() == 'SI'
self._merge_cache[cache_key] = result
return result
# Heurística simple si no hay cliente LLM
result = (
chunk1.rstrip().endswith(('.', '!', '?')) == False and
combined_tokens <= self.target_tokens
)
self._merge_cache[cache_key] = result
return result
except Exception as e:
logger.error(f"Error analizando chunks para merge: {e}")
self._merge_cache[cache_key] = False
return False
def enhance_chunk(self, chunk_text: str) -> str:
"""
Mejora un chunk usando LLM o truncamiento.
Args:
chunk_text: Texto del chunk a mejorar
Returns:
Texto del chunk mejorado
"""
cache_key = self._get_cache_key(chunk_text)
if cache_key in self._enhance_cache:
return self._enhance_cache[cache_key]
current_tokens = self.token_manager.count_tokens(chunk_text)
try:
if self.client and current_tokens < self.max_tokens:
base_prompt = f"""Optimiza este texto siguiendo estas reglas ESTRICTAS:
LÍMITES DE TOKENS:
- Actual: {current_tokens} tokens
- Máximo permitido: {self.max_tokens} tokens
- Objetivo: {self.target_tokens} tokens
REGLAS FUNDAMENTALES:
NO exceder {self.max_tokens} tokens bajo ninguna circunstancia
Mantener TODA la información esencial y metadatos
NO cambiar términos técnicos o palabras clave
Asegurar oraciones completas y coherentes
Optimizar claridad y estructura sin añadir contenido
SOLO devuelve el texto no agregues conclusiones NUNCA
Si el texto está cerca del límite, NO expandir. Solo mejorar estructura."""
if self.custom_instructions.strip():
base_prompt += f"\n\nINSTRUCCIONES ADICIONALES:\n{self.custom_instructions}"
base_prompt += f"\n\nTexto a optimizar:\n{chunk_text}"
response = self.client.generate_content(base_prompt)
enhanced_text = response.strip()
enhanced_tokens = self.token_manager.count_tokens(enhanced_text)
if enhanced_tokens > self.max_tokens:
logger.warning(
f"Texto optimizado excede límite ({enhanced_tokens} > {self.max_tokens}), truncando"
)
enhanced_text = self.token_manager.truncate_to_tokens(enhanced_text, self.max_tokens)
self._enhance_cache[cache_key] = enhanced_text
return enhanced_text
else:
# Sin LLM o ya en límite, solo truncar si es necesario
if current_tokens > self.max_tokens:
truncated = self.token_manager.truncate_to_tokens(chunk_text, self.max_tokens)
self._enhance_cache[cache_key] = truncated
return truncated
self._enhance_cache[cache_key] = chunk_text
return chunk_text
except Exception as e:
logger.error(f"Error procesando chunk: {e}")
if current_tokens > self.max_tokens:
truncated = self.token_manager.truncate_to_tokens(chunk_text, self.max_tokens)
self._enhance_cache[cache_key] = truncated
return truncated
self._enhance_cache[cache_key] = chunk_text
return chunk_text
def process_chunks_batch(
self,
chunks: List[Document],
merge_related: bool = False
) -> List[Document]:
"""
Procesa un lote de chunks, aplicando merge y mejoras.
Args:
chunks: Lista de documentos a procesar
merge_related: Si True, intenta unir chunks relacionados
Returns:
Lista de documentos procesados
"""
processed_chunks = []
total_chunks = len(chunks)
logger.info(f"Procesando {total_chunks} chunks en lotes de {self.chunks_per_batch}")
if self.custom_instructions:
logger.info(f"Con instrucciones personalizadas: {self.custom_instructions[:100]}...")
i = 0
while i < len(chunks):
batch_start = time.time()
current_chunk = chunks[i]
merged_content = current_chunk.page_content
original_tokens = self.token_manager.count_tokens(merged_content)
# Intentar merge si está habilitado
if merge_related and i < len(chunks) - 1:
merge_count = 0
while (
i + merge_count < len(chunks) - 1 and
self.should_merge_chunks(
merged_content,
chunks[i + merge_count + 1].page_content
)
):
merge_count += 1
merged_content += "\n\n" + chunks[i + merge_count].page_content
logger.info(f" Uniendo chunk {i + 1} con chunk {i + merge_count + 1}")
i += merge_count
logger.info(f"\nProcesando chunk {i + 1}/{total_chunks}")
logger.info(f" Tokens originales: {original_tokens}")
# Mejorar chunk
enhanced_content = self.enhance_chunk(merged_content)
final_tokens = self.token_manager.count_tokens(enhanced_content)
processed_chunks.append(Document(
page_content=enhanced_content,
metadata={
**current_chunk.metadata,
}
))
logger.info(f" Tokens finales: {final_tokens}")
logger.info(f" Tiempo de procesamiento: {time.time() - batch_start:.2f}s")
i += 1
if i % self.chunks_per_batch == 0 and i < len(chunks):
logger.info(f"\nCompletados {i}/{total_chunks} chunks")
time.sleep(0.1)
return processed_chunks

View File

@@ -0,0 +1,91 @@
"""
Cliente para interactuar con Gemini (Google Vertex AI).
Usado para procesamiento inteligente de chunks con LLM.
"""
import logging
import os
import google.oauth2.service_account as sa
import vertexai.generative_models as gm
import vertexai
from ...core.config import settings
logger = logging.getLogger(__name__)
class GeminiClient:
"""Cliente para generar contenido usando Gemini via Vertex AI"""
def __init__(
self,
account_file: str | None = None,
project: str | None = None,
model: str | None = None
) -> None:
"""
Inicializa el cliente de Gemini.
Args:
account_file: Ruta al archivo de credenciales de servicio (default: desde settings)
project: ID del proyecto de Google Cloud (default: desde settings)
model: Modelo de Gemini a usar (default: desde settings)
"""
# Usar configuración de settings si no se proporciona
account_file = account_file or settings.GOOGLE_APPLICATION_CREDENTIALS
project = project or settings.GOOGLE_CLOUD_PROJECT
model = model or settings.GEMINI_MODEL
try:
# Cargar credenciales desde archivo
credentials = sa.Credentials.from_service_account_file(account_file)
# Inicializar Vertex AI
vertexai.init(
project=project,
credentials=credentials,
location=settings.GOOGLE_CLOUD_LOCATION
)
# Inicializar modelo
self.model = gm.GenerativeModel(model)
logger.info(f"GeminiClient inicializado con modelo {model}")
except Exception as e:
logger.error(f"Error inicializando GeminiClient: {e}")
raise
def generate_content(self, prompt: str) -> str:
"""
Genera contenido usando Gemini.
Args:
prompt: Prompt para el modelo
Returns:
Texto generado por el modelo
Raises:
Exception: Si hay error en la generación
"""
try:
response = self.model.generate_content(prompt)
return response.text
except Exception as e:
logger.error(f"Error en Gemini: {e}")
return ""
# Instancia global singleton
_gemini_client: GeminiClient | None = None
def get_gemini_client() -> GeminiClient:
"""
Obtiene la instancia singleton del cliente de Gemini.
Returns:
Instancia de GeminiClient
"""
global _gemini_client
if _gemini_client is None:
_gemini_client = GeminiClient()
return _gemini_client

View File

@@ -0,0 +1,299 @@
"""
Extractor optimizado de PDFs con soporte para BytesIO y procesamiento paralelo.
Adaptado para trabajar con Azure Blob Storage sin archivos temporales.
"""
import logging
import os
import time
import hashlib
from typing import List, Optional, Dict, BinaryIO
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor
from langchain_core.documents import Document
from pypdf import PdfReader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pdf2image import convert_from_bytes
from .token_manager import TokenManager
from .chunk_processor import OptimizedChunkProcessor
from .gemini_client import GeminiClient
logger = logging.getLogger(__name__)
class OptimizedPDFExtractor:
"""Extractor optimizado de PDFs con soporte para BytesIO"""
def __init__(
self,
max_tokens: int = 1000,
target_tokens: int = 800,
gemini_client: Optional[GeminiClient] = None,
custom_instructions: str = "",
extract_images: bool = False, # Por defecto deshabilitado según requerimientos
max_workers: int = 4
):
"""
Inicializa el extractor de PDFs.
Args:
max_tokens: Límite máximo de tokens por chunk
target_tokens: Tokens objetivo para chunks
gemini_client: Cliente de Gemini (opcional)
custom_instructions: Instrucciones adicionales para optimización
extract_images: Si True, extrae páginas con formato especial como imágenes
max_workers: Número máximo de workers para procesamiento paralelo
"""
self.client = gemini_client
self.max_workers = max_workers
self.token_manager = TokenManager()
self.custom_instructions = custom_instructions
self.extract_images = extract_images
self._format_cache = {}
self.chunk_processor = OptimizedChunkProcessor(
max_tokens=max_tokens,
target_tokens=target_tokens,
gemini_client=gemini_client,
custom_instructions=custom_instructions
)
def detect_special_format_batch(self, chunks: List[Document]) -> Dict[int, bool]:
"""
Detecta chunks con formatos especiales (tablas, diagramas, etc.) en lote.
Args:
chunks: Lista de chunks a analizar
Returns:
Diccionario con índices de chunks y si tienen formato especial
"""
results = {}
chunks_to_process = []
for i, chunk in enumerate(chunks):
cache_key = hashlib.md5(chunk.page_content.encode()).hexdigest()[:16]
if cache_key in self._format_cache:
results[i] = self._format_cache[cache_key]
else:
chunks_to_process.append((i, chunk, cache_key))
if not chunks_to_process:
return results
logger.info(f"Analizando {len(chunks_to_process)} chunks para formatos especiales...")
if self.client and len(chunks_to_process) > 1:
with ThreadPoolExecutor(max_workers=min(self.max_workers, len(chunks_to_process))) as executor:
futures = {
executor.submit(self._detect_single_format, chunk): (i, cache_key)
for i, chunk, cache_key in chunks_to_process
}
for future in futures:
i, cache_key = futures[future]
try:
result = future.result()
results[i] = result
self._format_cache[cache_key] = result
except Exception as e:
logger.error(f"Error procesando chunk {i}: {e}")
results[i] = False
self._format_cache[cache_key] = False
else:
for i, chunk, cache_key in chunks_to_process:
result = self._detect_single_format(chunk)
results[i] = result
self._format_cache[cache_key] = result
return results
def _detect_single_format(self, chunk: Document) -> bool:
"""Detecta formato especial en un chunk individual."""
if not self.client:
content = chunk.page_content
table_indicators = ['', '', '', '', '', '', '|', '+', '-']
has_table_chars = any(char in content for char in table_indicators)
has_multiple_columns = content.count('\t') > 10 or content.count(' ') > 20
return has_table_chars or has_multiple_columns
try:
prompt = f"""¿Contiene este texto tablas estructuradas, diagramas ASCII, o elementos que requieren formato especial?
Responde SOLO 'SI' o 'NO'.
Texto:
{chunk.page_content[:1000]}"""
response = self.client.generate_content(prompt)
return response.strip().upper() == 'SI'
except Exception as e:
logger.error(f"Error detectando formato: {e}")
return False
def process_pdf_from_bytes(
self,
pdf_bytes: bytes,
file_name: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
merge_related: bool = True
) -> List[Document]:
"""
Procesa un PDF desde bytes (BytesIO).
Args:
pdf_bytes: Contenido del PDF en bytes
file_name: Nombre del archivo PDF
chunk_size: Tamaño del chunk
chunk_overlap: Solapamiento entre chunks
merge_related: Si True, intenta unir chunks relacionados
Returns:
Lista de documentos procesados
"""
overall_start = time.time()
logger.info(f"\n=== Iniciando procesamiento optimizado de PDF: {file_name} ===")
logger.info(f"Configuración:")
logger.info(f" - Tokens máximos por chunk: {self.chunk_processor.max_tokens}")
logger.info(f" - Tokens objetivo: {self.chunk_processor.target_tokens}")
logger.info(f" - Chunk size: {chunk_size}")
logger.info(f" - Chunk overlap: {chunk_overlap}")
logger.info(f" - Merge relacionados: {merge_related}")
logger.info(f" - Extraer imágenes: {'' if self.extract_images else ''}")
if self.custom_instructions:
logger.info(f" - Instrucciones personalizadas: {self.custom_instructions[:100]}...")
logger.info(f"\n1. Creando chunks del PDF...")
chunks = self._create_optimized_chunks_from_bytes(
pdf_bytes,
file_name,
chunk_size,
chunk_overlap
)
logger.info(f" Total chunks creados: {len(chunks)}")
# Nota: La extracción de imágenes desde bytes no se implementa por ahora
# ya que extract_images está deshabilitado por defecto según requerimientos
if self.extract_images:
logger.warning("Extracción de imágenes desde bytes no implementada aún")
logger.info(f"\n2. Procesando y optimizando chunks...")
processed_chunks = self.chunk_processor.process_chunks_batch(chunks, merge_related)
total_time = time.time() - overall_start
if processed_chunks:
avg_tokens = sum(
self.token_manager.count_tokens(chunk.page_content)
for chunk in processed_chunks
) / len(processed_chunks)
else:
avg_tokens = 0
logger.info(f"\n=== Procesamiento completado ===")
logger.info(f" Tiempo total: {total_time:.2f}s")
logger.info(f" Chunks procesados: {len(processed_chunks)}")
logger.info(f" Tokens promedio por chunk: {avg_tokens:.1f}")
if self.custom_instructions:
logger.info(f" Custom instructions aplicadas: ✅")
return processed_chunks
def _create_optimized_chunks_from_bytes(
self,
pdf_bytes: bytes,
file_name: str,
chunk_size: int,
chunk_overlap: int
) -> List[Document]:
"""
Crea chunks optimizados desde bytes del PDF.
Args:
pdf_bytes: Contenido del PDF en bytes
file_name: Nombre del archivo
chunk_size: Tamaño del chunk
chunk_overlap: Solapamiento entre chunks
Returns:
Lista de documentos con chunks
"""
logger.info(f" Leyendo PDF desde bytes: {file_name}")
# Crear BytesIO para pypdf
pdf_buffer = BytesIO(pdf_bytes)
pdf = PdfReader(pdf_buffer)
chunks = []
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=self.token_manager.count_tokens,
separators=["\n\n", "\n", ". ", " ", ""]
)
# Extraer todo el texto concatenado con tracking de páginas
full_text = ""
page_boundaries = [] # Lista de (char_position, page_num)
for page_num, page in enumerate(pdf.pages, 1):
text = page.extract_text()
if text.strip():
page_start = len(full_text)
full_text += text
# Agregar separador entre páginas (excepto después de la última)
if page_num < len(pdf.pages):
full_text += "\n\n"
page_end = len(full_text)
page_boundaries.append((page_start, page_end, page_num))
if not full_text.strip():
return []
# Dividir el texto completo (esto permite overlap entre páginas)
text_chunks = text_splitter.split_text(full_text)
logger.info(f" Total de chunks generados por splitter: {len(text_chunks)}")
if len(text_chunks) >= 2:
# Verificar overlap entre primer y segundo chunk
chunk0_end = text_chunks[0][-100:] if len(text_chunks[0]) > 100 else text_chunks[0]
chunk1_start = text_chunks[1][:100] if len(text_chunks[1]) > 100 else text_chunks[1]
logger.info(f" Chunk 0 termina con: ...{chunk0_end}")
logger.info(f" Chunk 1 empieza con: {chunk1_start}...")
# Asignar página a cada chunk basándonos en su posición en el texto original
chunks = []
current_search_pos = 0
for chunk_text in text_chunks:
# Buscar donde aparece este chunk en el texto completo
chunk_pos = full_text.find(chunk_text, current_search_pos)
if chunk_pos == -1:
# Si no lo encontramos, usar la última posición conocida
chunk_pos = current_search_pos
# Determinar la página basándonos en la posición del inicio del chunk
chunk_page = 1
for start, end, page_num in page_boundaries:
if chunk_pos >= start and chunk_pos < end:
chunk_page = page_num
break
elif chunk_pos >= end:
# El chunk está después de esta página, continuar buscando
chunk_page = page_num # Guardar la última página vista
chunks.append(Document(
page_content=chunk_text,
metadata={
"page": chunk_page,
"file_name": file_name,
}
))
# Actualizar posición de búsqueda para el siguiente chunk
current_search_pos = chunk_pos + len(chunk_text)
return chunks

View File

@@ -0,0 +1,65 @@
"""
Pipeline principal para procesar PDFs con control de tokens.
Función de alto nivel que orquesta el proceso completo de chunking.
"""
import logging
from typing import List, Optional
from langchain_core.documents import Document
from .pdf_extractor import OptimizedPDFExtractor
from .gemini_client import GeminiClient
logger = logging.getLogger(__name__)
def process_pdf_with_token_control(
pdf_bytes: bytes,
file_name: str,
max_tokens: int = 950,
target_tokens: int = 800,
chunk_size: int = 1000,
chunk_overlap: int = 200,
merge_related: bool = True,
gemini_client: Optional[GeminiClient] = None,
custom_instructions: str = "",
extract_images: bool = False
) -> List[Document]:
"""
Función principal para procesar PDFs con control completo de tokens.
Args:
pdf_bytes: Contenido del PDF en bytes
file_name: Nombre del archivo PDF
max_tokens: Límite máximo de tokens por chunk
target_tokens: Tokens objetivo para optimización
chunk_size: Tamaño base de chunks
chunk_overlap: Solapamiento entre chunks
merge_related: Si unir chunks relacionados
gemini_client: Cliente de Gemini (opcional, para LLM processing)
custom_instructions: Instrucciones adicionales para optimización
extract_images: Si True, extrae páginas con formato especial como imágenes
Returns:
Lista de documentos procesados con metadata simple (page, file_name)
"""
logger.info(f"Iniciando pipeline de chunking para {file_name}")
extractor = OptimizedPDFExtractor(
max_tokens=max_tokens,
target_tokens=target_tokens,
gemini_client=gemini_client,
custom_instructions=custom_instructions,
extract_images=extract_images,
max_workers=4
)
chunks = extractor.process_pdf_from_bytes(
pdf_bytes=pdf_bytes,
file_name=file_name,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
merge_related=merge_related
)
logger.info(f"Pipeline completado: {len(chunks)} chunks generados")
return chunks

View File

@@ -0,0 +1,72 @@
"""
Gestor de tokens para contar y truncar texto basado en modelos de tokenización.
"""
import logging
import tiktoken
logger = logging.getLogger(__name__)
class TokenManager:
"""Gestor para contar y truncar tokens usando tiktoken"""
def __init__(self, model_name: str = "gpt-3.5-turbo"):
"""
Inicializa el gestor de tokens.
Args:
model_name: Nombre del modelo para la codificación de tokens
"""
try:
self.encoding = tiktoken.encoding_for_model(model_name)
except KeyError:
logger.warning(
f"Modelo {model_name} no encontrado, usando codificación por defecto cl100k_base"
)
self.encoding = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
"""
Cuenta el número de tokens en un texto.
Args:
text: Texto a analizar
Returns:
Número de tokens
"""
return len(self.encoding.encode(text))
def truncate_to_tokens(
self,
text: str,
max_tokens: int,
preserve_sentences: bool = True
) -> str:
"""
Trunca texto a un número máximo de tokens.
Args:
text: Texto a truncar
max_tokens: Número máximo de tokens
preserve_sentences: Si True, intenta mantener oraciones completas
Returns:
Texto truncado
"""
tokens = self.encoding.encode(text)
if len(tokens) <= max_tokens:
return text
truncated_tokens = tokens[:max_tokens]
truncated_text = self.encoding.decode(truncated_tokens)
if preserve_sentences:
# Intentar cortar en el último punto
last_period = truncated_text.rfind('.')
# Solo cortar si el punto está en el último 30% del texto
if last_period > len(truncated_text) * 0.7:
return truncated_text[:last_period + 1]
return truncated_text