package com.example.service.notification; import com.example.dto.dialogflow.notification.ExternalNotRequestDTO; import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.QueryInputDTO; import com.example.dto.dialogflow.conversation.QueryParamsDTO; import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.service.base.DialogflowClientService; import com.example.util.SessionIdGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; import com.example.dto.dialogflow.conversation.TextInputDTO; @Service public class NotificationManagerService { private static final Logger logger = LoggerFactory.getLogger(NotificationManagerService.class); private static final String NOTIFICATION_TEXT_PARAM = "notificationText"; private static final String eventName = "notificacion"; private final DialogflowClientService dialogflowClientService; private final FirestoreNotificationService firestoreNotificationService; private final MemoryStoreNotificationService memoryStoreNotificationService; private final FirestoreNotificationConvService firestoreConversationService; @Value("${dialogflow.default-language-code:es}") private String defaultLanguageCode; public NotificationManagerService( DialogflowClientService dialogflowClientService, FirestoreNotificationService firestoreNotificationService, MemoryStoreNotificationService memoryStoreNotificationService, FirestoreNotificationConvService firestoreConversationService) { this.dialogflowClientService = dialogflowClientService; this.firestoreNotificationService = firestoreNotificationService; this.memoryStoreNotificationService = memoryStoreNotificationService; this.firestoreConversationService = firestoreConversationService; } public Mono processNotification(ExternalNotRequestDTO externalRequest) { Objects.requireNonNull(externalRequest, "ExternalNotRequestDTO cannot be null."); String telefono = externalRequest.phoneNumber(); if (telefono == null || telefono.isBlank()) { logger.warn("No phone number provided in ExternalNotRequestDTO. Cannot process notification."); return Mono.error(new IllegalArgumentException("Phone number is required.")); } // 1. Persist the incoming notification entry String newNotificationId = SessionIdGenerator.generateStandardSessionId(); NotificationDTO newNotificationEntry = new NotificationDTO(newNotificationId, telefono, Instant.now(), externalRequest.text(), eventName, defaultLanguageCode, Collections.emptyMap(), "active"); Mono persistenceMono = memoryStoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) .doOnSuccess(v -> { logger.info("Notification for phone {} cached. Kicking off async Firestore write-back.", telefono); firestoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) .subscribe( ignored -> logger.debug( "Background: Notification entry persistence initiated for phone {} in Firestore.",telefono), e -> logger.error( "Background: Error during notification entry persistence for phone {} in Firestore: {}", telefono, e.getMessage(), e)); }); // 2. Resolve or create a conversation session Mono sessionMono = memoryStoreNotificationService.getSessionByTelefono(telefono) .doOnNext(session -> logger.info("Found existing conversation session {} for phone number {}", session.sessionId(), telefono)) .flatMap(session -> { ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(externalRequest.text()); return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono) .thenReturn(session); }) .switchIfEmpty(Mono.defer(() -> { String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.info("No existing conversation session found for phone number {}. Creating new session: {}",telefono, newSessionId); String userId = "user_by_phone_" + telefono; ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(externalRequest.text()); return persistConversationTurn(userId, newSessionId, systemEntry, telefono) .then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono))); })); // 3. Send notification text to Dialogflow using the resolved conversation // session return persistenceMono.then(sessionMono) .flatMap(session -> { final String sessionId = session.sessionId(); logger.info("Sending notification text to Dialogflow using conversation session: {}", sessionId); Map parameters = new HashMap<>(); parameters.put("telefono", telefono); parameters.put(NOTIFICATION_TEXT_PARAM, newNotificationEntry.texto()); // Use a TextInputDTO to correctly build the QueryInputDTO TextInputDTO textInput = new TextInputDTO(newNotificationEntry.texto()); QueryInputDTO queryInput = new QueryInputDTO(textInput, null, defaultLanguageCode); DetectIntentRequestDTO detectIntentRequest = new DetectIntentRequestDTO( queryInput, new QueryParamsDTO(parameters)); return dialogflowClientService.detectIntent(sessionId, detectIntentRequest); }) .doOnSuccess(response -> logger .info("Finished processing notification. Dialogflow response received for phone {}.", telefono)) .doOnError(e -> logger.error("Overall error in NotificationManagerService: {}", e.getMessage(), e)); } private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, String userPhoneNumber) { logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, entry.type().name()); return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber) .doOnSuccess(v -> { logger.info( "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", sessionId, entry.type().name()); firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) .subscribe( fsVoid -> logger.debug( "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", sessionId, entry.type().name()), fsError -> logger.error( "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", sessionId, entry.type().name(), fsError.getMessage(), fsError)); }) .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, entry.type().name(), e.getMessage(), e)); } }