""" LandingAI Service - Servicio independiente Maneja toda la interacción con LandingAI ADE API. Usa parse() para extracción de chunks y extract() para datos estructurados. """ import logging import tempfile from pathlib import Path from typing import Dict, Any, List, Optional from langchain_core.documents import Document from ..models.schema_models import CustomSchema from ..services.schema_builder_service import SchemaBuilderService logger = logging.getLogger(__name__) class LandingAIService: """ Servicio para procesamiento de PDFs con LandingAI. Flujo: 1. Parse PDF → obtener chunks estructurados + markdown 2. Extract (opcional) → extraer datos según schema personalizado 3. Process chunks → filtrar, enriquecer, controlar tokens 4. Return Documents → listos para embeddings y Qdrant """ def __init__(self, api_key: str, environment: str = "production"): """ Inicializa el servicio LandingAI. Args: api_key: API key de LandingAI environment: "production" o "eu" Raises: ImportError: Si landingai-ade no está instalado """ try: from landingai_ade import LandingAIADE self.client = LandingAIADE( apikey=api_key, environment=environment, timeout=480.0, # 8 minutos para PDFs grandes max_retries=2 ) self.schema_builder = SchemaBuilderService() logger.info(f"LandingAIService inicializado (environment: {environment})") except ImportError: logger.error("landingai-ade no está instalado") raise ImportError( "Se requiere landingai-ade. Instalar con: pip install landingai-ade" ) def process_pdf( self, pdf_bytes: bytes, file_name: str, custom_schema: Optional[CustomSchema] = None, include_chunk_types: Optional[List[str]] = None, model: str = "dpt-2-latest" ) -> Dict[str, Any]: """ Procesa un PDF con LandingAI (modo rápido o con extracción). Args: pdf_bytes: Contenido del PDF en bytes file_name: Nombre del archivo custom_schema: Schema personalizado para extract (None = modo rápido) include_chunk_types: Tipos de chunks a incluir ["text", "table", "figure"] model: Modelo de LandingAI a usar Returns: Dict con: - chunks: List[Document] listos para embeddings - parse_metadata: Metadata del parse (páginas, duración, etc.) - extracted_data: Datos extraídos (si usó schema) - file_name: Nombre del archivo Raises: Exception: Si hay error en parse o extract """ logger.info(f"=== Procesando PDF con LandingAI: {file_name} ===") logger.info(f" Modo: {'Extracción' if custom_schema else 'Rápido'}") logger.info(f" Tipos incluidos: {include_chunk_types or 'todos'}") # 1. Parse PDF parse_result = self._parse_pdf(pdf_bytes, file_name, model) # 2. Extract (si hay schema) extracted_data = None if custom_schema: logger.info(f" Extrayendo datos con schema: {custom_schema.schema_name}") extracted_data = self._extract_data( parse_result["markdown"], custom_schema ) # 3. Procesar chunks documents = self._process_chunks( parse_result, extracted_data, file_name, include_chunk_types ) logger.info(f"=== Procesamiento completado: {len(documents)} chunks ===") return { "chunks": documents, "parse_metadata": parse_result["metadata"], "extracted_data": extracted_data, "file_name": file_name } def _parse_pdf( self, pdf_bytes: bytes, file_name: str, model: str ) -> Dict[str, Any]: """ Parse PDF con LandingAI. Args: pdf_bytes: Contenido del PDF file_name: Nombre del archivo model: Modelo de LandingAI Returns: Dict con chunks, markdown, grounding y metadata """ logger.info(f" Parseando PDF con modelo {model}...") # LandingAI requiere Path, crear archivo temporal with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: tmp.write(pdf_bytes) tmp_path = Path(tmp.name) try: # Parse con LandingAI response = self.client.parse(document=tmp_path, model=model) # Procesar respuesta chunks_data = [] for chunk in response.chunks: # Obtener grounding info del chunk grounding_info = {} if hasattr(response, 'grounding') and hasattr(response.grounding, chunk.id): ground = getattr(response.grounding, chunk.id) grounding_info = { "bbox": ground.bbox if hasattr(ground, 'bbox') else None, "page": ground.page if hasattr(ground, 'page') else 1 } page_num = grounding_info.get("page", 1) if grounding_info else 1 chunks_data.append({ "id": chunk.id, "content": chunk.markdown, "type": chunk.type, "grounding": grounding_info, "page": page_num }) # Obtener metadata metadata_dict = {} if hasattr(response, 'metadata'): metadata_dict = { "page_count": getattr(response.metadata, 'page_count', None), "duration_ms": getattr(response.metadata, 'duration_ms', None), "version": getattr(response.metadata, 'version', None) } logger.info( f" Parse completado: {len(chunks_data)} chunks, " f"{metadata_dict.get('page_count', 'N/A')} páginas" ) return { "chunks": chunks_data, "markdown": response.markdown, "grounding": response.grounding, "metadata": metadata_dict } finally: # Limpiar archivo temporal tmp_path.unlink(missing_ok=True) def _extract_data( self, markdown: str, custom_schema: CustomSchema ) -> Optional[Dict[str, Any]]: """ Extrae datos estructurados del markdown usando schema personalizado. Args: markdown: Markdown completo del documento custom_schema: Schema personalizado Returns: Dict con extraction, extraction_metadata y schema_used None si hay error """ try: # 1. Construir Pydantic schema pydantic_schema = self.schema_builder.build_pydantic_schema(custom_schema) # 2. Convertir a JSON schema json_schema = self.schema_builder.to_json_schema(pydantic_schema) # 3. Crear archivo temporal con markdown with tempfile.NamedTemporaryFile( mode='w', suffix=".md", delete=False, encoding='utf-8' ) as tmp: tmp.write(markdown) tmp_path = Path(tmp.name) try: # 4. Extract con LandingAI response = self.client.extract( schema=json_schema, markdown=tmp_path ) logger.info(f" Extracción completada: {len(response.extraction)} campos") return { "extraction": response.extraction, "extraction_metadata": response.extraction_metadata, "schema_used": custom_schema.schema_id } finally: tmp_path.unlink(missing_ok=True) except Exception as e: logger.error(f"Error en extract: {e}") return None def _process_chunks( self, parse_result: Dict[str, Any], extracted_data: Optional[Dict[str, Any]], file_name: str, include_chunk_types: Optional[List[str]] ) -> List[Document]: """ Convierte chunks de LandingAI a Documents de LangChain con metadata rica. Args: parse_result: Resultado del parse extracted_data: Datos extraídos (opcional) file_name: Nombre del archivo include_chunk_types: Tipos a incluir Returns: Lista de Documents listos para embeddings """ documents = [] filtered_count = 0 for chunk in parse_result["chunks"]: # Filtrar por tipo si se especificó if include_chunk_types and chunk["type"] not in include_chunk_types: filtered_count += 1 continue # Construir metadata rica metadata = { "file_name": file_name, "page": chunk["page"], "chunk_id": chunk["id"], "chunk_type": chunk["type"], "bbox": chunk["grounding"].get("bbox"), # Metadata del documento "document_metadata": { "page_count": parse_result["metadata"].get("page_count"), "processing_duration_ms": parse_result["metadata"].get("duration_ms"), "landingai_version": parse_result["metadata"].get("version"), } } # Agregar datos extraídos si existen if extracted_data: metadata["extracted_data"] = extracted_data["extraction"] metadata["extraction_metadata"] = extracted_data["extraction_metadata"] metadata["schema_used"] = extracted_data["schema_used"] # Crear Document doc = Document( page_content=chunk["content"], metadata=metadata ) documents.append(doc) if filtered_count > 0: logger.info(f" Filtrados {filtered_count} chunks por tipo") logger.info(f" Generados {len(documents)} documents") return documents # Singleton factory _landingai_service: Optional[LandingAIService] = None def get_landingai_service() -> LandingAIService: """ Factory para obtener instancia singleton del servicio. Returns: Instancia única de LandingAIService Raises: RuntimeError: Si la configuración no está disponible """ global _landingai_service if _landingai_service is None: try: from ..core.config import settings api_key = settings.LANDINGAI_API_KEY if not api_key: raise ValueError("LANDINGAI_API_KEY no está configurada") environment = getattr(settings, 'LANDINGAI_ENVIRONMENT', 'production') _landingai_service = LandingAIService( api_key=api_key, environment=environment ) logger.info("LandingAIService singleton inicializado") except Exception as e: logger.error(f"Error inicializando LandingAIService: {e}") raise RuntimeError(f"No se pudo inicializar LandingAIService: {str(e)}") return _landingai_service