From 52a674959cb3558fa46b27817deb6a1fa7bd3c80 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Fri, 20 Feb 2026 04:38:32 +0000 Subject: [PATCH] . --- pyproject.toml | 4 + src/capa_de_integracion/__init__.py | 7 +- src/capa_de_integracion/config.py | 9 +- src/capa_de_integracion/dependencies.py | 6 +- src/capa_de_integracion/exceptions.py | 9 +- src/capa_de_integracion/main.py | 5 +- src/capa_de_integracion/models/__init__.py | 16 +-- .../models/conversation.py | 1 + .../models/notification.py | 14 +-- .../routers/conversation.py | 25 ++-- .../routers/notification.py | 23 ++-- .../routers/quick_replies.py | 44 +++---- src/capa_de_integracion/services/__init__.py | 6 +- .../services/conversation_manager.py | 81 ++++++------- .../services/dlp_service.py | 40 +++---- .../services/firestore_service.py | 97 ++++++++-------- .../services/notification_manager.py | 36 +++--- .../services/quick_reply_content.py | 33 +++--- .../services/rag_service.py | 29 +++-- .../services/redis_service.py | 107 ++++++++++-------- 20 files changed, 309 insertions(+), 283 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 29eee4d..3af6502 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,3 +38,7 @@ dev = [ "ruff>=0.15.1", "ty>=0.0.17", ] + +[tool.ruff.lint] +select = ['ALL'] +ignore = [] diff --git a/src/capa_de_integracion/__init__.py b/src/capa_de_integracion/__init__.py index b26f3af..cccccb2 100644 --- a/src/capa_de_integracion/__init__.py +++ b/src/capa_de_integracion/__init__.py @@ -1,11 +1,10 @@ -""" -Copyright 2025 Google. This software is provided as-is, +"""Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google. Capa de Integración - Conversational AI Orchestrator Service """ -from .main import main, app +from .main import app, main -__all__ = ["main", "app"] +__all__ = ["app", "main"] diff --git a/src/capa_de_integracion/config.py b/src/capa_de_integracion/config.py index 2d1a37c..982a032 100644 --- a/src/capa_de_integracion/config.py +++ b/src/capa_de_integracion/config.py @@ -1,4 +1,5 @@ from pathlib import Path + from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict @@ -18,11 +19,11 @@ class Settings(BaseSettings): # Firestore firestore_database_id: str = Field(..., alias="GCP_FIRESTORE_DATABASE_ID") firestore_host: str = Field( - default="firestore.googleapis.com", alias="GCP_FIRESTORE_HOST" + default="firestore.googleapis.com", alias="GCP_FIRESTORE_HOST", ) firestore_port: int = Field(default=443, alias="GCP_FIRESTORE_PORT") firestore_importer_enabled: bool = Field( - default=False, alias="GCP_FIRESTORE_IMPORTER_ENABLE" + default=False, alias="GCP_FIRESTORE_IMPORTER_ENABLE", ) # Redis @@ -35,10 +36,10 @@ class Settings(BaseSettings): # Conversation Context conversation_context_message_limit: int = Field( - default=60, alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT" + default=60, alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT", ) conversation_context_days_limit: int = Field( - default=30, alias="CONVERSATION_CONTEXT_DAYS_LIMIT" + default=30, alias="CONVERSATION_CONTEXT_DAYS_LIMIT", ) # Logging diff --git a/src/capa_de_integracion/dependencies.py b/src/capa_de_integracion/dependencies.py index 637d9ad..d302d88 100644 --- a/src/capa_de_integracion/dependencies.py +++ b/src/capa_de_integracion/dependencies.py @@ -1,14 +1,15 @@ from functools import lru_cache + from .config import settings from .services import ( ConversationManagerService, + DLPService, NotificationManagerService, QuickReplyContentService, - DLPService, ) -from .services.redis_service import RedisService from .services.firestore_service import FirestoreService from .services.rag_service import RAGService +from .services.redis_service import RedisService @lru_cache(maxsize=1) @@ -70,7 +71,6 @@ def get_conversation_manager() -> ConversationManagerService: def init_services(settings) -> None: """Initialize services (placeholder for compatibility).""" # Services are lazy-loaded via lru_cache, no explicit init needed - pass async def startup_services() -> None: diff --git a/src/capa_de_integracion/exceptions.py b/src/capa_de_integracion/exceptions.py index 4f3eae9..e19affd 100644 --- a/src/capa_de_integracion/exceptions.py +++ b/src/capa_de_integracion/exceptions.py @@ -1,17 +1,16 @@ class FirestorePersistenceException(Exception): - """ - Exception raised when Firestore operations fail. + """Exception raised when Firestore operations fail. This is typically caught and logged without failing the request. """ - def __init__(self, message: str, cause: Exception | None = None): - """ - Initialize Firestore persistence exception. + def __init__(self, message: str, cause: Exception | None = None) -> None: + """Initialize Firestore persistence exception. Args: message: Error message cause: Original exception that caused this error + """ super().__init__(message) self.cause = cause diff --git a/src/capa_de_integracion/main.py b/src/capa_de_integracion/main.py index ade1974..676790a 100644 --- a/src/capa_de_integracion/main.py +++ b/src/capa_de_integracion/main.py @@ -5,9 +5,8 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from .config import settings +from .dependencies import init_services, shutdown_services, startup_services from .routers import conversation_router, notification_router, quick_replies_router -from .dependencies import init_services, startup_services, shutdown_services - # Configure logging logging.basicConfig( @@ -64,7 +63,7 @@ async def health_check(): return {"status": "healthy", "service": "capa-de-integracion"} -def main(): +def main() -> None: """Entry point for CLI.""" import uvicorn diff --git a/src/capa_de_integracion/models/__init__.py b/src/capa_de_integracion/models/__init__.py index 1661a8d..0d29f2c 100644 --- a/src/capa_de_integracion/models/__init__.py +++ b/src/capa_de_integracion/models/__init__.py @@ -1,29 +1,29 @@ """Data models module.""" from .conversation import ( - User, - ConversationSession, ConversationEntry, ConversationRequest, + ConversationSession, DetectIntentResponse, QueryResult, + User, ) from .notification import ( ExternalNotificationRequest, - NotificationSession, Notification, + NotificationSession, ) __all__ = [ - # Conversation - "User", - "ConversationSession", "ConversationEntry", "ConversationRequest", + "ConversationSession", "DetectIntentResponse", - "QueryResult", # Notification "ExternalNotificationRequest", - "NotificationSession", "Notification", + "NotificationSession", + "QueryResult", + # Conversation + "User", ] diff --git a/src/capa_de_integracion/models/conversation.py b/src/capa_de_integracion/models/conversation.py index a2dbfde..b65765f 100644 --- a/src/capa_de_integracion/models/conversation.py +++ b/src/capa_de_integracion/models/conversation.py @@ -1,5 +1,6 @@ from datetime import datetime from typing import Any, Literal + from pydantic import BaseModel, Field diff --git a/src/capa_de_integracion/models/notification.py b/src/capa_de_integracion/models/notification.py index 81888ee..9dbd957 100644 --- a/src/capa_de_integracion/models/notification.py +++ b/src/capa_de_integracion/models/notification.py @@ -1,17 +1,17 @@ from datetime import datetime from typing import Any + from pydantic import BaseModel, Field class Notification(BaseModel): - """ - Individual notification event record. + """Individual notification event record. Represents a notification to be stored in Firestore and cached in Redis. """ idNotificacion: str = Field( - ..., alias="idNotificacion", description="Unique notification ID" + ..., alias="idNotificacion", description="Unique notification ID", ) telefono: str = Field(..., alias="telefono", description="User phone number") timestampCreacion: datetime = Field( @@ -36,7 +36,7 @@ class Notification(BaseModel): description="Session parameters for Dialogflow", ) status: str = Field( - default="active", alias="status", description="Notification status" + default="active", alias="status", description="Notification status", ) model_config = {"populate_by_name": True} @@ -52,8 +52,7 @@ class Notification(BaseModel): parametros: dict[str, Any] | None = None, status: str = "active", ) -> "Notification": - """ - Create a new Notification with auto-filled timestamp. + """Create a new Notification with auto-filled timestamp. Args: id_notificacion: Unique notification ID @@ -66,6 +65,7 @@ class Notification(BaseModel): Returns: New Notification instance with current timestamp + """ return cls( idNotificacion=id_notificacion, @@ -109,7 +109,7 @@ class ExternalNotificationRequest(BaseModel): texto: str = Field(..., min_length=1) telefono: str = Field(..., alias="telefono", description="User phone number") parametros_ocultos: dict[str, Any] | None = Field( - None, alias="parametrosOcultos", description="Hidden parameters (metadata)" + None, alias="parametrosOcultos", description="Hidden parameters (metadata)", ) model_config = {"populate_by_name": True} diff --git a/src/capa_de_integracion/routers/conversation.py b/src/capa_de_integracion/routers/conversation.py index 4883621..85cce75 100644 --- a/src/capa_de_integracion/routers/conversation.py +++ b/src/capa_de_integracion/routers/conversation.py @@ -1,31 +1,32 @@ import logging +from typing import Annotated + from fastapi import APIRouter, Depends, HTTPException -from ..models import ConversationRequest, DetectIntentResponse -from ..services import ConversationManagerService -from ..dependencies import get_conversation_manager - +from capa_de_integracion.dependencies import get_conversation_manager +from capa_de_integracion.models import ConversationRequest, DetectIntentResponse +from capa_de_integracion.services import ConversationManagerService logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/dialogflow", tags=["conversation"]) -@router.post("/detect-intent", response_model=DetectIntentResponse) +@router.post("/detect-intent") async def detect_intent( request: ConversationRequest, - conversation_manager: ConversationManagerService = Depends( - get_conversation_manager - ), + conversation_manager: Annotated[ConversationManagerService, Depends( + get_conversation_manager, + )], ) -> DetectIntentResponse: - """ - Detect user intent and manage conversation. + """Detect user intent and manage conversation. Args: request: External conversation request from client Returns: Dialogflow detect intent response + """ try: logger.info("Received detect-intent request") @@ -34,9 +35,9 @@ async def detect_intent( return response except ValueError as e: - logger.error(f"Validation error: {str(e)}", exc_info=True) + logger.error(f"Validation error: {e!s}", exc_info=True) raise HTTPException(status_code=400, detail=str(e)) except Exception as e: - logger.error(f"Error processing detect-intent: {str(e)}", exc_info=True) + logger.error(f"Error processing detect-intent: {e!s}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") diff --git a/src/capa_de_integracion/routers/notification.py b/src/capa_de_integracion/routers/notification.py index a64e715..b672024 100644 --- a/src/capa_de_integracion/routers/notification.py +++ b/src/capa_de_integracion/routers/notification.py @@ -1,10 +1,11 @@ import logging +from typing import Annotated + from fastapi import APIRouter, Depends, HTTPException -from ..models.notification import ExternalNotificationRequest -from ..services.notification_manager import NotificationManagerService -from ..dependencies import get_notification_manager - +from capa_de_integracion.dependencies import get_notification_manager +from capa_de_integracion.models.notification import ExternalNotificationRequest +from capa_de_integracion.services.notification_manager import NotificationManagerService logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/dialogflow", tags=["notifications"]) @@ -13,12 +14,11 @@ router = APIRouter(prefix="/api/v1/dialogflow", tags=["notifications"]) @router.post("/notification", status_code=200) async def process_notification( request: ExternalNotificationRequest, - notification_manager: NotificationManagerService = Depends( - get_notification_manager - ), + notification_manager: Annotated[NotificationManagerService, Depends( + get_notification_manager, + )], ) -> None: - """ - Process push notification from external system. + """Process push notification from external system. This endpoint receives notifications (e.g., "Your card was blocked") and: 1. Stores them in Redis/Firestore @@ -37,6 +37,7 @@ async def process_notification( Raises: HTTPException: 400 if validation fails, 500 for internal errors + """ try: logger.info("Received notification request") @@ -45,9 +46,9 @@ async def process_notification( # Match Java behavior: process but don't return response body except ValueError as e: - logger.error(f"Validation error: {str(e)}", exc_info=True) + logger.error(f"Validation error: {e!s}", exc_info=True) raise HTTPException(status_code=400, detail=str(e)) except Exception as e: - logger.error(f"Error processing notification: {str(e)}", exc_info=True) + logger.error(f"Error processing notification: {e!s}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") diff --git a/src/capa_de_integracion/routers/quick_replies.py b/src/capa_de_integracion/routers/quick_replies.py index 8794e35..30ac7bf 100644 --- a/src/capa_de_integracion/routers/quick_replies.py +++ b/src/capa_de_integracion/routers/quick_replies.py @@ -1,18 +1,19 @@ import logging -from fastapi import APIRouter, Depends, HTTPException +from typing import Annotated from uuid import uuid4 + +from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel -from ..models.quick_replies import QuickReplyScreen -from ..services.quick_reply_content import QuickReplyContentService -from ..services.redis_service import RedisService -from ..services.firestore_service import FirestoreService -from ..dependencies import ( - get_redis_service, +from capa_de_integracion.dependencies import ( get_firestore_service, get_quick_reply_content_service, + get_redis_service, ) - +from capa_de_integracion.models.quick_replies import QuickReplyScreen +from capa_de_integracion.services.firestore_service import FirestoreService +from capa_de_integracion.services.quick_reply_content import QuickReplyContentService +from capa_de_integracion.services.redis_service import RedisService logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"]) @@ -38,14 +39,13 @@ class QuickReplyScreenResponse(BaseModel): @router.post("/screen") async def start_quick_reply_session( request: QuickReplyScreenRequest, - redis_service: RedisService = Depends(get_redis_service), - firestore_service: FirestoreService = Depends(get_firestore_service), - quick_reply_content_service: QuickReplyContentService = Depends( - get_quick_reply_content_service - ), + redis_service: Annotated[RedisService, Depends(get_redis_service)], + firestore_service: Annotated[FirestoreService, Depends(get_firestore_service)], + quick_reply_content_service: Annotated[QuickReplyContentService, Depends( + get_quick_reply_content_service, + )], ) -> QuickReplyScreenResponse: - """ - Start a quick reply FAQ session for a specific screen. + """Start a quick reply FAQ session for a specific screen. Creates a conversation session with pantalla_contexto set, loads the quick reply questions for the screen, and returns them. @@ -55,39 +55,41 @@ async def start_quick_reply_session( Returns: Detect intent response with quick reply questions + """ try: telefono = request.usuario.telefono pantalla_contexto = request.pantallaContexto if not telefono or not telefono.strip(): - raise ValueError("Phone number is required") + msg = "Phone number is required" + raise ValueError(msg) session = await firestore_service.get_session_by_phone(telefono) if session: session_id = session.sessionId await firestore_service.update_pantalla_contexto( - session_id, pantalla_contexto + session_id, pantalla_contexto, ) session.pantallaContexto = pantalla_contexto else: session_id = str(uuid4()) user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" session = await firestore_service.create_session( - session_id, user_id, telefono, pantalla_contexto + session_id, user_id, telefono, pantalla_contexto, ) # Cache session await redis_service.save_session(session) logger.info( - f"Created quick reply session {session_id} for screen: {pantalla_contexto}" + f"Created quick reply session {session_id} for screen: {pantalla_contexto}", ) # Load quick replies quick_replies = await quick_reply_content_service.get_quick_replies( - pantalla_contexto + pantalla_contexto, ) return QuickReplyScreenResponse( - responseId=session_id, quick_replies=quick_replies + responseId=session_id, quick_replies=quick_replies, ) except ValueError as e: diff --git a/src/capa_de_integracion/services/__init__.py b/src/capa_de_integracion/services/__init__.py index 4953fab..5ee9703 100644 --- a/src/capa_de_integracion/services/__init__.py +++ b/src/capa_de_integracion/services/__init__.py @@ -1,13 +1,13 @@ """Services module.""" from .conversation_manager import ConversationManagerService -from .notification_manager import NotificationManagerService from .dlp_service import DLPService +from .notification_manager import NotificationManagerService from .quick_reply_content import QuickReplyContentService __all__ = [ - "QuickReplyContentService", "ConversationManagerService", - "NotificationManagerService", "DLPService", + "NotificationManagerService", + "QuickReplyContentService", ] diff --git a/src/capa_de_integracion/services/conversation_manager.py b/src/capa_de_integracion/services/conversation_manager.py index aa7894e..42dfef6 100644 --- a/src/capa_de_integracion/services/conversation_manager.py +++ b/src/capa_de_integracion/services/conversation_manager.py @@ -1,21 +1,21 @@ import logging -from uuid import uuid4 from datetime import datetime, timedelta +from uuid import uuid4 -from ..config import Settings -from ..models import ( +from capa_de_integracion.config import Settings +from capa_de_integracion.models import ( + ConversationEntry, ConversationRequest, + ConversationSession, DetectIntentResponse, QueryResult, - ConversationSession, - ConversationEntry, ) -from .redis_service import RedisService -from .firestore_service import FirestoreService -from .dlp_service import DLPService -from .rag_service import RAGService -from .quick_reply_content import QuickReplyContentService +from .dlp_service import DLPService +from .firestore_service import FirestoreService +from .quick_reply_content import QuickReplyContentService +from .rag_service import RAGService +from .redis_service import RedisService logger = logging.getLogger(__name__) @@ -35,7 +35,7 @@ class ConversationManagerService: redis_service: RedisService, firestore_service: FirestoreService, dlp_service: DLPService, - ): + ) -> None: """Initialize conversation manager.""" self.settings = settings self.rag_service = rag_service @@ -47,16 +47,16 @@ class ConversationManagerService: logger.info("ConversationManagerService initialized successfully") async def manage_conversation( - self, request: ConversationRequest + self, request: ConversationRequest, ) -> DetectIntentResponse: - """ - Main entry point for managing conversations. + """Main entry point for managing conversations. Args: request: External conversation request from client Returns: Detect intent response from Dialogflow + """ try: # Step 1: DLP obfuscation @@ -75,7 +75,7 @@ class ConversationManagerService: session_id = str(uuid4()) user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" session = await self.firestore_service.create_session( - session_id, user_id, telefono + session_id, user_id, telefono, ) await self.redis_service.save_session(session) @@ -85,10 +85,10 @@ class ConversationManagerService: if self._is_pantalla_context_valid(session.lastModified): logger.info( f"Detected 'pantallaContexto' in session: {session.pantallaContexto}. " - f"Delegating to QuickReplies flow." + f"Delegating to QuickReplies flow.", ) response = await self._manage_quick_reply_conversation( - request, session.pantallaContexto + request, session.pantallaContexto, ) if response: # Save user message to Firestore @@ -101,7 +101,7 @@ class ConversationManagerService: canal=getattr(request, "canal", None), ) await self.firestore_service.save_entry( - session.sessionId, user_entry + session.sessionId, user_entry, ) # Save quick reply response to Firestore @@ -119,7 +119,7 @@ class ConversationManagerService: canal=getattr(request, "canal", None), ) await self.firestore_service.save_entry( - session.sessionId, assistant_entry + session.sessionId, assistant_entry, ) # Update session with last message and timestamp @@ -131,14 +131,14 @@ class ConversationManagerService: return response else: logger.info( - "Detected STALE 'pantallaContexto'. Ignoring and proceeding with normal flow." + "Detected STALE 'pantallaContexto'. Ignoring and proceeding with normal flow.", ) # Step 3: Continue with standard conversation flow nickname = request.usuario.nickname logger.info( - f"Primary Check (Redis): Looking up session for phone: {telefono}" + f"Primary Check (Redis): Looking up session for phone: {telefono}", ) # Step 3a: Load conversation history from Firestore @@ -165,7 +165,7 @@ class ConversationManagerService: logger.info("Sending query to RAG service") assistant_response = await self.rag_service.query(messages) logger.info( - f"Received response from RAG service: {assistant_response[:100]}..." + f"Received response from RAG service: {assistant_response[:100]}...", ) # Step 3e: Save user message to Firestore @@ -205,7 +205,7 @@ class ConversationManagerService: logger.info(f"Marked {len(notifications)} notifications as processed") # Step 3i: Return response object - response = DetectIntentResponse( + return DetectIntentResponse( responseId=str(uuid4()), queryResult=QueryResult( responseText=assistant_response, @@ -214,10 +214,9 @@ class ConversationManagerService: quick_replies=None, ) - return response except Exception as e: - logger.error(f"Error managing conversation: {str(e)}", exc_info=True) + logger.error(f"Error managing conversation: {e!s}", exc_info=True) raise def _is_pantalla_context_valid(self, last_modified: datetime) -> bool: @@ -252,17 +251,16 @@ class ConversationManagerService: # If no match, use first question as default or delegate to normal flow if not matched_answer: logger.warning( - f"No matching quick reply found for message: '{request.mensaje}'." + f"No matching quick reply found for message: '{request.mensaje}'.", ) # Create response with the matched quick reply answer - response = DetectIntentResponse( + return DetectIntentResponse( responseId=str(uuid4()), queryResult=QueryResult(responseText=matched_answer, parameters=None), quick_replies=quick_reply_screen, ) - return response async def _get_active_notifications(self, telefono: str) -> list: """Retrieve active notifications for a user from Redis or Firestore. @@ -272,20 +270,21 @@ class ConversationManagerService: Returns: List of active Notification objects + """ try: # Try Redis first notification_session = await self.redis_service.get_notification_session( - telefono + telefono, ) # If not in Redis, try Firestore if not notification_session: # Firestore uses phone as document ID for notifications - from ..models.notification import NotificationSession + from capa_de_integracion.models.notification import NotificationSession doc_ref = self.firestore_service.db.collection( - self.firestore_service.notifications_collection + self.firestore_service.notifications_collection, ).document(telefono) doc = await doc_ref.get() @@ -295,18 +294,17 @@ class ConversationManagerService: # Filter for active notifications only if notification_session and notification_session.notificaciones: - active_notifications = [ + return [ notif for notif in notification_session.notificaciones if notif.status == "active" ] - return active_notifications return [] except Exception as e: logger.error( - f"Error retrieving notifications for {telefono}: {str(e)}", + f"Error retrieving notifications for {telefono}: {e!s}", exc_info=True, ) return [] @@ -330,6 +328,7 @@ class ConversationManagerService: Returns: List of messages in OpenAI format [{"role": "...", "content": "..."}] + """ messages = [] @@ -341,7 +340,7 @@ class ConversationManagerService: { "role": "system", "content": f"Historial de conversación:\n{conversation_context}", - } + }, ) # Add system message with notifications if available @@ -355,7 +354,7 @@ class ConversationManagerService: { "role": "system", "content": f"Notificaciones pendientes para el usuario:\n{notifications_text}", - } + }, ) # Add system message with user context @@ -372,11 +371,12 @@ class ConversationManagerService: Args: telefono: User phone number + """ try: # Update status in Firestore await self.firestore_service.update_notification_status( - telefono, "processed" + telefono, "processed", ) # Update or delete from Redis @@ -386,7 +386,7 @@ class ConversationManagerService: except Exception as e: logger.error( - f"Error marking notifications as processed for {telefono}: {str(e)}", + f"Error marking notifications as processed for {telefono}: {e!s}", exc_info=True, ) @@ -408,13 +408,14 @@ class ConversationManagerService: Returns: Formatted conversation text + """ if not entries: return "" # Filter by date (30 days) cutoff_date = datetime.now() - timedelta( - days=self.settings.conversation_context_days_limit + days=self.settings.conversation_context_days_limit, ) recent_entries = [ e for e in entries if e.timestamp and e.timestamp >= cutoff_date @@ -439,6 +440,7 @@ class ConversationManagerService: Returns: Formatted text, truncated if necessary + """ if not entries: return "" @@ -470,6 +472,7 @@ class ConversationManagerService: Returns: Formatted string (e.g., "User: hello", "Assistant: hi there") + """ # Map entity to prefix (fixed bug from Java port!) prefix = "User: " if entry.entity == "user" else "Assistant: " diff --git a/src/capa_de_integracion/services/dlp_service.py b/src/capa_de_integracion/services/dlp_service.py index b87234d..3e3de7c 100644 --- a/src/capa_de_integracion/services/dlp_service.py +++ b/src/capa_de_integracion/services/dlp_service.py @@ -1,5 +1,4 @@ -""" -Copyright 2025 Google. This software is provided as-is, without warranty or +"""Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google. @@ -8,29 +7,28 @@ Data Loss Prevention service for obfuscating sensitive information. import logging import re + from google.cloud import dlp_v2 from google.cloud.dlp_v2 import types -from ..config import Settings - +from capa_de_integracion.config import Settings logger = logging.getLogger(__name__) class DLPService: - """ - Service for detecting and obfuscating sensitive data using Google Cloud DLP. + """Service for detecting and obfuscating sensitive data using Google Cloud DLP. Integrates with the DLP API to scan text for PII and other sensitive information, then obfuscates findings based on their info type. """ - def __init__(self, settings: Settings): - """ - Initialize DLP service. + def __init__(self, settings: Settings) -> None: + """Initialize DLP service. Args: settings: Application settings + """ self.settings = settings self.project_id = settings.gcp_project_id @@ -40,8 +38,7 @@ class DLPService: logger.info("DLP Service initialized") async def get_obfuscated_string(self, text: str, template_id: str) -> str: - """ - Inspect text for sensitive data and obfuscate findings. + """Inspect text for sensitive data and obfuscate findings. Args: text: Text to inspect and obfuscate @@ -52,6 +49,7 @@ class DLPService: Raises: Exception: If DLP API call fails (returns original text on error) + """ try: # Build content item @@ -63,7 +61,7 @@ class DLPService: # Build inspect config finding_limits = types.InspectConfig.FindingLimits( - max_findings_per_item=0 # No limit + max_findings_per_item=0, # No limit ) inspect_config = types.InspectConfig( @@ -91,8 +89,7 @@ class DLPService: if findings_count > 0: return self._obfuscate_text(response, text) - else: - return text + return text except Exception as e: logger.error( @@ -102,8 +99,7 @@ class DLPService: return text def _obfuscate_text(self, response: types.InspectContentResponse, text: str) -> str: - """ - Obfuscate sensitive findings in text. + """Obfuscate sensitive findings in text. Args: response: DLP inspect content response with findings @@ -111,6 +107,7 @@ class DLPService: Returns: Text with sensitive data obfuscated + """ # Filter findings by likelihood (> POSSIBLE, which is value 3) findings = [ @@ -127,7 +124,7 @@ class DLPService: info_type = finding.info_type.name logger.info( - f"InfoType: {info_type} | Likelihood: {finding.likelihood.value}" + f"InfoType: {info_type} | Likelihood: {finding.likelihood.value}", ) # Obfuscate based on info type @@ -136,13 +133,11 @@ class DLPService: text = text.replace(quote, replacement) # Clean up consecutive DIRECCION tags - text = self._clean_direccion(text) + return self._clean_direccion(text) - return text def _get_replacement(self, info_type: str, quote: str) -> str | None: - """ - Get replacement text for a given info type. + """Get replacement text for a given info type. Args: info_type: DLP info type name @@ -150,6 +145,7 @@ class DLPService: Returns: Replacement text or None to skip + """ replacements = { "CREDIT_CARD_NUMBER": f"**** **** **** {self._get_last4(quote)}", @@ -190,7 +186,7 @@ class DLPService: pattern = r"\[DIRECCION\](?:(?:,\s*|\s+)\[DIRECCION\])*" return re.sub(pattern, "[DIRECCION]", text).strip() - async def close(self): + async def close(self) -> None: """Close DLP client.""" await self.dlp_client.transport.close() logger.info("DLP client closed") diff --git a/src/capa_de_integracion/services/firestore_service.py b/src/capa_de_integracion/services/firestore_service.py index 925a1ea..bb7fc13 100644 --- a/src/capa_de_integracion/services/firestore_service.py +++ b/src/capa_de_integracion/services/firestore_service.py @@ -1,11 +1,11 @@ import logging from datetime import datetime + from google.cloud import firestore -from ..config import Settings -from ..models import ConversationSession, ConversationEntry -from ..models.notification import Notification - +from capa_de_integracion.config import Settings +from capa_de_integracion.models import ConversationEntry, ConversationSession +from capa_de_integracion.models.notification import Notification logger = logging.getLogger(__name__) @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) class FirestoreService: """Service for Firestore operations on conversations.""" - def __init__(self, settings: Settings): + def __init__(self, settings: Settings) -> None: """Initialize Firestore client.""" self.settings = settings self.db = firestore.AsyncClient( @@ -28,10 +28,10 @@ class FirestoreService: f"artifacts/{settings.gcp_project_id}/notifications" ) logger.info( - f"Firestore client initialized for project: {settings.gcp_project_id}" + f"Firestore client initialized for project: {settings.gcp_project_id}", ) - async def close(self): + async def close(self) -> None: """Close Firestore client.""" self.db.close() logger.info("Firestore client closed") @@ -56,20 +56,20 @@ class FirestoreService: return session except Exception as e: - logger.error( - f"Error retrieving session {session_id} from Firestore: {str(e)}" + logger.exception( + f"Error retrieving session {session_id} from Firestore: {e!s}", ) return None async def get_session_by_phone(self, telefono: str) -> ConversationSession | None: - """ - Retrieve most recent conversation session from Firestore by phone number. + """Retrieve most recent conversation session from Firestore by phone number. Args: telefono: User phone number Returns: Most recent session for this phone, or None if not found + """ try: query = ( @@ -84,7 +84,7 @@ class FirestoreService: data = doc.to_dict() session = ConversationSession.model_validate(data) logger.debug( - f"Retrieved session from Firestore for phone {telefono}: {session.sessionId}" + f"Retrieved session from Firestore for phone {telefono}: {session.sessionId}", ) return session @@ -92,8 +92,8 @@ class FirestoreService: return None except Exception as e: - logger.error( - f"Error querying session by phone {telefono} from Firestore: {str(e)}" + logger.exception( + f"Error querying session by phone {telefono} from Firestore: {e!s}", ) return None @@ -107,8 +107,8 @@ class FirestoreService: return True except Exception as e: - logger.error( - f"Error saving session {session.sessionId} to Firestore: {str(e)}" + logger.exception( + f"Error saving session {session.sessionId} to Firestore: {e!s}", ) return False @@ -134,6 +134,7 @@ class FirestoreService: Raises: Exception: If session creation or save fails + """ session = ConversationSession.create( session_id=session_id, @@ -166,13 +167,13 @@ class FirestoreService: return True except Exception as e: - logger.error( - f"Error saving entry for session {session_id} to Firestore: {str(e)}" + logger.exception( + f"Error saving entry for session {session_id} to Firestore: {e!s}", ) return False async def get_entries( - self, session_id: str, limit: int = 10 + self, session_id: str, limit: int = 10, ) -> list[ConversationEntry]: """Retrieve recent conversation entries from Firestore.""" try: @@ -181,7 +182,7 @@ class FirestoreService: # Get entries ordered by timestamp descending query = entries_ref.order_by( - "timestamp", direction=firestore.Query.DESCENDING + "timestamp", direction=firestore.Query.DESCENDING, ).limit(limit) docs = query.stream() @@ -198,8 +199,8 @@ class FirestoreService: return entries except Exception as e: - logger.error( - f"Error retrieving entries for session {session_id} from Firestore: {str(e)}" + logger.exception( + f"Error retrieving entries for session {session_id} from Firestore: {e!s}", ) return [] @@ -219,13 +220,13 @@ class FirestoreService: return True except Exception as e: - logger.error( - f"Error deleting session {session_id} from Firestore: {str(e)}" + logger.exception( + f"Error deleting session {session_id} from Firestore: {e!s}", ) return False async def update_pantalla_contexto( - self, session_id: str, pantalla_contexto: str | None + self, session_id: str, pantalla_contexto: str | None, ) -> bool: """Update the pantallaContexto field for a conversation session. @@ -235,6 +236,7 @@ class FirestoreService: Returns: True if update was successful, False otherwise + """ try: doc_ref = self._session_ref(session_id) @@ -242,7 +244,7 @@ class FirestoreService: if not doc.exists: logger.warning( - f"Session {session_id} not found in Firestore. Cannot update pantallaContexto" + f"Session {session_id} not found in Firestore. Cannot update pantallaContexto", ) return False @@ -250,17 +252,17 @@ class FirestoreService: { "pantallaContexto": pantalla_contexto, "lastModified": datetime.now(), - } + }, ) logger.debug( - f"Updated pantallaContexto for session {session_id} in Firestore" + f"Updated pantallaContexto for session {session_id} in Firestore", ) return True except Exception as e: - logger.error( - f"Error updating pantallaContexto for session {session_id} in Firestore: {str(e)}" + logger.exception( + f"Error updating pantallaContexto for session {session_id} in Firestore: {e!s}", ) return False @@ -269,22 +271,23 @@ class FirestoreService: def _notification_ref(self, notification_id: str): """Get Firestore document reference for notification.""" return self.db.collection(self.notifications_collection).document( - notification_id + notification_id, ) async def save_or_append_notification(self, new_entry: Notification) -> None: - """ - Save or append notification entry to Firestore. + """Save or append notification entry to Firestore. Args: new_entry: Notification entry to save Raises: ValueError: If phone number is missing + """ phone_number = new_entry.telefono if not phone_number or not phone_number.strip(): - raise ValueError("Phone number is required to manage notification entries") + msg = "Phone number is required to manage notification entries" + raise ValueError(msg) # Use phone number as document ID notification_session_id = phone_number @@ -301,10 +304,10 @@ class FirestoreService: { "notificaciones": firestore.ArrayUnion([entry_dict]), "ultimaActualizacion": datetime.now(), - } + }, ) logger.info( - f"Successfully appended notification entry to session {notification_session_id} in Firestore" + f"Successfully appended notification entry to session {notification_session_id} in Firestore", ) else: # Create new notification session @@ -317,23 +320,23 @@ class FirestoreService: } await doc_ref.set(new_session_data) logger.info( - f"Successfully created new notification session {notification_session_id} in Firestore" + f"Successfully created new notification session {notification_session_id} in Firestore", ) except Exception as e: logger.error( - f"Error saving notification to Firestore for phone {phone_number}: {str(e)}", + f"Error saving notification to Firestore for phone {phone_number}: {e!s}", exc_info=True, ) raise async def update_notification_status(self, session_id: str, status: str) -> None: - """ - Update the status of all notifications in a session. + """Update the status of all notifications in a session. Args: session_id: Notification session ID (phone number) status: New status value + """ try: doc_ref = self._notification_ref(session_id) @@ -341,7 +344,7 @@ class FirestoreService: if not doc.exists: logger.warning( - f"Notification session {session_id} not found in Firestore. Cannot update status" + f"Notification session {session_id} not found in Firestore. Cannot update status", ) return @@ -357,16 +360,16 @@ class FirestoreService: { "notificaciones": updated_notifications, "ultimaActualizacion": datetime.now(), - } + }, ) logger.info( - f"Successfully updated notification status to '{status}' for session {session_id} in Firestore" + f"Successfully updated notification status to '{status}' for session {session_id} in Firestore", ) except Exception as e: logger.error( - f"Error updating notification status in Firestore for session {session_id}: {str(e)}", + f"Error updating notification status in Firestore for session {session_id}: {e!s}", exc_info=True, ) raise @@ -375,18 +378,18 @@ class FirestoreService: """Delete notification session from Firestore.""" try: logger.info( - f"Deleting notification session {notification_id} from Firestore" + f"Deleting notification session {notification_id} from Firestore", ) doc_ref = self._notification_ref(notification_id) await doc_ref.delete() logger.info( - f"Successfully deleted notification session {notification_id} from Firestore" + f"Successfully deleted notification session {notification_id} from Firestore", ) return True except Exception as e: logger.error( - f"Error deleting notification session {notification_id} from Firestore: {str(e)}", + f"Error deleting notification session {notification_id} from Firestore: {e!s}", exc_info=True, ) return False diff --git a/src/capa_de_integracion/services/notification_manager.py b/src/capa_de_integracion/services/notification_manager.py index f29a722..877b6ab 100644 --- a/src/capa_de_integracion/services/notification_manager.py +++ b/src/capa_de_integracion/services/notification_manager.py @@ -2,12 +2,15 @@ import asyncio import logging from uuid import uuid4 -from ..config import Settings -from ..models.notification import ExternalNotificationRequest, Notification -from .redis_service import RedisService -from .firestore_service import FirestoreService -from .dlp_service import DLPService +from capa_de_integracion.config import Settings +from capa_de_integracion.models.notification import ( + ExternalNotificationRequest, + Notification, +) +from .dlp_service import DLPService +from .firestore_service import FirestoreService +from .redis_service import RedisService logger = logging.getLogger(__name__) @@ -15,8 +18,7 @@ PREFIX_PO_PARAM = "notification_po_" class NotificationManagerService: - """ - Manages notification processing and integration with conversations. + """Manages notification processing and integration with conversations. Handles push notifications from external systems, stores them in Redis/Firestore, and triggers Dialogflow event detection. @@ -28,9 +30,8 @@ class NotificationManagerService: redis_service: RedisService, firestore_service: FirestoreService, dlp_service: DLPService, - ): - """ - Initialize notification manager. + ) -> None: + """Initialize notification manager. Args: settings: Application settings @@ -38,6 +39,7 @@ class NotificationManagerService: redis_service: Redis caching service firestore_service: Firestore persistence service dlp_service: Data Loss Prevention service + """ self.settings = settings self.redis_service = redis_service @@ -49,10 +51,9 @@ class NotificationManagerService: logger.info("NotificationManagerService initialized") async def process_notification( - self, external_request: ExternalNotificationRequest + self, external_request: ExternalNotificationRequest, ) -> None: - """ - Process a push notification from external system. + """Process a push notification from external system. Flow: 1. Validate phone number @@ -71,6 +72,7 @@ class NotificationManagerService: Raises: ValueError: If phone number is missing + """ telefono = external_request.telefono @@ -101,17 +103,17 @@ class NotificationManagerService: # Save notification to Redis (with async Firestore write-back) await self.redis_service.save_or_append_notification(new_notification_entry) logger.info( - f"Notification for phone {telefono} cached. Kicking off async Firestore write-back" + f"Notification for phone {telefono} cached. Kicking off async Firestore write-back", ) # Fire-and-forget Firestore write (matching Java's .subscribe() behavior) - async def save_notification_to_firestore(): + async def save_notification_to_firestore() -> None: try: await self.firestore_service.save_or_append_notification( - new_notification_entry + new_notification_entry, ) logger.debug( - f"Notification entry persisted to Firestore for phone {telefono}" + f"Notification entry persisted to Firestore for phone {telefono}", ) except Exception as e: logger.error( diff --git a/src/capa_de_integracion/services/quick_reply_content.py b/src/capa_de_integracion/services/quick_reply_content.py index 85168fd..03f5565 100644 --- a/src/capa_de_integracion/services/quick_reply_content.py +++ b/src/capa_de_integracion/services/quick_reply_content.py @@ -1,9 +1,11 @@ import json import logging -from ..config import Settings -from ..models.quick_replies import QuickReplyScreen, QuickReplyQuestions - +from capa_de_integracion.config import Settings +from capa_de_integracion.models.quick_replies import ( + QuickReplyQuestions, + QuickReplyScreen, +) logger = logging.getLogger(__name__) @@ -11,23 +13,22 @@ logger = logging.getLogger(__name__) class QuickReplyContentService: """Service for loading quick reply screen content from JSON files.""" - def __init__(self, settings: Settings): - """ - Initialize quick reply content service. + def __init__(self, settings: Settings) -> None: + """Initialize quick reply content service. Args: settings: Application settings + """ self.settings = settings self.quick_replies_path = settings.base_path / "quick_replies" logger.info( - f"QuickReplyContentService initialized with path: {self.quick_replies_path}" + f"QuickReplyContentService initialized with path: {self.quick_replies_path}", ) async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen: - """ - Load quick reply screen content by ID. + """Load quick reply screen content by ID. Args: screen_id: Screen identifier (e.g., "pagos", "home") @@ -37,6 +38,7 @@ class QuickReplyContentService: Raises: ValueError: If the quick reply file is not found + """ if not screen_id or not screen_id.strip(): logger.warning("screen_id is null or empty. Returning empty quick replies") @@ -53,11 +55,12 @@ class QuickReplyContentService: try: if not file_path.exists(): logger.warning(f"Quick reply file not found: {file_path}") + msg = f"Quick reply file not found for screen_id: {screen_id}" raise ValueError( - f"Quick reply file not found for screen_id: {screen_id}" + msg, ) - with open(file_path, "r", encoding="utf-8") as f: + with open(file_path, encoding="utf-8") as f: data = json.load(f) # Parse questions @@ -80,20 +83,22 @@ class QuickReplyContentService: ) logger.info( - f"Successfully loaded {len(preguntas)} quick replies for screen: {screen_id}" + f"Successfully loaded {len(preguntas)} quick replies for screen: {screen_id}", ) return quick_reply except json.JSONDecodeError as e: logger.error(f"Error parsing JSON file {file_path}: {e}", exc_info=True) + msg = f"Invalid JSON format in quick reply file for screen_id: {screen_id}" raise ValueError( - f"Invalid JSON format in quick reply file for screen_id: {screen_id}" + msg, ) from e except Exception as e: logger.error( f"Error loading quick replies for screen {screen_id}: {e}", exc_info=True, ) + msg = f"Error loading quick replies for screen_id: {screen_id}" raise ValueError( - f"Error loading quick replies for screen_id: {screen_id}" + msg, ) from e diff --git a/src/capa_de_integracion/services/rag_service.py b/src/capa_de_integracion/services/rag_service.py index 0d2184d..27da0db 100644 --- a/src/capa_de_integracion/services/rag_service.py +++ b/src/capa_de_integracion/services/rag_service.py @@ -1,9 +1,9 @@ import logging + import httpx from pydantic import BaseModel, Field -from ..config import Settings - +from capa_de_integracion.config import Settings logger = logging.getLogger(__name__) @@ -28,8 +28,7 @@ class RAGResponse(BaseModel): class RAGService: - """ - Highly concurrent HTTP client for calling RAG endpoints. + """Highly concurrent HTTP client for calling RAG endpoints. Uses httpx AsyncClient with connection pooling for optimal performance when handling multiple concurrent requests. @@ -41,15 +40,15 @@ class RAGService: max_connections: int = 100, max_keepalive_connections: int = 20, timeout: float = 30.0, - ): - """ - Initialize RAG service with connection pooling. + ) -> None: + """Initialize RAG service with connection pooling. Args: settings: Application settings max_connections: Maximum number of concurrent connections max_keepalive_connections: Maximum number of idle connections to keep alive timeout: Request timeout in seconds + """ self.settings = settings self.rag_endpoint_url = settings.rag_endpoint_url @@ -70,12 +69,11 @@ class RAGService: logger.info( f"RAGService initialized with endpoint: {self.rag_endpoint_url}, " - f"max_connections: {max_connections}, timeout: {timeout}s" + f"max_connections: {max_connections}, timeout: {timeout}s", ) async def query(self, messages: list[dict[str, str]]) -> str: - """ - Send conversation history to RAG endpoint and get response. + """Send conversation history to RAG endpoint and get response. Args: messages: OpenAI-style conversation history @@ -87,6 +85,7 @@ class RAGService: Raises: httpx.HTTPError: If HTTP request fails ValueError: If response format is invalid + """ try: # Validate and construct request @@ -113,20 +112,20 @@ class RAGService: return rag_response.response except httpx.HTTPStatusError as e: - logger.error( - f"HTTP error calling RAG endpoint: {e.response.status_code} - {e.response.text}" + logger.exception( + f"HTTP error calling RAG endpoint: {e.response.status_code} - {e.response.text}", ) raise except httpx.RequestError as e: - logger.error(f"Request error calling RAG endpoint: {str(e)}") + logger.exception(f"Request error calling RAG endpoint: {e!s}") raise except Exception as e: logger.error( - f"Unexpected error calling RAG endpoint: {str(e)}", exc_info=True + f"Unexpected error calling RAG endpoint: {e!s}", exc_info=True, ) raise - async def close(self): + async def close(self) -> None: """Close the HTTP client and release connections.""" await self._client.aclose() logger.info("RAGService client closed") diff --git a/src/capa_de_integracion/services/redis_service.py b/src/capa_de_integracion/services/redis_service.py index e50b38c..d02f6f2 100644 --- a/src/capa_de_integracion/services/redis_service.py +++ b/src/capa_de_integracion/services/redis_service.py @@ -1,12 +1,12 @@ import json import logging from datetime import datetime + from redis.asyncio import Redis -from ..config import Settings -from ..models import ConversationSession -from ..models.notification import NotificationSession, Notification - +from capa_de_integracion.config import Settings +from capa_de_integracion.models import ConversationSession +from capa_de_integracion.models.notification import Notification, NotificationSession logger = logging.getLogger(__name__) @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) class RedisService: """Service for Redis operations on conversation sessions.""" - def __init__(self, settings: Settings): + def __init__(self, settings: Settings) -> None: """Initialize Redis client.""" self.settings = settings self.redis: Redis | None = None @@ -22,7 +22,7 @@ class RedisService: self.notification_ttl = 2592000 # 30 days in seconds self.qr_session_ttl = 86400 # 24 hours in seconds - async def connect(self): + async def connect(self) -> None: """Connect to Redis.""" self.redis = Redis( host=self.settings.redis_host, @@ -31,10 +31,10 @@ class RedisService: decode_responses=True, ) logger.info( - f"Connected to Redis at {self.settings.redis_host}:{self.settings.redis_port}" + f"Connected to Redis at {self.settings.redis_host}:{self.settings.redis_port}", ) - async def close(self): + async def close(self) -> None: """Close Redis connection.""" if self.redis: await self.redis.close() @@ -49,17 +49,18 @@ class RedisService: return f"conversation:phone:{phone}" async def get_session(self, session_id_or_phone: str) -> ConversationSession | None: - """ - Retrieve conversation session from Redis by session ID or phone number. + """Retrieve conversation session from Redis by session ID or phone number. Args: session_id_or_phone: Either a session ID or phone number Returns: Conversation session or None if not found + """ if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) # First try as phone number (lookup session ID) phone_key = self._phone_to_session_key(session_id_or_phone) @@ -86,17 +87,17 @@ class RedisService: logger.debug(f"Retrieved session from Redis: {session_id}") return session except Exception as e: - logger.error(f"Error deserializing session {session_id}: {str(e)}") + logger.exception(f"Error deserializing session {session_id}: {e!s}") return None async def save_session(self, session: ConversationSession) -> bool: - """ - Save conversation session to Redis with TTL. + """Save conversation session to Redis with TTL. Also stores phone-to-session mapping for lookup by phone number. """ if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) key = self._session_key(session.sessionId) phone_key = self._phone_to_session_key(session.telefono) @@ -110,17 +111,18 @@ class RedisService: await self.redis.setex(phone_key, self.session_ttl, session.sessionId) logger.debug( - f"Saved session to Redis: {session.sessionId} for phone: {session.telefono}" + f"Saved session to Redis: {session.sessionId} for phone: {session.telefono}", ) return True except Exception as e: - logger.error(f"Error saving session {session.sessionId} to Redis: {str(e)}") + logger.exception(f"Error saving session {session.sessionId} to Redis: {e!s}") return False async def delete_session(self, session_id: str) -> bool: """Delete conversation session from Redis.""" if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) key = self._session_key(session_id) @@ -129,13 +131,14 @@ class RedisService: logger.debug(f"Deleted session from Redis: {session_id}") return result > 0 except Exception as e: - logger.error(f"Error deleting session {session_id} from Redis: {str(e)}") + logger.exception(f"Error deleting session {session_id} from Redis: {e!s}") return False async def exists(self, session_id: str) -> bool: """Check if session exists in Redis.""" if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) key = self._session_key(session_id) return await self.redis.exists(key) > 0 @@ -147,8 +150,7 @@ class RedisService: return f"conversation:messages:{session_id}" async def save_message(self, session_id: str, message) -> bool: - """ - Save a conversation message to Redis sorted set. + """Save a conversation message to Redis sorted set. Messages are stored in a sorted set with timestamp as score. @@ -158,9 +160,11 @@ class RedisService: Returns: True if successful, False otherwise + """ if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) key = self._messages_key(session_id) @@ -178,14 +182,13 @@ class RedisService: logger.debug(f"Saved message to Redis: {session_id}") return True except Exception as e: - logger.error( - f"Error saving message to Redis for session {session_id}: {str(e)}" + logger.exception( + f"Error saving message to Redis for session {session_id}: {e!s}", ) return False async def get_messages(self, session_id: str) -> list: - """ - Retrieve all conversation messages for a session from Redis. + """Retrieve all conversation messages for a session from Redis. Returns messages ordered by timestamp (oldest first). @@ -194,9 +197,11 @@ class RedisService: Returns: List of message dictionaries (parsed from JSON) + """ if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) key = self._messages_key(session_id) @@ -214,16 +219,16 @@ class RedisService: try: messages.append(json.loads(msg_str)) except json.JSONDecodeError as e: - logger.error(f"Error parsing message JSON: {str(e)}") + logger.exception(f"Error parsing message JSON: {e!s}") continue logger.debug( - f"Retrieved {len(messages)} messages from Redis for session: {session_id}" + f"Retrieved {len(messages)} messages from Redis for session: {session_id}", ) return messages except Exception as e: - logger.error( - f"Error retrieving messages from Redis for session {session_id}: {str(e)}" + logger.exception( + f"Error retrieving messages from Redis for session {session_id}: {e!s}", ) return [] @@ -238,21 +243,23 @@ class RedisService: return f"notification:phone_to_notification:{phone}" async def save_or_append_notification(self, new_entry: Notification) -> None: - """ - Save or append notification entry to session. + """Save or append notification entry to session. Args: new_entry: Notification entry to save Raises: ValueError: If phone number is missing + """ if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) phone_number = new_entry.telefono if not phone_number or not phone_number.strip(): - raise ValueError("Phone number is required to manage notification entries") + msg = "Phone number is required to manage notification entries" + raise ValueError(msg) # Use phone number as session ID for notifications notification_session_id = phone_number @@ -262,7 +269,7 @@ class RedisService: if existing_session: # Append to existing session - updated_notifications = existing_session.notificaciones + [new_entry] + updated_notifications = [*existing_session.notificaciones, new_entry] updated_session = NotificationSession( sessionId=notification_session_id, telefono=phone_number, @@ -286,7 +293,8 @@ class RedisService: async def _cache_notification_session(self, session: NotificationSession) -> bool: """Cache notification session in Redis.""" if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) key = self._notification_key(session.sessionId) phone_key = self._phone_to_notification_key(session.telefono) @@ -302,17 +310,18 @@ class RedisService: logger.debug(f"Cached notification session: {session.sessionId}") return True except Exception as e: - logger.error( - f"Error caching notification session {session.sessionId}: {str(e)}" + logger.exception( + f"Error caching notification session {session.sessionId}: {e!s}", ) return False async def get_notification_session( - self, session_id: str + self, session_id: str, ) -> NotificationSession | None: """Retrieve notification session from Redis.""" if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) key = self._notification_key(session_id) data = await self.redis.get(key) @@ -327,15 +336,16 @@ class RedisService: logger.info(f"Notification session {session_id} retrieved from Redis") return session except Exception as e: - logger.error( - f"Error deserializing notification session {session_id}: {str(e)}" + logger.exception( + f"Error deserializing notification session {session_id}: {e!s}", ) return None async def get_notification_id_for_phone(self, phone: str) -> str | None: """Get notification session ID for a phone number.""" if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) key = self._phone_to_notification_key(phone) session_id = await self.redis.get(key) @@ -350,7 +360,8 @@ class RedisService: async def delete_notification_session(self, phone_number: str) -> bool: """Delete notification session from Redis.""" if not self.redis: - raise RuntimeError("Redis client not connected") + msg = "Redis client not connected" + raise RuntimeError(msg) notification_key = self._notification_key(phone_number) phone_key = self._phone_to_notification_key(phone_number) @@ -361,7 +372,7 @@ class RedisService: await self.redis.delete(phone_key) return True except Exception as e: - logger.error( - f"Error deleting notification session for phone {phone_number}: {str(e)}" + logger.exception( + f"Error deleting notification session for phone {phone_number}: {e!s}", ) return False