/* * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Your use of it is subject to your agreement with Google. */ package com.example.service.conversation; import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.conversation.*; import com.example.dto.dialogflow.notification.EventInputDTO; import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.mapper.conversation.ConversationEntryMapper; import com.example.mapper.conversation.ExternalConvRequestMapper; import com.example.mapper.messagefilter.ConversationContextMapper; import com.example.mapper.messagefilter.NotificationContextMapper; import com.example.service.base.DialogflowClientService; import com.example.service.base.MessageEntryFilter; import com.example.service.base.NotificationContextResolver; import com.example.service.notification.MemoryStoreNotificationService; import com.example.service.quickreplies.QuickRepliesManagerService; import com.example.service.llm.LlmResponseTunerService; import com.example.util.SessionIdGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import java.time.Duration; import java.time.Instant; import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; /** Service acting as the central orchestrator for managing user conversations. It integrates Data Loss Prevention (DLP) for message obfuscation, multi-stage routing, hybrid AI logic, and a reactive write-back persistence layer for conversation history. Routes traffic based on session context: If a 'pantallaContexto' (screen context) is present, it delegates to the QuickRepliesManagerService. Otherwise, it uses a Gemini-based MessageEntryFilter to classify the message against active notifications and history, routing to one of two main flows: a) Standard Conversation (proceedWithConversation): Handles regular dialogue, managing 30-minute session timeouts and injecting conversation history parameter to Dialogflow. b) Notifications (startNotificationConversation): It first asks a Gemini model (NotificationContextResolver) if it can answer the query. If yes, it saves the LLM's response and sends an 'LLM_RESPONSE_PROCESSED' event to Dialogflow. If no ("DIALOGFLOW"), it sends the user's original text to Dialogflow for intent matching. All conversation turns (user, agent, and LLM) are persisted using a reactive write-back cache pattern, saving to Memorystore (Redis) first and then asynchronously to a Firestore subcollection data model (persistConversationTurn). */ @Service public class ConversationManagerService { private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); private static final long SESSION_RESET_THRESHOLD_MINUTES = 30; private static final long SCREEN_CONTEXT_TIMEOUT_MINUTES = 10; // fix for the quick replies screen private static final String CONV_HISTORY_PARAM = "conversation_history"; private static final String HISTORY_PARAM = "historial"; private final ExternalConvRequestMapper externalRequestToDialogflowMapper; private final DialogflowClientService dialogflowServiceClient; private final FirestoreConversationService firestoreConversationService; private final MemoryStoreConversationService memoryStoreConversationService; private final QuickRepliesManagerService quickRepliesManagerService; private final MessageEntryFilter messageEntryFilter; private final MemoryStoreNotificationService memoryStoreNotificationService; private final NotificationContextMapper notificationContextMapper; private final ConversationContextMapper conversationContextMapper; private final DataLossPrevention dataLossPrevention; private final String dlpTemplateCompleteFlow; private final NotificationContextResolver notificationContextResolver; private final LlmResponseTunerService llmResponseTunerService; private final ConversationEntryMapper conversationEntryMapper; public ConversationManagerService( DialogflowClientService dialogflowServiceClient, FirestoreConversationService firestoreConversationService, MemoryStoreConversationService memoryStoreConversationService, ExternalConvRequestMapper externalRequestToDialogflowMapper, QuickRepliesManagerService quickRepliesManagerService, MessageEntryFilter messageEntryFilter, MemoryStoreNotificationService memoryStoreNotificationService, NotificationContextMapper notificationContextMapper, ConversationContextMapper conversationContextMapper, DataLossPrevention dataLossPrevention, NotificationContextResolver notificationContextResolver, LlmResponseTunerService llmResponseTunerService, ConversationEntryMapper conversationEntryMapper, @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { this.dialogflowServiceClient = dialogflowServiceClient; this.firestoreConversationService = firestoreConversationService; this.memoryStoreConversationService = memoryStoreConversationService; this.externalRequestToDialogflowMapper = externalRequestToDialogflowMapper; this.quickRepliesManagerService = quickRepliesManagerService; this.messageEntryFilter = messageEntryFilter; this.memoryStoreNotificationService = memoryStoreNotificationService; this.notificationContextMapper = notificationContextMapper; this.conversationContextMapper = conversationContextMapper; this.dataLossPrevention = dataLossPrevention; this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; this.notificationContextResolver = notificationContextResolver; this.llmResponseTunerService = llmResponseTunerService; this.conversationEntryMapper = conversationEntryMapper; } public Mono manageConversation(ExternalConvRequestDTO externalrequest) { return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow) .flatMap(obfuscatedMessage -> { ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO( obfuscatedMessage, externalrequest.user(), externalrequest.channel(), externalrequest.tipo(), externalrequest.pantallaContexto()); return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono()) .flatMap(session -> { boolean isContextStale = false; if (session.lastModified() != null) { long minutesSinceLastUpdate = java.time.Duration.between(session.lastModified(), java.time.Instant.now()).toMinutes(); if (minutesSinceLastUpdate > SCREEN_CONTEXT_TIMEOUT_MINUTES) { isContextStale = true; } } if (session != null && session.pantallaContexto() != null && !session.pantallaContexto().isBlank() && !isContextStale) { logger.info("Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService."); return quickRepliesManagerService.manageConversation(obfuscatedRequest); } // Remove the old QR and continue as normal conversation. if (isContextStale && session.pantallaContexto() != null) { logger.info("Detected STALE 'pantallaContexto'. Ignoring and proceeding with normal flow."); } return continueManagingConversation(obfuscatedRequest); }) .switchIfEmpty(continueManagingConversation(obfuscatedRequest)); }); } private Mono continueManagingConversation(ExternalConvRequestDTO externalrequest) { final DetectIntentRequestDTO request; try { request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(externalrequest); logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO"); } catch (IllegalArgumentException e) { logger.error("Error during pre-mapping: {}", e.getMessage()); return Mono.error(new IllegalArgumentException( "Failed to process external request due to mapping error: " + e.getMessage(), e)); } Map params = Optional.ofNullable(request.queryParams()) .map(queryParamsDTO -> queryParamsDTO.parameters()) .orElse(Collections.emptyMap()); Object telefonoObj = params.get("telefono"); if (!(telefonoObj instanceof String) || ((String) telefonoObj).isBlank()) { logger.error("Critical error: parameter is missing, not a String, or blank after mapping."); return Mono.error(new IllegalStateException("Internal error: parameter is invalid.")); } String primaryPhoneNumber = (String) telefonoObj; String resolvedUserId = params.get("usuario_id") instanceof String ? (String) params.get("usuario_id") : null; String userMessageText = request.queryInput().text().text(); final ConversationContext context = new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber); return continueConversationFlow(context, request); } private Mono continueConversationFlow(ConversationContext context, DetectIntentRequestDTO request) { final String userId = context.userId(); final String userMessageText = context.userMessageText(); final String userPhoneNumber = context.primaryPhoneNumber(); if (userPhoneNumber == null || userPhoneNumber.isBlank()) { logger.warn("No phone number provided in request. Cannot manage conversation session without it."); return Mono .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); } logger.info("Primary Check (MemoryStore): Looking up session for phone number: {}", userPhoneNumber); return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) .flatMap(session -> handleMessageClassification(context, request, session)) .switchIfEmpty(Mono.defer(() -> { logger.info("No session found in MemoryStore. Performing full lookup to Firestore."); return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber); })) .onErrorResume(e -> { logger.error("Overall error handling conversation in ConversationManagerService: {}", e.getMessage(), e); return Mono .error(new RuntimeException("Failed to process conversation due to an internal error.", e)); }); } private Mono handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) { final String userPhoneNumber = context.primaryPhoneNumber(); final String userMessageText = context.userMessageText(); return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) .flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId)) .map(notificationSession -> notificationSession.notificaciones().stream() .filter(notification -> "active".equalsIgnoreCase(notification.status())) .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) .orElse(null)) .filter(Objects::nonNull) .flatMap((NotificationDTO notification) -> { return memoryStoreConversationService.getMessages(session.sessionId()).collectList() .map(conversationContextMapper::toTextFromMessages) .defaultIfEmpty("") .flatMap(conversationHistory -> { String notificationText = notificationContextMapper.toText(notification); String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory); if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { return startNotificationConversation(context, request, notification); } else { return proceedWithConversation(context, request, session); } }); }) .switchIfEmpty(proceedWithConversation(context, request, session)); } private Mono proceedWithConversation(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) { Instant now = Instant.now(); if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { logger.info("Recent Session Found: Session {} is within the 30-minute threshold. Proceeding to Dialogflow.", session.sessionId()); return processDialogflowRequest(session, request, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false); } else { logger.info( "Old Session Found: Session {} is older than the 30-minute threshold.", session.sessionId()); // Generar un nuevo ID de sesión String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.info("Creating new session {} from old session {} due to timeout.", newSessionId, session.sessionId()); // Crear un nuevo DTO de sesión basado en la antigua, pero con el nuevo ID ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, context.userId(), context.primaryPhoneNumber()); return memoryStoreConversationService.getMessages(session.sessionId()) .collectList() // Adding use the TextWithLimits to truncate according to business rule 30 days/60 messages .map(messages -> conversationContextMapper.toTextWithLimits(session, messages)) .defaultIfEmpty("") .flatMap(conversationHistory -> { // Inject historial (max 60 msgs / 30 días / 50KB) DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); return processDialogflowRequest(newSession, newRequest, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false); }); } } private Mono fullLookupAndProcess(ConversationSessionDTO oldSession, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) { return firestoreConversationService.getSessionByTelefono(userPhoneNumber) .flatMap(session -> firestoreConversationService.getMessages(session.sessionId()).collectList() .map(conversationContextMapper::toTextFromMessages) .defaultIfEmpty("") .flatMap(conversationHistory -> { String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.info("Creating new session {} after full lookup.", newSessionId); ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber); DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, true); })) .switchIfEmpty(Mono.defer(() -> { String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.info("Creating new session {} after full lookup.", newSessionId); ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber); return processDialogflowRequest(newSession, request, userId, userMessageText, userPhoneNumber, true); })); } private Mono processDialogflowRequest(ConversationSessionDTO session, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber, boolean newSession) { final String finalSessionId = session.sessionId(); ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText); return this.persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry)) .doOnSuccess(v -> logger.debug( "User entry successfully persisted for session {}. Proceeding to Dialogflow...", finalSessionId)) .doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId, e.getMessage(), e)) .then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request) .flatMap(response -> { logger.debug( "RTest eceived Dialogflow CX response for session {}. Initiating agent response persistence.", finalSessionId); ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry)) .thenReturn(response); }) .doOnError( error -> logger.error("Overall error during conversation management for session {}: {}", finalSessionId, error.getMessage(), error)))); } public Mono startNotificationConversation(ConversationContext context, DetectIntentRequestDTO request, NotificationDTO notification) { final String userId = context.userId(); final String userMessageText = context.userMessageText(); final String userPhoneNumber = context.primaryPhoneNumber(); return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) .switchIfEmpty(Mono.defer(() -> { String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.warn("No existing conversation session found for notification reply on phone {}. This is unexpected. Creating new session: {}", userPhoneNumber, newSessionId); return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber)); })) .flatMap(session -> { final String sessionId = session.sessionId(); return memoryStoreConversationService.getMessages(sessionId).collectList() .map(conversationContextMapper::toTextFromMessages) .defaultIfEmpty("") .flatMap(conversationHistory -> { String notificationText = notificationContextMapper.toText(notification); Map filteredParams = notification.parametros().entrySet().stream() .filter(entry -> entry.getKey().startsWith("notification_po_")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); String resolvedContext = notificationContextResolver.resolveContext(userMessageText, notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, userPhoneNumber); if (!resolvedContext.trim().toUpperCase().contains(NotificationContextResolver.CATEGORY_DIALOGFLOW)) { String uuid = UUID.randomUUID().toString(); llmResponseTunerService.setValue(uuid, resolvedContext).subscribe(); ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, notification.parametros()); ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, notification.parametros()); return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry)) .then(persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(llmEntry))) .then(Mono.defer(() -> { EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED"); QueryInputDTO queryInput = new QueryInputDTO(null, eventInput, request.queryInput().languageCode()); DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput, request.queryParams()) .withParameter("llm_reponse_uuid", uuid); return dialogflowServiceClient.detectIntent(sessionId, newRequest) .flatMap(response -> { ConversationEntryDTO agentEntry = ConversationEntryDTO .forAgent(response.queryResult()); return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry)) .thenReturn(response); }); })); } else { ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, notification.parametros()); DetectIntentRequestDTO finalRequest; Instant now = Instant.now(); if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { finalRequest = request.withParameters(notification.parametros()); } else { finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) .withParameters(notification.parametros()); } return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry)) .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) .flatMap(response -> { ConversationEntryDTO agentEntry = ConversationEntryDTO .forAgent(response.queryResult()); return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry)) .thenReturn(response); })); } }); }); } private Mono persistConversationTurn(ConversationSessionDTO session, ConversationMessageDTO message) { logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", session.sessionId(), message.type().name()); ConversationSessionDTO updatedSession = session.withLastMessage(message.text()); return memoryStoreConversationService.saveSession(updatedSession) .then(memoryStoreConversationService.saveMessage(session.sessionId(), message)) .doOnSuccess(v -> logger.info( "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", session.sessionId(), message.type().name())) .then(firestoreConversationService.saveSession(updatedSession) .then(firestoreConversationService.saveMessage(session.sessionId(), message)) .doOnSuccess(fsVoid -> logger.debug( "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", session.sessionId(), message.type().name())) .doOnError(fsError -> logger.error( "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", session.sessionId(), message.type().name(), fsError.getMessage(), fsError))) .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", session.sessionId(), message.type().name(), e.getMessage(), e)); } }