Bug solucionado de Qdrant y subida a de datos extraidos a Redis con referencia al documento

This commit is contained in:
Sebastian
2025-11-07 23:30:10 +00:00
parent c9a63e129d
commit 70f2a42502
15 changed files with 1392 additions and 29 deletions

View File

@@ -41,6 +41,13 @@ class Settings(BaseSettings):
AZURE_OPENAI_EMBEDDING_MODEL: str = "text-embedding-3-large"
AZURE_OPENAI_EMBEDDING_DEPLOYMENT: str = "text-embedding-3-large"
# Rate limiting para embeddings (ajustar según tier de Azure OpenAI)
# S0 tier: batch_size=16, delay=1.0 es seguro
# Tier superior: batch_size=100, delay=0.1
EMBEDDING_BATCH_SIZE: int = 16
EMBEDDING_DELAY_BETWEEN_BATCHES: float = 1.0
EMBEDDING_MAX_RETRIES: int = 5
# Google Cloud / Vertex AI configuración
GOOGLE_APPLICATION_CREDENTIALS: str
GOOGLE_CLOUD_PROJECT: str

View File

@@ -11,6 +11,7 @@ from .routers.agent import router as agent_router
from .routers.chunking import router as chunking_router
from .routers.chunking_landingai import router as chunking_landingai_router
from .routers.dataroom import router as dataroom_router
from .routers.extracted_data import router as extracted_data_router
from .routers.files import router as files_router
from .routers.schemas import router as schemas_router
from .routers.vectors import router as vectors_router
@@ -123,6 +124,9 @@ app.include_router(schemas_router)
# Chunking LandingAI router (nuevo)
app.include_router(chunking_landingai_router)
# Extracted data router (nuevo)
app.include_router(extracted_data_router)
app.include_router(dataroom_router, prefix="/api/v1")
app.include_router(agent_router)

View File

@@ -0,0 +1,68 @@
"""
Modelo Redis-OM para almacenar datos extraídos de documentos.
Permite búsqueda rápida de datos estructurados sin necesidad de búsqueda vectorial.
"""
from datetime import datetime
from typing import Optional, Dict, Any
from redis_om import HashModel, Field, Migrator
import json
class ExtractedDocument(HashModel):
"""
Modelo para guardar datos extraídos de documentos en Redis.
Uso:
1. Cuando se procesa un PDF con schema y se extraen datos
2. Los chunks van a Qdrant (para RAG)
3. Los datos extraídos van a Redis (para búsqueda estructurada)
Ventajas:
- Búsqueda rápida por file_name, tema, collection_name
- Acceso directo a datos extraídos sin búsqueda vectorial
- Permite filtros y agregaciones
"""
# Identificadores
file_name: str = Field(index=True)
tema: str = Field(index=True)
collection_name: str = Field(index=True)
# Datos extraídos (JSON serializado)
# Redis-OM HashModel no soporta Dict directamente, usamos str y serializamos
extracted_data_json: str
# Metadata
extraction_timestamp: str # ISO format
class Meta:
database = None # Se configura en runtime
global_key_prefix = "extracted_doc"
model_key_prefix = "doc"
def set_extracted_data(self, data: Dict[str, Any]) -> None:
"""Helper para serializar datos extraídos a JSON"""
self.extracted_data_json = json.dumps(data, ensure_ascii=False, indent=2)
def get_extracted_data(self) -> Dict[str, Any]:
"""Helper para deserializar datos extraídos desde JSON"""
return json.loads(self.extracted_data_json)
@classmethod
def find_by_file(cls, file_name: str):
"""Busca todos los documentos extraídos de un archivo"""
return cls.find(cls.file_name == file_name).all()
@classmethod
def find_by_tema(cls, tema: str):
"""Busca todos los documentos extraídos de un tema"""
return cls.find(cls.tema == tema).all()
@classmethod
def find_by_collection(cls, collection_name: str):
"""Busca todos los documentos en una colección"""
return cls.find(cls.collection_name == collection_name).all()
# Ejecutar migración para crear índices en Redis
Migrator().run()

View File

@@ -58,7 +58,7 @@ class CustomSchema(BaseModel):
schema_id: Optional[str] = Field(None, description="ID único del schema (generado automáticamente si no se provee)")
schema_name: str = Field(..., description="Nombre descriptivo del schema", min_length=1, max_length=100)
description: str = Field(..., description="Descripción de qué extrae este schema", min_length=1, max_length=500)
fields: List[SchemaField] = Field(..., description="Lista de campos a extraer", min_items=1, max_items=50)
fields: List[SchemaField] = Field(..., description="Lista de campos a extraer", min_items=1, max_items=200)
# Metadata
created_at: Optional[str] = Field(None, description="Timestamp de creación ISO")

View File

@@ -14,6 +14,7 @@ from pydantic import BaseModel, Field
from ..repositories.schema_repository import get_schema_repository
from ..services.chunking_service import get_chunking_service
from ..services.landingai_service import get_landingai_service
from ..services.extracted_data_service import get_extracted_data_service
from ..utils.chunking.token_manager import TokenManager
logger = logging.getLogger(__name__)
@@ -105,11 +106,12 @@ async def process_with_landingai(request: ProcessLandingAIRequest):
logger.info(f"Tema: {request.tema}")
logger.info(f"Modo: {request.mode}")
logger.info(f"Colección: {request.collection_name}")
logger.info(f"Schema ID recibido: '{request.schema_id}' (tipo: {type(request.schema_id).__name__})")
# 1. Validar schema si es modo extract
custom_schema = None
if request.mode == "extract":
if not request.schema_id:
if not request.schema_id or request.schema_id.strip() == "":
raise HTTPException(
status_code=400,
detail="schema_id es requerido cuando mode='extract'",
@@ -224,6 +226,22 @@ async def process_with_landingai(request: ProcessLandingAIRequest):
status_code=500, detail=f"Error subiendo a Qdrant: {str(e)}"
)
# 8. Guardar datos extraídos en Redis (si existe extracted_data)
if result.get("extracted_data") and result["extracted_data"].get("extraction"):
try:
logger.info("\n[6/6] Guardando datos extraídos en Redis...")
extracted_data_service = get_extracted_data_service()
await extracted_data_service.save_extracted_data(
file_name=request.file_name,
tema=request.tema,
collection_name=request.collection_name,
extracted_data=result["extracted_data"]["extraction"]
)
except Exception as e:
# No fallar si Redis falla, solo logear
logger.warning(f"⚠️ No se pudieron guardar datos en Redis (no crítico): {e}")
# Tiempo total
processing_time = time.time() - start_time

View File

@@ -0,0 +1,141 @@
"""
Router para consultar datos extraídos almacenados en Redis.
"""
import logging
from typing import List, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from ..services.extracted_data_service import get_extracted_data_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/extracted-data", tags=["extracted-data"])
class ExtractedDataResponse(BaseModel):
"""Response con datos extraídos de un documento"""
pk: str
file_name: str
tema: str
collection_name: str
extracted_data: dict
extraction_timestamp: str
class ExtractedDataListResponse(BaseModel):
"""Response con lista de datos extraídos"""
total: int
documents: List[ExtractedDataResponse]
@router.get("/by-file/{file_name}", response_model=ExtractedDataListResponse)
async def get_by_file(file_name: str):
"""
Obtiene todos los datos extraídos de un archivo específico.
Args:
file_name: Nombre del archivo
Returns:
Lista de documentos con datos extraídos
"""
try:
service = get_extracted_data_service()
docs = await service.get_by_file(file_name)
documents = [
ExtractedDataResponse(
pk=doc.pk,
file_name=doc.file_name,
tema=doc.tema,
collection_name=doc.collection_name,
extracted_data=doc.get_extracted_data(),
extraction_timestamp=doc.extraction_timestamp
)
for doc in docs
]
return ExtractedDataListResponse(
total=len(documents),
documents=documents
)
except Exception as e:
logger.error(f"Error obteniendo datos extraídos por archivo: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/by-tema/{tema}", response_model=ExtractedDataListResponse)
async def get_by_tema(tema: str):
"""
Obtiene todos los datos extraídos de un tema específico.
Args:
tema: Nombre del tema
Returns:
Lista de documentos con datos extraídos
"""
try:
service = get_extracted_data_service()
docs = await service.get_by_tema(tema)
documents = [
ExtractedDataResponse(
pk=doc.pk,
file_name=doc.file_name,
tema=doc.tema,
collection_name=doc.collection_name,
extracted_data=doc.get_extracted_data(),
extraction_timestamp=doc.extraction_timestamp
)
for doc in docs
]
return ExtractedDataListResponse(
total=len(documents),
documents=documents
)
except Exception as e:
logger.error(f"Error obteniendo datos extraídos por tema: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/by-collection/{collection_name}", response_model=ExtractedDataListResponse)
async def get_by_collection(collection_name: str):
"""
Obtiene todos los datos extraídos de una colección específica.
Args:
collection_name: Nombre de la colección
Returns:
Lista de documentos con datos extraídos
"""
try:
service = get_extracted_data_service()
docs = await service.get_by_collection(collection_name)
documents = [
ExtractedDataResponse(
pk=doc.pk,
file_name=doc.file_name,
tema=doc.tema,
collection_name=doc.collection_name,
extracted_data=doc.get_extracted_data(),
extraction_timestamp=doc.extraction_timestamp
)
for doc in docs
]
return ExtractedDataListResponse(
total=len(documents),
documents=documents
)
except Exception as e:
logger.error(f"Error obteniendo datos extraídos por colección: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -66,6 +66,8 @@ class ChunkingService:
"""
Descarga un PDF desde Azure Blob Storage.
NOTA: Todos los blobs se guardan en minúsculas en Azure.
Args:
file_name: Nombre del archivo
tema: Tema/carpeta del archivo
@@ -77,8 +79,9 @@ class ChunkingService:
Exception: Si hay error descargando el archivo
"""
try:
blob_path = f"{tema}/{file_name}"
logger.info(f"Descargando PDF: {blob_path}")
# Convertir a minúsculas ya que todos los blobs están en minúsculas
blob_path = f"{tema.lower()}/{file_name.lower()}"
logger.info(f"Descargando PDF: {blob_path} (tema original: {tema}, file original: {file_name})")
blob_client = self.blob_service.get_blob_client(
container=self.container_name,

View File

@@ -1,10 +1,12 @@
"""
Servicio de embeddings usando Azure OpenAI.
Genera embeddings para chunks de texto usando text-embedding-3-large (3072 dimensiones).
Incluye manejo de rate limits con retry exponencial y delays entre batches.
"""
import asyncio
import logging
from typing import List
from openai import AzureOpenAI
from openai import AzureOpenAI, RateLimitError
from ..core.config import settings
logger = logging.getLogger(__name__)
@@ -63,46 +65,89 @@ class EmbeddingService:
async def generate_embeddings_batch(
self,
texts: List[str],
batch_size: int = 100
batch_size: int | None = None,
delay_between_batches: float | None = None,
max_retries: int | None = None
) -> List[List[float]]:
"""
Genera embeddings para múltiples textos en lotes.
Genera embeddings para múltiples textos en lotes con manejo de rate limits.
Args:
texts: Lista de textos para generar embeddings
batch_size: Tamaño del lote para procesamiento (default: 100)
batch_size: Tamaño del lote (None = usar configuración de settings)
delay_between_batches: Segundos de espera entre batches (None = usar configuración)
max_retries: Número máximo de reintentos (None = usar configuración)
Returns:
Lista de vectores de embeddings
Raises:
Exception: Si hay error al generar los embeddings
Exception: Si hay error al generar los embeddings después de todos los reintentos
"""
# Usar configuración de settings si no se proporciona
batch_size = batch_size or settings.EMBEDDING_BATCH_SIZE
delay_between_batches = delay_between_batches or settings.EMBEDDING_DELAY_BETWEEN_BATCHES
max_retries = max_retries or settings.EMBEDDING_MAX_RETRIES
try:
embeddings = []
total_batches = (len(texts) - 1) // batch_size + 1
logger.info(f"Iniciando generación de embeddings: {len(texts)} textos en {total_batches} batches")
logger.info(f"Configuración: batch_size={batch_size}, delay={delay_between_batches}s, max_retries={max_retries}")
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
logger.info(f"Procesando lote {i//batch_size + 1}/{(len(texts)-1)//batch_size + 1}")
batch_num = i // batch_size + 1
response = self.client.embeddings.create(
input=batch,
model=self.model
)
logger.info(f"📊 Procesando batch {batch_num}/{total_batches} ({len(batch)} textos)...")
batch_embeddings = [item.embedding for item in response.data]
# Validar dimensiones
for idx, emb in enumerate(batch_embeddings):
if len(emb) != self.embedding_dimension:
raise ValueError(
f"Dimensión incorrecta en índice {i + idx}: "
f"esperada {self.embedding_dimension}, obtenida {len(emb)}"
# Retry con exponential backoff
retry_count = 0
while retry_count <= max_retries:
try:
response = self.client.embeddings.create(
input=batch,
model=self.model
)
embeddings.extend(batch_embeddings)
batch_embeddings = [item.embedding for item in response.data]
logger.info(f"Generados {len(embeddings)} embeddings exitosamente")
# Validar dimensiones
for idx, emb in enumerate(batch_embeddings):
if len(emb) != self.embedding_dimension:
raise ValueError(
f"Dimensión incorrecta en índice {i + idx}: "
f"esperada {self.embedding_dimension}, obtenida {len(emb)}"
)
embeddings.extend(batch_embeddings)
logger.info(f"✓ Batch {batch_num}/{total_batches} completado exitosamente")
break # Éxito, salir del retry loop
except RateLimitError as e:
retry_count += 1
if retry_count > max_retries:
logger.error(f"❌ Rate limit excedido después de {max_retries} reintentos")
raise
# Exponential backoff: 2^retry_count segundos
wait_time = 2 ** retry_count
logger.warning(
f"⚠️ Rate limit alcanzado en batch {batch_num}/{total_batches}. "
f"Reintento {retry_count}/{max_retries} en {wait_time}s..."
)
await asyncio.sleep(wait_time)
except Exception as e:
logger.error(f"❌ Error en batch {batch_num}/{total_batches}: {e}")
raise
# Delay entre batches para respetar rate limit (excepto en el último)
if i + batch_size < len(texts):
await asyncio.sleep(delay_between_batches)
logger.info(f"✅ Embeddings generados exitosamente: {len(embeddings)} vectores de {self.embedding_dimension}D")
return embeddings
except Exception as e:

View File

@@ -0,0 +1,131 @@
"""
Servicio para manejar el almacenamiento de datos extraídos en Redis.
"""
import logging
from datetime import datetime
from typing import Dict, Any, List, Optional
from ..models.extracted_data import ExtractedDocument
logger = logging.getLogger(__name__)
class ExtractedDataService:
"""Servicio para guardar y recuperar datos extraídos de documentos"""
async def save_extracted_data(
self,
file_name: str,
tema: str,
collection_name: str,
extracted_data: Dict[str, Any]
) -> ExtractedDocument:
"""
Guarda datos extraídos de un documento en Redis.
Args:
file_name: Nombre del archivo
tema: Tema del documento
collection_name: Colección de Qdrant
extracted_data: Datos extraídos (dict)
Returns:
ExtractedDocument guardado
"""
try:
# Crear instancia del modelo
doc = ExtractedDocument(
file_name=file_name,
tema=tema,
collection_name=collection_name,
extracted_data_json="", # Se setea después
extraction_timestamp=datetime.utcnow().isoformat()
)
# Serializar datos extraídos
doc.set_extracted_data(extracted_data)
# Guardar en Redis
doc.save()
logger.info(
f"💾 Datos extraídos guardados en Redis: {file_name} "
f"({len(extracted_data)} campos)"
)
return doc
except Exception as e:
logger.error(f"Error guardando datos extraídos en Redis: {e}")
raise
async def get_by_file(self, file_name: str) -> List[ExtractedDocument]:
"""
Obtiene todos los documentos extraídos de un archivo.
Args:
file_name: Nombre del archivo
Returns:
Lista de ExtractedDocument
"""
try:
docs = ExtractedDocument.find_by_file(file_name)
logger.info(f"Encontrados {len(docs)} documentos extraídos para {file_name}")
return docs
except Exception as e:
logger.error(f"Error buscando documentos por archivo: {e}")
return []
async def get_by_tema(self, tema: str) -> List[ExtractedDocument]:
"""
Obtiene todos los documentos extraídos de un tema.
Args:
tema: Tema a buscar
Returns:
Lista de ExtractedDocument
"""
try:
docs = ExtractedDocument.find_by_tema(tema)
logger.info(f"Encontrados {len(docs)} documentos extraídos para tema {tema}")
return docs
except Exception as e:
logger.error(f"Error buscando documentos por tema: {e}")
return []
async def get_by_collection(self, collection_name: str) -> List[ExtractedDocument]:
"""
Obtiene todos los documentos de una colección.
Args:
collection_name: Nombre de la colección
Returns:
Lista de ExtractedDocument
"""
try:
docs = ExtractedDocument.find_by_collection(collection_name)
logger.info(f"Encontrados {len(docs)} documentos en colección {collection_name}")
return docs
except Exception as e:
logger.error(f"Error buscando documentos por colección: {e}")
return []
# Instancia global singleton
_extracted_data_service: Optional[ExtractedDataService] = None
def get_extracted_data_service() -> ExtractedDataService:
"""
Obtiene la instancia singleton del servicio.
Returns:
Instancia de ExtractedDataService
"""
global _extracted_data_service
if _extracted_data_service is None:
_extracted_data_service = ExtractedDataService()
return _extracted_data_service