Misc improvements
This commit is contained in:
132
src/capa_de_integracion/services/notifications.py
Normal file
132
src/capa_de_integracion/services/notifications.py
Normal file
@@ -0,0 +1,132 @@
|
||||
"""Notification manager service for processing push notifications."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from uuid import uuid4
|
||||
|
||||
from capa_de_integracion.config import Settings
|
||||
from capa_de_integracion.models.notification import (
|
||||
ExternalNotificationRequest,
|
||||
Notification,
|
||||
)
|
||||
from capa_de_integracion.services.dlp import DLPService
|
||||
from capa_de_integracion.services.storage.firestore import FirestoreService
|
||||
from capa_de_integracion.services.storage.redis import RedisService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PREFIX_PO_PARAM = "notification_po_"
|
||||
|
||||
|
||||
class NotificationManagerService:
|
||||
"""Manages notification processing and integration with conversations.
|
||||
|
||||
Handles push notifications from external systems, stores them in
|
||||
Redis/Firestore, and triggers Dialogflow event detection.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
settings: Settings,
|
||||
redis_service: RedisService,
|
||||
firestore_service: FirestoreService,
|
||||
dlp_service: DLPService,
|
||||
) -> None:
|
||||
"""Initialize notification manager.
|
||||
|
||||
Args:
|
||||
settings: Application settings
|
||||
dialogflow_client: Dialogflow CX client
|
||||
redis_service: Redis caching service
|
||||
firestore_service: Firestore persistence service
|
||||
dlp_service: Data Loss Prevention service
|
||||
|
||||
"""
|
||||
self.settings = settings
|
||||
self.redis_service = redis_service
|
||||
self.firestore_service = firestore_service
|
||||
self.dlp_service = dlp_service
|
||||
self.event_name = "notificacion"
|
||||
self.default_language_code = "es"
|
||||
|
||||
logger.info("NotificationManagerService initialized")
|
||||
|
||||
async def process_notification(
|
||||
self,
|
||||
external_request: ExternalNotificationRequest,
|
||||
) -> None:
|
||||
"""Process a push notification from external system.
|
||||
|
||||
Flow:
|
||||
1. Validate phone number
|
||||
2. Obfuscate sensitive data (DLP - TODO)
|
||||
3. Create notification entry
|
||||
4. Save to Redis and Firestore
|
||||
5. Get or create conversation session
|
||||
6. Add notification to conversation history
|
||||
7. Trigger Dialogflow event
|
||||
|
||||
Args:
|
||||
external_request: External notification request
|
||||
|
||||
Returns:
|
||||
Dialogflow detect intent response
|
||||
|
||||
Raises:
|
||||
ValueError: If phone number is missing
|
||||
|
||||
"""
|
||||
telefono = external_request.telefono
|
||||
|
||||
# Obfuscate sensitive data using DLP
|
||||
obfuscated_text = await self.dlp_service.get_obfuscated_string(
|
||||
external_request.texto,
|
||||
self.settings.dlp_template_complete_flow,
|
||||
)
|
||||
|
||||
# Prepare parameters with prefix
|
||||
parameters = {}
|
||||
if external_request.parametros_ocultos:
|
||||
for key, value in external_request.parametros_ocultos.items():
|
||||
parameters[f"{PREFIX_PO_PARAM}{key}"] = value
|
||||
|
||||
# Create notification entry
|
||||
new_notification_id = str(uuid4())
|
||||
new_notification_entry = Notification.create(
|
||||
id_notificacion=new_notification_id,
|
||||
telefono=telefono,
|
||||
texto=obfuscated_text,
|
||||
nombre_evento_dialogflow=self.event_name,
|
||||
codigo_idioma_dialogflow=self.default_language_code,
|
||||
parametros=parameters,
|
||||
status="active",
|
||||
)
|
||||
|
||||
# Save notification to Redis (with async Firestore write-back)
|
||||
await self.redis_service.save_or_append_notification(new_notification_entry)
|
||||
logger.info(
|
||||
"Notification for phone %s cached. Kicking off async Firestore write-back",
|
||||
telefono,
|
||||
)
|
||||
|
||||
# Fire-and-forget Firestore write (matching Java's .subscribe() behavior)
|
||||
async def save_notification_to_firestore() -> None:
|
||||
try:
|
||||
await self.firestore_service.save_or_append_notification(
|
||||
new_notification_entry,
|
||||
)
|
||||
logger.debug(
|
||||
"Notification entry persisted to Firestore for phone %s",
|
||||
telefono,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Background: Error during notification persistence "
|
||||
"to Firestore for phone %s",
|
||||
telefono,
|
||||
)
|
||||
|
||||
# Fire and forget - don't await
|
||||
_task = asyncio.create_task(save_notification_to_firestore())
|
||||
# Store reference to prevent premature garbage collection
|
||||
del _task
|
||||
Reference in New Issue
Block a user