diff --git a/src/capa_de_integracion/dependencies.py b/src/capa_de_integracion/dependencies.py index 02fa6b3..637d9ad 100644 --- a/src/capa_de_integracion/dependencies.py +++ b/src/capa_de_integracion/dependencies.py @@ -11,14 +11,12 @@ from .services.firestore_service import FirestoreService from .services.rag_service import RAGService - - - @lru_cache(maxsize=1) def get_redis_service() -> RedisService: """Get Redis service instance.""" return RedisService(settings) + @lru_cache(maxsize=1) def get_firestore_service() -> FirestoreService: """Get Firestore service instance.""" @@ -30,11 +28,13 @@ def get_dlp_service() -> DLPService: """Get DLP service instance.""" return DLPService(settings) + @lru_cache(maxsize=1) def get_quick_reply_content_service() -> QuickReplyContentService: """Get quick reply content service instance.""" return QuickReplyContentService(settings) + @lru_cache(maxsize=1) def get_notification_manager() -> NotificationManagerService: """Get notification manager instance.""" @@ -45,11 +45,13 @@ def get_notification_manager() -> NotificationManagerService: dlp_service=get_dlp_service(), ) + @lru_cache(maxsize=1) def get_rag_service() -> RAGService: """Get RAG service instance.""" return RAGService(settings) + @lru_cache(maxsize=1) def get_conversation_manager() -> ConversationManagerService: """Get conversation manager instance.""" @@ -60,3 +62,38 @@ def get_conversation_manager() -> ConversationManagerService: dlp_service=get_dlp_service(), rag_service=get_rag_service(), ) + + +# Lifecycle management functions + + +def init_services(settings) -> None: + """Initialize services (placeholder for compatibility).""" + # Services are lazy-loaded via lru_cache, no explicit init needed + pass + + +async def startup_services() -> None: + """Connect to external services on startup.""" + # Connect to Redis + redis = get_redis_service() + await redis.connect() + + +async def shutdown_services() -> None: + """Close all service connections on shutdown.""" + # Close Redis + redis = get_redis_service() + await redis.close() + + # Close Firestore + firestore = get_firestore_service() + await firestore.close() + + # Close DLP + dlp = get_dlp_service() + await dlp.close() + + # Close RAG + rag = get_rag_service() + await rag.close() diff --git a/src/capa_de_integracion/main.py b/src/capa_de_integracion/main.py index 85f8977..ade1974 100644 --- a/src/capa_de_integracion/main.py +++ b/src/capa_de_integracion/main.py @@ -57,6 +57,7 @@ app.include_router(conversation_router) app.include_router(notification_router) app.include_router(quick_replies_router) + @app.get("/health") async def health_check(): """Health check endpoint.""" diff --git a/src/capa_de_integracion/models/__init__.py b/src/capa_de_integracion/models/__init__.py index 999acb8..1661a8d 100644 --- a/src/capa_de_integracion/models/__init__.py +++ b/src/capa_de_integracion/models/__init__.py @@ -1,17 +1,12 @@ """Data models module.""" from .conversation import ( - ConversationSessionDTO, - ConversationEntryDTO, - ConversationMessageDTO, - ExternalConvRequestDTO, - DetectIntentRequestDTO, - DetectIntentResponseDTO, - QueryInputDTO, - TextInputDTO, - EventInputDTO, - QueryParamsDTO, - QueryResultDTO, + User, + ConversationSession, + ConversationEntry, + ConversationRequest, + DetectIntentResponse, + QueryResult, ) from .notification import ( ExternalNotificationRequest, @@ -21,17 +16,12 @@ from .notification import ( __all__ = [ # Conversation - "ConversationSessionDTO", - "ConversationEntryDTO", - "ConversationMessageDTO", - "ExternalConvRequestDTO", - "DetectIntentRequestDTO", - "DetectIntentResponseDTO", - "QueryInputDTO", - "TextInputDTO", - "EventInputDTO", - "QueryParamsDTO", - "QueryResultDTO", + "User", + "ConversationSession", + "ConversationEntry", + "ConversationRequest", + "DetectIntentResponse", + "QueryResult", # Notification "ExternalNotificationRequest", "NotificationSession", diff --git a/src/capa_de_integracion/models/conversation.py b/src/capa_de_integracion/models/conversation.py index a8a8b63..a2dbfde 100644 --- a/src/capa_de_integracion/models/conversation.py +++ b/src/capa_de_integracion/models/conversation.py @@ -1,60 +1,16 @@ from datetime import datetime from typing import Any, Literal -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, Field - -class UsuarioDTO(BaseModel): +class User(BaseModel): """User information.""" telefono: str = Field(..., min_length=1) nickname: str | None = None -class TextInputDTO(BaseModel): - """Text input for queries.""" - - text: str - - -class EventInputDTO(BaseModel): - """Event input for queries.""" - - event: str - - -class QueryParamsDTO(BaseModel): - """Query parameters for Dialogflow requests.""" - - parameters: dict[str, Any] | None = None - - -class QueryInputDTO(BaseModel): - """Query input combining text or event.""" - - text: TextInputDTO | None = None - event: EventInputDTO | None = None - language_code: str = "es" - - @field_validator("text", "event") - @classmethod - def check_at_least_one(cls, v, info): - """Ensure either text or event is provided.""" - if info.field_name == "event" and v is None: - # Check if text was provided - if not info.data.get("text"): - raise ValueError("Either text or event must be provided") - return v - - -class DetectIntentRequestDTO(BaseModel): - """Dialogflow detect intent request.""" - - query_input: QueryInputDTO - query_params: QueryParamsDTO | None = None - - -class QueryResultDTO(BaseModel): +class QueryResult(BaseModel): """Query result from Dialogflow.""" responseText: str | None = Field(None, alias="responseText") @@ -63,43 +19,31 @@ class QueryResultDTO(BaseModel): model_config = {"populate_by_name": True} -class DetectIntentResponseDTO(BaseModel): +class DetectIntentResponse(BaseModel): """Dialogflow detect intent response.""" responseId: str | None = Field(None, alias="responseId") - queryResult: QueryResultDTO | None = Field(None, alias="queryResult") - quick_replies: Any | None = None # QuickReplyDTO from quick_replies module + queryResult: QueryResult | None = Field(None, alias="queryResult") + quick_replies: Any | None = None # QuickReplyScreen from quick_replies module model_config = {"populate_by_name": True} -class ExternalConvRequestDTO(BaseModel): +class ConversationRequest(BaseModel): """External conversation request from client.""" mensaje: str = Field(..., alias="mensaje") - usuario: UsuarioDTO = Field(..., alias="usuario") + usuario: User = Field(..., alias="usuario") canal: str = Field(..., alias="canal") pantalla_contexto: str | None = Field(None, alias="pantallaContexto") model_config = {"populate_by_name": True} -class ConversationMessageDTO(BaseModel): - """Single conversation message.""" - - type: str = Field(..., alias="type") # Maps to MessageType - timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp") - text: str = Field(..., alias="text") - parameters: dict[str, Any] | None = Field(None, alias="parameters") - canal: str | None = Field(None, alias="canal") - - model_config = {"populate_by_name": True} - - -class ConversationEntryDTO(BaseModel): +class ConversationEntry(BaseModel): """Single conversation entry.""" - entity: Literal['user', 'assistant'] + entity: Literal["user", "assistant"] type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM" timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp") text: str = Field(..., alias="text") @@ -109,7 +53,7 @@ class ConversationEntryDTO(BaseModel): model_config = {"populate_by_name": True} -class ConversationSessionDTO(BaseModel): +class ConversationSession(BaseModel): """Conversation session metadata.""" sessionId: str = Field(..., alias="sessionId") @@ -130,7 +74,7 @@ class ConversationSessionDTO(BaseModel): telefono: str, pantalla_contexto: str | None = None, last_message: str | None = None, - ) -> "ConversationSessionDTO": + ) -> "ConversationSession": """Create a new conversation session.""" now = datetime.now() return cls( @@ -142,15 +86,3 @@ class ConversationSessionDTO(BaseModel): pantallaContexto=pantalla_contexto, lastMessage=last_message, ) - - def with_last_message(self, last_message: str) -> "ConversationSessionDTO": - """Create copy with updated last message.""" - return self.model_copy( - update={"lastMessage": last_message, "lastModified": datetime.now()} - ) - - def with_pantalla_contexto( - self, pantalla_contexto: str - ) -> "ConversationSessionDTO": - """Create copy with updated pantalla contexto.""" - return self.model_copy(update={"pantallaContexto": pantalla_contexto}) diff --git a/src/capa_de_integracion/routers/conversation.py b/src/capa_de_integracion/routers/conversation.py index ab41d56..4883621 100644 --- a/src/capa_de_integracion/routers/conversation.py +++ b/src/capa_de_integracion/routers/conversation.py @@ -1,7 +1,7 @@ import logging from fastapi import APIRouter, Depends, HTTPException -from ..models import ExternalConvRequestDTO, DetectIntentResponseDTO +from ..models import ConversationRequest, DetectIntentResponse from ..services import ConversationManagerService from ..dependencies import get_conversation_manager @@ -11,13 +11,13 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/dialogflow", tags=["conversation"]) -@router.post("/detect-intent", response_model=DetectIntentResponseDTO) +@router.post("/detect-intent", response_model=DetectIntentResponse) async def detect_intent( - request: ExternalConvRequestDTO, + request: ConversationRequest, conversation_manager: ConversationManagerService = Depends( get_conversation_manager ), -) -> DetectIntentResponseDTO: +) -> DetectIntentResponse: """ Detect user intent and manage conversation. diff --git a/src/capa_de_integracion/routers/quick_replies.py b/src/capa_de_integracion/routers/quick_replies.py index 3d12092..8794e35 100644 --- a/src/capa_de_integracion/routers/quick_replies.py +++ b/src/capa_de_integracion/routers/quick_replies.py @@ -7,22 +7,29 @@ from ..models.quick_replies import QuickReplyScreen from ..services.quick_reply_content import QuickReplyContentService from ..services.redis_service import RedisService from ..services.firestore_service import FirestoreService -from ..dependencies import get_redis_service, get_firestore_service, get_quick_reply_content_service +from ..dependencies import ( + get_redis_service, + get_firestore_service, + get_quick_reply_content_service, +) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"]) + class QuickReplyUser(BaseModel): telefono: str nombre: str + class QuickReplyScreenRequest(BaseModel): usuario: QuickReplyUser pantallaContexto: str model_config = {"populate_by_name": True} + class QuickReplyScreenResponse(BaseModel): responseId: str quick_replies: QuickReplyScreen @@ -33,7 +40,9 @@ async def start_quick_reply_session( request: QuickReplyScreenRequest, redis_service: RedisService = Depends(get_redis_service), firestore_service: FirestoreService = Depends(get_firestore_service), - quick_reply_content_service: QuickReplyContentService = Depends(get_quick_reply_content_service) + quick_reply_content_service: QuickReplyContentService = Depends( + get_quick_reply_content_service + ), ) -> QuickReplyScreenResponse: """ Start a quick reply FAQ session for a specific screen. @@ -56,21 +65,30 @@ async def start_quick_reply_session( session = await firestore_service.get_session_by_phone(telefono) if session: session_id = session.sessionId - await firestore_service.update_pantalla_contexto(session_id, pantalla_contexto) + await firestore_service.update_pantalla_contexto( + session_id, pantalla_contexto + ) session.pantallaContexto = pantalla_contexto else: session_id = str(uuid4()) user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" - session = await firestore_service.create_session(session_id, user_id, telefono, pantalla_contexto) - + session = await firestore_service.create_session( + session_id, user_id, telefono, pantalla_contexto + ) # Cache session await redis_service.save_session(session) - logger.info(f"Created quick reply session {session_id} for screen: {pantalla_contexto}") + logger.info( + f"Created quick reply session {session_id} for screen: {pantalla_contexto}" + ) # Load quick replies - quick_replies = await quick_reply_content_service.get_quick_replies(pantalla_contexto) - return QuickReplyScreenResponse(responseId=session_id, quick_replies=quick_replies) + quick_replies = await quick_reply_content_service.get_quick_replies( + pantalla_contexto + ) + return QuickReplyScreenResponse( + responseId=session_id, quick_replies=quick_replies + ) except ValueError as e: logger.error(f"Validation error: {e}", exc_info=True) diff --git a/src/capa_de_integracion/services/__init__.py b/src/capa_de_integracion/services/__init__.py index b6ef73f..4953fab 100644 --- a/src/capa_de_integracion/services/__init__.py +++ b/src/capa_de_integracion/services/__init__.py @@ -4,13 +4,10 @@ from .conversation_manager import ConversationManagerService from .notification_manager import NotificationManagerService from .dlp_service import DLPService from .quick_reply_content import QuickReplyContentService -from .mappers import NotificationContextMapper, ConversationContextMapper __all__ = [ "QuickReplyContentService", "ConversationManagerService", "NotificationManagerService", "DLPService", - "NotificationContextMapper", - "ConversationContextMapper", ] diff --git a/src/capa_de_integracion/services/conversation_manager.py b/src/capa_de_integracion/services/conversation_manager.py index b7b3dcc..aa7894e 100644 --- a/src/capa_de_integracion/services/conversation_manager.py +++ b/src/capa_de_integracion/services/conversation_manager.py @@ -1,24 +1,19 @@ import logging -import uuid +from uuid import uuid4 from datetime import datetime, timedelta -from typing import Any from ..config import Settings from ..models import ( - ExternalConvRequestDTO, - DetectIntentRequestDTO, - DetectIntentResponseDTO, - ConversationSessionDTO, - ConversationEntryDTO, - QueryInputDTO, - TextInputDTO, - QueryParamsDTO, + ConversationRequest, + DetectIntentResponse, + QueryResult, + ConversationSession, + ConversationEntry, ) from .redis_service import RedisService from .firestore_service import FirestoreService from .dlp_service import DLPService from .rag_service import RAGService -from .mappers import NotificationContextMapper, ConversationContextMapper from .quick_reply_content import QuickReplyContentService @@ -26,22 +21,7 @@ logger = logging.getLogger(__name__) class ConversationManagerService: - """ - Central orchestrator for managing user conversations. - - Integrates Data Loss Prevention (DLP), message classification, routing based - on session context (pantallaContexto for quick replies), and hybrid AI logic - for notification-driven conversations. - - Routes traffic based on session context: - 1. If 'pantallaContexto' is present (not stale), delegates to QuickRepliesManagerService - 2. Otherwise, uses MessageEntryFilter (Gemini) to classify message: - a) CONVERSATION: Standard Dialogflow flow with conversation history - b) NOTIFICATION: Uses NotificationContextResolver (Gemini) to answer or delegate to Dialogflow - - All conversation turns are persisted using reactive write-back pattern: - Redis first (fast), then async to Firestore (persistent). - """ + """Central orchestrator for managing user conversations.""" SESSION_RESET_THRESHOLD_MINUTES = 30 SCREEN_CONTEXT_TIMEOUT_MINUTES = 10 @@ -62,32 +42,16 @@ class ConversationManagerService: self.redis_service = redis_service self.firestore_service = firestore_service self.dlp_service = dlp_service - - # Initialize mappers - self.notification_mapper = NotificationContextMapper() - self.conversation_mapper = ConversationContextMapper( - message_limit=settings.conversation_context_message_limit, - days_limit=settings.conversation_context_days_limit, - ) - - # Quick reply service self.quick_reply_service = QuickReplyContentService(settings) logger.info("ConversationManagerService initialized successfully") async def manage_conversation( - self, request: ExternalConvRequestDTO - ) -> DetectIntentResponseDTO: + self, request: ConversationRequest + ) -> DetectIntentResponse: """ Main entry point for managing conversations. - Flow: - 1. Obfuscate message with DLP - 2. Check for pantallaContexto (quick replies mode) - 3. If no pantallaContexto, continue with standard flow - 4. Classify message (CONVERSATION vs NOTIFICATION) - 5. Route to appropriate handler - Args: request: External conversation request from client @@ -101,743 +65,421 @@ class ConversationManagerService: self.settings.dlp_template_complete_flow, ) request.mensaje = obfuscated_message + telefono = request.usuario.telefono + + # Step 2. Fetch session in Redis -> Firestore -> Create new session + session = await self.redis_service.get_session(telefono) + if not session: + session = await self.firestore_service.get_session_by_phone(telefono) + if not session: + session_id = str(uuid4()) + user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" + session = await self.firestore_service.create_session( + session_id, user_id, telefono + ) + await self.redis_service.save_session(session) # Step 2: Check for pantallaContexto in existing session - existing_session = await self.redis_service.get_session(request.usuario.telefono) - - if existing_session and existing_session.pantallaContexto: + if session.pantallaContexto: # Check if pantallaContexto is stale (10 minutes) - if self._is_pantalla_context_valid(existing_session): + if self._is_pantalla_context_valid(session.lastModified): logger.info( - f"Detected 'pantallaContexto' in session: {existing_session.pantallaContexto}. " + f"Detected 'pantallaContexto' in session: {session.pantallaContexto}. " f"Delegating to QuickReplies flow." ) - return await self._manage_quick_reply_conversation(request, existing_session) + response = await self._manage_quick_reply_conversation( + request, session.pantallaContexto + ) + if response: + # Save user message to Firestore + user_entry = ConversationEntry( + entity="user", + type="CONVERSACION", + timestamp=datetime.now(), + text=request.mensaje, + parameters=None, + canal=getattr(request, "canal", None), + ) + await self.firestore_service.save_entry( + session.sessionId, user_entry + ) + + # Save quick reply response to Firestore + response_text = ( + response.queryResult.responseText + if response.queryResult + else "" + ) or "" + assistant_entry = ConversationEntry( + entity="assistant", + type="CONVERSACION", + timestamp=datetime.now(), + text=response_text, + parameters=None, + canal=getattr(request, "canal", None), + ) + await self.firestore_service.save_entry( + session.sessionId, assistant_entry + ) + + # Update session with last message and timestamp + session.lastMessage = response_text + session.lastModified = datetime.now() + await self.firestore_service.save_session(session) + await self.redis_service.save_session(session) + + return response else: logger.info( "Detected STALE 'pantallaContexto'. Ignoring and proceeding with normal flow." ) # Step 3: Continue with standard conversation flow - return await self._continue_managing_conversation(request) + nickname = request.usuario.nickname + + logger.info( + f"Primary Check (Redis): Looking up session for phone: {telefono}" + ) + + # Step 3a: Load conversation history from Firestore + entries = await self.firestore_service.get_entries( + session.sessionId, + limit=self.settings.conversation_context_message_limit, + ) + logger.info(f"Loaded {len(entries)} conversation entries from Firestore") + + # Step 3b: Retrieve active notifications for this user + notifications = await self._get_active_notifications(telefono) + logger.info(f"Retrieved {len(notifications)} active notifications") + + # Step 3c: Prepare context for RAG service + messages = await self._prepare_rag_messages( + session=session, + entries=entries, + notifications=notifications, + user_message=request.mensaje, + nickname=nickname or "Usuario", + ) + + # Step 3d: Query RAG service + logger.info("Sending query to RAG service") + assistant_response = await self.rag_service.query(messages) + logger.info( + f"Received response from RAG service: {assistant_response[:100]}..." + ) + + # Step 3e: Save user message to Firestore + user_entry = ConversationEntry( + entity="user", + type="CONVERSACION", + timestamp=datetime.now(), + text=request.mensaje, + parameters=None, + canal=getattr(request, "canal", None), + ) + await self.firestore_service.save_entry(session.sessionId, user_entry) + logger.info("Saved user message to Firestore") + + # Step 3f: Save assistant response to Firestore + assistant_entry = ConversationEntry( + entity="assistant", + type="LLM", + timestamp=datetime.now(), + text=assistant_response, + parameters=None, + canal=getattr(request, "canal", None), + ) + await self.firestore_service.save_entry(session.sessionId, assistant_entry) + logger.info("Saved assistant response to Firestore") + + # Step 3g: Update session with last message and timestamp + session.lastMessage = assistant_response + session.lastModified = datetime.now() + await self.firestore_service.save_session(session) + await self.redis_service.save_session(session) + logger.info("Updated session in Firestore and Redis") + + # Step 3h: Mark notifications as processed if any were included + if notifications: + await self._mark_notifications_as_processed(telefono) + logger.info(f"Marked {len(notifications)} notifications as processed") + + # Step 3i: Return response object + response = DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult( + responseText=assistant_response, + parameters=None, + ), + quick_replies=None, + ) + + return response except Exception as e: logger.error(f"Error managing conversation: {str(e)}", exc_info=True) raise - def _is_pantalla_context_valid(self, session: ConversationSessionDTO) -> bool: + def _is_pantalla_context_valid(self, last_modified: datetime) -> bool: """Check if pantallaContexto is still valid (not stale).""" - if not session.lastModified: - return False - - time_diff = datetime.now() - session.lastModified + time_diff = datetime.now() - last_modified return time_diff < timedelta(minutes=self.SCREEN_CONTEXT_TIMEOUT_MINUTES) async def _manage_quick_reply_conversation( self, - request: ExternalConvRequestDTO, - session: ConversationSessionDTO, - ) -> DetectIntentResponseDTO: - """ - Handle conversation within Quick Replies context. + request: ConversationRequest, + screen_id: str, + ) -> DetectIntentResponse | None: + """Handle conversation within Quick Replies context.""" + quick_reply_screen = await self.quick_reply_service.get_quick_replies(screen_id) - User is in a quick reply screen, treat their message as a FAQ query. + # If no questions available, delegate to normal conversation flow + if not quick_reply_screen.preguntas: + logger.warning(f"No quick replies found for screen: {screen_id}.") + return None - Args: - request: External request - session: Existing session with pantallaContexto + # Match user message to a quick reply question + user_message_lower = request.mensaje.lower().strip() + matched_answer = None - Returns: - Dialogflow response - """ - # Build Dialogflow request with pantallaContexto - dialogflow_request = self._build_dialogflow_request( - request, session, request.mensaje - ) - - # Add pantallaContexto to parameters - if ( - dialogflow_request.query_params - and dialogflow_request.query_params.parameters - ): - dialogflow_request.query_params.parameters["pantalla_contexto"] = ( - session.pantallaContexto - ) - - # Call Dialogflow - response = await self.dialogflow_client.detect_intent( - session.sessionId, dialogflow_request - ) - - # Persist conversation turn - await self._persist_conversation_turn(session, request.mensaje, response) - - return response - - async def _continue_managing_conversation( - self, request: ExternalConvRequestDTO - ) -> DetectIntentResponseDTO: - """ - Continue with standard conversation flow. - - Steps: - 1. Get or create session - 2. Check for active notifications - 3. Classify message (CONVERSATION vs NOTIFICATION) - 4. Route to appropriate handler - - Args: - request: External conversation request - - Returns: - Dialogflow response - """ - telefono = request.usuario.telefono - nickname = ( - request.usuario.nickname if hasattr(request.usuario, "nickname") else None - ) - - if not telefono or not telefono.strip(): - raise ValueError("Phone number is required to manage conversation sessions") - - logger.info(f"Primary Check (Redis): Looking up session for phone: {telefono}") - - # Get session from Redis - session = await self.redis_service.get_session(telefono) - - if session: - return await self._handle_message_classification(request, session) - else: - # No session in Redis, check Firestore - logger.info( - "No session found in Redis. Performing full lookup to Firestore." - ) - return await self._full_lookup_and_process(request, telefono, nickname) - - async def _handle_message_classification( - self, - request: ExternalConvRequestDTO, - session: ConversationSessionDTO, - ) -> DetectIntentResponseDTO: - """ - Classify message using MessageEntryFilter and route accordingly. - - Checks for active notifications and uses Gemini to determine if the - user's message is about the notification or general conversation. - - Args: - request: External request - session: Existing conversation session - - Returns: - Dialogflow response - """ - telefono = request.usuario.telefono - user_message = request.mensaje - - # Get active notification for this phone - notification_id = await self.redis_service.get_notification_id_for_phone( - telefono - ) - - if not notification_id: - # No notification, proceed with standard conversation - return await self._proceed_with_conversation(request, session) - - # Get notification session - notification_session = await self.redis_service.get_notification_session( - notification_id - ) - - if not notification_session or not notification_session.notificaciones: - return await self._proceed_with_conversation(request, session) - - # Find most recent active notification - active_notification = None - for notif in sorted( - notification_session.notificaciones, - key=lambda n: n.timestampCreacion, - reverse=True, - ): - if notif.status == "active": - active_notification = notif + for pregunta in quick_reply_screen.preguntas: + # Simple matching: check if question title matches user message + if pregunta.titulo.lower().strip() == user_message_lower: + matched_answer = pregunta.respuesta + logger.info(f"Matched quick reply: {pregunta.titulo}") break - if not active_notification: - return await self._proceed_with_conversation(request, session) + # If no match, use first question as default or delegate to normal flow + if not matched_answer: + logger.warning( + f"No matching quick reply found for message: '{request.mensaje}'." + ) - # Get conversation history from Redis (fast in-memory cache) - messages_data = await self.redis_service.get_messages(session.sessionId) - # Convert message dicts to ConversationEntryDTO objects - conversation_entries = [ - ConversationEntryDTO.model_validate(msg) for msg in messages_data - ] - conversation_history = self.conversation_mapper.to_text_from_entries( - conversation_entries - ) - if not conversation_history: - conversation_history = "" - - # Classify message using MessageEntryFilter (Gemini) - notification_text = self.notification_mapper.to_text(active_notification) - classification = await self.message_filter.classify_message( - query_input_text=user_message, - notifications_json=notification_text, - conversation_json=conversation_history, + # Create response with the matched quick reply answer + response = DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText=matched_answer, parameters=None), + quick_replies=quick_reply_screen, ) - logger.info(f"Message classified as: {classification}") - - if classification == self.message_filter.CATEGORY_NOTIFICATION: - # Route to notification conversation flow - return await self._start_notification_conversation( - request, active_notification, session, conversation_entries - ) - else: - # Route to standard conversation flow - return await self._proceed_with_conversation(request, session) - - async def _proceed_with_conversation( - self, - request: ExternalConvRequestDTO, - session: ConversationSessionDTO, - ) -> DetectIntentResponseDTO: - """ - Proceed with standard Dialogflow conversation. - - Checks session age: - - If < 30 minutes: Continue with existing session - - If >= 30 minutes: Create new session and inject conversation history - - Args: - request: External request - session: Existing session - - Returns: - Dialogflow response - """ - datetime.now() - - # Check session age - if self._is_session_valid(session): - logger.info( - f"Recent Session Found: Session {session.sessionId} is within " - f"the {self.SESSION_RESET_THRESHOLD_MINUTES}-minute threshold. " - f"Proceeding to Dialogflow." - ) - return await self._process_dialogflow_request( - session, request, is_new_session=False - ) - else: - # Session expired, create new session with history injection - logger.info( - f"Old Session Found: Session {session.sessionId} is older than " - f"the {self.SESSION_RESET_THRESHOLD_MINUTES}-minute threshold." - ) - - # Create new session - new_session_id = SessionIdGenerator.generate() - telefono = request.usuario.telefono - nickname = ( - request.usuario.nickname - if hasattr(request.usuario, "nickname") - else None - ) - user_id = nickname or telefono - - new_session = ConversationSessionDTO.create( - session_id=new_session_id, - user_id=user_id, - telefono=telefono, - ) - - logger.info( - f"Creating new session {new_session_id} from old session " - f"{session.sessionId} due to timeout." - ) - - # Get conversation history from old session - old_entries = await self.firestore_service.get_entries( - session.sessionId, - limit=self.settings.conversation_context_message_limit, - ) - - # Apply limits (30 days / 60 messages / 50KB) - conversation_history = self.conversation_mapper.to_text_with_limits( - session, old_entries - ) - - # Build request with history parameter - dialogflow_request = self._build_dialogflow_request( - request, new_session, request.mensaje - ) - if ( - dialogflow_request.query_params - and dialogflow_request.query_params.parameters - ): - dialogflow_request.query_params.parameters[self.CONV_HISTORY_PARAM] = ( - conversation_history - ) - - return await self._process_dialogflow_request( - new_session, - request, - is_new_session=True, - dialogflow_request=dialogflow_request, - ) - - async def _start_notification_conversation( - self, - request: ExternalConvRequestDTO, - notification: Any, - session: ConversationSessionDTO, - conversation_entries: list[ConversationEntryDTO], - ) -> DetectIntentResponseDTO: - """ - Start notification-driven conversation. - - Uses NotificationContextResolver (Gemini) to determine if the question - can be answered directly from notification metadata or should be - delegated to Dialogflow. - - Args: - request: External request - notification: Active notification - session: Conversation session - conversation_entries: Recent conversation history - - Returns: - Dialogflow response - """ - user_message = request.mensaje - telefono = request.usuario.telefono - - # Prepare context for NotificationContextResolver - self.notification_mapper.to_text(notification) - notification_json = self.notification_mapper.to_json(notification) - conversation_history = self.conversation_mapper.to_text_from_entries( - conversation_entries - ) - - # Convert notification parameters to metadata string - # Filter to only include parameters starting with "notification_po_" - metadata = "" - if notification.parametros: - import json - - filtered_params = { - key: value - for key, value in notification.parametros.items() - if key.startswith("notification_po_") - } - metadata = json.dumps(filtered_params, ensure_ascii=False) - - # Resolve context using Gemini - resolution = await self.notification_context_resolver.resolve_context( - query_input_text=user_message, - notifications_json=notification_json, - conversation_json=conversation_history, - metadata=metadata, - user_id=session.userId, - session_id=session.sessionId, - user_phone_number=telefono, - ) - - if resolution == self.notification_context_resolver.CATEGORY_DIALOGFLOW: - # Delegate to Dialogflow - logger.info( - "NotificationContextResolver returned DIALOGFLOW. Sending to Dialogflow." - ) - - dialogflow_request = self._build_dialogflow_request( - request, session, user_message - ) - - # Check if session is older than 30 minutes - from datetime import datetime, timedelta - - time_diff = datetime.now() - session.lastModified - if time_diff >= timedelta(minutes=self.SESSION_RESET_THRESHOLD_MINUTES): - # Session is old, inject conversation history - logger.info( - f"Session is older than {self.SESSION_RESET_THRESHOLD_MINUTES} minutes. " - "Injecting conversation history." - ) - # Get conversation history with limits - firestore_entries = await self.firestore_service.get_entries( - session.sessionId - ) - conversation_history = self.conversation_mapper.to_text_with_limits( - session, firestore_entries - ) - if ( - dialogflow_request.query_params - and dialogflow_request.query_params.parameters - ): - dialogflow_request.query_params.parameters[ - self.CONV_HISTORY_PARAM - ] = conversation_history - - # Always add notification parameters - if ( - notification.parametros - and dialogflow_request.query_params - and dialogflow_request.query_params.parameters - ): - dialogflow_request.query_params.parameters.update( - notification.parametros - ) - - response = await self.dialogflow_client.detect_intent( - session.sessionId, dialogflow_request - ) - - await self._persist_conversation_turn(session, user_message, response) - return response - else: - # LLM provided direct answer - logger.info( - "NotificationContextResolver provided direct answer. Storing in Redis." - ) - - # Store LLM response in Redis with UUID - llm_uuid = str(uuid.uuid4()) - await self.llm_response_tuner.set_value(llm_uuid, resolution) - - # Send LLM_RESPONSE_PROCESSED event to Dialogflow - event_params = {"uuid": llm_uuid} - - response = await self.dialogflow_client.detect_intent_event( - session_id=session.sessionId, - event_name="LLM_RESPONSE_PROCESSED", - parameters=event_params, - language_code=self.settings.dialogflow_default_language, - ) - - # Persist LLM turn - await self._persist_llm_turn(session, user_message, resolution) - - return response - - async def _full_lookup_and_process( - self, - request: ExternalConvRequestDTO, - telefono: str, - nickname: str | None, - ) -> DetectIntentResponseDTO: - """ - Perform full lookup from Firestore and process conversation. - - Called when session is not found in Redis. - - Args: - request: External request - telefono: User phone number - nickname: User nickname - - Returns: - Dialogflow response - """ - # Try Firestore (by phone number) - session = await self.firestore_service.get_session_by_phone(telefono) - - if session: - # Get conversation history - old_entries = await self.firestore_service.get_entries( - session.sessionId, - limit=self.settings.conversation_context_message_limit, - ) - - # Create new session with history injection - new_session_id = SessionIdGenerator.generate() - user_id = nickname or telefono - - new_session = ConversationSessionDTO.create( - session_id=new_session_id, - user_id=user_id, - telefono=telefono, - ) - - logger.info(f"Creating new session {new_session_id} after full lookup.") - - # Apply history limits - conversation_history = self.conversation_mapper.to_text_with_limits( - session, old_entries - ) - - # Build request with history - dialogflow_request = self._build_dialogflow_request( - request, new_session, request.mensaje - ) - if ( - dialogflow_request.query_params - and dialogflow_request.query_params.parameters - ): - dialogflow_request.query_params.parameters[self.CONV_HISTORY_PARAM] = ( - conversation_history - ) - - return await self._process_dialogflow_request( - new_session, - request, - is_new_session=True, - dialogflow_request=dialogflow_request, - ) - else: - # No session found, create brand new session - logger.info( - f"No existing session found for {telefono}. Creating new session." - ) - return await self._create_new_session_and_process( - request, telefono, nickname - ) - - async def _create_new_session_and_process( - self, - request: ExternalConvRequestDTO, - telefono: str, - nickname: str | None, - ) -> DetectIntentResponseDTO: - """Create brand new session and process request.""" - session_id = SessionIdGenerator.generate() - user_id = nickname or telefono - - session = ConversationSessionDTO.create( - session_id=session_id, - user_id=user_id, - telefono=telefono, - ) - - # Save to Redis and Firestore - await self.redis_service.save_session(session) - await self.firestore_service.save_session(session) - - logger.info(f"Created new session: {session_id} for phone: {telefono}") - - return await self._process_dialogflow_request( - session, request, is_new_session=True - ) - - async def _process_dialogflow_request( - self, - session: ConversationSessionDTO, - request: ExternalConvRequestDTO, - is_new_session: bool, - dialogflow_request: DetectIntentRequestDTO | None = None, - ) -> DetectIntentResponseDTO: - """ - Process Dialogflow request and persist conversation turn. - - Args: - session: Conversation session - request: External request - is_new_session: Whether this is a new session - dialogflow_request: Pre-built Dialogflow request (optional) - - Returns: - Dialogflow response - """ - # Build request if not provided - if not dialogflow_request: - dialogflow_request = self._build_dialogflow_request( - request, session, request.mensaje - ) - - # Call Dialogflow - response = await self.dialogflow_client.detect_intent( - session.sessionId, dialogflow_request - ) - - # Persist conversation turn - await self._persist_conversation_turn(session, request.mensaje, response) - - logger.info( - f"Successfully processed conversation for session: {session.sessionId}" - ) return response - def _is_session_valid(self, session: ConversationSessionDTO) -> bool: - """Check if session is within 30-minute threshold.""" - if not session.lastModified: - return False + async def _get_active_notifications(self, telefono: str) -> list: + """Retrieve active notifications for a user from Redis or Firestore. - time_diff = datetime.now() - session.lastModified - return time_diff < timedelta(minutes=self.SESSION_RESET_THRESHOLD_MINUTES) + Args: + telefono: User phone number - def _build_dialogflow_request( - self, - external_request: ExternalConvRequestDTO, - session: ConversationSessionDTO, - message: str, - ) -> DetectIntentRequestDTO: - """Build Dialogflow detect intent request.""" - # Build query input - query_input = QueryInputDTO( - text=TextInputDTO(text=message), - language_code=self.settings.dialogflow_default_language, - ) - - # Build query parameters with session context - parameters = { - "telefono": session.telefono, - "usuario_id": session.userId, - } - - # Add pantalla_contexto if present - if session.pantallaContexto: - parameters["pantalla_contexto"] = session.pantallaContexto - - query_params = QueryParamsDTO(parameters=parameters) - - return DetectIntentRequestDTO( - query_input=query_input, - query_params=query_params, - ) - - async def _persist_conversation_turn( - self, - session: ConversationSessionDTO, - user_message: str, - response: DetectIntentResponseDTO, - ) -> None: - """ - Persist conversation turn using reactive write-back pattern. - Saves to Redis first, then async to Firestore. + Returns: + List of active Notification objects """ try: - # Update session with last message - updated_session = ConversationSessionDTO( - **session.model_dump(), - lastMessage=user_message, - lastModified=datetime.now(), + # Try Redis first + notification_session = await self.redis_service.get_notification_session( + telefono ) - # Create conversation entry - response_text = "" - parameters = None + # If not in Redis, try Firestore + if not notification_session: + # Firestore uses phone as document ID for notifications + from ..models.notification import NotificationSession - if response.queryResult: - response_text = response.queryResult.responseText or "" - parameters = response.queryResult.parameters + doc_ref = self.firestore_service.db.collection( + self.firestore_service.notifications_collection + ).document(telefono) + doc = await doc_ref.get() - user_entry = ConversationEntryDTO( - entity="USUARIO", - type="CONVERSACION", - timestamp=datetime.now(), - text=user_message, - parameters=None, - ) + if doc.exists: + data = doc.to_dict() + notification_session = NotificationSession.model_validate(data) - agent_entry = ConversationEntryDTO( - entity="AGENTE", - type="CONVERSACION", - timestamp=datetime.now(), - text=response_text, - parameters=parameters, - ) + # Filter for active notifications only + if notification_session and notification_session.notificaciones: + active_notifications = [ + notif + for notif in notification_session.notificaciones + if notif.status == "active" + ] + return active_notifications - # Save to Redis (fast, blocking) - await self.redis_service.save_session(updated_session) - await self.redis_service.save_message(session.sessionId, user_entry) - await self.redis_service.save_message(session.sessionId, agent_entry) - - # Save to Firestore (persistent, non-blocking write-back) - import asyncio - - async def save_to_firestore(): - try: - await self.firestore_service.save_session(updated_session) - await self.firestore_service.save_entry( - session.sessionId, user_entry - ) - await self.firestore_service.save_entry( - session.sessionId, agent_entry - ) - logger.debug( - f"Asynchronously (Write-Back): Entry successfully saved to Firestore for session: {session.sessionId}" - ) - except Exception as fs_error: - logger.error( - f"Asynchronously (Write-Back): Failed to save to Firestore for session {session.sessionId}: {str(fs_error)}", - exc_info=True, - ) - - # Fire and forget - don't await - asyncio.create_task(save_to_firestore()) - - logger.debug(f"Entry saved to Redis for session: {session.sessionId}") + return [] except Exception as e: logger.error( - f"Error persisting conversation turn for session {session.sessionId}: {str(e)}", + f"Error retrieving notifications for {telefono}: {str(e)}", exc_info=True, ) - # Don't fail the request if persistence fails + return [] - async def _persist_llm_turn( + async def _prepare_rag_messages( self, - session: ConversationSessionDTO, + session: ConversationSession, + entries: list[ConversationEntry], + notifications: list, user_message: str, - llm_response: str, - ) -> None: - """Persist LLM-generated conversation turn.""" + nickname: str, + ) -> list[dict[str, str]]: + """Prepare messages in OpenAI format for RAG service. + + Args: + session: Current conversation session + entries: Conversation history entries + notifications: Active notifications + user_message: Current user message + nickname: User's nickname + + Returns: + List of messages in OpenAI format [{"role": "...", "content": "..."}] + """ + messages = [] + + # Add system message with conversation history if available + if entries: + conversation_context = self._format_conversation_history(session, entries) + if conversation_context: + messages.append( + { + "role": "system", + "content": f"Historial de conversación:\n{conversation_context}", + } + ) + + # Add system message with notifications if available + if notifications: + # Simple Pythonic join - no mapper needed! + notifications_text = "\n".join( + n.texto for n in notifications if n.texto and n.texto.strip() + ) + if notifications_text: + messages.append( + { + "role": "system", + "content": f"Notificaciones pendientes para el usuario:\n{notifications_text}", + } + ) + + # Add system message with user context + user_context = f"Usuario: {nickname}" if nickname else "Usuario anónimo" + messages.append({"role": "system", "content": user_context}) + + # Add current user message + messages.append({"role": "user", "content": user_message}) + + return messages + + async def _mark_notifications_as_processed(self, telefono: str) -> None: + """Mark all notifications for a user as processed. + + Args: + telefono: User phone number + """ try: - # Update session - updated_session = ConversationSessionDTO( - **session.model_dump(), - lastMessage=user_message, - lastModified=datetime.now(), + # Update status in Firestore + await self.firestore_service.update_notification_status( + telefono, "processed" ) - user_entry = ConversationEntryDTO( - entity="USUARIO", - type="CONVERSACION", - timestamp=datetime.now(), - text=user_message, - parameters=None, - ) + # Update or delete from Redis + await self.redis_service.delete_notification_session(telefono) - llm_entry = ConversationEntryDTO( - entity="LLM", - type="LLM", - timestamp=datetime.now(), - text=llm_response, - parameters=None, - ) - - # Save to Redis (fast, blocking) - await self.redis_service.save_session(updated_session) - await self.redis_service.save_message(session.sessionId, user_entry) - await self.redis_service.save_message(session.sessionId, llm_entry) - - # Save to Firestore (persistent, non-blocking write-back) - import asyncio - - async def save_to_firestore(): - try: - await self.firestore_service.save_session(updated_session) - await self.firestore_service.save_entry( - session.sessionId, user_entry - ) - await self.firestore_service.save_entry( - session.sessionId, llm_entry - ) - logger.debug( - f"Asynchronously (Write-Back): LLM entry successfully saved to Firestore for session: {session.sessionId}" - ) - except Exception as fs_error: - logger.error( - f"Asynchronously (Write-Back): Failed to save LLM entry to Firestore for session {session.sessionId}: {str(fs_error)}", - exc_info=True, - ) - - # Fire and forget - don't await - asyncio.create_task(save_to_firestore()) - - logger.debug(f"LLM entry saved to Redis for session: {session.sessionId}") + logger.info(f"Marked notifications as processed for {telefono}") except Exception as e: logger.error( - f"Error persisting LLM turn for session {session.sessionId}: {str(e)}", + f"Error marking notifications as processed for {telefono}: {str(e)}", exc_info=True, ) + + def _format_conversation_history( + self, + session: ConversationSession, + entries: list[ConversationEntry], + ) -> str: + """Format conversation history with business rule limits. + + Applies limits: + - Date: 30 days maximum + - Count: 60 messages maximum + - Size: 50KB maximum + + Args: + session: Conversation session + entries: List of conversation entries + + Returns: + Formatted conversation text + """ + if not entries: + return "" + + # Filter by date (30 days) + cutoff_date = datetime.now() - timedelta( + days=self.settings.conversation_context_days_limit + ) + recent_entries = [ + e for e in entries if e.timestamp and e.timestamp >= cutoff_date + ] + + # Sort by timestamp (oldest first) and limit count + recent_entries.sort(key=lambda e: e.timestamp) + limited_entries = recent_entries[ + -self.settings.conversation_context_message_limit : + ] + + # Format with size truncation (50KB) + return self._format_entries_with_size_limit(limited_entries) + + def _format_entries_with_size_limit(self, entries: list[ConversationEntry]) -> str: + """Format entries with 50KB size limit. + + Builds from newest to oldest, stopping at size limit. + + Args: + entries: List of conversation entries + + Returns: + Formatted text, truncated if necessary + """ + if not entries: + return "" + + MAX_BYTES = 50 * 1024 # 50KB + formatted_messages = [self._format_entry(entry) for entry in entries] + + # Build from newest to oldest + text_block = [] + current_size = 0 + + for message in reversed(formatted_messages): + message_line = message + "\n" + message_bytes = len(message_line.encode("utf-8")) + + if current_size + message_bytes > MAX_BYTES: + break + + text_block.insert(0, message_line) + current_size += message_bytes + + return "".join(text_block).strip() + + def _format_entry(self, entry: ConversationEntry) -> str: + """Format a single conversation entry. + + Args: + entry: Conversation entry + + Returns: + Formatted string (e.g., "User: hello", "Assistant: hi there") + """ + # Map entity to prefix (fixed bug from Java port!) + prefix = "User: " if entry.entity == "user" else "Assistant: " + + # Clean content if needed + content = entry.text + if entry.entity == "assistant": + # Remove trailing JSON artifacts like {...} + import re + + content = re.sub(r"\s*\{.*\}\s*$", "", content).strip() + + return prefix + content diff --git a/src/capa_de_integracion/services/firestore_service.py b/src/capa_de_integracion/services/firestore_service.py index b20935f..925a1ea 100644 --- a/src/capa_de_integracion/services/firestore_service.py +++ b/src/capa_de_integracion/services/firestore_service.py @@ -3,7 +3,7 @@ from datetime import datetime from google.cloud import firestore from ..config import Settings -from ..models import ConversationSessionDTO, ConversationEntryDTO +from ..models import ConversationSession, ConversationEntry from ..models.notification import Notification @@ -40,7 +40,7 @@ class FirestoreService: """Get Firestore document reference for session.""" return self.db.collection(self.conversations_collection).document(session_id) - async def get_session(self, session_id: str) -> ConversationSessionDTO | None: + async def get_session(self, session_id: str) -> ConversationSession | None: """Retrieve conversation session from Firestore by session ID.""" try: doc_ref = self._session_ref(session_id) @@ -51,7 +51,7 @@ class FirestoreService: return None data = doc.to_dict() - session = ConversationSessionDTO.model_validate(data) + session = ConversationSession.model_validate(data) logger.debug(f"Retrieved session from Firestore: {session_id}") return session @@ -61,9 +61,7 @@ class FirestoreService: ) return None - async def get_session_by_phone( - self, telefono: str - ) -> ConversationSessionDTO | None: + async def get_session_by_phone(self, telefono: str) -> ConversationSession | None: """ Retrieve most recent conversation session from Firestore by phone number. @@ -84,7 +82,7 @@ class FirestoreService: docs = query.stream() async for doc in docs: data = doc.to_dict() - session = ConversationSessionDTO.model_validate(data) + session = ConversationSession.model_validate(data) logger.debug( f"Retrieved session from Firestore for phone {telefono}: {session.sessionId}" ) @@ -99,7 +97,7 @@ class FirestoreService: ) return None - async def save_session(self, session: ConversationSessionDTO) -> bool: + async def save_session(self, session: ConversationSession) -> bool: """Save conversation session to Firestore.""" try: doc_ref = self._session_ref(session.sessionId) @@ -121,7 +119,7 @@ class FirestoreService: telefono: str, pantalla_contexto: str | None = None, last_message: str | None = None, - ) -> ConversationSessionDTO: + ) -> ConversationSession: """Create and save a new conversation session to Firestore. Args: @@ -137,7 +135,7 @@ class FirestoreService: Raises: Exception: If session creation or save fails """ - session = ConversationSessionDTO.create( + session = ConversationSession.create( session_id=session_id, user_id=user_id, telefono=telefono, @@ -152,7 +150,7 @@ class FirestoreService: logger.info(f"Created new session in Firestore: {session_id}") return session - async def save_entry(self, session_id: str, entry: ConversationEntryDTO) -> bool: + async def save_entry(self, session_id: str, entry: ConversationEntry) -> bool: """Save conversation entry to Firestore subcollection.""" try: doc_ref = self._session_ref(session_id) @@ -175,7 +173,7 @@ class FirestoreService: async def get_entries( self, session_id: str, limit: int = 10 - ) -> list[ConversationEntryDTO]: + ) -> list[ConversationEntry]: """Retrieve recent conversation entries from Firestore.""" try: doc_ref = self._session_ref(session_id) @@ -191,7 +189,7 @@ class FirestoreService: async for doc in docs: entry_data = doc.to_dict() - entry = ConversationEntryDTO.model_validate(entry_data) + entry = ConversationEntry.model_validate(entry_data) entries.append(entry) # Reverse to get chronological order diff --git a/src/capa_de_integracion/services/mappers.py b/src/capa_de_integracion/services/mappers.py deleted file mode 100644 index c63d406..0000000 --- a/src/capa_de_integracion/services/mappers.py +++ /dev/null @@ -1,229 +0,0 @@ -""" -Copyright 2025 Google. This software is provided as-is, without warranty or -representation for any use or purpose. Your use of it is subject to your -agreement with Google. - -Mappers for converting DTOs to text format for Gemini API. -""" - -import json -import logging -from datetime import datetime, timedelta - -from ..models import ( - ConversationSessionDTO, - ConversationEntryDTO, -) -from ..models.notification import Notification - - -logger = logging.getLogger(__name__) - - -class NotificationContextMapper: - """Maps notifications to text format for Gemini classification.""" - - @staticmethod - def to_text(notification: Notification) -> str: - """ - Convert a notification to text format. - - Args: - notification: Notification DTO - - Returns: - Notification text - """ - if not notification or not notification.texto: - return "" - return notification.texto - - @staticmethod - def to_text_multiple(notifications: list[Notification]) -> str: - """ - Convert multiple notifications to text format. - - Args: - notifications: List of notification DTOs - - Returns: - Notifications joined by newlines - """ - if not notifications: - return "" - - texts = [n.texto for n in notifications if n.texto and n.texto.strip()] - return "\n".join(texts) - - @staticmethod - def to_json(notification: Notification) -> str: - """ - Convert notification to JSON string for Gemini. - - Args: - notification: Notification DTO - - Returns: - JSON representation - """ - if not notification: - return "{}" - - data = { - "texto": notification.texto, - "parametros": notification.parametros or {}, - "timestamp": notification.timestampCreacion.isoformat(), - } - return json.dumps(data, ensure_ascii=False) - - -class ConversationContextMapper: - """Maps conversation history to text format for Gemini.""" - - # Business rules for conversation history limits - MESSAGE_LIMIT = 60 # Maximum 60 messages - DAYS_LIMIT = 30 # Maximum 30 days - MAX_HISTORY_BYTES = 50 * 1024 # 50 KB maximum size - - NOTIFICATION_TEXT_PARAM = "notification_text" - - def __init__(self, message_limit: int = 60, days_limit: int = 30): - """ - Initialize conversation context mapper. - - Args: - message_limit: Maximum number of messages to include - days_limit: Maximum age of messages in days - """ - self.message_limit = message_limit - self.days_limit = days_limit - - def to_text_from_entries(self, entries: list[ConversationEntryDTO]) -> str: - """ - Convert conversation entries to text format. - - Args: - entries: List of conversation entries - - Returns: - Formatted conversation history - """ - if not entries: - return "" - - formatted = [self._format_entry(entry) for entry in entries] - return "\n".join(formatted) - - def to_text_with_limits( - self, - session: ConversationSessionDTO, - entries: list[ConversationEntryDTO], - ) -> str: - """ - Convert conversation to text with business rule limits applied. - - Applies: - - Days limit (30 days) - - Message limit (60 messages) - - Size limit (50 KB) - - Args: - session: Conversation session - entries: List of conversation entries - - Returns: - Formatted conversation history with limits applied - """ - if not entries: - return "" - - # Filter by date (30 days) - cutoff_date = datetime.now() - timedelta(days=self.days_limit) - recent_entries = [ - e for e in entries if e.timestamp and e.timestamp >= cutoff_date - ] - - # Sort by timestamp (oldest first) and limit count - recent_entries.sort(key=lambda e: e.timestamp) - limited_entries = recent_entries[-self.message_limit :] - - # Apply size truncation (50 KB) - return self._to_text_with_truncation(limited_entries) - - def _to_text_with_truncation(self, entries: list[ConversationEntryDTO]) -> str: - """ - Format entries with size truncation (50 KB max). - - Args: - entries: List of conversation entries - - Returns: - Formatted text, truncated if necessary - """ - if not entries: - return "" - - # Format all messages - formatted_messages = [self._format_entry(entry) for entry in entries] - - # Build from newest to oldest, stopping at 50KB - text_block = [] - current_size = 0 - - # Iterate from newest to oldest - for message in reversed(formatted_messages): - message_line = message + "\n" - message_bytes = len(message_line.encode("utf-8")) - - if current_size + message_bytes > self.MAX_HISTORY_BYTES: - break - - text_block.insert(0, message_line) - current_size += message_bytes - - return "".join(text_block).strip() - - def _format_entry(self, entry: ConversationEntryDTO) -> str: - """ - Format a single conversation entry. - - Args: - entry: Conversation entry - - Returns: - Formatted string (e.g., "User: hello", "Agent: hi there") - """ - prefix = "User: " - content = entry.text - - # Determine prefix based on entity - if entry.entity == "AGENTE": - prefix = "Agent: " - # Clean JSON artifacts from agent messages - content = self._clean_agent_message(content) - elif entry.entity == "SISTEMA": - prefix = "System: " - # Check if this is a notification in parameters - if entry.parameters and self.NOTIFICATION_TEXT_PARAM in entry.parameters: - param_text = entry.parameters[self.NOTIFICATION_TEXT_PARAM] - if param_text and str(param_text).strip(): - content = str(param_text) - elif entry.entity == "LLM": - prefix = "System: " - - return prefix + content - - def _clean_agent_message(self, message: str) -> str: - """ - Clean agent message by removing JSON artifacts at the end. - - Args: - message: Original message - - Returns: - Cleaned message - """ - # Remove trailing {...} patterns - import re - - return re.sub(r"\s*\{.*\}\s*$", "", message).strip() diff --git a/src/capa_de_integracion/services/quick_reply_content.py b/src/capa_de_integracion/services/quick_reply_content.py index 6ae1bfb..85168fd 100644 --- a/src/capa_de_integracion/services/quick_reply_content.py +++ b/src/capa_de_integracion/services/quick_reply_content.py @@ -53,7 +53,9 @@ class QuickReplyContentService: try: if not file_path.exists(): logger.warning(f"Quick reply file not found: {file_path}") - raise ValueError(f"Quick reply file not found for screen_id: {screen_id}") + raise ValueError( + f"Quick reply file not found for screen_id: {screen_id}" + ) with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) @@ -84,10 +86,14 @@ class QuickReplyContentService: except json.JSONDecodeError as e: logger.error(f"Error parsing JSON file {file_path}: {e}", exc_info=True) - raise ValueError(f"Invalid JSON format in quick reply file for screen_id: {screen_id}") from e + raise ValueError( + f"Invalid JSON format in quick reply file for screen_id: {screen_id}" + ) from e except Exception as e: logger.error( f"Error loading quick replies for screen {screen_id}: {e}", exc_info=True, ) - raise ValueError(f"Error loading quick replies for screen_id: {screen_id}") from e + raise ValueError( + f"Error loading quick replies for screen_id: {screen_id}" + ) from e diff --git a/src/capa_de_integracion/services/rag_service.py b/src/capa_de_integracion/services/rag_service.py index 795480b..0d2184d 100644 --- a/src/capa_de_integracion/services/rag_service.py +++ b/src/capa_de_integracion/services/rag_service.py @@ -121,7 +121,9 @@ class RAGService: logger.error(f"Request error calling RAG endpoint: {str(e)}") raise except Exception as e: - logger.error(f"Unexpected error calling RAG endpoint: {str(e)}", exc_info=True) + logger.error( + f"Unexpected error calling RAG endpoint: {str(e)}", exc_info=True + ) raise async def close(self): diff --git a/src/capa_de_integracion/services/redis_service.py b/src/capa_de_integracion/services/redis_service.py index a08f1af..e50b38c 100644 --- a/src/capa_de_integracion/services/redis_service.py +++ b/src/capa_de_integracion/services/redis_service.py @@ -4,7 +4,7 @@ from datetime import datetime from redis.asyncio import Redis from ..config import Settings -from ..models import ConversationSessionDTO +from ..models import ConversationSession from ..models.notification import NotificationSession, Notification @@ -48,9 +48,7 @@ class RedisService: """Generate Redis key for phone-to-session mapping.""" return f"conversation:phone:{phone}" - async def get_session( - self, session_id_or_phone: str - ) -> ConversationSessionDTO | None: + async def get_session(self, session_id_or_phone: str) -> ConversationSession | None: """ Retrieve conversation session from Redis by session ID or phone number. @@ -84,14 +82,14 @@ class RedisService: try: session_dict = json.loads(data) - session = ConversationSessionDTO.model_validate(session_dict) + session = ConversationSession.model_validate(session_dict) logger.debug(f"Retrieved session from Redis: {session_id}") return session except Exception as e: logger.error(f"Error deserializing session {session_id}: {str(e)}") return None - async def save_session(self, session: ConversationSessionDTO) -> bool: + async def save_session(self, session: ConversationSession) -> bool: """ Save conversation session to Redis with TTL. @@ -156,7 +154,7 @@ class RedisService: Args: session_id: The session ID - message: ConversationMessageDTO or ConversationEntryDTO + message: ConversationEntry Returns: True if successful, False otherwise @@ -285,9 +283,7 @@ class RedisService: # Save to Redis await self._cache_notification_session(updated_session) - async def _cache_notification_session( - self, session: NotificationSession - ) -> bool: + async def _cache_notification_session(self, session: NotificationSession) -> bool: """Cache notification session in Redis.""" if not self.redis: raise RuntimeError("Redis client not connected")