""" Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google. Notification manager service for processing push notifications. """ import logging from datetime import datetime from ..config import Settings from ..models import DetectIntentResponseDTO from ..models.notification import ExternalNotRequestDTO, NotificationDTO from ..models.conversation import ConversationSessionDTO, ConversationEntryDTO from ..utils.session_id import generate_session_id from .dialogflow_client import DialogflowClientService from .redis_service import RedisService from .firestore_service import FirestoreService from .dlp_service import DLPService logger = logging.getLogger(__name__) PREFIX_PO_PARAM = "notification_po_" class NotificationManagerService: """ Manages notification processing and integration with conversations. Handles push notifications from external systems, stores them in Redis/Firestore, and triggers Dialogflow event detection. """ def __init__( self, settings: Settings, dialogflow_client: DialogflowClientService, redis_service: RedisService, firestore_service: FirestoreService, dlp_service: DLPService, ): """ Initialize notification manager. Args: settings: Application settings dialogflow_client: Dialogflow CX client redis_service: Redis caching service firestore_service: Firestore persistence service dlp_service: Data Loss Prevention service """ self.settings = settings self.dialogflow_client = dialogflow_client self.redis_service = redis_service self.firestore_service = firestore_service self.dlp_service = dlp_service self.default_language_code = settings.dialogflow_default_language self.event_name = "notificacion" logger.info("NotificationManagerService initialized") async def process_notification( self, external_request: ExternalNotRequestDTO ) -> DetectIntentResponseDTO: """ Process a push notification from external system. Flow: 1. Validate phone number 2. Obfuscate sensitive data (DLP - TODO) 3. Create notification entry 4. Save to Redis and Firestore 5. Get or create conversation session 6. Add notification to conversation history 7. Trigger Dialogflow event Args: external_request: External notification request Returns: Dialogflow detect intent response Raises: ValueError: If phone number is missing """ telefono = external_request.telefono if not telefono or not telefono.strip(): logger.warning("No phone number provided in notification request") raise ValueError("Phone number is required") # Obfuscate sensitive data using DLP obfuscated_text = await self.dlp_service.get_obfuscated_string( external_request.texto, self.settings.dlp_template_complete_flow, ) # Prepare parameters with prefix parameters = {} if external_request.parametros_ocultos: for key, value in external_request.parametros_ocultos.items(): parameters[f"{PREFIX_PO_PARAM}{key}"] = value # Create notification entry new_notification_id = generate_session_id() new_notification_entry = NotificationDTO( idNotificacion=new_notification_id, telefono=telefono, timestampCreacion=datetime.now(), texto=obfuscated_text, nombreEventoDialogflow=self.event_name, codigoIdiomaDialogflow=self.default_language_code, parametros=parameters, status="active", ) # Save notification to Redis (with async Firestore write-back) await self.redis_service.save_or_append_notification(new_notification_entry) logger.info( f"Notification for phone {telefono} cached. Kicking off async Firestore write-back" ) # Fire-and-forget Firestore write # In production, consider using asyncio.create_task() with proper error handling try: await self.firestore_service.save_or_append_notification( new_notification_entry ) logger.debug( f"Notification entry persisted to Firestore for phone {telefono}" ) except Exception as e: logger.error( f"Background: Error during notification persistence to Firestore for phone {telefono}: {e}", exc_info=True, ) # Get or create conversation session session = await self._get_or_create_conversation_session( telefono, obfuscated_text, parameters ) # Send notification event to Dialogflow logger.info( f"Sending notification text to Dialogflow using conversation session: {session.sessionId}" ) response = await self.dialogflow_client.detect_intent_event( session_id=session.sessionId, event_name=self.event_name, parameters=parameters, language_code=self.default_language_code, ) logger.info( f"Finished processing notification. Dialogflow response received for phone {telefono}" ) return response async def _get_or_create_conversation_session( self, telefono: str, notification_text: str, parameters: dict ) -> ConversationSessionDTO: """ Get existing conversation session or create a new one. Also persists system entry for the notification. Args: telefono: User phone number notification_text: Notification text content parameters: Notification parameters Returns: Conversation session """ # Try to get existing session by phone # TODO: Need to implement get_session_by_telefono in Redis service # For now, we'll create a new session new_session_id = generate_session_id() user_id = f"user_by_phone_{telefono}" logger.info( f"Creating new conversation session {new_session_id} for notification (phone: {telefono})" ) # Create system entry for notification system_entry = ConversationEntryDTO( entity="SISTEMA", type="SISTEMA", timestamp=datetime.now(), text=notification_text, parameters=parameters, intent=None, ) # Create new session new_session = ConversationSessionDTO( sessionId=new_session_id, userId=user_id, telefono=telefono, createdAt=datetime.now(), lastModified=datetime.now(), lastMessage=notification_text, pantallaContexto=None, ) # Persist conversation turn (session + system entry) await self._persist_conversation_turn(new_session, system_entry) return new_session async def _persist_conversation_turn( self, session: ConversationSessionDTO, entry: ConversationEntryDTO ) -> None: """ Persist conversation turn to Redis and Firestore. Uses write-through caching: writes to Redis first, then async to Firestore. Args: session: Conversation session entry: Conversation entry to persist """ logger.debug( f"Starting Write-Back persistence for notification session {session.sessionId}. " f"Type: {entry.type}. Writing to Redis first" ) # Update session with last message updated_session = ConversationSessionDTO( **session.model_dump(), lastMessage=entry.text, lastModified=datetime.now(), ) # Save to Redis await self.redis_service.save_session(updated_session) logger.info( f"Entry saved to Redis for notification session {session.sessionId}. " f"Type: {entry.type}. Kicking off async Firestore write-back" ) # Fire-and-forget Firestore writes try: await self.firestore_service.save_session(updated_session) await self.firestore_service.save_entry(session.sessionId, entry) logger.debug( f"Asynchronously (Write-Back): Entry successfully saved to Firestore " f"for notification session {session.sessionId}. Type: {entry.type}" ) except Exception as e: logger.error( f"Asynchronously (Write-Back): Failed to save entry to Firestore " f"for notification session {session.sessionId}. Type: {entry.type}: {e}", exc_info=True, )