"""Conversation manager service for orchestrating user conversations.""" import logging import re from datetime import UTC, datetime, timedelta from uuid import uuid4 from capa_de_integracion.config import Settings from capa_de_integracion.models import ( ConversationEntry, ConversationRequest, ConversationSession, DetectIntentResponse, QueryResult, ) from capa_de_integracion.models.notification import NotificationSession from capa_de_integracion.services.rag import RAGServiceBase from .dlp_service import DLPService from .firestore_service import FirestoreService from .quick_reply_content import QuickReplyContentService from .redis_service import RedisService logger = logging.getLogger(__name__) class ConversationManagerService: """Central orchestrator for managing user conversations.""" SESSION_RESET_THRESHOLD_MINUTES = 30 SCREEN_CONTEXT_TIMEOUT_MINUTES = 10 CONV_HISTORY_PARAM = "conversation_history" HISTORY_PARAM = "historial" def __init__( self, settings: Settings, rag_service: RAGServiceBase, redis_service: RedisService, firestore_service: FirestoreService, dlp_service: DLPService, ) -> None: """Initialize conversation manager.""" self.settings = settings self.rag_service = rag_service self.redis_service = redis_service self.firestore_service = firestore_service self.dlp_service = dlp_service self.quick_reply_service = QuickReplyContentService(settings) logger.info("ConversationManagerService initialized successfully") async def manage_conversation( # noqa: PLR0915 self, request: ConversationRequest, ) -> DetectIntentResponse: """Manage conversation flow and return response. Args: request: External conversation request from client Returns: Detect intent response from Dialogflow """ try: # Step 1: DLP obfuscation obfuscated_message = await self.dlp_service.get_obfuscated_string( request.mensaje, self.settings.dlp_template_complete_flow, ) request.mensaje = obfuscated_message telefono = request.usuario.telefono # Step 2. Fetch session in Redis -> Firestore -> Create new session session = await self.redis_service.get_session(telefono) if not session: session = await self.firestore_service.get_session_by_phone(telefono) if not session: session_id = str(uuid4()) user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" session = await self.firestore_service.create_session( session_id, user_id, telefono, ) await self.redis_service.save_session(session) # Step 2: Check for pantallaContexto in existing session if session.pantalla_contexto: # Check if pantallaContexto is stale (10 minutes) if self._is_pantalla_context_valid(session.last_modified): logger.info( "Detected 'pantallaContexto' in session: %s. " "Delegating to QuickReplies flow.", session.pantalla_contexto, ) response = await self._manage_quick_reply_conversation( request, session.pantalla_contexto, ) if response: # Save user message to Firestore user_entry = ConversationEntry( entity="user", type="CONVERSACION", timestamp=datetime.now(UTC), text=request.mensaje, parameters=None, canal=getattr(request, "canal", None), ) await self.firestore_service.save_entry( session.session_id, user_entry, ) # Save quick reply response to Firestore response_text = ( response.query_result.response_text if response.query_result else "" ) or "" assistant_entry = ConversationEntry( entity="assistant", type="CONVERSACION", timestamp=datetime.now(UTC), text=response_text, parameters=None, canal=getattr(request, "canal", None), ) await self.firestore_service.save_entry( session.session_id, assistant_entry, ) # Update session with last message and timestamp session.last_message = response_text session.last_modified = datetime.now(UTC) await self.firestore_service.save_session(session) await self.redis_service.save_session(session) return response else: logger.info( "Detected STALE 'pantallaContexto'. " "Ignoring and proceeding with normal flow.", ) # Step 3: Continue with standard conversation flow nickname = request.usuario.nickname logger.info( "Primary Check (Redis): Looking up session for phone: %s", telefono, ) # Step 3a: Load conversation history from Firestore entries = await self.firestore_service.get_entries( session.session_id, limit=self.settings.conversation_context_message_limit, ) logger.info("Loaded %s conversation entries from Firestore", len(entries)) # Step 3b: Retrieve active notifications for this user notifications = await self._get_active_notifications(telefono) logger.info("Retrieved %s active notifications", len(notifications)) # Step 3c: Prepare context for RAG service messages = await self._prepare_rag_messages( session=session, entries=entries, notifications=notifications, user_message=request.mensaje, nickname=nickname or "Usuario", ) # Step 3d: Query RAG service logger.info("Sending query to RAG service") assistant_response = await self.rag_service.query(messages) logger.info( "Received response from RAG service: %s...", assistant_response[:100], ) # Step 3e: Save user message to Firestore user_entry = ConversationEntry( entity="user", type="CONVERSACION", timestamp=datetime.now(UTC), text=request.mensaje, parameters=None, canal=getattr(request, "canal", None), ) await self.firestore_service.save_entry(session.session_id, user_entry) logger.info("Saved user message to Firestore") # Step 3f: Save assistant response to Firestore assistant_entry = ConversationEntry( entity="assistant", type="LLM", timestamp=datetime.now(UTC), text=assistant_response, parameters=None, canal=getattr(request, "canal", None), ) await self.firestore_service.save_entry(session.session_id, assistant_entry) logger.info("Saved assistant response to Firestore") # Step 3g: Update session with last message and timestamp session.last_message = assistant_response session.last_modified = datetime.now(UTC) await self.firestore_service.save_session(session) await self.redis_service.save_session(session) logger.info("Updated session in Firestore and Redis") # Step 3h: Mark notifications as processed if any were included if notifications: await self._mark_notifications_as_processed(telefono) logger.info("Marked %s notifications as processed", len(notifications)) # Step 3i: Return response object return DetectIntentResponse( responseId=str(uuid4()), queryResult=QueryResult( responseText=assistant_response, parameters=None, ), quick_replies=None, ) except Exception: logger.exception("Error managing conversation") raise def _is_pantalla_context_valid(self, last_modified: datetime) -> bool: """Check if pantallaContexto is still valid (not stale).""" time_diff = datetime.now(UTC) - last_modified return time_diff < timedelta(minutes=self.SCREEN_CONTEXT_TIMEOUT_MINUTES) async def _manage_quick_reply_conversation( self, request: ConversationRequest, screen_id: str, ) -> DetectIntentResponse | None: """Handle conversation within Quick Replies context.""" quick_reply_screen = await self.quick_reply_service.get_quick_replies(screen_id) # If no questions available, delegate to normal conversation flow if not quick_reply_screen.preguntas: logger.warning("No quick replies found for screen: %s.", screen_id) return None # Match user message to a quick reply question user_message_lower = request.mensaje.lower().strip() matched_answer = None for pregunta in quick_reply_screen.preguntas: # Simple matching: check if question title matches user message if pregunta.titulo.lower().strip() == user_message_lower: matched_answer = pregunta.respuesta logger.info("Matched quick reply: %s", pregunta.titulo) break # If no match, use first question as default or delegate to normal flow if not matched_answer: logger.warning( "No matching quick reply found for message: '%s'.", request.mensaje, ) # Create response with the matched quick reply answer return DetectIntentResponse( responseId=str(uuid4()), queryResult=QueryResult(responseText=matched_answer, parameters=None), quick_replies=quick_reply_screen, ) async def _get_active_notifications(self, telefono: str) -> list: """Retrieve active notifications for a user from Redis or Firestore. Args: telefono: User phone number Returns: List of active Notification objects """ try: # Try Redis first notification_session = await self.redis_service.get_notification_session( telefono, ) # If not in Redis, try Firestore if not notification_session: # Firestore uses phone as document ID for notifications doc_ref = self.firestore_service.db.collection( self.firestore_service.notifications_collection, ).document(telefono) doc = await doc_ref.get() if doc.exists: data = doc.to_dict() notification_session = NotificationSession.model_validate(data) # Filter for active notifications only if notification_session and notification_session.notificaciones: active_notifications = [ notif for notif in notification_session.notificaciones if notif.status == "active" ] else: active_notifications = [] except Exception: logger.exception("Error retrieving notifications for %s", telefono) return [] else: return active_notifications async def _prepare_rag_messages( self, session: ConversationSession, entries: list[ConversationEntry], notifications: list, user_message: str, nickname: str, ) -> list[dict[str, str]]: """Prepare messages in OpenAI format for RAG service. Args: session: Current conversation session entries: Conversation history entries notifications: Active notifications user_message: Current user message nickname: User's nickname Returns: List of messages in OpenAI format [{"role": "...", "content": "..."}] """ messages = [] # Add system message with conversation history if available if entries: conversation_context = self._format_conversation_history(session, entries) if conversation_context: messages.append( { "role": "system", "content": ( f"Historial de conversación:\n" f"{conversation_context}" ), }, ) # Add system message with notifications if available if notifications: # Simple Pythonic join - no mapper needed! notifications_text = "\n".join( n.texto for n in notifications if n.texto and n.texto.strip() ) if notifications_text: messages.append( { "role": "system", "content": ( f"Notificaciones pendientes para el usuario:\n" f"{notifications_text}" ), }, ) # Add system message with user context user_context = f"Usuario: {nickname}" if nickname else "Usuario anónimo" messages.append({"role": "system", "content": user_context}) # Add current user message messages.append({"role": "user", "content": user_message}) return messages async def _mark_notifications_as_processed(self, telefono: str) -> None: """Mark all notifications for a user as processed. Args: telefono: User phone number """ try: # Update status in Firestore await self.firestore_service.update_notification_status( telefono, "processed", ) # Update or delete from Redis await self.redis_service.delete_notification_session(telefono) logger.info("Marked notifications as processed for %s", telefono) except Exception: logger.exception( "Error marking notifications as processed for %s", telefono, ) def _format_conversation_history( self, session: ConversationSession, # noqa: ARG002 entries: list[ConversationEntry], ) -> str: """Format conversation history with business rule limits. Applies limits: - Date: 30 days maximum - Count: 60 messages maximum - Size: 50KB maximum Args: session: Conversation session entries: List of conversation entries Returns: Formatted conversation text """ if not entries: return "" # Filter by date (30 days) cutoff_date = datetime.now(UTC) - timedelta( days=self.settings.conversation_context_days_limit, ) recent_entries = [ e for e in entries if e.timestamp and e.timestamp >= cutoff_date ] # Sort by timestamp (oldest first) and limit count recent_entries.sort(key=lambda e: e.timestamp) limited_entries = recent_entries[ -self.settings.conversation_context_message_limit : ] # Format with size truncation (50KB) return self._format_entries_with_size_limit(limited_entries) def _format_entries_with_size_limit(self, entries: list[ConversationEntry]) -> str: """Format entries with 50KB size limit. Builds from newest to oldest, stopping at size limit. Args: entries: List of conversation entries Returns: Formatted text, truncated if necessary """ if not entries: return "" max_bytes = 50 * 1024 # 50KB formatted_messages = [self._format_entry(entry) for entry in entries] # Build from newest to oldest text_block = [] current_size = 0 for message in reversed(formatted_messages): message_line = message + "\n" message_bytes = len(message_line.encode("utf-8")) if current_size + message_bytes > max_bytes: break text_block.insert(0, message_line) current_size += message_bytes return "".join(text_block).strip() def _format_entry(self, entry: ConversationEntry) -> str: """Format a single conversation entry. Args: entry: Conversation entry Returns: Formatted string (e.g., "User: hello", "Assistant: hi there") """ # Map entity to prefix (fixed bug from Java port!) prefix = "User: " if entry.entity == "user" else "Assistant: " # Clean content if needed content = entry.text if entry.entity == "assistant": # Remove trailing JSON artifacts like {...} content = re.sub(r"\s*\{.*\}\s*$", "", content).strip() return prefix + content