diff --git a/pyproject.toml b/pyproject.toml index 4bcd026..ff867fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,7 +73,7 @@ filterwarnings = [ ] env = [ - "FIRESTORE_EMULATOR_HOST=[::1]:8911", + "FIRESTORE_EMULATOR_HOST=[::1]:8469", "GCP_PROJECT_ID=test-project", "GCP_LOCATION=us-central1", "GCP_FIRESTORE_DATABASE_ID=(default)", diff --git a/src/capa_de_integracion/routers/notification.py b/src/capa_de_integracion/routers/notification.py index 319cfd3..bb92972 100644 --- a/src/capa_de_integracion/routers/notification.py +++ b/src/capa_de_integracion/routers/notification.py @@ -39,7 +39,7 @@ async def process_notification( notification_manager: Notification manager service instance Returns: - None (204 No Content) + None (200 OK with empty body) Raises: HTTPException: 400 if validation fails, 500 for internal errors diff --git a/src/capa_de_integracion/services/conversation.py b/src/capa_de_integracion/services/conversation.py index 007fec4..009cdf8 100644 --- a/src/capa_de_integracion/services/conversation.py +++ b/src/capa_de_integracion/services/conversation.py @@ -22,6 +22,8 @@ from capa_de_integracion.services.storage.redis import RedisService logger = logging.getLogger(__name__) +MSG_EMPTY_MESSAGE = "Message cannot be empty" + class ConversationManagerService: """Central orchestrator for managing user conversations.""" @@ -49,6 +51,19 @@ class ConversationManagerService: logger.info("ConversationManagerService initialized successfully") + def _validate_message(self, mensaje: str) -> None: + """Validate message is not empty. + + Args: + mensaje: Message text to validate + + Raises: + ValueError: If message is empty or whitespace + + """ + if not mensaje or not mensaje.strip(): + raise ValueError(MSG_EMPTY_MESSAGE) + async def manage_conversation( self, request: ConversationRequest, @@ -56,10 +71,11 @@ class ConversationManagerService: """Manage conversation flow and return response. Orchestrates: - 1. Security (DLP obfuscation) - 2. Session management - 3. Quick reply path (if applicable) - 4. Standard RAG path (fallback) + 1. Validation + 2. Security (DLP obfuscation) + 3. Session management + 4. Quick reply path (if applicable) + 5. Standard RAG path (fallback) Args: request: External conversation request from client @@ -69,7 +85,10 @@ class ConversationManagerService: """ try: - # Step 1: Apply DLP security + # Step 1: Validate message is not empty + self._validate_message(request.mensaje) + + # Step 2: Apply DLP security obfuscated_message = await self.dlp_service.get_obfuscated_string( request.mensaje, self.settings.dlp_template_complete_flow, @@ -77,15 +96,15 @@ class ConversationManagerService: request.mensaje = obfuscated_message telefono = request.usuario.telefono - # Step 2: Obtain or create session + # Step 3: Obtain or create session session = await self._obtain_or_create_session(telefono) - # Step 3: Try quick reply path first + # Step 4: Try quick reply path first response = await self._handle_quick_reply_path(request, session) if response: return response - # Step 4: Fall through to standard conversation path + # Step 5: Fall through to standard conversation path return await self._handle_standard_conversation(request, session) except Exception: @@ -273,12 +292,25 @@ class ConversationManagerService: telefono, ) - # Load conversation history from Firestore - entries = await self.firestore_service.get_entries( - session.session_id, - limit=self.settings.conversation_context_message_limit, - ) - logger.info("Loaded %s conversation entries from Firestore", len(entries)) + # Load conversation history only if session is older than threshold + # (optimization: new/recent sessions don't need history context) + session_age = datetime.now(UTC) - session.created_at + if session_age > timedelta(minutes=self.SESSION_RESET_THRESHOLD_MINUTES): + entries = await self.firestore_service.get_entries( + session.session_id, + limit=self.settings.conversation_context_message_limit, + ) + logger.info( + "Session is %s minutes old. Loaded %s conversation entries.", + session_age.total_seconds() / 60, + len(entries), + ) + else: + entries = [] + logger.info( + "Session is only %s minutes old. Skipping history load.", + session_age.total_seconds() / 60, + ) # Retrieve active notifications for this user notifications = await self._get_active_notifications(telefono) @@ -370,12 +402,13 @@ class ConversationManagerService: logger.info("Matched quick reply: %s", pregunta.titulo) break - # If no match, use first question as default or delegate to normal flow + # If no match, delegate to normal flow if not matched_answer: logger.warning( - "No matching quick reply found for message: '%s'.", + "No matching quick reply found for message: '%s'. Falling back to RAG.", request.mensaje, ) + return None # Create response with the matched quick reply answer return DetectIntentResponse( diff --git a/src/capa_de_integracion/services/notifications.py b/src/capa_de_integracion/services/notifications.py index 6bbee00..d3652fe 100644 --- a/src/capa_de_integracion/services/notifications.py +++ b/src/capa_de_integracion/services/notifications.py @@ -17,6 +17,9 @@ logger = logging.getLogger(__name__) PREFIX_PO_PARAM = "notification_po_" +# Keep references to background tasks to prevent garbage collection +_background_tasks: set[asyncio.Task] = set() + class NotificationManagerService: """Manages notification processing and integration with conversations. @@ -127,6 +130,8 @@ class NotificationManagerService: ) # Fire and forget - don't await - _task = asyncio.create_task(save_notification_to_firestore()) + task = asyncio.create_task(save_notification_to_firestore()) # Store reference to prevent premature garbage collection - del _task + _background_tasks.add(task) + # Remove from set when done to prevent memory leak + task.add_done_callback(_background_tasks.discard) diff --git a/src/capa_de_integracion/services/quick_reply/session.py b/src/capa_de_integracion/services/quick_reply/session.py index fa6c1e9..7a8c01c 100644 --- a/src/capa_de_integracion/services/quick_reply/session.py +++ b/src/capa_de_integracion/services/quick_reply/session.py @@ -87,8 +87,11 @@ class QuickReplySessionService: """ self._validate_phone(telefono) - # Get or create session - session = await self.firestore_service.get_session_by_phone(telefono) + # Get or create session (check Redis first for consistency) + session = await self.redis_service.get_session(telefono) + if not session: + session = await self.firestore_service.get_session_by_phone(telefono) + if session: session_id = session.session_id await self.firestore_service.update_pantalla_contexto( diff --git a/tests/services/test_conversation_service.py b/tests/services/test_conversation_service.py index 3337d7d..b20a5c4 100644 --- a/tests/services/test_conversation_service.py +++ b/tests/services/test_conversation_service.py @@ -519,14 +519,25 @@ class TestStandardConversation: sample_request: ConversationRequest, sample_session: ConversationSession, ) -> None: - """Test standard conversation loads history.""" + """Test standard conversation loads history for old sessions.""" + # Make session older than 30 minutes to trigger history loading + old_session = ConversationSession( + session_id=sample_session.session_id, + user_id=sample_session.user_id, + telefono=sample_session.telefono, + created_at=datetime.now(UTC) - timedelta(minutes=45), + last_modified=sample_session.last_modified, + last_message=sample_session.last_message, + pantalla_contexto=sample_session.pantalla_contexto, + ) + await conversation_service._handle_standard_conversation( request=sample_request, - session=sample_session, + session=old_session, ) mock_firestore.get_entries.assert_awaited_once_with( - sample_session.session_id, + old_session.session_id, limit=60, ) diff --git a/tests/services/test_quick_reply_session.py b/tests/services/test_quick_reply_session.py index cea8172..10a272e 100644 --- a/tests/services/test_quick_reply_session.py +++ b/tests/services/test_quick_reply_session.py @@ -17,6 +17,7 @@ from capa_de_integracion.services.storage.redis import RedisService def mock_redis(): """Create mock Redis service.""" redis = Mock(spec=RedisService) + redis.get_session = AsyncMock() redis.save_session = AsyncMock() return redis @@ -67,6 +68,7 @@ async def test_validate_phone_whitespace(service): async def test_start_session_new_user(service, mock_firestore, mock_redis, mock_content): """Test starting a quick reply session for a new user.""" # Setup mocks + mock_redis.get_session.return_value = None # No session in Redis mock_firestore.get_session_by_phone.return_value = None # No existing session # Mock create_session to return a session with the ID that was passed in @@ -100,6 +102,7 @@ async def test_start_session_new_user(service, mock_firestore, mock_redis, mock_ assert result.session_id is not None # Session ID should be generated assert result.quick_replies.header == "Home Screen" + mock_redis.get_session.assert_called_once_with("555-1234") mock_firestore.get_session_by_phone.assert_called_once_with("555-1234") mock_firestore.create_session.assert_called_once() @@ -123,6 +126,7 @@ async def test_start_session_existing_user(service, mock_firestore, mock_redis, telefono="555-1234", pantalla_contexto="old_screen", ) + mock_redis.get_session.return_value = None # Not in Redis cache mock_firestore.get_session_by_phone.return_value = test_session test_quick_replies = QuickReplyScreen( @@ -145,6 +149,7 @@ async def test_start_session_existing_user(service, mock_firestore, mock_redis, assert result.session_id == test_session_id assert result.quick_replies.header == "Payments Screen" + mock_redis.get_session.assert_called_once_with("555-1234") mock_firestore.get_session_by_phone.assert_called_once_with("555-1234") mock_firestore.update_pantalla_contexto.assert_called_once_with( test_session_id,