/* * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Your use of it is subject to your agreement with Google. */ package com.example.service.quickreplies; import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationEntryEntity; import com.example.dto.dialogflow.conversation.ConversationEntryType; import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO; import com.example.dto.quickreplies.QuickReplyScreenRequestDTO; import com.example.dto.quickreplies.QuestionDTO; import com.example.dto.quickreplies.QuickReplyDTO; import com.example.service.conversation.FirestoreConversationService; import com.example.service.conversation.MemoryStoreConversationService; import com.example.util.SessionIdGenerator; import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import com.example.dto.dialogflow.conversation.QueryResultDTO; import com.example.service.conversation.ConversationManagerService; import org.springframework.context.annotation.Lazy; import reactor.core.publisher.Mono; @Service public class QuickRepliesManagerService { private static final Logger logger = LoggerFactory.getLogger(QuickRepliesManagerService.class); private final MemoryStoreConversationService memoryStoreConversationService; private final FirestoreConversationService firestoreConversationService; private final QuickReplyContentService quickReplyContentService; private final ConversationManagerService conversationManagerService; public QuickRepliesManagerService( @Lazy ConversationManagerService conversationManagerService, MemoryStoreConversationService memoryStoreConversationService, FirestoreConversationService firestoreConversationService, QuickReplyContentService quickReplyContentService) { this.conversationManagerService = conversationManagerService; this.memoryStoreConversationService = memoryStoreConversationService; this.firestoreConversationService = firestoreConversationService; this.quickReplyContentService = quickReplyContentService; } public Mono startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) { String userPhoneNumber = externalRequest.user().telefono(); if (userPhoneNumber == null || userPhoneNumber.isBlank()) { logger.warn("No phone number provided in request. Cannot manage conversation session without it."); return Mono .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); } return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) .flatMap(session -> Mono.just(session.sessionId())) .switchIfEmpty(Mono.fromCallable(SessionIdGenerator::generateStandardSessionId)) .flatMap(sessionId -> { String userId = "user_by_phone_" + userPhoneNumber.replaceAll("[^0-9]", ""); ConversationEntryDTO systemEntry = new ConversationEntryDTO( ConversationEntryEntity.SISTEMA, ConversationEntryType.INICIO, Instant.now(), "Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :", null, null); return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber, externalRequest.pantallaContexto()) .then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto())) .map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); }); } public Mono manageConversation(ExternalConvRequestDTO externalRequest) { String userPhoneNumber = externalRequest.user().telefono(); if (userPhoneNumber == null || userPhoneNumber.isBlank()) { logger.warn("No phone number provided in request. Cannot manage conversation session without it."); return Mono .error(new IllegalArgumentException("Phone number is required to manage conversation sessions.")); } return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) .switchIfEmpty(Mono.error( new IllegalStateException("No quick reply session found for phone number: " + userPhoneNumber))) .flatMap(session -> { String userId = session.userId(); String sessionId = session.sessionId(); ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message()); List entries = session.entries(); int lastInitIndex = IntStream.range(0, entries.size()) .map(i -> entries.size() - 1 - i) .filter(i -> { ConversationEntryDTO entry = entries.get(i); return entry.entity() == ConversationEntryEntity.SISTEMA && entry.type() == ConversationEntryType.INICIO; }) .findFirst() .orElse(-1); long userMessagesCount; if (lastInitIndex != -1) { userMessagesCount = entries.subList(lastInitIndex + 1, entries.size()).stream() .filter(e -> e.entity() == ConversationEntryEntity.USUARIO) .count(); } else { userMessagesCount = 0; } if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow // This is the second message of the flow. Return the full list. return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, session.pantallaContexto()) .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) .flatMap(quickReplyDTO -> { ConversationEntryDTO agentEntry = ConversationEntryDTO .forAgentWithMessage(quickReplyDTO.toString()); return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber, session.pantallaContexto()) .thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); }); } else if (userMessagesCount == 1) { // Is the second user message in the QR flow // This is the third message of the flow. Filter and end. return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, session.pantallaContexto()) .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) .flatMap(quickReplyDTO -> { List matchedPreguntas = quickReplyDTO.preguntas().stream() .filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim())) .toList(); if (!matchedPreguntas.isEmpty()) { // Matched question, return the answer String respuesta = matchedPreguntas.get(0).respuesta(); QueryResultDTO queryResult = new QueryResultDTO(respuesta, null); DetectIntentResponseDTO response = new DetectIntentResponseDTO(sessionId, queryResult, null); return memoryStoreConversationService .updateSession(session.withPantallaContexto(null)) .then(persistConversationTurn(userId, sessionId, ConversationEntryDTO.forAgentWithMessage(respuesta), userPhoneNumber, null)) .thenReturn(response); } else { // No match, delegate to Dialogflow return memoryStoreConversationService .updateSession(session.withPantallaContexto(null)) .then(conversationManagerService.manageConversation(externalRequest)); } }); } else { // Should not happen. End the flow. return memoryStoreConversationService.updateSession(session.withPantallaContexto(null)) .then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null, new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList())))); } }); } private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, String userPhoneNumber, String pantallaContexto) { logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.", sessionId, entry.type().name()); return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) .doOnSuccess(v -> logger.info( "Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.", sessionId, entry.type().name())) .then(firestoreConversationService .saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) .doOnSuccess(fsVoid -> logger.debug( "Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.", sessionId, entry.type().name())) .doOnError(fsError -> logger.error( "Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}", sessionId, entry.type().name(), fsError.getMessage(), fsError))) .doOnError( e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}", sessionId, entry.type().name(), e.getMessage(), e)); } }