"""Conversation manager service for orchestrating user conversations.""" import logging import re from datetime import UTC, datetime, timedelta from uuid import uuid4 from capa_de_integracion.config import Settings from capa_de_integracion.models import ( ConversationEntry, ConversationRequest, ConversationSession, DetectIntentResponse, QueryResult, ) from capa_de_integracion.models.notification import NotificationSession from capa_de_integracion.services.dlp import DLPService from capa_de_integracion.services.quick_reply.content import QuickReplyContentService from capa_de_integracion.services.rag import RAGServiceBase from capa_de_integracion.services.storage.firestore import FirestoreService from capa_de_integracion.services.storage.redis import RedisService logger = logging.getLogger(__name__) MSG_EMPTY_MESSAGE = "Message cannot be empty" class ConversationManagerService: """Central orchestrator for managing user conversations.""" SESSION_RESET_THRESHOLD_MINUTES = 30 SCREEN_CONTEXT_TIMEOUT_MINUTES = 10 CONV_HISTORY_PARAM = "conversation_history" HISTORY_PARAM = "historial" def __init__( self, settings: Settings, rag_service: RAGServiceBase, redis_service: RedisService, firestore_service: FirestoreService, dlp_service: DLPService, ) -> None: """Initialize conversation manager.""" self.settings = settings self.rag_service = rag_service self.redis_service = redis_service self.firestore_service = firestore_service self.dlp_service = dlp_service self.quick_reply_service = QuickReplyContentService(settings) logger.info("ConversationManagerService initialized successfully") def _validate_message(self, mensaje: str) -> None: """Validate message is not empty. Args: mensaje: Message text to validate Raises: ValueError: If message is empty or whitespace """ if not mensaje or not mensaje.strip(): raise ValueError(MSG_EMPTY_MESSAGE) async def manage_conversation( self, request: ConversationRequest, ) -> DetectIntentResponse: """Manage conversation flow and return response. Orchestrates: 1. Validation 2. Security (DLP obfuscation) 3. Session management 4. Quick reply path (if applicable) 5. Standard RAG path (fallback) Args: request: External conversation request from client Returns: Detect intent response from Dialogflow """ try: # Step 1: Validate message is not empty self._validate_message(request.mensaje) # Step 2: Apply DLP security obfuscated_message = await self.dlp_service.get_obfuscated_string( request.mensaje, self.settings.dlp_template_complete_flow, ) request.mensaje = obfuscated_message telefono = request.usuario.telefono # Step 3: Obtain or create session session = await self._obtain_or_create_session(telefono) # Step 4: Try quick reply path first response = await self._handle_quick_reply_path(request, session) if response: return response # Step 5: Fall through to standard conversation path return await self._handle_standard_conversation(request, session) except Exception: logger.exception("Error managing conversation") raise async def _obtain_or_create_session(self, telefono: str) -> ConversationSession: """Get existing session or create new one. Checks Redis → Firestore → Creates new session with auto-caching. Args: telefono: User phone number Returns: ConversationSession instance """ # Try Redis first session = await self.redis_service.get_session(telefono) if session: return session # Try Firestore if Redis miss session = await self.firestore_service.get_session_by_phone(telefono) if session: return session # Create new session if both miss session_id = str(uuid4()) user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" session = await self.firestore_service.create_session( session_id, user_id, telefono, ) # Auto-cache to Redis await self.redis_service.save_session(session) return session async def _save_conversation_turn( self, session_id: str, user_text: str, assistant_text: str, entry_type: str, canal: str | None = None, ) -> None: """Save user and assistant messages to Firestore. Args: session_id: Session identifier user_text: User message text assistant_text: Assistant response text entry_type: Type of conversation entry ("CONVERSACION" or "LLM") canal: Communication channel """ # Save user entry user_entry = ConversationEntry( entity="user", type=entry_type, timestamp=datetime.now(UTC), text=user_text, parameters=None, canal=canal, ) await self.firestore_service.save_entry(session_id, user_entry) # Save assistant entry assistant_entry = ConversationEntry( entity="assistant", type=entry_type, timestamp=datetime.now(UTC), text=assistant_text, parameters=None, canal=canal, ) await self.firestore_service.save_entry(session_id, assistant_entry) async def _update_session_after_turn( self, session: ConversationSession, last_message: str, ) -> None: """Update session metadata and sync to storage. Updates last_message, last_modified timestamp, and saves to both Firestore and Redis for dual-storage consistency. Args: session: Session to update (modified in place) last_message: Latest message text """ session.last_message = last_message session.last_modified = datetime.now(UTC) await self.firestore_service.save_session(session) await self.redis_service.save_session(session) async def _handle_quick_reply_path( self, request: ConversationRequest, session: ConversationSession, ) -> DetectIntentResponse | None: """Handle conversation when pantalla_contexto is active and valid. Args: request: User conversation request session: Current conversation session Returns: DetectIntentResponse if handled, None if fall through to standard path """ # Check if pantalla_contexto exists if not session.pantalla_contexto: return None # Check if pantalla_contexto is stale if not self._is_pantalla_context_valid(session.last_modified): logger.info( "Detected STALE 'pantallaContexto'. " "Ignoring and proceeding with normal flow.", ) return None logger.info( "Detected 'pantallaContexto' in session: %s. " "Delegating to QuickReplies flow.", session.pantalla_contexto, ) response = await self._manage_quick_reply_conversation( request, session.pantalla_contexto, ) if not response: return None # Extract response text response_text = ( response.query_result.response_text if response.query_result else "" ) or "" # Save conversation turn await self._save_conversation_turn( session_id=session.session_id, user_text=request.mensaje, assistant_text=response_text, entry_type="CONVERSACION", canal=getattr(request, "canal", None), ) # Update session await self._update_session_after_turn(session, response_text) return response async def _handle_standard_conversation( self, request: ConversationRequest, session: ConversationSession, ) -> DetectIntentResponse: """Handle standard RAG-based conversation flow. Loads history, notifications, queries RAG service, and persists results. Args: request: User conversation request session: Current conversation session Returns: DetectIntentResponse with RAG response """ telefono = request.usuario.telefono nickname = request.usuario.nickname logger.info( "Primary Check (Redis): Looking up session for phone: %s", telefono, ) # Load conversation history only if session is older than threshold # (optimization: new/recent sessions don't need history context) session_age = datetime.now(UTC) - session.created_at if session_age > timedelta(minutes=self.SESSION_RESET_THRESHOLD_MINUTES): entries = await self.firestore_service.get_entries( session.session_id, limit=self.settings.conversation_context_message_limit, ) logger.info( "Session is %s minutes old. Loaded %s conversation entries.", session_age.total_seconds() / 60, len(entries), ) else: entries = [] logger.info( "Session is only %s minutes old. Skipping history load.", session_age.total_seconds() / 60, ) # Retrieve active notifications for this user notifications = await self._get_active_notifications(telefono) logger.info("Retrieved %s active notifications", len(notifications)) # Prepare current user message messages = await self._prepare_rag_messages(request.mensaje) # Extract notification texts for RAG notification_texts = ( [n.texto for n in notifications if n.texto and n.texto.strip()] if notifications else None ) # Format conversation history for RAG conversation_history = ( self._format_conversation_history(session, entries) if entries else None ) # Query RAG service with separated fields logger.info("Sending query to RAG service") assistant_response = await self.rag_service.query( messages=messages, notifications=notification_texts, conversation_history=conversation_history, user_nickname=nickname or None, ) logger.info( "Received response from RAG service: %s...", assistant_response[:100], ) # Save conversation turn await self._save_conversation_turn( session_id=session.session_id, user_text=request.mensaje, assistant_text=assistant_response, entry_type="LLM", canal=getattr(request, "canal", None), ) logger.info("Saved user message and assistant response to Firestore") # Update session await self._update_session_after_turn(session, assistant_response) logger.info("Updated session in Firestore and Redis") # Mark notifications as processed if any were included if notifications: await self._mark_notifications_as_processed(telefono) logger.info("Marked %s notifications as processed", len(notifications)) # Return response object return DetectIntentResponse( responseId=str(uuid4()), queryResult=QueryResult( responseText=assistant_response, parameters=None, ), quick_replies=None, ) def _is_pantalla_context_valid(self, last_modified: datetime) -> bool: """Check if pantallaContexto is still valid (not stale).""" time_diff = datetime.now(UTC) - last_modified return time_diff < timedelta(minutes=self.SCREEN_CONTEXT_TIMEOUT_MINUTES) async def _manage_quick_reply_conversation( self, request: ConversationRequest, screen_id: str, ) -> DetectIntentResponse | None: """Handle conversation within Quick Replies context.""" quick_reply_screen = await self.quick_reply_service.get_quick_replies(screen_id) # If no questions available, delegate to normal conversation flow if not quick_reply_screen.preguntas: logger.warning("No quick replies found for screen: %s.", screen_id) return None # Match user message to a quick reply question user_message_lower = request.mensaje.lower().strip() matched_answer = None for pregunta in quick_reply_screen.preguntas: # Simple matching: check if question title matches user message if pregunta.titulo.lower().strip() == user_message_lower: matched_answer = pregunta.respuesta logger.info("Matched quick reply: %s", pregunta.titulo) break # If no match, delegate to normal flow if not matched_answer: logger.warning( "No matching quick reply found for message: '%s'. Falling back to RAG.", request.mensaje, ) return None # Create response with the matched quick reply answer return DetectIntentResponse( responseId=str(uuid4()), queryResult=QueryResult(responseText=matched_answer, parameters=None), quick_replies=quick_reply_screen, ) async def _get_active_notifications(self, telefono: str) -> list: """Retrieve active notifications for a user from Redis or Firestore. Args: telefono: User phone number Returns: List of active Notification objects """ try: # Try Redis first notification_session = await self.redis_service.get_notification_session( telefono, ) # If not in Redis, try Firestore if not notification_session: # Firestore uses phone as document ID for notifications doc_ref = self.firestore_service.db.collection( self.firestore_service.notifications_collection, ).document(telefono) doc = await doc_ref.get() if doc.exists: data = doc.to_dict() notification_session = NotificationSession.model_validate(data) # Filter for active notifications only if notification_session and notification_session.notificaciones: active_notifications = [ notif for notif in notification_session.notificaciones if notif.status == "active" ] else: active_notifications = [] except Exception: logger.exception("Error retrieving notifications for %s", telefono) return [] else: return active_notifications async def _prepare_rag_messages( self, user_message: str, ) -> list[dict[str, str]]: """Prepare current user message for RAG service. Args: user_message: Current user message Returns: List with single user message """ # Only include the current user message - no system messages return [{"role": "user", "content": user_message}] async def _mark_notifications_as_processed(self, telefono: str) -> None: """Mark all notifications for a user as processed. Args: telefono: User phone number """ try: # Update status in Firestore await self.firestore_service.update_notification_status( telefono, "processed", ) # Update or delete from Redis await self.redis_service.delete_notification_session(telefono) logger.info("Marked notifications as processed for %s", telefono) except Exception: logger.exception( "Error marking notifications as processed for %s", telefono, ) def _format_conversation_history( self, session: ConversationSession, # noqa: ARG002 entries: list[ConversationEntry], ) -> str: """Format conversation history with business rule limits. Applies limits: - Date: 30 days maximum - Count: 60 messages maximum - Size: 50KB maximum Args: session: Conversation session entries: List of conversation entries Returns: Formatted conversation text """ if not entries: return "" # Filter by date (30 days) cutoff_date = datetime.now(UTC) - timedelta( days=self.settings.conversation_context_days_limit, ) recent_entries = [ e for e in entries if e.timestamp and e.timestamp >= cutoff_date ] # Sort by timestamp (oldest first) and limit count recent_entries.sort(key=lambda e: e.timestamp) limited_entries = recent_entries[ -self.settings.conversation_context_message_limit : ] # Format with size truncation (50KB) return self._format_entries_with_size_limit(limited_entries) def _format_entries_with_size_limit(self, entries: list[ConversationEntry]) -> str: """Format entries with 50KB size limit. Builds from newest to oldest, stopping at size limit. Args: entries: List of conversation entries Returns: Formatted text, truncated if necessary """ if not entries: return "" max_bytes = 50 * 1024 # 50KB formatted_messages = [self._format_entry(entry) for entry in entries] # Build from newest to oldest text_block = [] current_size = 0 for message in reversed(formatted_messages): message_line = message + "\n" message_bytes = len(message_line.encode("utf-8")) if current_size + message_bytes > max_bytes: break text_block.insert(0, message_line) current_size += message_bytes return "".join(text_block).strip() def _format_entry(self, entry: ConversationEntry) -> str: """Format a single conversation entry. Args: entry: Conversation entry Returns: Formatted string (e.g., "User: hello", "Assistant: hi there") """ # Map entity to prefix (fixed bug from Java port!) prefix = "User: " if entry.entity == "user" else "Assistant: " # Clean content if needed content = entry.text if entry.entity == "assistant": # Remove trailing JSON artifacts like {...} content = re.sub(r"\s*\{.*\}\s*$", "", content).strip() return prefix + content