From dba41226538516c5a8fd518c43e2546c1dd06105 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Fri, 20 Feb 2026 06:59:31 +0000 Subject: [PATCH] Misc improvements --- pyproject.toml | 2 +- src/capa_de_integracion/config.py | 15 +- src/capa_de_integracion/dependencies.py | 5 +- .../models/conversation.py | 9 +- .../models/notification.py | 34 +- .../routers/conversation.py | 9 +- .../routers/notification.py | 11 +- .../routers/quick_replies.py | 7 +- src/capa_de_integracion/services/__init__.py | 10 +- ...onversation_manager.py => conversation.py} | 487 ++++++----- .../services/{dlp_service.py => dlp.py} | 1 - ...tification_manager.py => notifications.py} | 10 +- .../services/quick_reply/__init__.py | 9 + .../services/quick_reply/content.py | 161 ++++ .../session.py} | 14 +- .../services/quick_reply_content.py | 106 --- src/capa_de_integracion/services/rag/base.py | 32 +- src/capa_de_integracion/services/rag/echo.py | 13 +- src/capa_de_integracion/services/rag/http.py | 31 +- .../services/storage/__init__.py | 9 + .../firestore.py} | 18 +- .../{redis_service.py => storage/redis.py} | 3 +- tests/conftest.py | 3 +- tests/services/test_conversation_service.py | 775 ++++++++++++++++++ tests/services/test_dlp_service.py | 4 +- tests/services/test_firestore_service.py | 2 +- tests/services/test_notification_manager.py | 36 +- tests/services/test_quick_reply_content.py | 160 +++- tests/services/test_quick_reply_session.py | 166 ++++ tests/services/test_rag_services.py | 19 + tests/services/test_redis_service.py | 2 +- tests/test_dependencies.py | 20 +- tests/test_routers_simple.py | 81 ++ 33 files changed, 1844 insertions(+), 420 deletions(-) rename src/capa_de_integracion/services/{conversation_manager.py => conversation.py} (51%) rename src/capa_de_integracion/services/{dlp_service.py => dlp.py} (99%) rename src/capa_de_integracion/services/{notification_manager.py => notifications.py} (94%) create mode 100644 src/capa_de_integracion/services/quick_reply/__init__.py create mode 100644 src/capa_de_integracion/services/quick_reply/content.py rename src/capa_de_integracion/services/{quick_reply_session_service.py => quick_reply/session.py} (90%) delete mode 100644 src/capa_de_integracion/services/quick_reply_content.py create mode 100644 src/capa_de_integracion/services/storage/__init__.py rename src/capa_de_integracion/services/{firestore_service.py => storage/firestore.py} (97%) rename src/capa_de_integracion/services/{redis_service.py => storage/redis.py} (99%) create mode 100644 tests/services/test_conversation_service.py create mode 100644 tests/services/test_quick_reply_session.py diff --git a/pyproject.toml b/pyproject.toml index f5cfe53..4bcd026 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ exclude = ["tests", "scripts"] [tool.ruff.lint] select = ['ALL'] -ignore = ['D203', 'D213'] +ignore = ['D203', 'D213', 'COM812'] [tool.ty.src] include = ["src"] diff --git a/src/capa_de_integracion/config.py b/src/capa_de_integracion/config.py index 9d4fa6e..cb705e7 100644 --- a/src/capa_de_integracion/config.py +++ b/src/capa_de_integracion/config.py @@ -18,17 +18,20 @@ class Settings(BaseSettings): # RAG rag_endpoint_url: str rag_echo_enabled: bool = Field( - default=False, alias="RAG_ECHO_ENABLED", + default=False, + alias="RAG_ECHO_ENABLED", ) # Firestore firestore_database_id: str = Field(..., alias="GCP_FIRESTORE_DATABASE_ID") firestore_host: str = Field( - default="firestore.googleapis.com", alias="GCP_FIRESTORE_HOST", + default="firestore.googleapis.com", + alias="GCP_FIRESTORE_HOST", ) firestore_port: int = Field(default=443, alias="GCP_FIRESTORE_PORT") firestore_importer_enabled: bool = Field( - default=False, alias="GCP_FIRESTORE_IMPORTER_ENABLE", + default=False, + alias="GCP_FIRESTORE_IMPORTER_ENABLE", ) # Redis @@ -41,10 +44,12 @@ class Settings(BaseSettings): # Conversation Context conversation_context_message_limit: int = Field( - default=60, alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT", + default=60, + alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT", ) conversation_context_days_limit: int = Field( - default=30, alias="CONVERSATION_CONTEXT_DAYS_LIMIT", + default=30, + alias="CONVERSATION_CONTEXT_DAYS_LIMIT", ) # Logging diff --git a/src/capa_de_integracion/dependencies.py b/src/capa_de_integracion/dependencies.py index e1bd5a1..dce72c0 100644 --- a/src/capa_de_integracion/dependencies.py +++ b/src/capa_de_integracion/dependencies.py @@ -14,10 +14,9 @@ from .services import ( DLPService, NotificationManagerService, QuickReplyContentService, + QuickReplySessionService, ) -from .services.firestore_service import FirestoreService -from .services.quick_reply_session_service import QuickReplySessionService -from .services.redis_service import RedisService +from .services.storage import FirestoreService, RedisService @lru_cache(maxsize=1) diff --git a/src/capa_de_integracion/models/conversation.py b/src/capa_de_integracion/models/conversation.py index 1467858..78bb90c 100644 --- a/src/capa_de_integracion/models/conversation.py +++ b/src/capa_de_integracion/models/conversation.py @@ -51,7 +51,8 @@ class ConversationEntry(BaseModel): entity: Literal["user", "assistant"] type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM" timestamp: datetime = Field( - default_factory=lambda: datetime.now(UTC), alias="timestamp", + default_factory=lambda: datetime.now(UTC), + alias="timestamp", ) text: str = Field(..., alias="text") parameters: dict[str, Any] | None = Field(None, alias="parameters") @@ -67,10 +68,12 @@ class ConversationSession(BaseModel): user_id: str = Field(..., alias="userId") telefono: str = Field(..., alias="telefono") created_at: datetime = Field( - default_factory=lambda: datetime.now(UTC), alias="createdAt", + default_factory=lambda: datetime.now(UTC), + alias="createdAt", ) last_modified: datetime = Field( - default_factory=lambda: datetime.now(UTC), alias="lastModified", + default_factory=lambda: datetime.now(UTC), + alias="lastModified", ) last_message: str | None = Field(None, alias="lastMessage") pantalla_contexto: str | None = Field(None, alias="pantallaContexto") diff --git a/src/capa_de_integracion/models/notification.py b/src/capa_de_integracion/models/notification.py index 4bda0f4..e633c1e 100644 --- a/src/capa_de_integracion/models/notification.py +++ b/src/capa_de_integracion/models/notification.py @@ -13,7 +13,9 @@ class Notification(BaseModel): """ id_notificacion: str = Field( - ..., alias="idNotificacion", description="Unique notification ID", + ..., + alias="idNotificacion", + description="Unique notification ID", ) telefono: str = Field(..., alias="telefono", description="User phone number") timestamp_creacion: datetime = Field( @@ -38,7 +40,9 @@ class Notification(BaseModel): description="Session parameters for Dialogflow", ) status: str = Field( - default="active", alias="status", description="Notification status", + default="active", + alias="status", + description="Notification status", ) model_config = {"populate_by_name": True} @@ -69,16 +73,18 @@ class Notification(BaseModel): New Notification instance with current timestamp """ - return cls.model_validate({ - "idNotificacion": id_notificacion, - "telefono": telefono, - "timestampCreacion": datetime.now(UTC), - "texto": texto, - "nombreEventoDialogflow": nombre_evento_dialogflow, - "codigoIdiomaDialogflow": codigo_idioma_dialogflow, - "parametros": parametros or {}, - "status": status, - }) + return cls.model_validate( + { + "idNotificacion": id_notificacion, + "telefono": telefono, + "timestampCreacion": datetime.now(UTC), + "texto": texto, + "nombreEventoDialogflow": nombre_evento_dialogflow, + "codigoIdiomaDialogflow": codigo_idioma_dialogflow, + "parametros": parametros or {}, + "status": status, + } + ) class NotificationSession(BaseModel): @@ -111,7 +117,9 @@ class ExternalNotificationRequest(BaseModel): texto: str = Field(..., min_length=1) telefono: str = Field(..., alias="telefono", description="User phone number") parametros_ocultos: dict[str, Any] | None = Field( - None, alias="parametrosOcultos", description="Hidden parameters (metadata)", + None, + alias="parametrosOcultos", + description="Hidden parameters (metadata)", ) model_config = {"populate_by_name": True} diff --git a/src/capa_de_integracion/routers/conversation.py b/src/capa_de_integracion/routers/conversation.py index ab1f660..74a9924 100644 --- a/src/capa_de_integracion/routers/conversation.py +++ b/src/capa_de_integracion/routers/conversation.py @@ -17,9 +17,12 @@ router = APIRouter(prefix="/api/v1/dialogflow", tags=["conversation"]) @router.post("/detect-intent") async def detect_intent( request: ConversationRequest, - conversation_manager: Annotated[ConversationManagerService, Depends( - get_conversation_manager, - )], + conversation_manager: Annotated[ + ConversationManagerService, + Depends( + get_conversation_manager, + ), + ], ) -> DetectIntentResponse: """Detect user intent and manage conversation. diff --git a/src/capa_de_integracion/routers/notification.py b/src/capa_de_integracion/routers/notification.py index 779c307..319cfd3 100644 --- a/src/capa_de_integracion/routers/notification.py +++ b/src/capa_de_integracion/routers/notification.py @@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException from capa_de_integracion.dependencies import get_notification_manager from capa_de_integracion.models.notification import ExternalNotificationRequest -from capa_de_integracion.services.notification_manager import NotificationManagerService +from capa_de_integracion.services import NotificationManagerService logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/dialogflow", tags=["notifications"]) @@ -16,9 +16,12 @@ router = APIRouter(prefix="/api/v1/dialogflow", tags=["notifications"]) @router.post("/notification", status_code=200) async def process_notification( request: ExternalNotificationRequest, - notification_manager: Annotated[NotificationManagerService, Depends( - get_notification_manager, - )], + notification_manager: Annotated[ + NotificationManagerService, + Depends( + get_notification_manager, + ), + ], ) -> None: """Process push notification from external system. diff --git a/src/capa_de_integracion/routers/quick_replies.py b/src/capa_de_integracion/routers/quick_replies.py index 35fdfff..fc09700 100644 --- a/src/capa_de_integracion/routers/quick_replies.py +++ b/src/capa_de_integracion/routers/quick_replies.py @@ -10,9 +10,7 @@ from capa_de_integracion.dependencies import ( get_quick_reply_session_service, ) from capa_de_integracion.models.quick_replies import QuickReplyScreen -from capa_de_integracion.services.quick_reply_session_service import ( - QuickReplySessionService, -) +from capa_de_integracion.services import QuickReplySessionService logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"]) @@ -45,7 +43,8 @@ class QuickReplyScreenResponse(BaseModel): async def start_quick_reply_session( request: QuickReplyScreenRequest, quick_reply_session_service: Annotated[ - QuickReplySessionService, Depends(get_quick_reply_session_service), + QuickReplySessionService, + Depends(get_quick_reply_session_service), ], ) -> QuickReplyScreenResponse: """Start a quick reply FAQ session for a specific screen. diff --git a/src/capa_de_integracion/services/__init__.py b/src/capa_de_integracion/services/__init__.py index 41eae2b..e9fac13 100644 --- a/src/capa_de_integracion/services/__init__.py +++ b/src/capa_de_integracion/services/__init__.py @@ -1,10 +1,10 @@ """Services module.""" -from .conversation_manager import ConversationManagerService -from .dlp_service import DLPService -from .notification_manager import NotificationManagerService -from .quick_reply_content import QuickReplyContentService -from .quick_reply_session_service import QuickReplySessionService +from capa_de_integracion.services.conversation import ConversationManagerService +from capa_de_integracion.services.dlp import DLPService +from capa_de_integracion.services.notifications import NotificationManagerService +from capa_de_integracion.services.quick_reply.content import QuickReplyContentService +from capa_de_integracion.services.quick_reply.session import QuickReplySessionService __all__ = [ "ConversationManagerService", diff --git a/src/capa_de_integracion/services/conversation_manager.py b/src/capa_de_integracion/services/conversation.py similarity index 51% rename from src/capa_de_integracion/services/conversation_manager.py rename to src/capa_de_integracion/services/conversation.py index 7354326..007fec4 100644 --- a/src/capa_de_integracion/services/conversation_manager.py +++ b/src/capa_de_integracion/services/conversation.py @@ -14,12 +14,11 @@ from capa_de_integracion.models import ( QueryResult, ) from capa_de_integracion.models.notification import NotificationSession +from capa_de_integracion.services.dlp import DLPService +from capa_de_integracion.services.quick_reply.content import QuickReplyContentService from capa_de_integracion.services.rag import RAGServiceBase - -from .dlp_service import DLPService -from .firestore_service import FirestoreService -from .quick_reply_content import QuickReplyContentService -from .redis_service import RedisService +from capa_de_integracion.services.storage.firestore import FirestoreService +from capa_de_integracion.services.storage.redis import RedisService logger = logging.getLogger(__name__) @@ -50,11 +49,18 @@ class ConversationManagerService: logger.info("ConversationManagerService initialized successfully") - async def manage_conversation( # noqa: PLR0915 - self, request: ConversationRequest, + async def manage_conversation( + self, + request: ConversationRequest, ) -> DetectIntentResponse: """Manage conversation flow and return response. + Orchestrates: + 1. Security (DLP obfuscation) + 2. Session management + 3. Quick reply path (if applicable) + 4. Standard RAG path (fallback) + Args: request: External conversation request from client @@ -63,7 +69,7 @@ class ConversationManagerService: """ try: - # Step 1: DLP obfuscation + # Step 1: Apply DLP security obfuscated_message = await self.dlp_service.get_obfuscated_string( request.mensaje, self.settings.dlp_template_complete_flow, @@ -71,162 +77,270 @@ class ConversationManagerService: request.mensaje = obfuscated_message telefono = request.usuario.telefono - # Step 2. Fetch session in Redis -> Firestore -> Create new session - session = await self.redis_service.get_session(telefono) - if not session: - session = await self.firestore_service.get_session_by_phone(telefono) - if not session: - session_id = str(uuid4()) - user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" - session = await self.firestore_service.create_session( - session_id, user_id, telefono, - ) - await self.redis_service.save_session(session) + # Step 2: Obtain or create session + session = await self._obtain_or_create_session(telefono) - # Step 2: Check for pantallaContexto in existing session - if session.pantalla_contexto: - # Check if pantallaContexto is stale (10 minutes) - if self._is_pantalla_context_valid(session.last_modified): - logger.info( - "Detected 'pantallaContexto' in session: %s. " - "Delegating to QuickReplies flow.", - session.pantalla_contexto, - ) - response = await self._manage_quick_reply_conversation( - request, session.pantalla_contexto, - ) - if response: - # Save user message to Firestore - user_entry = ConversationEntry( - entity="user", - type="CONVERSACION", - timestamp=datetime.now(UTC), - text=request.mensaje, - parameters=None, - canal=getattr(request, "canal", None), - ) - await self.firestore_service.save_entry( - session.session_id, user_entry, - ) - - # Save quick reply response to Firestore - response_text = ( - response.query_result.response_text - if response.query_result - else "" - ) or "" - assistant_entry = ConversationEntry( - entity="assistant", - type="CONVERSACION", - timestamp=datetime.now(UTC), - text=response_text, - parameters=None, - canal=getattr(request, "canal", None), - ) - await self.firestore_service.save_entry( - session.session_id, assistant_entry, - ) - - # Update session with last message and timestamp - session.last_message = response_text - session.last_modified = datetime.now(UTC) - await self.firestore_service.save_session(session) - await self.redis_service.save_session(session) - - return response - else: - logger.info( - "Detected STALE 'pantallaContexto'. " - "Ignoring and proceeding with normal flow.", - ) - - # Step 3: Continue with standard conversation flow - nickname = request.usuario.nickname - - logger.info( - "Primary Check (Redis): Looking up session for phone: %s", - telefono, - ) - - # Step 3a: Load conversation history from Firestore - entries = await self.firestore_service.get_entries( - session.session_id, - limit=self.settings.conversation_context_message_limit, - ) - logger.info("Loaded %s conversation entries from Firestore", len(entries)) - - # Step 3b: Retrieve active notifications for this user - notifications = await self._get_active_notifications(telefono) - logger.info("Retrieved %s active notifications", len(notifications)) - - # Step 3c: Prepare context for RAG service - messages = await self._prepare_rag_messages( - session=session, - entries=entries, - notifications=notifications, - user_message=request.mensaje, - nickname=nickname or "Usuario", - ) - - # Step 3d: Query RAG service - logger.info("Sending query to RAG service") - assistant_response = await self.rag_service.query(messages) - logger.info( - "Received response from RAG service: %s...", - assistant_response[:100], - ) - - # Step 3e: Save user message to Firestore - user_entry = ConversationEntry( - entity="user", - type="CONVERSACION", - timestamp=datetime.now(UTC), - text=request.mensaje, - parameters=None, - canal=getattr(request, "canal", None), - ) - await self.firestore_service.save_entry(session.session_id, user_entry) - logger.info("Saved user message to Firestore") - - # Step 3f: Save assistant response to Firestore - assistant_entry = ConversationEntry( - entity="assistant", - type="LLM", - timestamp=datetime.now(UTC), - text=assistant_response, - parameters=None, - canal=getattr(request, "canal", None), - ) - await self.firestore_service.save_entry(session.session_id, assistant_entry) - logger.info("Saved assistant response to Firestore") - - # Step 3g: Update session with last message and timestamp - session.last_message = assistant_response - session.last_modified = datetime.now(UTC) - await self.firestore_service.save_session(session) - await self.redis_service.save_session(session) - logger.info("Updated session in Firestore and Redis") - - # Step 3h: Mark notifications as processed if any were included - if notifications: - await self._mark_notifications_as_processed(telefono) - logger.info("Marked %s notifications as processed", len(notifications)) - - # Step 3i: Return response object - return DetectIntentResponse( - responseId=str(uuid4()), - queryResult=QueryResult( - responseText=assistant_response, - parameters=None, - ), - quick_replies=None, - ) + # Step 3: Try quick reply path first + response = await self._handle_quick_reply_path(request, session) + if response: + return response + # Step 4: Fall through to standard conversation path + return await self._handle_standard_conversation(request, session) except Exception: logger.exception("Error managing conversation") raise + async def _obtain_or_create_session(self, telefono: str) -> ConversationSession: + """Get existing session or create new one. + + Checks Redis → Firestore → Creates new session with auto-caching. + + Args: + telefono: User phone number + + Returns: + ConversationSession instance + + """ + # Try Redis first + session = await self.redis_service.get_session(telefono) + if session: + return session + + # Try Firestore if Redis miss + session = await self.firestore_service.get_session_by_phone(telefono) + if session: + return session + + # Create new session if both miss + session_id = str(uuid4()) + user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" + session = await self.firestore_service.create_session( + session_id, + user_id, + telefono, + ) + + # Auto-cache to Redis + await self.redis_service.save_session(session) + + return session + + async def _save_conversation_turn( + self, + session_id: str, + user_text: str, + assistant_text: str, + entry_type: str, + canal: str | None = None, + ) -> None: + """Save user and assistant messages to Firestore. + + Args: + session_id: Session identifier + user_text: User message text + assistant_text: Assistant response text + entry_type: Type of conversation entry ("CONVERSACION" or "LLM") + canal: Communication channel + + """ + # Save user entry + user_entry = ConversationEntry( + entity="user", + type=entry_type, + timestamp=datetime.now(UTC), + text=user_text, + parameters=None, + canal=canal, + ) + await self.firestore_service.save_entry(session_id, user_entry) + + # Save assistant entry + assistant_entry = ConversationEntry( + entity="assistant", + type=entry_type, + timestamp=datetime.now(UTC), + text=assistant_text, + parameters=None, + canal=canal, + ) + await self.firestore_service.save_entry(session_id, assistant_entry) + + async def _update_session_after_turn( + self, + session: ConversationSession, + last_message: str, + ) -> None: + """Update session metadata and sync to storage. + + Updates last_message, last_modified timestamp, and saves to + both Firestore and Redis for dual-storage consistency. + + Args: + session: Session to update (modified in place) + last_message: Latest message text + + """ + session.last_message = last_message + session.last_modified = datetime.now(UTC) + await self.firestore_service.save_session(session) + await self.redis_service.save_session(session) + + async def _handle_quick_reply_path( + self, + request: ConversationRequest, + session: ConversationSession, + ) -> DetectIntentResponse | None: + """Handle conversation when pantalla_contexto is active and valid. + + Args: + request: User conversation request + session: Current conversation session + + Returns: + DetectIntentResponse if handled, None if fall through to standard path + + """ + # Check if pantalla_contexto exists + if not session.pantalla_contexto: + return None + + # Check if pantalla_contexto is stale + if not self._is_pantalla_context_valid(session.last_modified): + logger.info( + "Detected STALE 'pantallaContexto'. " + "Ignoring and proceeding with normal flow.", + ) + return None + + logger.info( + "Detected 'pantallaContexto' in session: %s. " + "Delegating to QuickReplies flow.", + session.pantalla_contexto, + ) + + response = await self._manage_quick_reply_conversation( + request, + session.pantalla_contexto, + ) + + if not response: + return None + + # Extract response text + response_text = ( + response.query_result.response_text if response.query_result else "" + ) or "" + + # Save conversation turn + await self._save_conversation_turn( + session_id=session.session_id, + user_text=request.mensaje, + assistant_text=response_text, + entry_type="CONVERSACION", + canal=getattr(request, "canal", None), + ) + + # Update session + await self._update_session_after_turn(session, response_text) + + return response + + async def _handle_standard_conversation( + self, + request: ConversationRequest, + session: ConversationSession, + ) -> DetectIntentResponse: + """Handle standard RAG-based conversation flow. + + Loads history, notifications, queries RAG service, and persists results. + + Args: + request: User conversation request + session: Current conversation session + + Returns: + DetectIntentResponse with RAG response + + """ + telefono = request.usuario.telefono + nickname = request.usuario.nickname + + logger.info( + "Primary Check (Redis): Looking up session for phone: %s", + telefono, + ) + + # Load conversation history from Firestore + entries = await self.firestore_service.get_entries( + session.session_id, + limit=self.settings.conversation_context_message_limit, + ) + logger.info("Loaded %s conversation entries from Firestore", len(entries)) + + # Retrieve active notifications for this user + notifications = await self._get_active_notifications(telefono) + logger.info("Retrieved %s active notifications", len(notifications)) + + # Prepare current user message + messages = await self._prepare_rag_messages(request.mensaje) + + # Extract notification texts for RAG + notification_texts = ( + [n.texto for n in notifications if n.texto and n.texto.strip()] + if notifications + else None + ) + + # Format conversation history for RAG + conversation_history = ( + self._format_conversation_history(session, entries) if entries else None + ) + + # Query RAG service with separated fields + logger.info("Sending query to RAG service") + assistant_response = await self.rag_service.query( + messages=messages, + notifications=notification_texts, + conversation_history=conversation_history, + user_nickname=nickname or None, + ) + logger.info( + "Received response from RAG service: %s...", + assistant_response[:100], + ) + + # Save conversation turn + await self._save_conversation_turn( + session_id=session.session_id, + user_text=request.mensaje, + assistant_text=assistant_response, + entry_type="LLM", + canal=getattr(request, "canal", None), + ) + logger.info("Saved user message and assistant response to Firestore") + + # Update session + await self._update_session_after_turn(session, assistant_response) + logger.info("Updated session in Firestore and Redis") + + # Mark notifications as processed if any were included + if notifications: + await self._mark_notifications_as_processed(telefono) + logger.info("Marked %s notifications as processed", len(notifications)) + + # Return response object + return DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult( + responseText=assistant_response, + parameters=None, + ), + quick_replies=None, + ) + def _is_pantalla_context_valid(self, last_modified: datetime) -> bool: """Check if pantallaContexto is still valid (not stale).""" time_diff = datetime.now(UTC) - last_modified @@ -270,7 +384,6 @@ class ConversationManagerService: quick_replies=quick_reply_screen, ) - async def _get_active_notifications(self, telefono: str) -> list: """Retrieve active notifications for a user from Redis or Firestore. @@ -317,66 +430,19 @@ class ConversationManagerService: async def _prepare_rag_messages( self, - session: ConversationSession, - entries: list[ConversationEntry], - notifications: list, user_message: str, - nickname: str, ) -> list[dict[str, str]]: - """Prepare messages in OpenAI format for RAG service. + """Prepare current user message for RAG service. Args: - session: Current conversation session - entries: Conversation history entries - notifications: Active notifications user_message: Current user message - nickname: User's nickname Returns: - List of messages in OpenAI format [{"role": "...", "content": "..."}] + List with single user message """ - messages = [] - - # Add system message with conversation history if available - if entries: - conversation_context = self._format_conversation_history(session, entries) - if conversation_context: - messages.append( - { - "role": "system", - "content": ( - f"Historial de conversación:\n" - f"{conversation_context}" - ), - }, - ) - - # Add system message with notifications if available - if notifications: - # Simple Pythonic join - no mapper needed! - notifications_text = "\n".join( - n.texto for n in notifications if n.texto and n.texto.strip() - ) - if notifications_text: - messages.append( - { - "role": "system", - "content": ( - f"Notificaciones pendientes para el usuario:\n" - f"{notifications_text}" - ), - }, - ) - - # Add system message with user context - user_context = f"Usuario: {nickname}" if nickname else "Usuario anónimo" - messages.append({"role": "system", "content": user_context}) - - # Add current user message - messages.append({"role": "user", "content": user_message}) - - return messages + # Only include the current user message - no system messages + return [{"role": "user", "content": user_message}] async def _mark_notifications_as_processed(self, telefono: str) -> None: """Mark all notifications for a user as processed. @@ -388,7 +454,8 @@ class ConversationManagerService: try: # Update status in Firestore await self.firestore_service.update_notification_status( - telefono, "processed", + telefono, + "processed", ) # Update or delete from Redis diff --git a/src/capa_de_integracion/services/dlp_service.py b/src/capa_de_integracion/services/dlp.py similarity index 99% rename from src/capa_de_integracion/services/dlp_service.py rename to src/capa_de_integracion/services/dlp.py index 27d8220..0f2a775 100644 --- a/src/capa_de_integracion/services/dlp_service.py +++ b/src/capa_de_integracion/services/dlp.py @@ -140,7 +140,6 @@ class DLPService: # Clean up consecutive DIRECCION tags return self._clean_direccion(text) - def _get_replacement(self, info_type: str, quote: str) -> str | None: """Get replacement text for a given info type. diff --git a/src/capa_de_integracion/services/notification_manager.py b/src/capa_de_integracion/services/notifications.py similarity index 94% rename from src/capa_de_integracion/services/notification_manager.py rename to src/capa_de_integracion/services/notifications.py index c571bc9..6bbee00 100644 --- a/src/capa_de_integracion/services/notification_manager.py +++ b/src/capa_de_integracion/services/notifications.py @@ -9,10 +9,9 @@ from capa_de_integracion.models.notification import ( ExternalNotificationRequest, Notification, ) - -from .dlp_service import DLPService -from .firestore_service import FirestoreService -from .redis_service import RedisService +from capa_de_integracion.services.dlp import DLPService +from capa_de_integracion.services.storage.firestore import FirestoreService +from capa_de_integracion.services.storage.redis import RedisService logger = logging.getLogger(__name__) @@ -53,7 +52,8 @@ class NotificationManagerService: logger.info("NotificationManagerService initialized") async def process_notification( - self, external_request: ExternalNotificationRequest, + self, + external_request: ExternalNotificationRequest, ) -> None: """Process a push notification from external system. diff --git a/src/capa_de_integracion/services/quick_reply/__init__.py b/src/capa_de_integracion/services/quick_reply/__init__.py new file mode 100644 index 0000000..cd9ce86 --- /dev/null +++ b/src/capa_de_integracion/services/quick_reply/__init__.py @@ -0,0 +1,9 @@ +"""Quick reply services.""" + +from capa_de_integracion.services.quick_reply.content import QuickReplyContentService +from capa_de_integracion.services.quick_reply.session import QuickReplySessionService + +__all__ = [ + "QuickReplyContentService", + "QuickReplySessionService", +] diff --git a/src/capa_de_integracion/services/quick_reply/content.py b/src/capa_de_integracion/services/quick_reply/content.py new file mode 100644 index 0000000..1a4173d --- /dev/null +++ b/src/capa_de_integracion/services/quick_reply/content.py @@ -0,0 +1,161 @@ +"""Quick reply content service for loading FAQ screens.""" + +import json +import logging +from pathlib import Path + +from capa_de_integracion.config import Settings +from capa_de_integracion.models.quick_replies import ( + QuickReplyQuestions, + QuickReplyScreen, +) + +logger = logging.getLogger(__name__) + + +class QuickReplyContentService: + """Service for loading quick reply screen content from JSON files.""" + + def __init__(self, settings: Settings) -> None: + """Initialize quick reply content service. + + Args: + settings: Application settings + + """ + self.settings = settings + self.quick_replies_path = settings.base_path / "quick_replies" + self._cache: dict[str, QuickReplyScreen] = {} + + logger.info( + "QuickReplyContentService initialized with path: %s", + self.quick_replies_path, + ) + + # Preload all quick reply files into memory + self._preload_cache() + + def _validate_file(self, file_path: Path, screen_id: str) -> None: + """Validate that the quick reply file exists.""" + if not file_path.exists(): + logger.warning("Quick reply file not found: %s", file_path) + msg = f"Quick reply file not found for screen_id: {screen_id}" + raise ValueError(msg) + + def _parse_quick_reply_data(self, data: dict) -> QuickReplyScreen: + """Parse JSON data into QuickReplyScreen model. + + Args: + data: JSON data dictionary + + Returns: + Parsed QuickReplyScreen object + + """ + preguntas_data = data.get("preguntas", []) + preguntas = [ + QuickReplyQuestions( + titulo=q.get("titulo", ""), + descripcion=q.get("descripcion"), + respuesta=q.get("respuesta", ""), + ) + for q in preguntas_data + ] + + return QuickReplyScreen( + header=data.get("header"), + body=data.get("body"), + button=data.get("button"), + header_section=data.get("header_section"), + preguntas=preguntas, + ) + + def _preload_cache(self) -> None: + """Preload all quick reply files into memory cache at startup. + + This method runs synchronously at initialization to load all + quick reply JSON files. Blocking here is acceptable since it + only happens once at startup. + + """ + if not self.quick_replies_path.exists(): + logger.warning( + "Quick replies directory not found: %s", + self.quick_replies_path, + ) + return + + loaded_count = 0 + failed_count = 0 + + for file_path in self.quick_replies_path.glob("*.json"): + screen_id = file_path.stem + try: + # Blocking I/O is OK at startup + content = file_path.read_text(encoding="utf-8") + data = json.loads(content) + quick_reply = self._parse_quick_reply_data(data) + + self._cache[screen_id] = quick_reply + loaded_count += 1 + + logger.debug( + "Cached %s quick replies for screen: %s", + len(quick_reply.preguntas), + screen_id, + ) + + except json.JSONDecodeError: + logger.exception("Invalid JSON in file: %s", file_path) + failed_count += 1 + except Exception: + logger.exception("Failed to load quick reply file: %s", file_path) + failed_count += 1 + + logger.info( + "Quick reply cache initialized: %s screens loaded, %s failed", + loaded_count, + failed_count, + ) + + async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen: + """Get quick reply screen content by ID from in-memory cache. + + This method is non-blocking as it retrieves data from the + in-memory cache populated at startup. + + Args: + screen_id: Screen identifier (e.g., "pagos", "home") + + Returns: + Quick reply screen data + + Raises: + ValueError: If the quick reply is not found in cache + + """ + if not screen_id or not screen_id.strip(): + logger.warning("screen_id is null or empty. Returning empty quick replies") + return QuickReplyScreen( + header="empty", + body=None, + button=None, + header_section=None, + preguntas=[], + ) + + # Non-blocking: just a dictionary lookup + quick_reply = self._cache.get(screen_id) + + if quick_reply is None: + logger.warning("Quick reply not found in cache for screen: %s", screen_id) + msg = f"Quick reply not found for screen_id: {screen_id}" + raise ValueError(msg) + + logger.info( + "Retrieved %s quick replies for screen: %s from cache", + len(quick_reply.preguntas), + screen_id, + ) + + return quick_reply diff --git a/src/capa_de_integracion/services/quick_reply_session_service.py b/src/capa_de_integracion/services/quick_reply/session.py similarity index 90% rename from src/capa_de_integracion/services/quick_reply_session_service.py rename to src/capa_de_integracion/services/quick_reply/session.py index 833de75..fa6c1e9 100644 --- a/src/capa_de_integracion/services/quick_reply_session_service.py +++ b/src/capa_de_integracion/services/quick_reply/session.py @@ -4,9 +4,9 @@ import logging from uuid import uuid4 from capa_de_integracion.models.quick_replies import QuickReplyScreen -from capa_de_integracion.services.firestore_service import FirestoreService -from capa_de_integracion.services.quick_reply_content import QuickReplyContentService -from capa_de_integracion.services.redis_service import RedisService +from capa_de_integracion.services.quick_reply.content import QuickReplyContentService +from capa_de_integracion.services.storage.firestore import FirestoreService +from capa_de_integracion.services.storage.redis import RedisService logger = logging.getLogger(__name__) @@ -92,14 +92,18 @@ class QuickReplySessionService: if session: session_id = session.session_id await self.firestore_service.update_pantalla_contexto( - session_id, pantalla_contexto, + session_id, + pantalla_contexto, ) session.pantalla_contexto = pantalla_contexto else: session_id = str(uuid4()) user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" session = await self.firestore_service.create_session( - session_id, user_id, telefono, pantalla_contexto, + session_id, + user_id, + telefono, + pantalla_contexto, ) # Cache session in Redis diff --git a/src/capa_de_integracion/services/quick_reply_content.py b/src/capa_de_integracion/services/quick_reply_content.py deleted file mode 100644 index 00bbccc..0000000 --- a/src/capa_de_integracion/services/quick_reply_content.py +++ /dev/null @@ -1,106 +0,0 @@ -"""Quick reply content service for loading FAQ screens.""" - -import json -import logging -from pathlib import Path - -from capa_de_integracion.config import Settings -from capa_de_integracion.models.quick_replies import ( - QuickReplyQuestions, - QuickReplyScreen, -) - -logger = logging.getLogger(__name__) - - -class QuickReplyContentService: - """Service for loading quick reply screen content from JSON files.""" - - def __init__(self, settings: Settings) -> None: - """Initialize quick reply content service. - - Args: - settings: Application settings - - """ - self.settings = settings - self.quick_replies_path = settings.base_path / "quick_replies" - - logger.info( - "QuickReplyContentService initialized with path: %s", - self.quick_replies_path, - ) - - def _validate_file(self, file_path: Path, screen_id: str) -> None: - """Validate that the quick reply file exists.""" - if not file_path.exists(): - logger.warning("Quick reply file not found: %s", file_path) - msg = f"Quick reply file not found for screen_id: {screen_id}" - raise ValueError(msg) - - async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen: - """Load quick reply screen content by ID. - - Args: - screen_id: Screen identifier (e.g., "pagos", "home") - - Returns: - Quick reply DTO - - Raises: - ValueError: If the quick reply file is not found - - """ - if not screen_id or not screen_id.strip(): - logger.warning("screen_id is null or empty. Returning empty quick replies") - return QuickReplyScreen( - header="empty", - body=None, - button=None, - header_section=None, - preguntas=[], - ) - - file_path = self.quick_replies_path / f"{screen_id}.json" - - try: - self._validate_file(file_path, screen_id) - - # Use Path.read_text() for async-friendly file reading - content = file_path.read_text(encoding="utf-8") - data = json.loads(content) - - # Parse questions - preguntas_data = data.get("preguntas", []) - preguntas = [ - QuickReplyQuestions( - titulo=q.get("titulo", ""), - descripcion=q.get("descripcion"), - respuesta=q.get("respuesta", ""), - ) - for q in preguntas_data - ] - - quick_reply = QuickReplyScreen( - header=data.get("header"), - body=data.get("body"), - button=data.get("button"), - header_section=data.get("header_section"), - preguntas=preguntas, - ) - - logger.info( - "Successfully loaded %s quick replies for screen: %s", - len(preguntas), - screen_id, - ) - except json.JSONDecodeError as e: - logger.exception("Error parsing JSON file %s", file_path) - msg = f"Invalid JSON format in quick reply file for screen_id: {screen_id}" - raise ValueError(msg) from e - except Exception as e: - logger.exception("Error loading quick replies for screen %s", screen_id) - msg = f"Error loading quick replies for screen_id: {screen_id}" - raise ValueError(msg) from e - else: - return quick_reply diff --git a/src/capa_de_integracion/services/rag/base.py b/src/capa_de_integracion/services/rag/base.py index 8cb62eb..97b6d8f 100644 --- a/src/capa_de_integracion/services/rag/base.py +++ b/src/capa_de_integracion/services/rag/base.py @@ -17,7 +17,22 @@ class Message(BaseModel): class RAGRequest(BaseModel): """Request model for RAG endpoint.""" - messages: list[Message] = Field(..., description="Conversation history") + messages: list[Message] = Field( + ..., + description="Current conversation messages (user and assistant only)", + ) + notifications: list[str] | None = Field( + default=None, + description="Active notifications for the user", + ) + conversation_history: str | None = Field( + default=None, + description="Formatted conversation history", + ) + user_nickname: str | None = Field( + default=None, + description="User's nickname or display name", + ) class RAGResponse(BaseModel): @@ -34,12 +49,21 @@ class RAGServiceBase(ABC): """ @abstractmethod - async def query(self, messages: list[dict[str, str]]) -> str: - """Send conversation history to RAG endpoint and get response. + async def query( + self, + messages: list[dict[str, str]], + notifications: list[str] | None = None, + conversation_history: str | None = None, + user_nickname: str | None = None, + ) -> str: + """Send conversation to RAG endpoint and get response. Args: - messages: OpenAI-style conversation history + messages: Current conversation messages (user/assistant only) e.g., [{"role": "user", "content": "Hello"}, ...] + notifications: Active notifications for the user (optional) + conversation_history: Formatted conversation history (optional) + user_nickname: User's nickname or display name (optional) Returns: Response string from RAG endpoint diff --git a/src/capa_de_integracion/services/rag/echo.py b/src/capa_de_integracion/services/rag/echo.py index 0991556..83d8a18 100644 --- a/src/capa_de_integracion/services/rag/echo.py +++ b/src/capa_de_integracion/services/rag/echo.py @@ -28,12 +28,21 @@ class EchoRAGService(RAGServiceBase): self.prefix = prefix logger.info("EchoRAGService initialized with prefix: %r", prefix) - async def query(self, messages: list[dict[str, str]]) -> str: + async def query( + self, + messages: list[dict[str, str]], + notifications: list[str] | None = None, # noqa: ARG002 + conversation_history: str | None = None, # noqa: ARG002 + user_nickname: str | None = None, # noqa: ARG002 + ) -> str: """Echo back the last user message with a prefix. Args: - messages: OpenAI-style conversation history + messages: Current conversation messages (user/assistant only) e.g., [{"role": "user", "content": "Hello"}, ...] + notifications: Active notifications for the user (optional, ignored) + conversation_history: Formatted conversation history (optional, ignored) + user_nickname: User's nickname or display name (optional, ignored) Returns: The last user message content with prefix diff --git a/src/capa_de_integracion/services/rag/http.py b/src/capa_de_integracion/services/rag/http.py index 376c085..ba4f74a 100644 --- a/src/capa_de_integracion/services/rag/http.py +++ b/src/capa_de_integracion/services/rag/http.py @@ -61,12 +61,21 @@ class HTTPRAGService(RAGServiceBase): timeout, ) - async def query(self, messages: list[dict[str, str]]) -> str: - """Send conversation history to RAG endpoint and get response. + async def query( + self, + messages: list[dict[str, str]], + notifications: list[str] | None = None, + conversation_history: str | None = None, + user_nickname: str | None = None, + ) -> str: + """Send conversation to RAG endpoint and get response. Args: - messages: OpenAI-style conversation history + messages: Current conversation messages (user/assistant only) e.g., [{"role": "user", "content": "Hello"}, ...] + notifications: Active notifications for the user (optional) + conversation_history: Formatted conversation history (optional) + user_nickname: User's nickname or display name (optional) Returns: Response string from RAG endpoint @@ -79,10 +88,22 @@ class HTTPRAGService(RAGServiceBase): try: # Validate and construct request message_objects = [Message(**msg) for msg in messages] - request = RAGRequest(messages=message_objects) + request = RAGRequest( + messages=message_objects, + notifications=notifications, + conversation_history=conversation_history, + user_nickname=user_nickname, + ) # Make async HTTP POST request - logger.debug("Sending RAG request with %s messages", len(messages)) + logger.debug( + "Sending RAG request with %s messages, %s notifications, " + "history: %s, user: %s", + len(messages), + len(notifications) if notifications else 0, + "yes" if conversation_history else "no", + user_nickname or "anonymous", + ) response = await self._client.post( self.endpoint_url, diff --git a/src/capa_de_integracion/services/storage/__init__.py b/src/capa_de_integracion/services/storage/__init__.py new file mode 100644 index 0000000..c423407 --- /dev/null +++ b/src/capa_de_integracion/services/storage/__init__.py @@ -0,0 +1,9 @@ +"""Storage services.""" + +from capa_de_integracion.services.storage.firestore import FirestoreService +from capa_de_integracion.services.storage.redis import RedisService + +__all__ = [ + "FirestoreService", + "RedisService", +] diff --git a/src/capa_de_integracion/services/firestore_service.py b/src/capa_de_integracion/services/storage/firestore.py similarity index 97% rename from src/capa_de_integracion/services/firestore_service.py rename to src/capa_de_integracion/services/storage/firestore.py index 960a76e..70737ea 100644 --- a/src/capa_de_integracion/services/firestore_service.py +++ b/src/capa_de_integracion/services/storage/firestore.py @@ -183,7 +183,9 @@ class FirestoreService: return True async def get_entries( - self, session_id: str, limit: int = 10, + self, + session_id: str, + limit: int = 10, ) -> list[ConversationEntry]: """Retrieve recent conversation entries from Firestore.""" try: @@ -192,7 +194,8 @@ class FirestoreService: # Get entries ordered by timestamp descending query = entries_ref.order_by( - "timestamp", direction=firestore.Query.DESCENDING, + "timestamp", + direction=firestore.Query.DESCENDING, ).limit(limit) docs = query.stream() @@ -206,7 +209,9 @@ class FirestoreService: # Reverse to get chronological order entries.reverse() logger.debug( - "Retrieved %s entries for session: %s", len(entries), session_id, + "Retrieved %s entries for session: %s", + len(entries), + session_id, ) except Exception: logger.exception( @@ -240,7 +245,9 @@ class FirestoreService: return True async def update_pantalla_contexto( - self, session_id: str, pantalla_contexto: str | None, + self, + session_id: str, + pantalla_contexto: str | None, ) -> bool: """Update the pantallaContexto field for a conversation session. @@ -286,7 +293,8 @@ class FirestoreService: # ====== Notification Methods ====== def _notification_ref( - self, notification_id: str, + self, + notification_id: str, ) -> firestore.AsyncDocumentReference: """Get Firestore document reference for notification.""" return self.db.collection(self.notifications_collection).document( diff --git a/src/capa_de_integracion/services/redis_service.py b/src/capa_de_integracion/services/storage/redis.py similarity index 99% rename from src/capa_de_integracion/services/redis_service.py rename to src/capa_de_integracion/services/storage/redis.py index 5d6f828..3868cf9 100644 --- a/src/capa_de_integracion/services/redis_service.py +++ b/src/capa_de_integracion/services/storage/redis.py @@ -329,7 +329,8 @@ class RedisService: return True async def get_notification_session( - self, session_id: str, + self, + session_id: str, ) -> NotificationSession | None: """Retrieve notification session from Redis.""" if not self.redis: diff --git a/tests/conftest.py b/tests/conftest.py index c8039ed..a6c8295 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,8 +10,7 @@ import pytest_asyncio from fakeredis import aioredis as fakeredis from capa_de_integracion.config import Settings -from capa_de_integracion.services.firestore_service import FirestoreService -from capa_de_integracion.services.redis_service import RedisService +from capa_de_integracion.services.storage import FirestoreService, RedisService # Configure pytest-asyncio pytest_plugins = ("pytest_asyncio",) diff --git a/tests/services/test_conversation_service.py b/tests/services/test_conversation_service.py new file mode 100644 index 0000000..3337d7d --- /dev/null +++ b/tests/services/test_conversation_service.py @@ -0,0 +1,775 @@ +"""Unit tests for ConversationManagerService.""" + +from datetime import UTC, datetime, timedelta +from typing import Literal +from unittest.mock import AsyncMock, Mock, patch +from uuid import uuid4 + +import pytest + +from capa_de_integracion.config import Settings +from capa_de_integracion.models import ( + ConversationEntry, + ConversationRequest, + ConversationSession, + DetectIntentResponse, + QueryResult, + User, +) +from capa_de_integracion.services.conversation import ConversationManagerService +from capa_de_integracion.services.dlp import DLPService +from capa_de_integracion.services.rag import RAGServiceBase +from capa_de_integracion.services.storage.firestore import FirestoreService +from capa_de_integracion.services.storage.redis import RedisService + + +@pytest.fixture +def mock_settings() -> Settings: + """Create mock settings.""" + settings = Mock(spec=Settings) + settings.dlp_template_complete_flow = "test_template" + settings.conversation_context_message_limit = 60 + settings.conversation_context_days_limit = 30 + return settings + + +@pytest.fixture +def mock_dlp() -> DLPService: + """Create mock DLP service.""" + dlp = Mock(spec=DLPService) + dlp.get_obfuscated_string = AsyncMock(return_value="obfuscated message") + return dlp + + +@pytest.fixture +def mock_rag() -> RAGServiceBase: + """Create mock RAG service.""" + rag = Mock(spec=RAGServiceBase) + rag.query = AsyncMock(return_value="RAG response") + return rag + + +@pytest.fixture +def mock_redis() -> RedisService: + """Create mock Redis service.""" + redis = Mock(spec=RedisService) + redis.get_session = AsyncMock(return_value=None) + redis.save_session = AsyncMock() + redis.get_notification_session = AsyncMock(return_value=None) + redis.delete_notification_session = AsyncMock() + return redis + + +@pytest.fixture +def mock_firestore() -> FirestoreService: + """Create mock Firestore service.""" + firestore = Mock(spec=FirestoreService) + firestore.get_session_by_phone = AsyncMock(return_value=None) + firestore.create_session = AsyncMock() + firestore.save_session = AsyncMock() + firestore.save_entry = AsyncMock() + firestore.get_entries = AsyncMock(return_value=[]) + firestore.update_notification_status = AsyncMock() + # Mock db.collection for notifications + mock_doc = Mock() + mock_doc.exists = False + mock_doc_ref = Mock() + mock_doc_ref.get = AsyncMock(return_value=mock_doc) + mock_collection = Mock() + mock_collection.document = Mock(return_value=mock_doc_ref) + firestore.db = Mock() + firestore.db.collection = Mock(return_value=mock_collection) + firestore.notifications_collection = "notifications" + return firestore + + +@pytest.fixture +def conversation_service( + mock_settings: Settings, + mock_rag: RAGServiceBase, + mock_redis: RedisService, + mock_firestore: FirestoreService, + mock_dlp: DLPService, +) -> ConversationManagerService: + """Create conversation service with mocked dependencies.""" + with patch( + "capa_de_integracion.services.conversation.QuickReplyContentService" + ): + service = ConversationManagerService( + settings=mock_settings, + rag_service=mock_rag, + redis_service=mock_redis, + firestore_service=mock_firestore, + dlp_service=mock_dlp, + ) + return service + + +@pytest.fixture +def sample_session() -> ConversationSession: + """Create a sample conversation session.""" + return ConversationSession( + session_id="test_session_123", + user_id="user_by_phone_1234567890", + telefono="1234567890", + last_modified=datetime.now(UTC), + last_message="Hello", + pantalla_contexto=None, + ) + + +@pytest.fixture +def sample_request() -> ConversationRequest: + """Create a sample conversation request.""" + return ConversationRequest( + mensaje="Hello, I need help", + usuario=User( + telefono="1234567890", + nickname="TestUser", + ), + canal="whatsapp", + ) + + +# ============================================================================ +# Test Session Management +# ============================================================================ + + +class TestSessionManagement: + """Tests for session management methods.""" + + @pytest.mark.asyncio + async def test_obtain_session_from_redis( + self, + conversation_service: ConversationManagerService, + mock_redis: RedisService, + sample_session: ConversationSession, + ) -> None: + """Test obtaining session from Redis.""" + mock_redis.get_session = AsyncMock(return_value=sample_session) + + result = await conversation_service._obtain_or_create_session("1234567890") + + assert result == sample_session + mock_redis.get_session.assert_awaited_once_with("1234567890") + + @pytest.mark.asyncio + async def test_obtain_session_from_firestore_when_redis_miss( + self, + conversation_service: ConversationManagerService, + mock_redis: RedisService, + mock_firestore: FirestoreService, + sample_session: ConversationSession, + ) -> None: + """Test obtaining session from Firestore when Redis misses.""" + mock_redis.get_session = AsyncMock(return_value=None) + mock_firestore.get_session_by_phone = AsyncMock(return_value=sample_session) + + result = await conversation_service._obtain_or_create_session("1234567890") + + assert result == sample_session + mock_redis.get_session.assert_awaited_once() + mock_firestore.get_session_by_phone.assert_awaited_once_with("1234567890") + + @pytest.mark.asyncio + async def test_create_new_session_when_both_miss( + self, + conversation_service: ConversationManagerService, + mock_redis: RedisService, + mock_firestore: FirestoreService, + sample_session: ConversationSession, + ) -> None: + """Test creating new session when both Redis and Firestore miss.""" + mock_redis.get_session = AsyncMock(return_value=None) + mock_firestore.get_session_by_phone = AsyncMock(return_value=None) + mock_firestore.create_session = AsyncMock(return_value=sample_session) + + result = await conversation_service._obtain_or_create_session("1234567890") + + assert result == sample_session + mock_firestore.create_session.assert_awaited_once() + # Verify the session was auto-cached to Redis + mock_redis.save_session.assert_awaited_once_with(sample_session) + + @pytest.mark.asyncio + async def test_session_auto_cached_to_redis( + self, + conversation_service: ConversationManagerService, + mock_redis: RedisService, + mock_firestore: FirestoreService, + sample_session: ConversationSession, + ) -> None: + """Test that newly created session is auto-cached to Redis.""" + mock_redis.get_session = AsyncMock(return_value=None) + mock_firestore.get_session_by_phone = AsyncMock(return_value=None) + mock_firestore.create_session = AsyncMock(return_value=sample_session) + + await conversation_service._obtain_or_create_session("1234567890") + + mock_redis.save_session.assert_awaited_once_with(sample_session) + + +# ============================================================================ +# Test Entry Persistence +# ============================================================================ + + +class TestEntryPersistence: + """Tests for conversation entry persistence methods.""" + + @pytest.mark.asyncio + async def test_save_conversation_turn_with_conversacion_type( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + ) -> None: + """Test saving conversation turn with CONVERSACION type.""" + await conversation_service._save_conversation_turn( + session_id="test_session", + user_text="Hello", + assistant_text="Hi there", + entry_type="CONVERSACION", + canal="whatsapp", + ) + + assert mock_firestore.save_entry.await_count == 2 + # Verify user entry + user_call = mock_firestore.save_entry.await_args_list[0] + assert user_call[0][0] == "test_session" + user_entry = user_call[0][1] + assert user_entry.entity == "user" + assert user_entry.text == "Hello" + assert user_entry.type == "CONVERSACION" + assert user_entry.canal == "whatsapp" + # Verify assistant entry + assistant_call = mock_firestore.save_entry.await_args_list[1] + assistant_entry = assistant_call[0][1] + assert assistant_entry.entity == "assistant" + assert assistant_entry.text == "Hi there" + assert assistant_entry.type == "CONVERSACION" + + @pytest.mark.asyncio + async def test_save_conversation_turn_with_llm_type( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + ) -> None: + """Test saving conversation turn with LLM type.""" + await conversation_service._save_conversation_turn( + session_id="test_session", + user_text="What's the weather?", + assistant_text="It's sunny", + entry_type="LLM", + canal="telegram", + ) + + assert mock_firestore.save_entry.await_count == 2 + assistant_call = mock_firestore.save_entry.await_args_list[1] + assistant_entry = assistant_call[0][1] + assert assistant_entry.type == "LLM" + + @pytest.mark.asyncio + async def test_save_conversation_turn_with_canal( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + ) -> None: + """Test saving conversation turn with canal parameter.""" + await conversation_service._save_conversation_turn( + session_id="test_session", + user_text="Test", + assistant_text="Response", + entry_type="CONVERSACION", + canal="sms", + ) + + user_call = mock_firestore.save_entry.await_args_list[0] + user_entry = user_call[0][1] + assert user_entry.canal == "sms" + + @pytest.mark.asyncio + async def test_save_conversation_turn_without_canal( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + ) -> None: + """Test saving conversation turn without canal parameter.""" + await conversation_service._save_conversation_turn( + session_id="test_session", + user_text="Test", + assistant_text="Response", + entry_type="CONVERSACION", + ) + + user_call = mock_firestore.save_entry.await_args_list[0] + user_entry = user_call[0][1] + assert user_entry.canal is None + + +# ============================================================================ +# Test Session Updates +# ============================================================================ + + +class TestSessionUpdates: + """Tests for session update methods.""" + + @pytest.mark.asyncio + async def test_update_session_sets_last_message( + self, + conversation_service: ConversationManagerService, + sample_session: ConversationSession, + ) -> None: + """Test that update_session sets last_message.""" + await conversation_service._update_session_after_turn( + session=sample_session, + last_message="New message", + ) + + assert sample_session.last_message == "New message" + + @pytest.mark.asyncio + async def test_update_session_sets_timestamp( + self, + conversation_service: ConversationManagerService, + sample_session: ConversationSession, + ) -> None: + """Test that update_session sets timestamp.""" + old_timestamp = sample_session.last_modified + await conversation_service._update_session_after_turn( + session=sample_session, + last_message="New message", + ) + + assert sample_session.last_modified > old_timestamp + + @pytest.mark.asyncio + async def test_update_session_saves_to_firestore( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + sample_session: ConversationSession, + ) -> None: + """Test that update_session saves to Firestore.""" + await conversation_service._update_session_after_turn( + session=sample_session, + last_message="New message", + ) + + mock_firestore.save_session.assert_awaited_once_with(sample_session) + + @pytest.mark.asyncio + async def test_update_session_saves_to_redis( + self, + conversation_service: ConversationManagerService, + mock_redis: RedisService, + sample_session: ConversationSession, + ) -> None: + """Test that update_session saves to Redis.""" + await conversation_service._update_session_after_turn( + session=sample_session, + last_message="New message", + ) + + mock_redis.save_session.assert_awaited_once_with(sample_session) + + +# ============================================================================ +# Test Quick Reply Path +# ============================================================================ + + +class TestQuickReplyPath: + """Tests for quick reply path handling.""" + + @pytest.mark.asyncio + async def test_quick_reply_path_with_valid_context( + self, + conversation_service: ConversationManagerService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test quick reply path with valid pantalla_contexto.""" + sample_session.pantalla_contexto = "screen_123" + sample_session.last_modified = datetime.now(UTC) + + # Mock quick reply service + mock_response = DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText="Quick reply response"), + ) + conversation_service._manage_quick_reply_conversation = AsyncMock( + return_value=mock_response + ) + + result = await conversation_service._handle_quick_reply_path( + request=sample_request, + session=sample_session, + ) + + assert result == mock_response + + @pytest.mark.asyncio + async def test_quick_reply_path_with_stale_context_returns_none( + self, + conversation_service: ConversationManagerService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test quick reply path with stale pantalla_contexto returns None.""" + sample_session.pantalla_contexto = "screen_123" + # Set timestamp to 11 minutes ago (stale) + sample_session.last_modified = datetime.now(UTC) - timedelta(minutes=11) + + result = await conversation_service._handle_quick_reply_path( + request=sample_request, + session=sample_session, + ) + + assert result is None + + @pytest.mark.asyncio + async def test_quick_reply_path_without_context_returns_none( + self, + conversation_service: ConversationManagerService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test quick reply path without pantalla_contexto returns None.""" + sample_session.pantalla_contexto = None + + result = await conversation_service._handle_quick_reply_path( + request=sample_request, + session=sample_session, + ) + + assert result is None + + @pytest.mark.asyncio + async def test_quick_reply_path_saves_entries( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test quick reply path saves conversation entries.""" + sample_session.pantalla_contexto = "screen_123" + sample_session.last_modified = datetime.now(UTC) + + mock_response = DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText="Quick reply response"), + ) + conversation_service._manage_quick_reply_conversation = AsyncMock( + return_value=mock_response + ) + + await conversation_service._handle_quick_reply_path( + request=sample_request, + session=sample_session, + ) + + assert mock_firestore.save_entry.await_count == 2 + + @pytest.mark.asyncio + async def test_quick_reply_path_updates_session( + self, + conversation_service: ConversationManagerService, + mock_redis: RedisService, + mock_firestore: FirestoreService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test quick reply path updates session.""" + sample_session.pantalla_contexto = "screen_123" + sample_session.last_modified = datetime.now(UTC) + + mock_response = DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText="Quick reply response"), + ) + conversation_service._manage_quick_reply_conversation = AsyncMock( + return_value=mock_response + ) + + await conversation_service._handle_quick_reply_path( + request=sample_request, + session=sample_session, + ) + + mock_firestore.save_session.assert_awaited_once() + mock_redis.save_session.assert_awaited_once() + + +# ============================================================================ +# Test Standard Conversation Path +# ============================================================================ + + +class TestStandardConversation: + """Tests for standard conversation flow.""" + + @pytest.mark.asyncio + async def test_standard_conversation_loads_history( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test standard conversation loads history.""" + await conversation_service._handle_standard_conversation( + request=sample_request, + session=sample_session, + ) + + mock_firestore.get_entries.assert_awaited_once_with( + sample_session.session_id, + limit=60, + ) + + @pytest.mark.asyncio + async def test_standard_conversation_queries_rag( + self, + conversation_service: ConversationManagerService, + mock_rag: RAGServiceBase, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test standard conversation queries RAG service.""" + await conversation_service._handle_standard_conversation( + request=sample_request, + session=sample_session, + ) + + mock_rag.query.assert_awaited_once() + + @pytest.mark.asyncio + async def test_standard_conversation_saves_entries( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test standard conversation saves entries.""" + await conversation_service._handle_standard_conversation( + request=sample_request, + session=sample_session, + ) + + assert mock_firestore.save_entry.await_count == 2 + + @pytest.mark.asyncio + async def test_standard_conversation_updates_session( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + mock_redis: RedisService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test standard conversation updates session.""" + await conversation_service._handle_standard_conversation( + request=sample_request, + session=sample_session, + ) + + # save_session is called in _update_session_after_turn + assert mock_firestore.save_session.await_count >= 1 + assert mock_redis.save_session.await_count >= 1 + + @pytest.mark.asyncio + async def test_standard_conversation_marks_notifications_processed( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test standard conversation marks notifications as processed.""" + # Mock that there are active notifications + conversation_service._get_active_notifications = AsyncMock( + return_value=[Mock(texto="Test notification")] + ) + + await conversation_service._handle_standard_conversation( + request=sample_request, + session=sample_session, + ) + + mock_firestore.update_notification_status.assert_awaited_once() + + @pytest.mark.asyncio + async def test_standard_conversation_without_notifications( + self, + conversation_service: ConversationManagerService, + mock_firestore: FirestoreService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test standard conversation without notifications.""" + conversation_service._get_active_notifications = AsyncMock(return_value=[]) + + await conversation_service._handle_standard_conversation( + request=sample_request, + session=sample_session, + ) + + mock_firestore.update_notification_status.assert_not_awaited() + + +# ============================================================================ +# Test Orchestration +# ============================================================================ + + +class TestOrchestration: + """Tests for main orchestration logic.""" + + @pytest.mark.asyncio + async def test_manage_conversation_applies_dlp( + self, + conversation_service: ConversationManagerService, + mock_dlp: DLPService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test manage_conversation applies DLP obfuscation.""" + conversation_service._obtain_or_create_session = AsyncMock( + return_value=sample_session + ) + conversation_service._handle_standard_conversation = AsyncMock( + return_value=DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText="Response"), + ) + ) + + await conversation_service.manage_conversation(sample_request) + + mock_dlp.get_obfuscated_string.assert_awaited_once() + assert sample_request.mensaje == "obfuscated message" + + @pytest.mark.asyncio + async def test_manage_conversation_obtains_session( + self, + conversation_service: ConversationManagerService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test manage_conversation obtains session.""" + conversation_service._obtain_or_create_session = AsyncMock( + return_value=sample_session + ) + conversation_service._handle_standard_conversation = AsyncMock( + return_value=DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText="Response"), + ) + ) + + await conversation_service.manage_conversation(sample_request) + + conversation_service._obtain_or_create_session.assert_awaited_once_with( + "1234567890" + ) + + @pytest.mark.asyncio + async def test_manage_conversation_uses_quick_reply_path_when_valid( + self, + conversation_service: ConversationManagerService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test manage_conversation uses quick reply path when valid.""" + sample_session.pantalla_contexto = "screen_123" + sample_session.last_modified = datetime.now(UTC) + + conversation_service._obtain_or_create_session = AsyncMock( + return_value=sample_session + ) + mock_response = DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText="Quick reply"), + ) + conversation_service._handle_quick_reply_path = AsyncMock( + return_value=mock_response + ) + conversation_service._handle_standard_conversation = AsyncMock() + + result = await conversation_service.manage_conversation(sample_request) + + assert result == mock_response + conversation_service._handle_quick_reply_path.assert_awaited_once() + conversation_service._handle_standard_conversation.assert_not_awaited() + + @pytest.mark.asyncio + async def test_manage_conversation_uses_standard_path_when_no_context( + self, + conversation_service: ConversationManagerService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test manage_conversation uses standard path when no context.""" + sample_session.pantalla_contexto = None + + conversation_service._obtain_or_create_session = AsyncMock( + return_value=sample_session + ) + mock_response = DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText="Standard response"), + ) + conversation_service._handle_standard_conversation = AsyncMock( + return_value=mock_response + ) + + result = await conversation_service.manage_conversation(sample_request) + + assert result == mock_response + conversation_service._handle_standard_conversation.assert_awaited_once() + + @pytest.mark.asyncio + async def test_manage_conversation_uses_standard_path_when_stale_context( + self, + conversation_service: ConversationManagerService, + sample_request: ConversationRequest, + sample_session: ConversationSession, + ) -> None: + """Test manage_conversation uses standard path when context is stale.""" + sample_session.pantalla_contexto = "screen_123" + sample_session.last_modified = datetime.now(UTC) - timedelta(minutes=11) + + conversation_service._obtain_or_create_session = AsyncMock( + return_value=sample_session + ) + mock_response = DetectIntentResponse( + responseId=str(uuid4()), + queryResult=QueryResult(responseText="Standard response"), + ) + conversation_service._handle_quick_reply_path = AsyncMock(return_value=None) + conversation_service._handle_standard_conversation = AsyncMock( + return_value=mock_response + ) + + result = await conversation_service.manage_conversation(sample_request) + + assert result == mock_response + conversation_service._handle_standard_conversation.assert_awaited_once() + + @pytest.mark.asyncio + async def test_manage_conversation_handles_exceptions( + self, + conversation_service: ConversationManagerService, + sample_request: ConversationRequest, + ) -> None: + """Test manage_conversation handles exceptions properly.""" + conversation_service._obtain_or_create_session = AsyncMock( + side_effect=Exception("Test error") + ) + + with pytest.raises(Exception, match="Test error"): + await conversation_service.manage_conversation(sample_request) diff --git a/tests/services/test_dlp_service.py b/tests/services/test_dlp_service.py index eedc148..204e631 100644 --- a/tests/services/test_dlp_service.py +++ b/tests/services/test_dlp_service.py @@ -6,7 +6,7 @@ import pytest from google.cloud.dlp_v2 import types from capa_de_integracion.config import Settings -from capa_de_integracion.services.dlp_service import DLPService +from capa_de_integracion.services import DLPService @pytest.fixture @@ -21,7 +21,7 @@ def mock_settings(): @pytest.fixture def service(mock_settings): """Create DLPService instance with mocked client.""" - with patch("capa_de_integracion.services.dlp_service.dlp_v2.DlpServiceAsyncClient"): + with patch("capa_de_integracion.services.dlp.dlp_v2.DlpServiceAsyncClient"): return DLPService(mock_settings) diff --git a/tests/services/test_firestore_service.py b/tests/services/test_firestore_service.py index 3be4280..931f603 100644 --- a/tests/services/test_firestore_service.py +++ b/tests/services/test_firestore_service.py @@ -7,7 +7,7 @@ from inline_snapshot import snapshot from capa_de_integracion.models import ConversationEntry, ConversationSession from capa_de_integracion.models.notification import Notification -from capa_de_integracion.services.firestore_service import FirestoreService +from capa_de_integracion.services.storage import FirestoreService @pytest.mark.vcr diff --git a/tests/services/test_notification_manager.py b/tests/services/test_notification_manager.py index 74c8ad1..7ffbd1b 100644 --- a/tests/services/test_notification_manager.py +++ b/tests/services/test_notification_manager.py @@ -6,10 +6,8 @@ import pytest from capa_de_integracion.config import Settings from capa_de_integracion.models.notification import ExternalNotificationRequest -from capa_de_integracion.services.dlp_service import DLPService -from capa_de_integracion.services.firestore_service import FirestoreService -from capa_de_integracion.services.notification_manager import NotificationManagerService -from capa_de_integracion.services.redis_service import RedisService +from capa_de_integracion.services import DLPService, NotificationManagerService +from capa_de_integracion.services.storage import FirestoreService, RedisService @pytest.fixture @@ -157,3 +155,33 @@ async def test_process_notification_generates_unique_id(service, mock_redis): notification2 = mock_redis.save_or_append_notification.call_args[0][0] assert notification1.id_notificacion != notification2.id_notificacion + + +@pytest.mark.asyncio +async def test_process_notification_firestore_exception_handling( + service, mock_redis, mock_firestore +): + """Test that Firestore exceptions are handled gracefully in background task.""" + import asyncio + + # Make Firestore save fail + mock_firestore.save_or_append_notification = AsyncMock( + side_effect=Exception("Firestore connection error") + ) + + request = ExternalNotificationRequest( + telefono="555-1234", + texto="Test notification", + parametros_ocultos=None, + ) + + await service.process_notification(request) + + # Redis should succeed + mock_redis.save_or_append_notification.assert_called_once() + + # Give the background task time to execute + await asyncio.sleep(0.1) + + # Firestore should have been attempted (and failed) + mock_firestore.save_or_append_notification.assert_called_once() diff --git a/tests/services/test_quick_reply_content.py b/tests/services/test_quick_reply_content.py index 805bdb4..f399e49 100644 --- a/tests/services/test_quick_reply_content.py +++ b/tests/services/test_quick_reply_content.py @@ -1,27 +1,29 @@ """Tests for QuickReplyContentService.""" import json -from pathlib import Path -from unittest.mock import Mock, patch +from unittest.mock import Mock import pytest from capa_de_integracion.config import Settings from capa_de_integracion.models.quick_replies import QuickReplyScreen -from capa_de_integracion.services.quick_reply_content import QuickReplyContentService +from capa_de_integracion.services import QuickReplyContentService @pytest.fixture -def mock_settings(): +def mock_settings(tmp_path): """Create mock settings for testing.""" settings = Mock(spec=Settings) - settings.base_path = Path("/tmp/test_resources") + # Create the quick_replies directory + quick_replies_dir = tmp_path / "quick_replies" + quick_replies_dir.mkdir() + settings.base_path = tmp_path return settings @pytest.fixture def service(mock_settings): - """Create QuickReplyContentService instance.""" + """Create QuickReplyContentService instance with empty cache.""" return QuickReplyContentService(mock_settings) @@ -59,22 +61,19 @@ async def test_get_quick_replies_whitespace_screen_id(service): @pytest.mark.asyncio -async def test_get_quick_replies_file_not_found(service, tmp_path): - """Test get_quick_replies raises error when file not found.""" - # Set service to use a temp directory where the file won't exist - service.quick_replies_path = tmp_path / "nonexistent_dir" - - with pytest.raises(ValueError, match="Error loading quick replies"): +async def test_get_quick_replies_file_not_found(service): + """Test get_quick_replies raises error when screen not in cache.""" + # Cache is empty (no files loaded), so any screen_id should raise ValueError + with pytest.raises(ValueError, match="Quick reply not found for screen_id"): await service.get_quick_replies("nonexistent") @pytest.mark.asyncio -async def test_get_quick_replies_success(service, tmp_path): - """Test get_quick_replies successfully loads file.""" - # Create test JSON file +async def test_get_quick_replies_success(tmp_path): + """Test get_quick_replies successfully retrieves from cache.""" + # Create test JSON file BEFORE initializing service quick_replies_dir = tmp_path / "quick_replies" quick_replies_dir.mkdir() - service.quick_replies_path = quick_replies_dir test_data = { "header": "Test Header", @@ -97,6 +96,11 @@ async def test_get_quick_replies_success(service, tmp_path): test_file = quick_replies_dir / "test_screen.json" test_file.write_text(json.dumps(test_data), encoding="utf-8") + # Initialize service - it will preload the file into cache + settings = Mock(spec=Settings) + settings.base_path = tmp_path + service = QuickReplyContentService(settings) + result = await service.get_quick_replies("test_screen") assert isinstance(result, QuickReplyScreen) @@ -114,25 +118,30 @@ async def test_get_quick_replies_success(service, tmp_path): @pytest.mark.asyncio -async def test_get_quick_replies_invalid_json(service, tmp_path): - """Test get_quick_replies raises error for invalid JSON.""" +async def test_get_quick_replies_invalid_json(tmp_path): + """Test that invalid JSON files are skipped during cache preload.""" quick_replies_dir = tmp_path / "quick_replies" quick_replies_dir.mkdir() - service.quick_replies_path = quick_replies_dir + # Create invalid JSON file test_file = quick_replies_dir / "invalid.json" test_file.write_text("{ invalid json }", encoding="utf-8") - with pytest.raises(ValueError, match="Invalid JSON format"): + # Initialize service - invalid file should be logged but not crash + settings = Mock(spec=Settings) + settings.base_path = tmp_path + service = QuickReplyContentService(settings) + + # Requesting the invalid screen should raise ValueError (not in cache) + with pytest.raises(ValueError, match="Quick reply not found for screen_id"): await service.get_quick_replies("invalid") @pytest.mark.asyncio -async def test_get_quick_replies_minimal_data(service, tmp_path): - """Test get_quick_replies with minimal data.""" +async def test_get_quick_replies_minimal_data(tmp_path): + """Test get_quick_replies with minimal data from cache.""" quick_replies_dir = tmp_path / "quick_replies" quick_replies_dir.mkdir() - service.quick_replies_path = quick_replies_dir test_data = { "preguntas": [], @@ -141,6 +150,11 @@ async def test_get_quick_replies_minimal_data(service, tmp_path): test_file = quick_replies_dir / "minimal.json" test_file.write_text(json.dumps(test_data), encoding="utf-8") + # Initialize service - it will preload the file + settings = Mock(spec=Settings) + settings.base_path = tmp_path + service = QuickReplyContentService(settings) + result = await service.get_quick_replies("minimal") assert isinstance(result, QuickReplyScreen) @@ -168,3 +182,103 @@ async def test_validate_file_not_exists(service, tmp_path): with pytest.raises(ValueError, match="Quick reply file not found"): service._validate_file(test_file, "test") + + +@pytest.mark.asyncio +async def test_cache_preload_multiple_files(tmp_path): + """Test that cache preloads multiple files correctly.""" + quick_replies_dir = tmp_path / "quick_replies" + quick_replies_dir.mkdir() + + # Create multiple test files + for screen_id in ["home", "pagos", "transferencia"]: + test_data = { + "header": f"{screen_id} header", + "preguntas": [ + {"titulo": f"Q1 for {screen_id}", "respuesta": "Answer 1"}, + ], + } + test_file = quick_replies_dir / f"{screen_id}.json" + test_file.write_text(json.dumps(test_data), encoding="utf-8") + + # Initialize service + settings = Mock(spec=Settings) + settings.base_path = tmp_path + service = QuickReplyContentService(settings) + + # Verify all screens are in cache + home = await service.get_quick_replies("home") + assert home.header == "home header" + + pagos = await service.get_quick_replies("pagos") + assert pagos.header == "pagos header" + + transferencia = await service.get_quick_replies("transferencia") + assert transferencia.header == "transferencia header" + + +@pytest.mark.asyncio +async def test_cache_preload_with_mixed_valid_invalid(tmp_path): + """Test cache preload handles mix of valid and invalid files.""" + quick_replies_dir = tmp_path / "quick_replies" + quick_replies_dir.mkdir() + + # Create valid file + valid_data = {"header": "valid", "preguntas": []} + (quick_replies_dir / "valid.json").write_text( + json.dumps(valid_data), encoding="utf-8", + ) + + # Create invalid file + (quick_replies_dir / "invalid.json").write_text( + "{ invalid }", encoding="utf-8", + ) + + # Initialize service - should not crash + settings = Mock(spec=Settings) + settings.base_path = tmp_path + service = QuickReplyContentService(settings) + + # Valid file should be in cache + valid = await service.get_quick_replies("valid") + assert valid.header == "valid" + + # Invalid file should not be in cache + with pytest.raises(ValueError, match="Quick reply not found"): + await service.get_quick_replies("invalid") + + +@pytest.mark.asyncio +async def test_cache_preload_handles_generic_exception(tmp_path, monkeypatch): + """Test cache preload handles generic exceptions during file processing.""" + from pathlib import Path + from unittest.mock import Mock + + quick_replies_dir = tmp_path / "quick_replies" + quick_replies_dir.mkdir() + + # Create valid JSON file + valid_data = {"header": "test", "preguntas": []} + (quick_replies_dir / "test.json").write_text( + json.dumps(valid_data), encoding="utf-8", + ) + + # Mock _parse_quick_reply_data to raise generic exception + def mock_parse_error(*args, **kwargs): + raise ValueError("Simulated parsing error") + + settings = Mock(spec=Settings) + settings.base_path = tmp_path + + # Initialize service and patch the parsing method + with monkeypatch.context() as m: + service = QuickReplyContentService(settings) + m.setattr(service, "_parse_quick_reply_data", mock_parse_error) + + # Manually call _preload_cache to trigger the exception + service._cache.clear() + service._preload_cache() + + # The file should not be in cache due to the exception + with pytest.raises(ValueError, match="Quick reply not found"): + await service.get_quick_replies("test") diff --git a/tests/services/test_quick_reply_session.py b/tests/services/test_quick_reply_session.py new file mode 100644 index 0000000..cea8172 --- /dev/null +++ b/tests/services/test_quick_reply_session.py @@ -0,0 +1,166 @@ +"""Tests for QuickReplySessionService.""" + +from unittest.mock import AsyncMock, Mock +from uuid import uuid4 + +import pytest + +from capa_de_integracion.models.conversation import ConversationSession +from capa_de_integracion.models.quick_replies import QuickReplyScreen +from capa_de_integracion.services import QuickReplySessionService +from capa_de_integracion.services.quick_reply.content import QuickReplyContentService +from capa_de_integracion.services.storage.firestore import FirestoreService +from capa_de_integracion.services.storage.redis import RedisService + + +@pytest.fixture +def mock_redis(): + """Create mock Redis service.""" + redis = Mock(spec=RedisService) + redis.save_session = AsyncMock() + return redis + + +@pytest.fixture +def mock_firestore(): + """Create mock Firestore service.""" + firestore = Mock(spec=FirestoreService) + firestore.get_session_by_phone = AsyncMock() + firestore.create_session = AsyncMock() + firestore.update_pantalla_contexto = AsyncMock() + return firestore + + +@pytest.fixture +def mock_content(): + """Create mock QuickReplyContentService.""" + content = Mock(spec=QuickReplyContentService) + content.get_quick_replies = AsyncMock() + return content + + +@pytest.fixture +def service(mock_redis, mock_firestore, mock_content): + """Create QuickReplySessionService instance.""" + return QuickReplySessionService( + redis_service=mock_redis, + firestore_service=mock_firestore, + quick_reply_content_service=mock_content, + ) + + +@pytest.mark.asyncio +async def test_validate_phone_empty_string(service): + """Test phone validation with empty string.""" + with pytest.raises(ValueError, match="Phone number is required"): + service._validate_phone("") + + +@pytest.mark.asyncio +async def test_validate_phone_whitespace(service): + """Test phone validation with whitespace only.""" + with pytest.raises(ValueError, match="Phone number is required"): + service._validate_phone(" ") + + +@pytest.mark.asyncio +async def test_start_session_new_user(service, mock_firestore, mock_redis, mock_content): + """Test starting a quick reply session for a new user.""" + # Setup mocks + mock_firestore.get_session_by_phone.return_value = None # No existing session + + # Mock create_session to return a session with the ID that was passed in + def create_session_side_effect(session_id, user_id, telefono, pantalla_contexto): + return ConversationSession.create( + session_id=session_id, + user_id=user_id, + telefono=telefono, + pantalla_contexto=pantalla_contexto, + ) + + mock_firestore.create_session.side_effect = create_session_side_effect + + test_quick_replies = QuickReplyScreen( + header="Home Screen", + body=None, + button=None, + header_section=None, + preguntas=[], + ) + mock_content.get_quick_replies.return_value = test_quick_replies + + # Execute + result = await service.start_quick_reply_session( + telefono="555-1234", + _nombre="John", + pantalla_contexto="home", + ) + + # Verify + assert result.session_id is not None # Session ID should be generated + assert result.quick_replies.header == "Home Screen" + + mock_firestore.get_session_by_phone.assert_called_once_with("555-1234") + mock_firestore.create_session.assert_called_once() + + # Verify create_session was called with correct parameters + call_args = mock_firestore.create_session.call_args + assert call_args[0][2] == "555-1234" # telefono + assert call_args[0][3] == "home" # pantalla_contexto + + mock_redis.save_session.assert_called_once() + mock_content.get_quick_replies.assert_called_once_with("home") + + +@pytest.mark.asyncio +async def test_start_session_existing_user(service, mock_firestore, mock_redis, mock_content): + """Test starting a quick reply session for an existing user.""" + # Setup mocks - existing session + test_session_id = "existing-session-123" + test_session = ConversationSession.create( + session_id=test_session_id, + user_id="user_by_phone_5551234", + telefono="555-1234", + pantalla_contexto="old_screen", + ) + mock_firestore.get_session_by_phone.return_value = test_session + + test_quick_replies = QuickReplyScreen( + header="Payments Screen", + body=None, + button=None, + header_section=None, + preguntas=[], + ) + mock_content.get_quick_replies.return_value = test_quick_replies + + # Execute + result = await service.start_quick_reply_session( + telefono="555-1234", + _nombre="John", + pantalla_contexto="pagos", + ) + + # Verify + assert result.session_id == test_session_id + assert result.quick_replies.header == "Payments Screen" + + mock_firestore.get_session_by_phone.assert_called_once_with("555-1234") + mock_firestore.update_pantalla_contexto.assert_called_once_with( + test_session_id, + "pagos", + ) + mock_firestore.create_session.assert_not_called() + mock_redis.save_session.assert_called_once() + mock_content.get_quick_replies.assert_called_once_with("pagos") + + +@pytest.mark.asyncio +async def test_start_session_invalid_phone(service): + """Test starting session with invalid phone number.""" + with pytest.raises(ValueError, match="Phone number is required"): + await service.start_quick_reply_session( + telefono="", + _nombre="John", + pantalla_contexto="home", + ) diff --git a/tests/services/test_rag_services.py b/tests/services/test_rag_services.py index f8e4f83..e9d5ea4 100644 --- a/tests/services/test_rag_services.py +++ b/tests/services/test_rag_services.py @@ -201,6 +201,25 @@ class TestHTTPRAGService: mock_client.aclose.assert_called_once() + @pytest.mark.asyncio + async def test_http_generic_exception(self): + """Test HTTP RAG service handles generic exceptions during processing.""" + mock_response = Mock() + mock_response.raise_for_status = Mock() + # Make json() raise a generic exception + mock_response.json = Mock(side_effect=ValueError("Invalid response format")) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_response) + mock_client_class.return_value = mock_client + + service = HTTPRAGService(endpoint_url="http://test.example.com/rag") + messages = [{"role": "user", "content": "Hello"}] + + with pytest.raises(ValueError, match="Invalid response format"): + await service.query(messages) + class TestRAGModels: """Tests for RAG data models.""" diff --git a/tests/services/test_redis_service.py b/tests/services/test_redis_service.py index a9a5460..10bc800 100644 --- a/tests/services/test_redis_service.py +++ b/tests/services/test_redis_service.py @@ -9,7 +9,7 @@ from inline_snapshot import snapshot from capa_de_integracion.config import Settings from capa_de_integracion.models import ConversationEntry, ConversationSession from capa_de_integracion.models.notification import Notification, NotificationSession -from capa_de_integracion.services.redis_service import RedisService +from capa_de_integracion.services.storage import RedisService class TestConnectionManagement: diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index beeac82..2a97562 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -11,6 +11,7 @@ from capa_de_integracion.dependencies import ( get_firestore_service, get_notification_manager, get_quick_reply_content_service, + get_quick_reply_session_service, get_rag_service, get_redis_service, init_services, @@ -22,10 +23,10 @@ from capa_de_integracion.services import ( DLPService, NotificationManagerService, QuickReplyContentService, + QuickReplySessionService, ) -from capa_de_integracion.services.firestore_service import FirestoreService from capa_de_integracion.services.rag import EchoRAGService, HTTPRAGService -from capa_de_integracion.services.redis_service import RedisService +from capa_de_integracion.services.storage import FirestoreService, RedisService def test_get_redis_service(): @@ -77,6 +78,21 @@ def test_get_quick_reply_content_service(): assert service is service2 +def test_get_quick_reply_session_service(): + """Test get_quick_reply_session_service returns QuickReplySessionService.""" + get_quick_reply_session_service.cache_clear() + get_redis_service.cache_clear() + get_firestore_service.cache_clear() + get_quick_reply_content_service.cache_clear() + + service = get_quick_reply_session_service() + assert isinstance(service, QuickReplySessionService) + + # Should return same instance (cached) + service2 = get_quick_reply_session_service() + assert service is service2 + + def test_get_notification_manager(): """Test get_notification_manager returns NotificationManagerService.""" get_notification_manager.cache_clear() diff --git a/tests/test_routers_simple.py b/tests/test_routers_simple.py index 44e16b6..6863887 100644 --- a/tests/test_routers_simple.py +++ b/tests/test_routers_simple.py @@ -8,6 +8,11 @@ from capa_de_integracion.models import ConversationRequest, DetectIntentResponse from capa_de_integracion.models.notification import ExternalNotificationRequest from capa_de_integracion.models.quick_replies import QuickReplyScreen from capa_de_integracion.routers import conversation, notification, quick_replies +from capa_de_integracion.routers.quick_replies import ( + QuickReplyScreenRequest, + QuickReplyUser, +) +from capa_de_integracion.services.quick_reply.session import QuickReplySessionResponse @pytest.mark.asyncio @@ -136,3 +141,79 @@ async def test_process_notification_general_error(): await notification.process_notification(request, mock_manager) assert exc_info.value.status_code == 500 + + +@pytest.mark.asyncio +async def test_start_quick_reply_session_success(): + """Test quick reply session endpoint with success.""" + mock_service = Mock() + mock_result = QuickReplySessionResponse( + session_id="test-session-123", + quick_replies=QuickReplyScreen( + header="Test Header", + body=None, + button=None, + header_section=None, + preguntas=[], + ), + ) + mock_service.start_quick_reply_session = AsyncMock(return_value=mock_result) + + request = QuickReplyScreenRequest( + usuario=QuickReplyUser(telefono="555-1234", nombre="John"), + pantallaContexto="home", + ) + + response = await quick_replies.start_quick_reply_session(request, mock_service) + + assert response.response_id == "test-session-123" + assert response.quick_replies.header == "Test Header" + mock_service.start_quick_reply_session.assert_called_once_with( + telefono="555-1234", + _nombre="John", + pantalla_contexto="home", + ) + + +@pytest.mark.asyncio +async def test_start_quick_reply_session_value_error(): + """Test quick reply session with ValueError.""" + mock_service = Mock() + mock_service.start_quick_reply_session = AsyncMock( + side_effect=ValueError("Invalid screen"), + ) + + request = QuickReplyScreenRequest( + usuario=QuickReplyUser(telefono="555-1234", nombre="John"), + pantallaContexto="invalid", + ) + + from fastapi import HTTPException + + with pytest.raises(HTTPException) as exc_info: + await quick_replies.start_quick_reply_session(request, mock_service) + + assert exc_info.value.status_code == 400 + assert "Invalid screen" in str(exc_info.value.detail) + + +@pytest.mark.asyncio +async def test_start_quick_reply_session_general_error(): + """Test quick reply session with general Exception.""" + mock_service = Mock() + mock_service.start_quick_reply_session = AsyncMock( + side_effect=RuntimeError("Database error"), + ) + + request = QuickReplyScreenRequest( + usuario=QuickReplyUser(telefono="555-1234", nombre="John"), + pantallaContexto="home", + ) + + from fastapi import HTTPException + + with pytest.raises(HTTPException) as exc_info: + await quick_replies.start_quick_reply_session(request, mock_service) + + assert exc_info.value.status_code == 500 + assert "Internal server error" in str(exc_info.value.detail)