diff --git a/src/main/java/com/example/service/conversation/ConversationManagerService.java b/src/main/java/com/example/service/conversation/ConversationManagerService.java index bad9333..ac7600e 100644 --- a/src/main/java/com/example/service/conversation/ConversationManagerService.java +++ b/src/main/java/com/example/service/conversation/ConversationManagerService.java @@ -301,83 +301,82 @@ public class ConversationManagerService { } public Mono startNotificationConversation(ConversationContext context, - DetectIntentRequestDTO request, NotificationDTO notification) { - final String userId = context.userId(); - final String userMessageText = context.userMessageText(); - final String userPhoneNumber = context.primaryPhoneNumber(); + DetectIntentRequestDTO request, NotificationDTO notification) { + final String userId = context.userId(); + final String userMessageText = context.userMessageText(); + final String userPhoneNumber = context.primaryPhoneNumber(); + + return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) + .switchIfEmpty(Mono.defer(() -> { + String newSessionId = SessionIdGenerator.generateStandardSessionId(); + logger.warn("No existing conversation session found for notification reply on phone {}. This is unexpected. Creating new session: {}", + userPhoneNumber, newSessionId); + return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber)); + })) + .flatMap(session -> { + final String sessionId = session.sessionId(); + String conversationHistory = conversationContextMapper.toTextWithLimits(session); + String notificationText = notificationContextMapper.toText(notification); - return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber) - .switchIfEmpty(Mono.defer(() -> { - String newSessionId = SessionIdGenerator.generateStandardSessionId(); - logger.info("No existing notification session found for phone number {}. Creating new session: {}", - userPhoneNumber, newSessionId); - return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber)); - })) - .flatMap(session -> { - final String sessionId = session.sessionId(); - String conversationHistory = conversationContextMapper.toTextWithLimits(session); - String notificationText = notificationContextMapper.toText(notification); + Map filteredParams = notification.parametros().entrySet().stream() + .filter(entry -> entry.getKey().startsWith("notification_po_")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Map filteredParams = notification.parametros().entrySet().stream() - .filter(entry -> entry.getKey().startsWith("notification_po_")) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + String resolvedContext = notificationContextResolver.resolveContext(userMessageText, + notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, + userPhoneNumber); - String resolvedContext = notificationContextResolver.resolveContext(userMessageText, - notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, - userPhoneNumber); + if (!resolvedContext.trim().toUpperCase().contains(NotificationContextResolver.CATEGORY_DIALOGFLOW)) { + String uuid = UUID.randomUUID().toString(); + llmResponseTunerService.setValue(uuid, resolvedContext).subscribe(); - if (!NotificationContextResolver.CATEGORY_DIALOGFLOW.equals(resolvedContext)) { - String uuid = UUID.randomUUID().toString(); - llmResponseTunerService.setValue(uuid, resolvedContext).subscribe(); + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, + notification.parametros()); + ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, + notification.parametros()); - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, - notification.parametros()); - ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, - notification.parametros()); + return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber) + .then(persistConversationTurn(userId, sessionId, llmEntry, userPhoneNumber)) + .then(Mono.defer(() -> { + EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED"); + QueryInputDTO queryInput = new QueryInputDTO(null, eventInput, + request.queryInput().languageCode()); + DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput, + request.queryParams()) + .withParameter("llm_reponse_uuid", uuid); + + return dialogflowServiceClient.detectIntent(sessionId, newRequest) + .flatMap(response -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO + .forAgent(response.queryResult()); + return persistConversationTurn(userId, sessionId, agentEntry, + userPhoneNumber) + .thenReturn(response); + }); + })); + } else { + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, + notification.parametros()); - return persistNotificationTurn(userId, sessionId, userEntry, userPhoneNumber) - .then(persistNotificationTurn(userId, sessionId, llmEntry, userPhoneNumber)) - .then(Mono.defer(() -> { - EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED"); - QueryInputDTO queryInput = new QueryInputDTO(null, eventInput, - request.queryInput().languageCode()); - DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput, - request.queryParams()) - .withParameter("llm_reponse_uuid", uuid); - return dialogflowServiceClient.detectIntent(sessionId, newRequest) - .flatMap(response -> { - ConversationEntryDTO agentEntry = ConversationEntryDTO - .forAgent(response.queryResult()); - return persistNotificationTurn(userId, sessionId, agentEntry, - userPhoneNumber) - .thenReturn(response); - }); - })); - } - - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, - notification.parametros()); - - DetectIntentRequestDTO finalRequest; - Instant now = Instant.now(); - if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { - finalRequest = request.withParameters(notification.parametros()); - } else { - finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) - .withParameters(notification.parametros()); - } - - return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber) - .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) - .flatMap(response -> { - ConversationEntryDTO agentEntry = ConversationEntryDTO - .forAgent(response.queryResult()); - return memoryStoreNotificationService - .saveEntry(userId, sessionId, agentEntry, userPhoneNumber) - .thenReturn(response); - })); - }); - } + DetectIntentRequestDTO finalRequest; + Instant now = Instant.now(); + if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { + finalRequest = request.withParameters(notification.parametros()); + } else { + finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) + .withParameters(notification.parametros()); + } + return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber) + .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) + .flatMap(response -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO + .forAgent(response.queryResult()); + return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber) + .thenReturn(response); + })); + } + }); + } private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, String userPhoneNumber) { @@ -397,19 +396,4 @@ public class ConversationManagerService { .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, entry.type().name(), e.getMessage(), e)); } - - private Mono persistNotificationTurn(String userId, String sessionId, ConversationEntryDTO entry, - String userPhoneNumber) { - logger.debug("Starting Write-Back persistence for notification session {}. Type: {}. Writing to Redis first.", - sessionId, - entry.type().name()); - return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber) - .doOnSuccess(v -> logger.info( - "Entry saved to Redis for notification session {}. Type: {}. Kicking off async Firestore write-back.", - sessionId, entry.type().name())) - .doOnError(e -> logger.error( - "Error during primary Redis write for notification session {}. Type: {}: {}", sessionId, - entry.type().name(), e.getMessage(), e)); - } - } \ No newline at end of file diff --git a/src/main/java/com/example/service/notification/NotificationManagerService.java b/src/main/java/com/example/service/notification/NotificationManagerService.java index 2450c4f..09cd755 100644 --- a/src/main/java/com/example/service/notification/NotificationManagerService.java +++ b/src/main/java/com/example/service/notification/NotificationManagerService.java @@ -14,13 +14,14 @@ import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.mapper.notification.ExternalNotRequestMapper; import com.example.service.base.DialogflowClientService; import com.example.service.conversation.DataLossPrevention; +import com.example.service.conversation.FirestoreConversationService; +import com.example.service.conversation.MemoryStoreConversationService; import com.example.util.SessionIdGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; - import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -29,142 +30,146 @@ import java.util.Objects; @Service public class NotificationManagerService { - private static final Logger logger = LoggerFactory.getLogger(NotificationManagerService.class); - private static final String eventName = "notificacion"; - private static final String PREFIX_PO_PARAM = "notification_po_"; + private static final Logger logger = LoggerFactory.getLogger(NotificationManagerService.class); + private static final String eventName = "notificacion"; + private static final String PREFIX_PO_PARAM = "notification_po_"; - private final DialogflowClientService dialogflowClientService; - private final FirestoreNotificationService firestoreNotificationService; - private final MemoryStoreNotificationService memoryStoreNotificationService; - private final FirestoreNotificationConvService firestoreConversationService; - private final ExternalNotRequestMapper externalNotRequestMapper; + private final DialogflowClientService dialogflowClientService; + private final FirestoreNotificationService firestoreNotificationService; + private final MemoryStoreNotificationService memoryStoreNotificationService; + private final ExternalNotRequestMapper externalNotRequestMapper; + private final MemoryStoreConversationService memoryStoreConversationService; + private final FirestoreConversationService firestoreConversationService; + private final DataLossPrevention dataLossPrevention; + private final String dlpTemplateCompleteFlow; - private final DataLossPrevention dataLossPrevention; - private final String dlpTemplateCompleteFlow; + @Value("${dialogflow.default-language-code:es}") + private String defaultLanguageCode; - @Value("${dialogflow.default-language-code:es}") - private String defaultLanguageCode; + public NotificationManagerService( + DialogflowClientService dialogflowClientService, + FirestoreNotificationService firestoreNotificationService, + MemoryStoreNotificationService memoryStoreNotificationService, + MemoryStoreConversationService memoryStoreConversationService, + FirestoreConversationService firestoreConversationService, + + ExternalNotRequestMapper externalNotRequestMapper, + DataLossPrevention dataLossPrevention, + @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { + + this.dialogflowClientService = dialogflowClientService; + this.firestoreNotificationService = firestoreNotificationService; + this.memoryStoreNotificationService = memoryStoreNotificationService; + this.externalNotRequestMapper = externalNotRequestMapper; + this.dataLossPrevention = dataLossPrevention; + this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; + this.memoryStoreConversationService = memoryStoreConversationService; + this.firestoreConversationService = firestoreConversationService; + } - public NotificationManagerService( - DialogflowClientService dialogflowClientService, - FirestoreNotificationService firestoreNotificationService, - MemoryStoreNotificationService memoryStoreNotificationService, - FirestoreNotificationConvService firestoreConversationService, - ExternalNotRequestMapper externalNotRequestMapper, - DataLossPrevention dataLossPrevention, - @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { - this.dialogflowClientService = dialogflowClientService; - this.firestoreNotificationService = firestoreNotificationService; - this.memoryStoreNotificationService = memoryStoreNotificationService; - this.firestoreConversationService = firestoreConversationService; - this.externalNotRequestMapper = externalNotRequestMapper; - this.dataLossPrevention = dataLossPrevention; - this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; - } + public Mono processNotification(ExternalNotRequestDTO externalRequest) { + Objects.requireNonNull(externalRequest, "ExternalNotRequestDTO cannot be null."); - public Mono processNotification(ExternalNotRequestDTO externalRequest) { - Objects.requireNonNull(externalRequest, "ExternalNotRequestDTO cannot be null."); - - String telefono = externalRequest.phoneNumber(); - if (telefono == null || telefono.isBlank()) { - logger.warn("No phone number provided in ExternalNotRequestDTO. Cannot process notification."); - return Mono.error(new IllegalArgumentException("Phone number is required.")); - } + String telefono = externalRequest.phoneNumber(); + if (telefono == null || telefono.isBlank()) { + logger.warn("No phone number provided in ExternalNotRequestDTO. Cannot process notification."); + return Mono.error(new IllegalArgumentException("Phone number is required.")); + } - return dataLossPrevention.getObfuscatedString(externalRequest.text(), dlpTemplateCompleteFlow) - .flatMap(obfuscatedMessage -> { - ExternalNotRequestDTO obfuscatedRequest = new ExternalNotRequestDTO( - obfuscatedMessage, - externalRequest.phoneNumber(), - externalRequest.hiddenParameters() - ); - // 1. Persist the incoming notification entry - String newNotificationId = SessionIdGenerator.generateStandardSessionId(); - Map parameters = new HashMap<>(); - if (obfuscatedRequest.hiddenParameters() != null) { - obfuscatedRequest.hiddenParameters().forEach((key, value) -> parameters.put(PREFIX_PO_PARAM + key, value)); - } + return dataLossPrevention.getObfuscatedString(externalRequest.text(), dlpTemplateCompleteFlow) + .flatMap(obfuscatedMessage -> { + ExternalNotRequestDTO obfuscatedRequest = new ExternalNotRequestDTO( + obfuscatedMessage, + externalRequest.phoneNumber(), + externalRequest.hiddenParameters() + ); - NotificationDTO newNotificationEntry = new NotificationDTO(newNotificationId, telefono, Instant.now(), - obfuscatedRequest.text(), eventName, defaultLanguageCode, parameters, "active"); - Mono persistenceMono = memoryStoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) - .doOnSuccess(v -> { - logger.info("Notification for phone cached. Kicking off async Firestore write-back."); - firestoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) - .subscribe( - ignored -> logger.debug( - "Background: Notification entry persistence initiated for phone in Firestore."), - e -> logger.error( - "Background: Error during notification entry persistence for phone in Firestore: {}", e.getMessage(), e)); - }); + String newNotificationId = SessionIdGenerator.generateStandardSessionId(); + Map parameters = new HashMap<>(); + if (obfuscatedRequest.hiddenParameters() != null) { + obfuscatedRequest.hiddenParameters().forEach((key, value) -> parameters.put(PREFIX_PO_PARAM + key, value)); + } - // 2. Resolve or create a conversation session - Mono sessionMono = memoryStoreNotificationService.getSessionByTelefono(telefono) - .doOnNext(session -> logger.info("Found existing conversation session {} for phone number", - session.sessionId())) - .flatMap(session -> { - Map prefixedParameters = new HashMap<>(); - if (obfuscatedRequest.hiddenParameters() != null) { - obfuscatedRequest.hiddenParameters() - .forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value)); - } - ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), - prefixedParameters); - return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono) - .thenReturn(session); - }) - .switchIfEmpty(Mono.defer(() -> { - String newSessionId = SessionIdGenerator.generateStandardSessionId(); - logger.info("No existing conversation session found for phone number. Creating new session: {}", newSessionId); - String userId = "user_by_phone_" + telefono; - Map prefixedParameters = new HashMap<>(); - if (obfuscatedRequest.hiddenParameters() != null) { - obfuscatedRequest.hiddenParameters() - .forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value)); - } - ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), - prefixedParameters); - return persistConversationTurn(userId, newSessionId, systemEntry, telefono) - .then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono))); - })); - - // 3. Send notification text to Dialogflow using the resolved conversation - // session - return persistenceMono.then(sessionMono) - .flatMap(session -> { - final String sessionId = session.sessionId(); - logger.info("Sending notification text to Dialogflow using conversation session: {}", sessionId); - - DetectIntentRequestDTO detectIntentRequest = externalNotRequestMapper.map(obfuscatedRequest); - - return dialogflowClientService.detectIntent(sessionId, detectIntentRequest); - }) - .doOnSuccess(response -> logger - .info("Finished processing notification. Dialogflow response received for phone")) - .doOnError(e -> logger.error("Overall error in NotificationManagerService: {}", e.getMessage(), e)); + NotificationDTO newNotificationEntry = new NotificationDTO(newNotificationId, telefono, Instant.now(), + obfuscatedRequest.text(), eventName, defaultLanguageCode, parameters, "active"); + Mono persistenceMono = memoryStoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) + .doOnSuccess(v -> { + logger.info("Notification for phone {} cached. Kicking off async Firestore write-back.", telefono); + firestoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) + .subscribe( + ignored -> logger.debug( + "Background: Notification entry persistence initiated for phone {} in Firestore.", telefono), + e -> logger.error( + "Background: Error during notification entry persistence for phone {} in Firestore: {}", + telefono, e.getMessage(), e)); }); - } - private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, - String userPhoneNumber) { - logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, - entry.type().name()); + Mono sessionMono = memoryStoreConversationService.getSessionByTelefono(telefono) + .doOnNext(session -> logger.info("Found existing conversation session {} for phone number {}", + session.sessionId(), telefono)) + .flatMap(session -> { + Map prefixedParameters = new HashMap<>(); + if (obfuscatedRequest.hiddenParameters() != null) { + obfuscatedRequest.hiddenParameters() + .forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value)); + } + ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), + prefixedParameters); + return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono) + .thenReturn(session); + }) + .switchIfEmpty(Mono.defer(() -> { + String newSessionId = SessionIdGenerator.generateStandardSessionId(); + logger.info("No existing conversation session found for phone number {}. Creating new session: {}", + telefono, newSessionId); + String userId = "user_by_phone_" + telefono; + Map prefixedParameters = new HashMap<>(); + if (obfuscatedRequest.hiddenParameters() != null) { + obfuscatedRequest.hiddenParameters() + .forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value)); + } + ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), + prefixedParameters); + return persistConversationTurn(userId, newSessionId, systemEntry, telefono) + .then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono))); + })); - return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber) - .doOnSuccess(v -> { - logger.info( - "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", - sessionId, entry.type().name()); - firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) - .subscribe( - fsVoid -> logger.debug( - "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", - sessionId, entry.type().name()), - fsError -> logger.error( - "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", - sessionId, entry.type().name(), fsError.getMessage(), fsError)); - }) - .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, - entry.type().name(), e.getMessage(), e)); - } -} + return persistenceMono.then(sessionMono) + .flatMap(session -> { + final String sessionId = session.sessionId(); + logger.info("Sending notification text to Dialogflow using conversation session: {}", sessionId); + + DetectIntentRequestDTO detectIntentRequest = externalNotRequestMapper.map(obfuscatedRequest); + + return dialogflowClientService.detectIntent(sessionId, detectIntentRequest); + }) + .doOnSuccess(response -> logger + .info("Finished processing notification. Dialogflow response received for phone {}.", telefono)) + .doOnError(e -> logger.error("Overall error in NotificationManagerService: {}", e.getMessage(), e)); + }); + } + + private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, + String userPhoneNumber) { + logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, + entry.type().name()); + + return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + .doOnSuccess(v -> { + logger.info( + "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", + sessionId, entry.type().name()); + + firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + .subscribe( + fsVoid -> logger.debug( + "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", + sessionId, entry.type().name()), + fsError -> logger.error( + "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", + sessionId, entry.type().name(), fsError.getMessage(), fsError)); + }) + .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, + entry.type().name(), e.getMessage(), e)); + } +} \ No newline at end of file