From 2ad649c3215dd27242b565ef58ab2918c81a0d42 Mon Sep 17 00:00:00 2001 From: PAVEL PALMA Date: Mon, 29 Sep 2025 02:10:37 -0600 Subject: [PATCH] Update 25-sept --- .../LlmResponseTunerController.java | 85 +++ .../dto/llm/webhook/SessionInfoDTO.java | 23 + .../dto/llm/webhook/WebhookRequestDTO.java | 17 + .../dto/llm/webhook/WebhookResponseDTO.java | 24 + .../exception/GlobalExceptionHandler.java | 44 ++ .../ConversationContextMapper.java | 12 +- .../service/base/MessageEntryFilter.java | 7 +- .../base/NotificationContextResolver.java | 14 +- .../ConversationManagerService.java | 683 +++++++++--------- .../conversation/DataLossPreventionImpl.java | 8 + .../service/llm/LlmResponseTunerService.java | 13 + .../llm/LlmResponseTunerServiceImpl.java | 33 + .../QuickRepliesManagerService.java | 280 +++---- src/main/resources/application-dev.properties | 7 +- .../prompts/message_filter_prompt.txt | 125 ++-- .../ConversationManagerServiceTest.java | 99 +++ .../llm/LlmResponseTunerServiceImplTest.java | 57 ++ 17 files changed, 993 insertions(+), 538 deletions(-) create mode 100644 src/main/java/com/example/controller/LlmResponseTunerController.java create mode 100644 src/main/java/com/example/dto/llm/webhook/SessionInfoDTO.java create mode 100644 src/main/java/com/example/dto/llm/webhook/WebhookRequestDTO.java create mode 100644 src/main/java/com/example/dto/llm/webhook/WebhookResponseDTO.java create mode 100644 src/main/java/com/example/exception/GlobalExceptionHandler.java create mode 100644 src/main/java/com/example/service/llm/LlmResponseTunerService.java create mode 100644 src/main/java/com/example/service/llm/LlmResponseTunerServiceImpl.java create mode 100644 src/test/java/com/example/service/conversation/ConversationManagerServiceTest.java create mode 100644 src/test/java/com/example/service/llm/LlmResponseTunerServiceImplTest.java diff --git a/src/main/java/com/example/controller/LlmResponseTunerController.java b/src/main/java/com/example/controller/LlmResponseTunerController.java new file mode 100644 index 0000000..555ddb0 --- /dev/null +++ b/src/main/java/com/example/controller/LlmResponseTunerController.java @@ -0,0 +1,85 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.controller; + +import com.example.dto.llm.webhook.WebhookRequestDTO; +import com.example.dto.llm.webhook.SessionInfoDTO; +import com.example.dto.llm.webhook.WebhookResponseDTO; +import com.example.service.llm.LlmResponseTunerService; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +@RestController +@RequestMapping("/api/v1/llm") +public class LlmResponseTunerController { + + private static final Logger logger = LoggerFactory.getLogger(LlmResponseTunerController.class); + + private final LlmResponseTunerService llmResponseTunerService; + + public LlmResponseTunerController(LlmResponseTunerService llmResponseTunerService) { + this.llmResponseTunerService = llmResponseTunerService; + } + + @PostMapping("/tune-response") + public Mono tuneResponse(@RequestBody WebhookRequestDTO request) { + String uuid = (String) request.getSessionInfo().getParameters().get("uuid"); + return llmResponseTunerService + .getValue(uuid) + .map( + value -> { + Map parameters = new HashMap<>(); + parameters.put("webhook_success", true); + parameters.put("response", value); + SessionInfoDTO sessionInfo = new SessionInfoDTO(parameters); + return new WebhookResponseDTO(sessionInfo); + }) + .defaultIfEmpty(createErrorResponse("No response found for the given UUID.", false)) + .onErrorResume( + e -> { + logger.error("Error tuning response: {}", e.getMessage()); + return Mono.just( + createErrorResponse("An internal error occurred.", true)); + }); + } + + private WebhookResponseDTO createErrorResponse(String errorMessage, boolean isError) { + Map parameters = new HashMap<>(); + parameters.put("webhook_success", false); + parameters.put("error_message", errorMessage); + SessionInfoDTO sessionInfo = new SessionInfoDTO(parameters); + return new WebhookResponseDTO(sessionInfo); + } + + @ExceptionHandler(Exception.class) + public ResponseEntity> handleException(Exception e) { + logger.error("An unexpected error occurred: {}", e.getMessage()); + Map response = new HashMap<>(); + response.put("error", "Internal Server Error"); + response.put("message", "An unexpected error occurred. Please try again later."); + return new ResponseEntity<>(response, HttpStatus.INTERNAL_SERVER_ERROR); + } + + @ExceptionHandler(IllegalArgumentException.class) + public ResponseEntity> handleIllegalArgumentException( + IllegalArgumentException e) { + logger.error("Bad request: {}", e.getMessage()); + Map response = new HashMap<>(); + response.put("error", "Bad Request"); + response.put("message", e.getMessage()); + return new ResponseEntity<>(response, HttpStatus.BAD_REQUEST); + } +} diff --git a/src/main/java/com/example/dto/llm/webhook/SessionInfoDTO.java b/src/main/java/com/example/dto/llm/webhook/SessionInfoDTO.java new file mode 100644 index 0000000..f21a543 --- /dev/null +++ b/src/main/java/com/example/dto/llm/webhook/SessionInfoDTO.java @@ -0,0 +1,23 @@ +package com.example.dto.llm.webhook; + +import java.util.Map; + +public class SessionInfoDTO { + + private Map parameters; + + public SessionInfoDTO() { + } + + public SessionInfoDTO(Map parameters) { + this.parameters = parameters; + } + + public Map getParameters() { + return parameters; + } + + public void setParameters(Map parameters) { + this.parameters = parameters; + } +} diff --git a/src/main/java/com/example/dto/llm/webhook/WebhookRequestDTO.java b/src/main/java/com/example/dto/llm/webhook/WebhookRequestDTO.java new file mode 100644 index 0000000..a646c16 --- /dev/null +++ b/src/main/java/com/example/dto/llm/webhook/WebhookRequestDTO.java @@ -0,0 +1,17 @@ +package com.example.dto.llm.webhook; + +public class WebhookRequestDTO { + + private SessionInfoDTO sessionInfo; + + public WebhookRequestDTO() { + } + + public SessionInfoDTO getSessionInfo() { + return sessionInfo; + } + + public void setSessionInfo(SessionInfoDTO sessionInfo) { + this.sessionInfo = sessionInfo; + } +} diff --git a/src/main/java/com/example/dto/llm/webhook/WebhookResponseDTO.java b/src/main/java/com/example/dto/llm/webhook/WebhookResponseDTO.java new file mode 100644 index 0000000..9014739 --- /dev/null +++ b/src/main/java/com/example/dto/llm/webhook/WebhookResponseDTO.java @@ -0,0 +1,24 @@ +package com.example.dto.llm.webhook; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class WebhookResponseDTO { + + @JsonProperty("sessionInfo") + private SessionInfoDTO sessionInfo; + + public WebhookResponseDTO() { + } + + public WebhookResponseDTO(SessionInfoDTO sessionInfo) { + this.sessionInfo = sessionInfo; + } + + public SessionInfoDTO getSessionInfo() { + return sessionInfo; + } + + public void setSessionInfo(SessionInfoDTO sessionInfo) { + this.sessionInfo = sessionInfo; + } +} diff --git a/src/main/java/com/example/exception/GlobalExceptionHandler.java b/src/main/java/com/example/exception/GlobalExceptionHandler.java new file mode 100644 index 0000000..1312c3c --- /dev/null +++ b/src/main/java/com/example/exception/GlobalExceptionHandler.java @@ -0,0 +1,44 @@ +package com.example.exception; + +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; + +@ControllerAdvice +public class GlobalExceptionHandler { + + private static final Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class); + + @ExceptionHandler(DialogflowClientException.class) + public ResponseEntity> handleDialogflowClientException( + DialogflowClientException ex) { + Map error = new HashMap<>(); + error.put("error", "Error communicating with Dialogflow"); + error.put("message", ex.getMessage()); + logger.error("DialogflowClientException: {}", ex.getMessage()); + return new ResponseEntity<>(error, HttpStatus.SERVICE_UNAVAILABLE); + } + + @ExceptionHandler(GeminiClientException.class) + public ResponseEntity> handleGeminiClientException(GeminiClientException ex) { + Map error = new HashMap<>(); + error.put("error", "Error communicating with Gemini"); + error.put("message", ex.getMessage()); + logger.error("GeminiClientException: {}", ex.getMessage()); + return new ResponseEntity<>(error, HttpStatus.SERVICE_UNAVAILABLE); + } + + @ExceptionHandler(Exception.class) + public ResponseEntity> handleAllExceptions(Exception ex) { + Map error = new HashMap<>(); + error.put("error", "Internal Server Error"); + error.put("message", ex.getMessage()); + logger.error("An unexpected error occurred: {}", ex.getMessage(), ex); + return new ResponseEntity<>(error, HttpStatus.INTERNAL_SERVER_ERROR); + } +} \ No newline at end of file diff --git a/src/main/java/com/example/mapper/messagefilter/ConversationContextMapper.java b/src/main/java/com/example/mapper/messagefilter/ConversationContextMapper.java index a5a6135..825138b 100644 --- a/src/main/java/com/example/mapper/messagefilter/ConversationContextMapper.java +++ b/src/main/java/com/example/mapper/messagefilter/ConversationContextMapper.java @@ -7,6 +7,7 @@ package com.example.mapper.messagefilter; import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.time.Instant; @@ -18,8 +19,11 @@ import java.util.stream.Collectors; @Component public class ConversationContextMapper { - private static final int MESSAGE_LIMIT = 60; - private static final int DAYS_LIMIT = 30; + @Value("${conversation.context.message.limit:60}") + private int messageLimit; + + @Value("${conversation.context.days.limit:30}") + private int daysLimit; public String toText(ConversationSessionDTO session) { if (session == null || session.entries() == null || session.entries().isEmpty()) { @@ -36,12 +40,12 @@ public class ConversationContextMapper { return ""; } - Instant thirtyDaysAgo = Instant.now().minus(DAYS_LIMIT, ChronoUnit.DAYS); + Instant thirtyDaysAgo = Instant.now().minus(daysLimit, ChronoUnit.DAYS); List recentEntries = session.entries().stream() .filter(entry -> entry.timestamp().isAfter(thirtyDaysAgo)) .sorted(Comparator.comparing(ConversationEntryDTO::timestamp).reversed()) - .limit(MESSAGE_LIMIT) + .limit(messageLimit) .sorted(Comparator.comparing(ConversationEntryDTO::timestamp)) .collect(Collectors.toList()); diff --git a/src/main/java/com/example/service/base/MessageEntryFilter.java b/src/main/java/com/example/service/base/MessageEntryFilter.java index a021357..171812b 100644 --- a/src/main/java/com/example/service/base/MessageEntryFilter.java +++ b/src/main/java/com/example/service/base/MessageEntryFilter.java @@ -87,9 +87,6 @@ public class MessageEntryFilter { String classificationPrompt = String.format( this.promptTemplate, - CATEGORY_NOTIFICATION, CATEGORY_NOTIFICATION, CATEGORY_CONVERSATION, CATEGORY_CONVERSATION, - CATEGORY_CONVERSATION, CATEGORY_CONVERSATION, CATEGORY_CONVERSATION, CATEGORY_NOTIFICATION, - CATEGORY_NOTIFICATION, CATEGORY_CONVERSATION, CATEGORY_CONVERSATION, conversationHistory, interruptingNotification, queryInputText @@ -106,8 +103,8 @@ public class MessageEntryFilter { geminiModelNameClassifier, classifierTopP ); - - String resultCategory = switch (geminiResponse != null ? geminiResponse.trim().toUpperCase() : "") { + + String resultCategory = switch (geminiResponse != null ? geminiResponse.strip().toUpperCase() : "") { case CATEGORY_CONVERSATION -> { logger.info("Classified as {}.", CATEGORY_CONVERSATION); yield CATEGORY_CONVERSATION; diff --git a/src/main/java/com/example/service/base/NotificationContextResolver.java b/src/main/java/com/example/service/base/NotificationContextResolver.java index 758ccd1..a0392a8 100644 --- a/src/main/java/com/example/service/base/NotificationContextResolver.java +++ b/src/main/java/com/example/service/base/NotificationContextResolver.java @@ -2,6 +2,7 @@ * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Your use of it is subject to your agreement with Google. */ + package com.example.service.base; import com.example.service.notification.MemoryStoreNotificationService; @@ -75,9 +76,10 @@ public class NotificationContextResolver { public String resolveContext(String queryInputText, String notificationsJson, String conversationJson, String metadata, String userId, String sessionId, String userPhoneNumber) { - logger.debug("resolveContext -> queryInputText: {}, notificationsJson: {}, conversationJson: {}, metadata: {}", queryInputText, notificationsJson, conversationJson, metadata); + logger.debug("resolveContext -> queryInputText: {}, notificationsJson: {}, conversationJson: {}, metadata: {}", + queryInputText, notificationsJson, conversationJson, metadata); if (queryInputText == null || queryInputText.isBlank()) { - logger.warn("Query input text for context resolution is null or blank. Returning {}.", CATEGORY_DIALOGFLOW); + logger.warn("Query input text for context resolution is null or blank.", CATEGORY_DIALOGFLOW); return CATEGORY_DIALOGFLOW; } @@ -107,15 +109,15 @@ public class NotificationContextResolver { if (geminiResponse != null && !geminiResponse.isBlank()) { if (geminiResponse.trim().equalsIgnoreCase(CATEGORY_DIALOGFLOW)) { - logger.info("Resolved to {}.", CATEGORY_DIALOGFLOW); + logger.debug("Resolved to {}. Input: '{}'", CATEGORY_DIALOGFLOW, queryInputText); return CATEGORY_DIALOGFLOW; } else { - logger.info("Resolved to a specific response."); + logger.debug("Resolved to a specific response. Input: '{}'", queryInputText); return geminiResponse; } } else { - logger.warn("Gemini returned a null or blank response. Returning {}.", - CATEGORY_DIALOGFLOW); + logger.warn("Gemini returned a null or blank response", + queryInputText, CATEGORY_DIALOGFLOW); return CATEGORY_DIALOGFLOW; } } catch (Exception e) { diff --git a/src/main/java/com/example/service/conversation/ConversationManagerService.java b/src/main/java/com/example/service/conversation/ConversationManagerService.java index 92994ac..bad9333 100644 --- a/src/main/java/com/example/service/conversation/ConversationManagerService.java +++ b/src/main/java/com/example/service/conversation/ConversationManagerService.java @@ -2,6 +2,7 @@ * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Your use of it is subject to your agreement with Google. */ + package com.example.service.conversation; import com.example.dto.dialogflow.base.DetectIntentRequestDTO; @@ -10,7 +11,8 @@ import com.example.dto.dialogflow.conversation.ConversationContext; import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO; -import com.example.dto.dialogflow.conversation.QueryResultDTO; +import com.example.dto.dialogflow.conversation.QueryInputDTO; +import com.example.dto.dialogflow.notification.EventInputDTO; import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.mapper.conversation.ExternalConvRequestMapper; import com.example.mapper.messagefilter.ConversationContextMapper; @@ -20,6 +22,7 @@ import com.example.service.base.MessageEntryFilter; import com.example.service.base.NotificationContextResolver; import com.example.service.notification.MemoryStoreNotificationService; import com.example.service.quickreplies.QuickRepliesManagerService; +import com.example.service.llm.LlmResponseTunerService; import com.example.util.SessionIdGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,379 +35,381 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; +/** + * Orchestrates the full lifecycle of a user conversation,(this is the core of + * the entire integration layer service), acting as the central point for all + * inbound requests. + * This service manages conversational state by integrating both an in-memory + * cache (for active sessions) + * and a durable database (for conversation history). It intelligently routes + * incoming messages + * to the appropriate handler, which can include a standard Dialogflow agent, a + * notification-specific flow, or a direct LLM-based response. The class also + * ensures data integrity and security by applying Data Loss Prevention (DLP) + * to all incoming user messages before they are processed. + */ @Service public class ConversationManagerService { - private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); + private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); - private static final long SESSION_RESET_THRESHOLD_MINUTES = 30; - private static final String CONV_HISTORY_PARAM = "conversation_history"; - private final ExternalConvRequestMapper externalRequestToDialogflowMapper; - private final DialogflowClientService dialogflowServiceClient; - private final FirestoreConversationService firestoreConversationService; - private final MemoryStoreConversationService memoryStoreConversationService; - private final QuickRepliesManagerService quickRepliesManagerService; - private final MessageEntryFilter messageEntryFilter; - private final MemoryStoreNotificationService memoryStoreNotificationService; - private final NotificationContextMapper notificationContextMapper; - private final ConversationContextMapper conversationContextMapper; - private final DataLossPrevention dataLossPrevention; - private final String dlpTemplateCompleteFlow; + private static final long SESSION_RESET_THRESHOLD_MINUTES = 30; + private static final String CONV_HISTORY_PARAM = "conversation_history"; + private final ExternalConvRequestMapper externalRequestToDialogflowMapper; + private final DialogflowClientService dialogflowServiceClient; + private final FirestoreConversationService firestoreConversationService; + private final MemoryStoreConversationService memoryStoreConversationService; + private final QuickRepliesManagerService quickRepliesManagerService; + private final MessageEntryFilter messageEntryFilter; + private final MemoryStoreNotificationService memoryStoreNotificationService; + private final NotificationContextMapper notificationContextMapper; + private final ConversationContextMapper conversationContextMapper; + private final DataLossPrevention dataLossPrevention; + private final String dlpTemplateCompleteFlow; - private final NotificationContextResolver notificationContextResolver; + private final NotificationContextResolver notificationContextResolver; + private final LlmResponseTunerService llmResponseTunerService; - public ConversationManagerService( - DialogflowClientService dialogflowServiceClient, - FirestoreConversationService firestoreConversationService, - MemoryStoreConversationService memoryStoreConversationService, - ExternalConvRequestMapper externalRequestToDialogflowMapper, - QuickRepliesManagerService quickRepliesManagerService, - MessageEntryFilter messageEntryFilter, - MemoryStoreNotificationService memoryStoreNotificationService, - NotificationContextMapper notificationContextMapper, - ConversationContextMapper conversationContextMapper, - DataLossPrevention dataLossPrevention, - NotificationContextResolver notificationContextResolver, - @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { - this.dialogflowServiceClient = dialogflowServiceClient; - this.firestoreConversationService = firestoreConversationService; - this.memoryStoreConversationService = memoryStoreConversationService; - this.externalRequestToDialogflowMapper = externalRequestToDialogflowMapper; - this.quickRepliesManagerService = quickRepliesManagerService; - this.messageEntryFilter = messageEntryFilter; - this.memoryStoreNotificationService = memoryStoreNotificationService; - this.notificationContextMapper = notificationContextMapper; - this.conversationContextMapper = conversationContextMapper; - this.dataLossPrevention = dataLossPrevention; - this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; - this.notificationContextResolver = notificationContextResolver; + public ConversationManagerService( + DialogflowClientService dialogflowServiceClient, + FirestoreConversationService firestoreConversationService, + MemoryStoreConversationService memoryStoreConversationService, + ExternalConvRequestMapper externalRequestToDialogflowMapper, + QuickRepliesManagerService quickRepliesManagerService, + MessageEntryFilter messageEntryFilter, + MemoryStoreNotificationService memoryStoreNotificationService, + NotificationContextMapper notificationContextMapper, + ConversationContextMapper conversationContextMapper, + DataLossPrevention dataLossPrevention, + NotificationContextResolver notificationContextResolver, + LlmResponseTunerService llmResponseTunerService, + @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { + this.dialogflowServiceClient = dialogflowServiceClient; + this.firestoreConversationService = firestoreConversationService; + this.memoryStoreConversationService = memoryStoreConversationService; + this.externalRequestToDialogflowMapper = externalRequestToDialogflowMapper; + this.quickRepliesManagerService = quickRepliesManagerService; + this.messageEntryFilter = messageEntryFilter; + this.memoryStoreNotificationService = memoryStoreNotificationService; + this.notificationContextMapper = notificationContextMapper; + this.conversationContextMapper = conversationContextMapper; + this.dataLossPrevention = dataLossPrevention; + this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; + this.notificationContextResolver = notificationContextResolver; + this.llmResponseTunerService = llmResponseTunerService; - } + } - public Mono manageConversation(ExternalConvRequestDTO externalrequest) { - return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow) - .flatMap(obfuscatedMessage -> { - ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO( - obfuscatedMessage, - externalrequest.user(), - externalrequest.channel(), - externalrequest.tipo(), - externalrequest.pantallaContexto()); - return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono()) - .flatMap(session -> { + public Mono manageConversation(ExternalConvRequestDTO externalrequest) { + return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow) + .flatMap(obfuscatedMessage -> { + ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO( + obfuscatedMessage, + externalrequest.user(), + externalrequest.channel(), + externalrequest.tipo(), + externalrequest.pantallaContexto()); + return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono()) + .flatMap(session -> { + if (session != null && session.pantallaContexto() != null + && !session.pantallaContexto().isBlank()) { + logger.info( + "Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService."); + return quickRepliesManagerService.manageConversation(obfuscatedRequest); + } + return continueManagingConversation(obfuscatedRequest); + }) + .switchIfEmpty(continueManagingConversation(obfuscatedRequest)); + }); + } - if (session != null && session.pantallaContexto() != null - && !session.pantallaContexto().isBlank()) { - logger.info( - "Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService."); - return quickRepliesManagerService.manageConversation(obfuscatedRequest); - } - return continueManagingConversation(obfuscatedRequest); - }) - .switchIfEmpty(continueManagingConversation(obfuscatedRequest)); - }); - } + private Mono continueManagingConversation(ExternalConvRequestDTO externalrequest) { + final DetectIntentRequestDTO request; + try { + request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(externalrequest); + logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO"); + } catch (IllegalArgumentException e) { + logger.error("Error during pre-mapping: {}", e.getMessage()); + return Mono.error(new IllegalArgumentException( + "Failed to process external request due to mapping error: " + e.getMessage(), e)); + } - private Mono continueManagingConversation(ExternalConvRequestDTO externalrequest) { - final DetectIntentRequestDTO request; - try { - request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(externalrequest); - logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO"); - } catch (IllegalArgumentException e) { - logger.error("Error during pre-mapping: {}", e.getMessage()); - return Mono.error(new IllegalArgumentException( - "Failed to process external request due to mapping error: " + e.getMessage(), e)); - } + Map params = Optional.ofNullable(request.queryParams()) + .map(queryParamsDTO -> queryParamsDTO.parameters()) + .orElse(Collections.emptyMap()); - final ConversationContext context; - try { - context = resolveAndValidateRequest(request); - } catch (IllegalArgumentException e) { - logger.error("Validation error for incoming request: {}", e.getMessage()); - return Mono.error(e); - } + Object telefonoObj = params.get("telefono"); + if (!(telefonoObj instanceof String) || ((String) telefonoObj).isBlank()) { + logger.error("Critical error: parameter is missing, not a String, or blank after mapping."); + return Mono.error(new IllegalStateException("Internal error: parameter is invalid.")); + } - return handleMessageClassification(context, request); - } + String primaryPhoneNumber = (String) telefonoObj; + String resolvedUserId = params.get("usuario_id") instanceof String ? (String) params.get("usuario_id") : null; + String userMessageText = request.queryInput().text().text(); + final ConversationContext context = new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber); - private Mono handleMessageClassification(ConversationContext context, - DetectIntentRequestDTO request) { - final String userPhoneNumber = context.primaryPhoneNumber(); - final String userMessageText = context.userMessageText(); + return handleMessageClassification(context, request); + } - return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) - .map(conversationContextMapper::toText) - .defaultIfEmpty("") - .flatMap(conversationHistory -> { - return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) - .flatMap(notificationId -> memoryStoreNotificationService - .getCachedNotificationSession(notificationId)) - .map(notificationSession -> notificationSession.notificaciones().stream() - .filter(notification -> "active".equalsIgnoreCase(notification.status())) - .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) - .orElse(null)) - .filter(Objects::nonNull) - .flatMap((NotificationDTO notification) -> { - String notificationText = notificationContextMapper.toText(notification); - String classification = messageEntryFilter.classifyMessage(userMessageText, - notificationText, conversationHistory); - if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { - return startNotificationConversation(context, request, notification); - } else { - return continueConversationFlow(context, request); - } - }) - .switchIfEmpty(continueConversationFlow(context, request)); - }); - } + private Mono handleMessageClassification(ConversationContext context, + DetectIntentRequestDTO request) { + final String userPhoneNumber = context.primaryPhoneNumber(); + final String userMessageText = context.userMessageText(); - private Mono continueConversationFlow(ConversationContext context, - DetectIntentRequestDTO request) { - final String userId = context.userId(); - final String userMessageText = context.userMessageText(); - final String userPhoneNumber = context.primaryPhoneNumber(); + return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) + .map(conversationContextMapper::toText) + .defaultIfEmpty("") + .flatMap(conversationHistory -> { + return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) + .flatMap(notificationId -> memoryStoreNotificationService + .getCachedNotificationSession(notificationId)) + .map(notificationSession -> notificationSession.notificaciones().stream() + .filter(notification -> "active".equalsIgnoreCase(notification.status())) + .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) + .orElse(null)) + .filter(Objects::nonNull) + .flatMap((NotificationDTO notification) -> { + String notificationText = notificationContextMapper.toText(notification); + String classification = messageEntryFilter.classifyMessage(userMessageText, + notificationText, conversationHistory); + if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { + return startNotificationConversation(context, request, notification); + } else { + return continueConversationFlow(context, request); + } + }) + .switchIfEmpty(continueConversationFlow(context, request)); + }); + } - if (userPhoneNumber == null || userPhoneNumber.isBlank()) { - logger.warn("No phone number provided in request. Cannot manage conversation session without it."); - return Mono - .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); - } + private Mono continueConversationFlow(ConversationContext context, + DetectIntentRequestDTO request) { + final String userId = context.userId(); + final String userMessageText = context.userMessageText(); + final String userPhoneNumber = context.primaryPhoneNumber(); - logger.info("Primary Check (MemoryStore): Looking up session for phone number"); - return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) - .flatMap(session -> handleMessageClassification(context, request, session)) - .switchIfEmpty(Mono.defer(() -> { - logger.info("No session found in MemoryStore. Performing full lookup to Firestore."); - return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber); - })) - .onErrorResume(e -> { - logger.error("Overall error handling conversation in ConversationManagerService: {}", - e.getMessage(), e); - return Mono - .error(new RuntimeException("Failed to process conversation due to an internal error.", e)); - }); - } + if (userPhoneNumber == null || userPhoneNumber.isBlank()) { + logger.warn("No phone number provided in request. Cannot manage conversation session without it."); + return Mono + .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); + } - private Mono handleMessageClassification(ConversationContext context, - DetectIntentRequestDTO request, ConversationSessionDTO session) { - final String userPhoneNumber = context.primaryPhoneNumber(); - final String userMessageText = context.userMessageText(); + logger.info("Primary Check (MemoryStore): Looking up session for phone number: {}", userPhoneNumber); + return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) + .flatMap(session -> handleMessageClassification(context, request, session)) + .switchIfEmpty(Mono.defer(() -> { + logger.info("No session found in MemoryStore. Performing full lookup to Firestore."); + return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber); + })) + .onErrorResume(e -> { + logger.error("Overall error handling conversation in ConversationManagerService: {}", + e.getMessage(), e); + return Mono + .error(new RuntimeException("Failed to process conversation due to an internal error.", e)); + }); + } - return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) - .flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId)) - .map(notificationSession -> notificationSession.notificaciones().stream() - .filter(notification -> "active".equalsIgnoreCase(notification.status())) - .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) - .orElse(null)) - .filter(Objects::nonNull) - .flatMap((NotificationDTO notification) -> { - String conversationHistory = conversationContextMapper.toText(session); - String notificationText = notificationContextMapper.toText(notification); - String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, - conversationHistory); - if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { - return startNotificationConversation(context, request, notification); - } else { - return proceedWithConversation(context, request, session); - } - }) - .switchIfEmpty(proceedWithConversation(context, request, session)); - } + private Mono handleMessageClassification(ConversationContext context, + DetectIntentRequestDTO request, ConversationSessionDTO session) { + final String userPhoneNumber = context.primaryPhoneNumber(); + final String userMessageText = context.userMessageText(); - private Mono proceedWithConversation(ConversationContext context, - DetectIntentRequestDTO request, ConversationSessionDTO session) { - Instant now = Instant.now(); - if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { - logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.", - session.sessionId()); - return processDialogflowRequest(session, request, context.userId(), context.userMessageText(), - context.primaryPhoneNumber(), false); - } else { - logger.info( - "Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.", - session.sessionId()); - String conversationHistory = conversationContextMapper.toTextWithLimits(session); - DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); - return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), - context.primaryPhoneNumber(), false); - } - } + return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) + .flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId)) + .map(notificationSession -> notificationSession.notificaciones().stream() + .filter(notification -> "active".equalsIgnoreCase(notification.status())) + .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) + .orElse(null)) + .filter(Objects::nonNull) + .flatMap((NotificationDTO notification) -> { + String conversationHistory = conversationContextMapper.toText(session); + String notificationText = notificationContextMapper.toText(notification); + String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, + conversationHistory); + if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { + return startNotificationConversation(context, request, notification); + } else { + return proceedWithConversation(context, request, session); + } + }) + .switchIfEmpty(proceedWithConversation(context, request, session)); + } - private Mono fullLookupAndProcess(ConversationSessionDTO oldSession, - DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) { - return firestoreConversationService.getSessionByTelefono(userPhoneNumber) - .map(conversationContextMapper::toTextWithLimits) - .defaultIfEmpty("") - .flatMap(conversationHistory -> { - String newSessionId = SessionIdGenerator.generateStandardSessionId(); - logger.info("Creating new session {} after full lookup.", newSessionId); - ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, - userPhoneNumber); - DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); - return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, - true); - }); - } + private Mono proceedWithConversation(ConversationContext context, + DetectIntentRequestDTO request, ConversationSessionDTO session) { + Instant now = Instant.now(); + if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { + logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.", + session.sessionId()); + return processDialogflowRequest(session, request, context.userId(), context.userMessageText(), + context.primaryPhoneNumber(), false); + } else { + logger.info( + "Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.", + session.sessionId()); + String conversationHistory = conversationContextMapper.toTextWithLimits(session); + DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); + return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), + context.primaryPhoneNumber(), false); + } + } - private Mono processDialogflowRequest(ConversationSessionDTO session, - DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber, - boolean newSession) { - final String finalSessionId = session.sessionId(); + private Mono fullLookupAndProcess(ConversationSessionDTO oldSession, + DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) { + return firestoreConversationService.getSessionByTelefono(userPhoneNumber) + .map(conversationContextMapper::toTextWithLimits) + .defaultIfEmpty("") + .flatMap(conversationHistory -> { + String newSessionId = SessionIdGenerator.generateStandardSessionId(); + logger.info("Creating new session {} after full lookup.", newSessionId); + ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, + userPhoneNumber); + DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); + return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, + true); + }); + } - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText); + private Mono processDialogflowRequest(ConversationSessionDTO session, + DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber, + boolean newSession) { + final String finalSessionId = session.sessionId(); - return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber) - .doOnSuccess(v -> logger.debug( - "User entry successfully persisted for session {}. Proceeding to Dialogflow...", - finalSessionId)) - .doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId, - e.getMessage(), e)) - .then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request) - .flatMap(response -> { - logger.debug( - "Received Dialogflow CX response for session {}. Initiating agent response persistence.", - finalSessionId); - ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); - return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber) - .thenReturn(response); - }) - .doOnError( - error -> logger.error("Overall error during conversation management for session {}: {}", - finalSessionId, error.getMessage(), error)))); - } + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText); - private Mono startNotificationConversation(ConversationContext context, - DetectIntentRequestDTO request, NotificationDTO notification) { - final String userId = context.userId(); - final String userMessageText = context.userMessageText(); - final String userPhoneNumber = context.primaryPhoneNumber(); + return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber) + .doOnSuccess(v -> logger.debug( + "User entry successfully persisted for session {}. Proceeding to Dialogflow...", + finalSessionId)) + .doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId, + e.getMessage(), e)) + .then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request) + .flatMap(response -> { + logger.debug( + "Received Dialogflow CX response for session {}. Initiating agent response persistence.", + finalSessionId); + ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); + return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber) + .thenReturn(response); + }) + .doOnError( + error -> logger.error("Overall error during conversation management for session {}: {}", + finalSessionId, error.getMessage(), error)))); + } - return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber) - .switchIfEmpty(Mono.defer(() -> { - String newSessionId = SessionIdGenerator.generateStandardSessionId(); - logger.info("No existing notification session found for phone number. Creating new session: {}", - newSessionId); - return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber)); - })) - .flatMap(session -> { - final String sessionId = session.sessionId(); - String conversationHistory = conversationContextMapper.toTextWithLimits(session); - String notificationText = notificationContextMapper.toText(notification); + public Mono startNotificationConversation(ConversationContext context, + DetectIntentRequestDTO request, NotificationDTO notification) { + final String userId = context.userId(); + final String userMessageText = context.userMessageText(); + final String userPhoneNumber = context.primaryPhoneNumber(); - Map filteredParams = notification.parametros().entrySet().stream() - .filter(entry -> entry.getKey().startsWith("notification_po_")) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber) + .switchIfEmpty(Mono.defer(() -> { + String newSessionId = SessionIdGenerator.generateStandardSessionId(); + logger.info("No existing notification session found for phone number {}. Creating new session: {}", + userPhoneNumber, newSessionId); + return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber)); + })) + .flatMap(session -> { + final String sessionId = session.sessionId(); + String conversationHistory = conversationContextMapper.toTextWithLimits(session); + String notificationText = notificationContextMapper.toText(notification); - String resolvedContext = notificationContextResolver.resolveContext(userMessageText, - notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, - userPhoneNumber); + Map filteredParams = notification.parametros().entrySet().stream() + .filter(entry -> entry.getKey().startsWith("notification_po_")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - if (!NotificationContextResolver.CATEGORY_DIALOGFLOW.equals(resolvedContext)) { - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, - notification.parametros()); - ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, - notification.parametros()); + String resolvedContext = notificationContextResolver.resolveContext(userMessageText, + notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, + userPhoneNumber); - return persistNotificationTurn(userId, sessionId, userEntry, userPhoneNumber) - .then(persistNotificationTurn(userId, sessionId, llmEntry, userPhoneNumber)) - .then(Mono.fromCallable(() -> { - QueryResultDTO queryResult = new QueryResultDTO(resolvedContext, - java.util.Collections.emptyMap()); - return new DetectIntentResponseDTO(null, queryResult); - })); - } + if (!NotificationContextResolver.CATEGORY_DIALOGFLOW.equals(resolvedContext)) { + String uuid = UUID.randomUUID().toString(); + llmResponseTunerService.setValue(uuid, resolvedContext).subscribe(); - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, - notification.parametros()); + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, + notification.parametros()); + ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, + notification.parametros()); - DetectIntentRequestDTO finalRequest; - Instant now = Instant.now(); - if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { - finalRequest = request.withParameters(notification.parametros()); - } else { - finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) - .withParameters(notification.parametros()); - } + return persistNotificationTurn(userId, sessionId, userEntry, userPhoneNumber) + .then(persistNotificationTurn(userId, sessionId, llmEntry, userPhoneNumber)) + .then(Mono.defer(() -> { + EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED"); + QueryInputDTO queryInput = new QueryInputDTO(null, eventInput, + request.queryInput().languageCode()); + DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput, + request.queryParams()) + .withParameter("llm_reponse_uuid", uuid); + return dialogflowServiceClient.detectIntent(sessionId, newRequest) + .flatMap(response -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO + .forAgent(response.queryResult()); + return persistNotificationTurn(userId, sessionId, agentEntry, + userPhoneNumber) + .thenReturn(response); + }); + })); + } - return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber) - .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) - .flatMap(response -> { - ConversationEntryDTO agentEntry = ConversationEntryDTO - .forAgent(response.queryResult()); - return memoryStoreNotificationService - .saveEntry(userId, sessionId, agentEntry, userPhoneNumber) - .thenReturn(response); - })); - }); - } + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, + notification.parametros()); - private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, - String userPhoneNumber) { - logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, - entry.type().name()); - return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) - .doOnSuccess(v -> logger.info( - "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", - sessionId, entry.type().name())) - .then(firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) - .doOnSuccess(fsVoid -> logger.debug( - "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", - sessionId, entry.type().name())) - .doOnError(fsError -> logger.error( - "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", - sessionId, entry.type().name(), fsError.getMessage(), fsError))) - .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, - entry.type().name(), e.getMessage(), e)); - } + DetectIntentRequestDTO finalRequest; + Instant now = Instant.now(); + if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { + finalRequest = request.withParameters(notification.parametros()); + } else { + finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) + .withParameters(notification.parametros()); + } - private Mono persistNotificationTurn(String userId, String sessionId, ConversationEntryDTO entry, - String userPhoneNumber) { - logger.debug("Starting Write-Back persistence for notification session {}. Type: {}. Writing to Redis first.", - sessionId, - entry.type().name()); - return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber) - .doOnSuccess(v -> logger.info( - "Entry saved to Redis for notification session {}. Type: {}. Kicking off async Firestore write-back.", - sessionId, entry.type().name())) - .doOnError(e -> logger.error( - "Error during primary Redis write for notification session {}. Type: {}: {}", sessionId, - entry.type().name(), e.getMessage(), e)); - } + return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber) + .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) + .flatMap(response -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO + .forAgent(response.queryResult()); + return memoryStoreNotificationService + .saveEntry(userId, sessionId, agentEntry, userPhoneNumber) + .thenReturn(response); + })); + }); + } + + private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, + String userPhoneNumber) { + logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, + entry.type().name()); + return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + .doOnSuccess(v -> logger.info( + "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", + sessionId, entry.type().name())) + .then(firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + .doOnSuccess(fsVoid -> logger.debug( + "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", + sessionId, entry.type().name())) + .doOnError(fsError -> logger.error( + "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", + sessionId, entry.type().name(), fsError.getMessage(), fsError))) + .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, + entry.type().name(), e.getMessage(), e)); + } + + private Mono persistNotificationTurn(String userId, String sessionId, ConversationEntryDTO entry, + String userPhoneNumber) { + logger.debug("Starting Write-Back persistence for notification session {}. Type: {}. Writing to Redis first.", + sessionId, + entry.type().name()); + return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + .doOnSuccess(v -> logger.info( + "Entry saved to Redis for notification session {}. Type: {}. Kicking off async Firestore write-back.", + sessionId, entry.type().name())) + .doOnError(e -> logger.error( + "Error during primary Redis write for notification session {}. Type: {}: {}", sessionId, + entry.type().name(), e.getMessage(), e)); + } - private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) { - Map params = Optional.ofNullable(request.queryParams()) - .map(queryParamsDTO -> queryParamsDTO.parameters()) - .orElse(Collections.emptyMap()); - String primaryPhoneNumber = null; - Object telefonoObj = params.get("telefono"); // Get from map - if (telefonoObj instanceof String) { - primaryPhoneNumber = (String) telefonoObj; - } else if (telefonoObj != null) { - logger.warn("Parameter 'telefono' in queryParams is not a String (type: {}). Expected String.", - telefonoObj.getClass().getName()); - } - if (primaryPhoneNumber == null || primaryPhoneNumber.trim().isEmpty()) { - throw new IllegalArgumentException( - "Phone number (telefono) is required in query parameters for conversation management."); - } - String resolvedUserId = null; - Object userIdObj = params.get("usuario_id"); - if (userIdObj instanceof String) { - resolvedUserId = (String) userIdObj; - } else if (userIdObj != null) { - logger.warn("Parameter 'userId' in query_params is not a String (type: {}). Expected String.", - userIdObj.getClass().getName()); - } - if (resolvedUserId == null || resolvedUserId.trim().isEmpty()) { - resolvedUserId = "user_by_phone_" + primaryPhoneNumber.replaceAll("[^0-9]", ""); - logger.warn("User ID not provided in query parameters. Using derived ID from phone number"); - } - if (request.queryInput() == null || request.queryInput().text() == null || - request.queryInput().text().text() == null || request.queryInput().text().text().trim().isEmpty()) { - throw new IllegalArgumentException("Dialogflow query input text is required."); - } - String userMessageText = request.queryInput().text().text(); - return new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber); - } } \ No newline at end of file diff --git a/src/main/java/com/example/service/conversation/DataLossPreventionImpl.java b/src/main/java/com/example/service/conversation/DataLossPreventionImpl.java index 371c0c0..f4131cc 100644 --- a/src/main/java/com/example/service/conversation/DataLossPreventionImpl.java +++ b/src/main/java/com/example/service/conversation/DataLossPreventionImpl.java @@ -21,6 +21,14 @@ import com.example.util.TextObfuscator; import reactor.core.publisher.Mono; +/** +Implements a data loss prevention service by integrating with the +Google Cloud Data Loss Prevention (DLP) API. This service is responsible for +scanning a given text input to identify and obfuscate sensitive information based on +a specified DLP template. If the DLP API detects sensitive findings, the +original text is obfuscated to protect user data; otherwise, the original +text is returned. +*/ @Service public class DataLossPreventionImpl implements DataLossPrevention { diff --git a/src/main/java/com/example/service/llm/LlmResponseTunerService.java b/src/main/java/com/example/service/llm/LlmResponseTunerService.java new file mode 100644 index 0000000..de93a39 --- /dev/null +++ b/src/main/java/com/example/service/llm/LlmResponseTunerService.java @@ -0,0 +1,13 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.service.llm; + +import reactor.core.publisher.Mono; + +public interface LlmResponseTunerService { + Mono getValue(String key); + Mono setValue(String key, String value); +} \ No newline at end of file diff --git a/src/main/java/com/example/service/llm/LlmResponseTunerServiceImpl.java b/src/main/java/com/example/service/llm/LlmResponseTunerServiceImpl.java new file mode 100644 index 0000000..2ea0a19 --- /dev/null +++ b/src/main/java/com/example/service/llm/LlmResponseTunerServiceImpl.java @@ -0,0 +1,33 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.service.llm; + +import java.time.Duration; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +public class LlmResponseTunerServiceImpl implements LlmResponseTunerService { + + private final ReactiveRedisTemplate reactiveStringRedisTemplate; + private final String llmPreResponseCollectionName = "llm-pre-response:"; + private final Duration ttl = Duration.ofHours(1); + + public LlmResponseTunerServiceImpl(ReactiveRedisTemplate reactiveStringRedisTemplate) { + this.reactiveStringRedisTemplate = reactiveStringRedisTemplate; + } + + @Override + public Mono getValue(String key) { + return reactiveStringRedisTemplate.opsForValue().get(llmPreResponseCollectionName + key); + } + + @Override + public Mono setValue(String key, String value) { + return reactiveStringRedisTemplate.opsForValue().set(llmPreResponseCollectionName + key, value, ttl).then(); + } +} diff --git a/src/main/java/com/example/service/quickreplies/QuickRepliesManagerService.java b/src/main/java/com/example/service/quickreplies/QuickRepliesManagerService.java index 890e14c..3b3c37c 100644 --- a/src/main/java/com/example/service/quickreplies/QuickRepliesManagerService.java +++ b/src/main/java/com/example/service/quickreplies/QuickRepliesManagerService.java @@ -31,155 +31,155 @@ import reactor.core.publisher.Mono; @Service public class QuickRepliesManagerService { - private static final Logger logger = LoggerFactory.getLogger(QuickRepliesManagerService.class); - private final MemoryStoreConversationService memoryStoreConversationService; - private final FirestoreConversationService firestoreConversationService; - private final QuickReplyContentService quickReplyContentService; - private final ConversationManagerService conversationManagerService; + private static final Logger logger = LoggerFactory.getLogger(QuickRepliesManagerService.class); + private final MemoryStoreConversationService memoryStoreConversationService; + private final FirestoreConversationService firestoreConversationService; + private final QuickReplyContentService quickReplyContentService; + private final ConversationManagerService conversationManagerService; - public QuickRepliesManagerService( - @Lazy ConversationManagerService conversationManagerService, - MemoryStoreConversationService memoryStoreConversationService, - FirestoreConversationService firestoreConversationService, - QuickReplyContentService quickReplyContentService) { - this.conversationManagerService = conversationManagerService; - this.memoryStoreConversationService = memoryStoreConversationService; - this.firestoreConversationService = firestoreConversationService; - this.quickReplyContentService = quickReplyContentService; - } + public QuickRepliesManagerService( + @Lazy ConversationManagerService conversationManagerService, + MemoryStoreConversationService memoryStoreConversationService, + FirestoreConversationService firestoreConversationService, + QuickReplyContentService quickReplyContentService) { + this.conversationManagerService = conversationManagerService; + this.memoryStoreConversationService = memoryStoreConversationService; + this.firestoreConversationService = firestoreConversationService; + this.quickReplyContentService = quickReplyContentService; + } - public Mono startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) { - String userPhoneNumber = externalRequest.user().telefono(); - if (userPhoneNumber == null || userPhoneNumber.isBlank()) { - logger.warn("No phone number provided in request. Cannot manage conversation session without it."); - return Mono - .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); - } - return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) - .flatMap(session -> Mono.just(session.sessionId())) - .switchIfEmpty(Mono.fromCallable(SessionIdGenerator::generateStandardSessionId)) - .flatMap(sessionId -> { - String userId = "user_by_phone_" + userPhoneNumber.replaceAll("[^0-9]", ""); - ConversationEntryDTO systemEntry = new ConversationEntryDTO( - ConversationEntryEntity.SISTEMA, - ConversationEntryType.INICIO, - Instant.now(), - "Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :", - null, - null); - return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber, - externalRequest.pantallaContexto()) - .then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto())) - .map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); - }); - } + public Mono startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) { + String userPhoneNumber = externalRequest.user().telefono(); + if (userPhoneNumber == null || userPhoneNumber.isBlank()) { + logger.warn("No phone number provided in request. Cannot manage conversation session without it."); + return Mono + .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); + } + return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) + .flatMap(session -> Mono.just(session.sessionId())) + .switchIfEmpty(Mono.fromCallable(SessionIdGenerator::generateStandardSessionId)) + .flatMap(sessionId -> { + String userId = "user_by_phone_" + userPhoneNumber.replaceAll("[^0-9]", ""); + ConversationEntryDTO systemEntry = new ConversationEntryDTO( + ConversationEntryEntity.SISTEMA, + ConversationEntryType.INICIO, + Instant.now(), + "Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :", + null, + null); + return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber, + externalRequest.pantallaContexto()) + .then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto())) + .map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); + }); + } - public Mono manageConversation(ExternalConvRequestDTO externalRequest) { - String userPhoneNumber = externalRequest.user().telefono(); - if (userPhoneNumber == null || userPhoneNumber.isBlank()) { - logger.warn("No phone number provided in request. Cannot manage conversation session without it."); - return Mono - .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); - } + public Mono manageConversation(ExternalConvRequestDTO externalRequest) { + String userPhoneNumber = externalRequest.user().telefono(); + if (userPhoneNumber == null || userPhoneNumber.isBlank()) { + logger.warn("No phone number provided in request. Cannot manage conversation session without it."); + return Mono + .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); + } - return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) - .switchIfEmpty(Mono.error( - new IllegalStateException("No quick reply session found for phone number" ))) - .flatMap(session -> { - String userId = session.userId(); - String sessionId = session.sessionId(); + return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) + .switchIfEmpty(Mono.error( + new IllegalStateException("No quick reply session found for phone number"))) + .flatMap(session -> { + String userId = session.userId(); + String sessionId = session.sessionId(); - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message()); + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message()); - List entries = session.entries(); - int lastInitIndex = IntStream.range(0, entries.size()) - .map(i -> entries.size() - 1 - i) - .filter(i -> { - ConversationEntryDTO entry = entries.get(i); - return entry.entity() == ConversationEntryEntity.SISTEMA - && entry.type() == ConversationEntryType.INICIO; - }) - .findFirst() - .orElse(-1); + List entries = session.entries(); + int lastInitIndex = IntStream.range(0, entries.size()) + .map(i -> entries.size() - 1 - i) + .filter(i -> { + ConversationEntryDTO entry = entries.get(i); + return entry.entity() == ConversationEntryEntity.SISTEMA + && entry.type() == ConversationEntryType.INICIO; + }) + .findFirst() + .orElse(-1); - long userMessagesCount; - if (lastInitIndex != -1) { - userMessagesCount = entries.subList(lastInitIndex + 1, entries.size()).stream() - .filter(e -> e.entity() == ConversationEntryEntity.USUARIO) - .count(); - } else { - userMessagesCount = 0; - } + long userMessagesCount; + if (lastInitIndex != -1) { + userMessagesCount = entries.subList(lastInitIndex + 1, entries.size()).stream() + .filter(e -> e.entity() == ConversationEntryEntity.USUARIO) + .count(); + } else { + userMessagesCount = 0; + } - if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow - // This is the second message of the flow. Return the full list. - return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, - session.pantallaContexto()) - .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) - .flatMap(quickReplyDTO -> { - ConversationEntryDTO agentEntry = ConversationEntryDTO - .forAgentWithMessage(quickReplyDTO.toString()); - return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber, - session.pantallaContexto()) - .thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); - }); - } else if (userMessagesCount == 1) { // Is the second user message in the QR flow - // This is the third message of the flow. Filter and end. - return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, - session.pantallaContexto()) - .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) - .flatMap(quickReplyDTO -> { - List matchedPreguntas = quickReplyDTO.preguntas().stream() - .filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim())) - .toList(); + if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow + // This is the second message of the flow. Return the full list. + return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, + session.pantallaContexto()) + .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) + .flatMap(quickReplyDTO -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO + .forAgentWithMessage(quickReplyDTO.toString()); + return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber, + session.pantallaContexto()) + .thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); + }); + } else if (userMessagesCount == 1) { // Is the second user message in the QR flow + // This is the third message of the flow. Filter and end. + return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, + session.pantallaContexto()) + .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) + .flatMap(quickReplyDTO -> { + List matchedPreguntas = quickReplyDTO.preguntas().stream() + .filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim())) + .toList(); - if (!matchedPreguntas.isEmpty()) { - // Matched question, return the answer - String respuesta = matchedPreguntas.get(0).respuesta(); - QueryResultDTO queryResult = new QueryResultDTO(respuesta, null); - DetectIntentResponseDTO response = new DetectIntentResponseDTO(sessionId, - queryResult, null); + if (!matchedPreguntas.isEmpty()) { + // Matched question, return the answer + String respuesta = matchedPreguntas.get(0).respuesta(); + QueryResultDTO queryResult = new QueryResultDTO(respuesta, null); + DetectIntentResponseDTO response = new DetectIntentResponseDTO(sessionId, + queryResult, null); - return memoryStoreConversationService - .updateSession(session.withPantallaContexto(null)) - .then(persistConversationTurn(userId, sessionId, - ConversationEntryDTO.forAgentWithMessage(respuesta), - userPhoneNumber, null)) - .thenReturn(response); - } else { - // No match, delegate to Dialogflow - return memoryStoreConversationService - .updateSession(session.withPantallaContexto(null)) - .then(conversationManagerService.manageConversation(externalRequest)); - } - }); - } else { - // Should not happen. End the flow. - return memoryStoreConversationService.updateSession(session.withPantallaContexto(null)) - .then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null, - new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList())))); - } - }); - } + return memoryStoreConversationService + .updateSession(session.withPantallaContexto(null)) + .then(persistConversationTurn(userId, sessionId, + ConversationEntryDTO.forAgentWithMessage(respuesta), + userPhoneNumber, null)) + .thenReturn(response); + } else { + // No match, delegate to Dialogflow + return memoryStoreConversationService + .updateSession(session.withPantallaContexto(null)) + .then(conversationManagerService.manageConversation(externalRequest)); + } + }); + } else { + // Should not happen. End the flow. + return memoryStoreConversationService.updateSession(session.withPantallaContexto(null)) + .then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null, + new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList())))); + } + }); + } - private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, - String userPhoneNumber, String pantallaContexto) { - logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.", - sessionId, entry.type().name()); - return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) - .doOnSuccess(v -> logger.info( - "Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.", - sessionId, entry.type().name())) - .then(firestoreConversationService - .saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) - .doOnSuccess(fsVoid -> logger.debug( - "Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.", - sessionId, entry.type().name())) - .doOnError(fsError -> logger.error( - "Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}", - sessionId, entry.type().name(), fsError.getMessage(), fsError))) - .doOnError( - e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}", - sessionId, entry.type().name(), e.getMessage(), e)); - } + private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, + String userPhoneNumber, String pantallaContexto) { + logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.", + sessionId, entry.type().name()); + return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) + .doOnSuccess(v -> logger.info( + "Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.", + sessionId, entry.type().name())) + .then(firestoreConversationService + .saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) + .doOnSuccess(fsVoid -> logger.debug( + "Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.", + sessionId, entry.type().name())) + .doOnError(fsError -> logger.error( + "Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}", + sessionId, entry.type().name(), fsError.getMessage(), fsError))) + .doOnError( + e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}", + sessionId, entry.type().name(), e.getMessage(), e)); + } } \ No newline at end of file diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 068cefd..2bfa7c2 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -70,4 +70,9 @@ firestore.data.importer.enabled=false # LOGGING Configuration # ========================================================= logging.level.root=INFO -logging.level.com.example=INFO \ No newline at end of file +logging.level.com.example=INFO +# ========================================================= +# ConversationContext Configuration +# ========================================================= +conversation.context.message.limit=${CONVERSATION_CONTEXT_MESSAGE_LIMIT} +conversation.context.days.limit=${CONVERSATION_CONTEXT_DAYS_LIMIT} \ No newline at end of file diff --git a/src/main/resources/prompts/message_filter_prompt.txt b/src/main/resources/prompts/message_filter_prompt.txt index ac95ac7..c9bc8cf 100644 --- a/src/main/resources/prompts/message_filter_prompt.txt +++ b/src/main/resources/prompts/message_filter_prompt.txt @@ -1,54 +1,93 @@ -You are an expert AI classification agent. -Your task is to analyze a user's final message after an external notification interrupts an ongoing conversation. +Hay un sistema de conversaciones entre un agente y un usuario. Durante +la conversación, una notificación puede entrar a la conversación de forma +abrupta, de tal forma que la siguiente interacción del usuario después +de la notificación puede corresponder a la conversación que estaba +sucediendo o puede ser un seguimiento a la notificación. -You will receive three pieces of information: -1. `CONVERSATION_HISTORY`: The dialogue between the agent and the user *before* the notification. -2. `INTERRUPTING_NOTIFICATION`: The specific alert that appeared. -3. `USER_FINAL_INPUT`: The user's message that you must classify. +Tu tarea es identificar si la siguiente interacción del usuario es un +seguimiento a la notificación o una continuación de la conversación. -Your goal is to determine if the `USER_FINAL_INPUT` is a reaction to the `INTERRUPTING_NOTIFICATION` or a continuation of the `CONVERSATION_HISTORY`. +Recibirás esta información: -**Classification Rules:** +- HISTORIAL_CONVERSACION: El diálogo entre el agente y el usuario antes + de la notificación. +- INTERRUPCION_NOTIFICACION: La notificación. Esta puede o no traer parámetros + los cuales refieren a detalles específicos de la notificación. Por ejemplo: + { "vigencia": “12 de septiembre de 2025”, "credito_tipo" : "platinum" } +- INTERACCION_USUARIO: La siguiente interacción del usuario después de + la notificación. -* **`%s`**: Classify as `%s` if the `USER_FINAL_INPUT` is a direct question, comment, or reaction related to the `INTERRUPTING_NOTIFICATION`. The user has switched their focus to the notification. -* **`%s`**: Classify as `%s` if the `USER_FINAL_INPUT` ignores the notification and continues the topic from the `CONVERSATION_HISTORY`. -* **Ambiguity Rule**: If the input is ambiguous (e.g., "ok, thanks"), default to `%s`. It's safer to assume the user is concluding the original topic. -* **Acknowledgement Rule**: If the user briefly acknowledges the notification but immediately pivots back to the original conversation (e.g., "Okay thank you, but back to my question about loans..."), classify it as `%s`. The PRIMARY INTENT is to continue the original dialogue. +Reglas: +- Solo debes responder una palabra: NOTIFICATION o CONVERSATION. No agregues + o inventes otra palabra. +- Clasifica como NOTIFICATION si la siguiente interacción del usuario + es una clara respuesta o seguimiento a la notificación. +- Clasifica como CONVERSATION si la siguiente interacción del usuario + es un claro seguimiento al histórico de la conversación. +- Si la siguiente interacción del usuario es ambigua, clasifica + como CONVERSATION. -Your response must be a single word: `%s` or `%s`. Do not add any other text, punctuation, or explanations. +Ejemplos: ---- -**Examples (Few-Shot Learning):** +Ejemplo 1: +HISTORIAL_CONVERSACION: + Agente: Claro, para un crédito de vehículo, las tasas actuales inician en el 1.2%% mensual. + Usuario: Entiendo, ¿y el plazo máximo de cuánto sería? +INTERRUPCION_NOTIFICACION: + Tu pago de la tarjeta de crédito por $1,500.00 ha sido procesado. +INTERACCION_USUARIO: + perfecto, cuando es la fecha de corte? +Clasificación: NOTIFICACION -**Example 1:** -`CONVERSATION_HISTORY`: -Agent: Claro, para un crédito de vehículo, las tasas actuales inician en el 1.2%% mensual. -User: Entiendo, ¿y el plazo máximo de cuánto sería? -`INTERRUPTING_NOTIFICATION`: Tu pago de la tarjeta de crédito por $1,500.00 ha sido procesado. -`USER_FINAL_INPUT`: ¡Perfecto! Justo de eso quería saber, ¿ese pago ya se ve reflejado en el cupo disponible? -Classification: %s +Ejemplo 2: +HISTORIAL_CONVERSACION: + Agente: No es necesario, puedes completar todo el proceso para abrir tu cuenta desde nuestra app. + Usuario: Ok + Agente: ¿Necesitas algo más? +INTERRUPCION_NOTIFICACION: + Tu estado de cuenta de Julio ya está disponible. + Parametros: {"fecha_corte": "30 de Agosto del 2025", "tipo_cuenta": "credito"} +INTERACCION_USUARIO: + que documentos necesito? +Clasificación: CONVERSACION -**Example 2:** -`CONVERSATION_HISTORY`: -Agent: No es necesario, puedes completar todo el proceso para abrir tu cuenta desde nuestra app. -User: Ok, suena fácil. -`INTERRUPTING_NOTIFICATION`: Tu estado de cuenta de Julio ya está disponible. -`USER_FINAL_INPUT`: Bueno, y qué documentos necesito tener a la mano para hacerlo en la app? -Classification: %s +Ejemplo 3: +HISTORIAL_CONVERSACION: + Agente: Ese fondo de inversión tiene un perfil de alto riesgo, pero históricamente ha dado un rendimiento superior al 15%% anual. + Usuario: ok, entiendo +INTERRUPCION_NOTIFICACION: + Alerta: Tu cuenta de ahorros tiene un saldo bajo de $50.00. + Parametros: {"fecha_retiro": "5 de septiembre del 2025", "tipo_cuenta": "ahorros"} +INTERACCION_USUARIO: + cuando fue el ultimo retiro? +Clasificación: NOTIFICACION -**Example 3:** -`CONVERSATION_HISTORY`: -Agent: Ese fondo de inversión tiene un perfil de alto riesgo, pero históricamente ha dado un rendimiento superior al 15%% anual. -User: Suena interesante... -`INTERRUPTING_NOTIFICATION`: Alerta: Tu cuenta de ahorros tiene un saldo bajo de $50.00. -`USER_FINAL_INPUT`: Umm, ok gracias cuando fue el ultimo retiro?. Pero volviendo al fondo, ¿cuál es la inversión mínima para entrar? -Classification: %s ---- +Ejemplo 4: +HISTORIAL_CONVERSACION: + Usuario: Que es el CAT? + Agente: El CAT (Costo Anual Total) es un indicador financiero, expresado en un porcentaje anual, que refleja el costo total de un crédito, incluyendo no solo la tasa de interés, sino también todas las comisiones, gastos y otros cobros que genera. +INTERRUPCION_NOTIFICACION: + Alerta: Se realizó un retiro en efectivo por $100. +INTERACCION_USUARIO: + y este se aplica solo si dejo de pagar? +Clasificación: CONVERSACION -**Task:** +Ejemplo 5: +HISTORIAL_CONVERSACION: + Usuario: Cual es la tasa de hipoteca que manejan? + Agente: La tasa de una hipoteca depende tanto de factores económicos generales (inflación, tasas de referencia del banco central) como de factores individuales del solicitante (historial crediticio, monto del pago inicial, ingresos, endeudamiento, etc.) +INTERRUPCION_NOTIFICACION: + Hola, [Alias]: Pasó algo con la captura de tu INE y no se completó tu solicitud de tarjeta de crédito con folio 3421. + Parametros: {“solicitud_tarjeta_credito_vigencia”: “12 de septiembre de 2025”, “solicitud_tarjeta_credito_error”: “Error con el formato de la captura”, “solicitud_tarjeta_credito_tipo” : “platinum” } +INTERACCION_USUARIO: + cual fue el error? +Clasificación: NOTIFICACION -`CONVERSATION_HISTORY`: -%s -`INTERRUPTING_NOTIFICATION`: %s -`USER_FINAL_INPUT`: %s -Classification: \ No newline at end of file +Tarea: +HISTORIAL_CONVERSACION: + %s +INTERRUPCION_NOTIFICACION: + %s +INTERACCION_USUARIO: + %s +Clasificación: diff --git a/src/test/java/com/example/service/conversation/ConversationManagerServiceTest.java b/src/test/java/com/example/service/conversation/ConversationManagerServiceTest.java new file mode 100644 index 0000000..8c6e936 --- /dev/null +++ b/src/test/java/com/example/service/conversation/ConversationManagerServiceTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.service.conversation; + +import com.example.dto.dialogflow.base.DetectIntentRequestDTO; +import com.example.dto.dialogflow.base.DetectIntentResponseDTO; +import com.example.dto.dialogflow.conversation.ConversationContext; +import com.example.dto.dialogflow.conversation.ConversationEntryDTO; +import com.example.dto.dialogflow.conversation.ConversationSessionDTO; +import com.example.dto.dialogflow.notification.NotificationDTO; +import com.example.mapper.conversation.ExternalConvRequestMapper; +import com.example.mapper.messagefilter.ConversationContextMapper; +import com.example.mapper.messagefilter.NotificationContextMapper; +import com.example.service.base.DialogflowClientService; +import com.example.service.base.MessageEntryFilter; +import com.example.service.base.NotificationContextResolver; +import com.example.service.llm.LlmResponseTunerService; +import com.example.service.notification.MemoryStoreNotificationService; +import com.example.service.quickreplies.QuickRepliesManagerService; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Instant; +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class ConversationManagerServiceTest { + + @Mock + private ExternalConvRequestMapper externalRequestToDialogflowMapper; + @Mock + private DialogflowClientService dialogflowServiceClient; + @Mock + private FirestoreConversationService firestoreConversationService; + @Mock + private MemoryStoreConversationService memoryStoreConversationService; + @Mock + private QuickRepliesManagerService quickRepliesManagerService; + @Mock + private MessageEntryFilter messageEntryFilter; + @Mock + private MemoryStoreNotificationService memoryStoreNotificationService; + @Mock + private NotificationContextMapper notificationContextMapper; + @Mock + private ConversationContextMapper conversationContextMapper; + @Mock + private DataLossPrevention dataLossPrevention; + @Mock + private NotificationContextResolver notificationContextResolver; + @Mock + private LlmResponseTunerService llmResponseTunerService; + + @InjectMocks + private ConversationManagerService conversationManagerService; + + @Test + void startNotificationConversation_shouldSaveResolvedContextAndReturnIt() { + // Given + String userId = "test-user"; + String userPhoneNumber = "1234567890"; + String userMessageText = "test message"; + String sessionId = "test-session"; + String resolvedContext = "resolved context"; + + ConversationContext context = new ConversationContext(userId, null, userMessageText, userPhoneNumber); + DetectIntentRequestDTO request = new DetectIntentRequestDTO(null, null); + NotificationDTO notification = new NotificationDTO("1", "1234567890", Instant.now(), "test text", "test_event", "es", Collections.emptyMap(), "active"); + ConversationSessionDTO session = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber); + + when(memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber)).thenReturn(Mono.just(session)); + when(conversationContextMapper.toTextWithLimits(session)).thenReturn("history"); + when(notificationContextMapper.toText(notification)).thenReturn("notification text"); + when(notificationContextResolver.resolveContext(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString())) + .thenReturn(resolvedContext); + when(llmResponseTunerService.setValue(anyString(), anyString())).thenReturn(Mono.empty()); + when(memoryStoreNotificationService.saveEntry(anyString(), anyString(), any(ConversationEntryDTO.class), anyString())).thenReturn(Mono.empty()); + + // When + Mono result = conversationManagerService.startNotificationConversation(context, request, notification); + + // Then + StepVerifier.create(result) + .expectNextMatches(response -> response.queryResult().responseText().equals(resolvedContext)) + .verifyComplete(); + } +} \ No newline at end of file diff --git a/src/test/java/com/example/service/llm/LlmResponseTunerServiceImplTest.java b/src/test/java/com/example/service/llm/LlmResponseTunerServiceImplTest.java new file mode 100644 index 0000000..82f6727 --- /dev/null +++ b/src/test/java/com/example/service/llm/LlmResponseTunerServiceImplTest.java @@ -0,0 +1,57 @@ +package com.example.service.llm; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.data.redis.core.ReactiveValueOperations; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class LlmResponseTunerServiceImplTest { + + @Mock + private ReactiveRedisTemplate reactiveStringRedisTemplate; + + @Mock + private ReactiveValueOperations reactiveValueOperations; + + @InjectMocks + private LlmResponseTunerServiceImpl llmResponseTunerService; + + private final String llmPreResponseCollectionName = "llm-pre-response:"; + + @BeforeEach + void setUp() { + when(reactiveStringRedisTemplate.opsForValue()).thenReturn(reactiveValueOperations); + } + + @Test + void getValue_shouldReturnValueFromRedis() { + String key = "test_key"; + String expectedValue = "test_value"; + + when(reactiveValueOperations.get(llmPreResponseCollectionName + key)).thenReturn(Mono.just(expectedValue)); + + StepVerifier.create(llmResponseTunerService.getValue(key)) + .expectNext(expectedValue) + .verifyComplete(); + } + + @Test + void setValue_shouldSetValueInRedis() { + String key = "test_key"; + String value = "test_value"; + + when(reactiveValueOperations.set(llmPreResponseCollectionName + key, value)).thenReturn(Mono.just(true)); + + StepVerifier.create(llmResponseTunerService.setValue(key, value)) + .verifyComplete(); + } +}