/* * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Your use of it is subject to your agreement with Google. */ package com.example.service.conversation; import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.conversation.ConversationContext; import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO; import com.example.dto.dialogflow.conversation.QueryResultDTO; import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.mapper.conversation.ExternalConvRequestMapper; import com.example.mapper.messagefilter.ConversationContextMapper; import com.example.mapper.messagefilter.NotificationContextMapper; import com.example.service.base.DialogflowClientService; import com.example.service.base.MessageEntryFilter; import com.example.service.base.NotificationContextResolver; import com.example.service.notification.MemoryStoreNotificationService; import com.example.service.quickreplies.QuickRepliesManagerService; import com.example.util.SessionIdGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import java.time.Duration; import java.time.Instant; import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @Service public class ConversationManagerService { private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); private static final long SESSION_RESET_THRESHOLD_MINUTES = 30; private static final String CONV_HISTORY_PARAM = "conversation_history"; private final ExternalConvRequestMapper externalRequestToDialogflowMapper; private final DialogflowClientService dialogflowServiceClient; private final FirestoreConversationService firestoreConversationService; private final MemoryStoreConversationService memoryStoreConversationService; private final QuickRepliesManagerService quickRepliesManagerService; private final MessageEntryFilter messageEntryFilter; private final MemoryStoreNotificationService memoryStoreNotificationService; private final NotificationContextMapper notificationContextMapper; private final ConversationContextMapper conversationContextMapper; private final DataLossPrevention dataLossPrevention; private final String dlpTemplateCompleteFlow; private final NotificationContextResolver notificationContextResolver; public ConversationManagerService( DialogflowClientService dialogflowServiceClient, FirestoreConversationService firestoreConversationService, MemoryStoreConversationService memoryStoreConversationService, ExternalConvRequestMapper externalRequestToDialogflowMapper, QuickRepliesManagerService quickRepliesManagerService, MessageEntryFilter messageEntryFilter, MemoryStoreNotificationService memoryStoreNotificationService, NotificationContextMapper notificationContextMapper, ConversationContextMapper conversationContextMapper, DataLossPrevention dataLossPrevention, NotificationContextResolver notificationContextResolver, @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { this.dialogflowServiceClient = dialogflowServiceClient; this.firestoreConversationService = firestoreConversationService; this.memoryStoreConversationService = memoryStoreConversationService; this.externalRequestToDialogflowMapper = externalRequestToDialogflowMapper; this.quickRepliesManagerService = quickRepliesManagerService; this.messageEntryFilter = messageEntryFilter; this.memoryStoreNotificationService = memoryStoreNotificationService; this.notificationContextMapper = notificationContextMapper; this.conversationContextMapper = conversationContextMapper; this.dataLossPrevention = dataLossPrevention; this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; this.notificationContextResolver = notificationContextResolver; } public Mono manageConversation(ExternalConvRequestDTO externalrequest) { return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow) .flatMap(obfuscatedMessage -> { ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO( obfuscatedMessage, externalrequest.user(), externalrequest.channel(), externalrequest.tipo(), externalrequest.pantallaContexto()); return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono()) .flatMap(session -> { if (session != null && session.pantallaContexto() != null && !session.pantallaContexto().isBlank()) { logger.info( "Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService."); return quickRepliesManagerService.manageConversation(obfuscatedRequest); } return continueManagingConversation(obfuscatedRequest); }) .switchIfEmpty(continueManagingConversation(obfuscatedRequest)); }); } private Mono continueManagingConversation(ExternalConvRequestDTO externalrequest) { final DetectIntentRequestDTO request; try { request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(externalrequest); logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO"); } catch (IllegalArgumentException e) { logger.error("Error during pre-mapping: {}", e.getMessage()); return Mono.error(new IllegalArgumentException( "Failed to process external request due to mapping error: " + e.getMessage(), e)); } final ConversationContext context; try { context = resolveAndValidateRequest(request); } catch (IllegalArgumentException e) { logger.error("Validation error for incoming request: {}", e.getMessage()); return Mono.error(e); } return handleMessageClassification(context, request); } private Mono handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request) { final String userPhoneNumber = context.primaryPhoneNumber(); final String userMessageText = context.userMessageText(); return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) .map(conversationContextMapper::toText) .defaultIfEmpty("") .flatMap(conversationHistory -> { return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) .flatMap(notificationId -> memoryStoreNotificationService .getCachedNotificationSession(notificationId)) .map(notificationSession -> notificationSession.notificaciones().stream() .filter(notification -> "active".equalsIgnoreCase(notification.status())) .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) .orElse(null)) .filter(Objects::nonNull) .flatMap((NotificationDTO notification) -> { String notificationText = notificationContextMapper.toText(notification); String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory); if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { return startNotificationConversation(context, request, notification); } else { return continueConversationFlow(context, request); } }) .switchIfEmpty(continueConversationFlow(context, request)); }); } private Mono continueConversationFlow(ConversationContext context, DetectIntentRequestDTO request) { final String userId = context.userId(); final String userMessageText = context.userMessageText(); final String userPhoneNumber = context.primaryPhoneNumber(); if (userPhoneNumber == null || userPhoneNumber.isBlank()) { logger.warn("No phone number provided in request. Cannot manage conversation session without it."); return Mono .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); } logger.info("Primary Check (MemoryStore): Looking up session for phone number: {}", userPhoneNumber); return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) .flatMap(session -> handleMessageClassification(context, request, session)) .switchIfEmpty(Mono.defer(() -> { logger.info("No session found in MemoryStore. Performing full lookup to Firestore."); return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber); })) .onErrorResume(e -> { logger.error("Overall error handling conversation in ConversationManagerService: {}", e.getMessage(), e); return Mono .error(new RuntimeException("Failed to process conversation due to an internal error.", e)); }); } private Mono handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) { final String userPhoneNumber = context.primaryPhoneNumber(); final String userMessageText = context.userMessageText(); return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) .flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId)) .map(notificationSession -> notificationSession.notificaciones().stream() .filter(notification -> "active".equalsIgnoreCase(notification.status())) .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) .orElse(null)) .filter(Objects::nonNull) .flatMap((NotificationDTO notification) -> { String conversationHistory = conversationContextMapper.toText(session); String notificationText = notificationContextMapper.toText(notification); String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory); if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { return startNotificationConversation(context, request, notification); } else { return proceedWithConversation(context, request, session); } }) .switchIfEmpty(proceedWithConversation(context, request, session)); } private Mono proceedWithConversation(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) { Instant now = Instant.now(); if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.", session.sessionId()); return processDialogflowRequest(session, request, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false); } else { logger.info( "Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.", session.sessionId()); String conversationHistory = conversationContextMapper.toTextWithLimits(session); DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false); } } private Mono fullLookupAndProcess(ConversationSessionDTO oldSession, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) { return firestoreConversationService.getSessionByTelefono(userPhoneNumber) .map(conversationContextMapper::toTextWithLimits) .defaultIfEmpty("") .flatMap(conversationHistory -> { String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.info("Creating new session {} after full lookup.", newSessionId); ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber); DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, true); }); } private Mono processDialogflowRequest(ConversationSessionDTO session, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber, boolean newSession) { final String finalSessionId = session.sessionId(); ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText); return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber) .doOnSuccess(v -> logger.debug( "User entry successfully persisted for session {}. Proceeding to Dialogflow...", finalSessionId)) .doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId, e.getMessage(), e)) .then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request) .flatMap(response -> { logger.debug( "Received Dialogflow CX response for session {}. Initiating agent response persistence.", finalSessionId); ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber) .thenReturn(response); }) .doOnError( error -> logger.error("Overall error during conversation management for session {}: {}", finalSessionId, error.getMessage(), error)))); } private Mono startNotificationConversation(ConversationContext context, DetectIntentRequestDTO request, NotificationDTO notification) { final String userId = context.userId(); final String userMessageText = context.userMessageText(); final String userPhoneNumber = context.primaryPhoneNumber(); return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber) .switchIfEmpty(Mono.defer(() -> { String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.info("No existing notification session found for phone number {}. Creating new session: {}", userPhoneNumber, newSessionId); return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber)); })) .flatMap(session -> { final String sessionId = session.sessionId(); String conversationHistory = conversationContextMapper.toTextWithLimits(session); String notificationText = notificationContextMapper.toText(notification); Map filteredParams = notification.parametros().entrySet().stream() .filter(entry -> entry.getKey().startsWith("notification_po_")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); String resolvedContext = notificationContextResolver.resolveContext(userMessageText, notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, userPhoneNumber); if (!NotificationContextResolver.CATEGORY_DIALOGFLOW.equals(resolvedContext)) { ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, notification.parametros()); ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, notification.parametros()); return persistNotificationTurn(userId, sessionId, userEntry, userPhoneNumber) .then(persistNotificationTurn(userId, sessionId, llmEntry, userPhoneNumber)) .then(Mono.fromCallable(() -> { QueryResultDTO queryResult = new QueryResultDTO(resolvedContext, java.util.Collections.emptyMap()); return new DetectIntentResponseDTO(null, queryResult); })); } ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, notification.parametros()); DetectIntentRequestDTO finalRequest; Instant now = Instant.now(); if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { finalRequest = request.withParameters(notification.parametros()); } else { finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) .withParameters(notification.parametros()); } return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber) .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) .flatMap(response -> { ConversationEntryDTO agentEntry = ConversationEntryDTO .forAgent(response.queryResult()); return memoryStoreNotificationService .saveEntry(userId, sessionId, agentEntry, userPhoneNumber) .thenReturn(response); })); }); } private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, String userPhoneNumber) { logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, entry.type().name()); return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) .doOnSuccess(v -> logger.info( "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", sessionId, entry.type().name())) .then(firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) .doOnSuccess(fsVoid -> logger.debug( "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", sessionId, entry.type().name())) .doOnError(fsError -> logger.error( "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", sessionId, entry.type().name(), fsError.getMessage(), fsError))) .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, entry.type().name(), e.getMessage(), e)); } private Mono persistNotificationTurn(String userId, String sessionId, ConversationEntryDTO entry, String userPhoneNumber) { logger.debug("Starting Write-Back persistence for notification session {}. Type: {}. Writing to Redis first.", sessionId, entry.type().name()); return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber) .doOnSuccess(v -> logger.info( "Entry saved to Redis for notification session {}. Type: {}. Kicking off async Firestore write-back.", sessionId, entry.type().name())) .doOnError(e -> logger.error( "Error during primary Redis write for notification session {}. Type: {}: {}", sessionId, entry.type().name(), e.getMessage(), e)); } private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) { Map params = Optional.ofNullable(request.queryParams()) .map(queryParamsDTO -> queryParamsDTO.parameters()) .orElse(Collections.emptyMap()); String primaryPhoneNumber = null; Object telefonoObj = params.get("telefono"); // Get from map if (telefonoObj instanceof String) { primaryPhoneNumber = (String) telefonoObj; } else if (telefonoObj != null) { logger.warn("Parameter 'telefono' in queryParams is not a String (type: {}). Expected String.", telefonoObj.getClass().getName()); } if (primaryPhoneNumber == null || primaryPhoneNumber.trim().isEmpty()) { throw new IllegalArgumentException( "Phone number (telefono) is required in query parameters for conversation management."); } String resolvedUserId = null; Object userIdObj = params.get("usuario_id"); if (userIdObj instanceof String) { resolvedUserId = (String) userIdObj; } else if (userIdObj != null) { logger.warn("Parameter 'userId' in query_params is not a String (type: {}). Expected String.", userIdObj.getClass().getName()); } if (resolvedUserId == null || resolvedUserId.trim().isEmpty()) { resolvedUserId = "user_by_phone_" + primaryPhoneNumber.replaceAll("[^0-9]", ""); logger.warn("User ID not provided in query parameters. Using derived ID from phone number: {}", resolvedUserId); } if (request.queryInput() == null || request.queryInput().text() == null || request.queryInput().text().text() == null || request.queryInput().text().text().trim().isEmpty()) { throw new IllegalArgumentException("Dialogflow query input text is required."); } String userMessageText = request.queryInput().text().text(); return new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber); } }