182 lines
9.1 KiB
Java
182 lines
9.1 KiB
Java
/*
|
|
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
|
|
* Your use of it is subject to your agreement with Google.
|
|
*/
|
|
|
|
package com.example.service.quickreplies;
|
|
|
|
import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
|
|
import com.example.dto.dialogflow.conversation.*;
|
|
import com.example.dto.quickreplies.QuickReplyScreenRequestDTO;
|
|
import com.example.dto.quickreplies.QuestionDTO;
|
|
import com.example.dto.quickreplies.QuickReplyDTO;
|
|
import com.example.mapper.conversation.ConversationEntryMapper;
|
|
import com.example.service.conversation.FirestoreConversationService;
|
|
import com.example.service.conversation.MemoryStoreConversationService;
|
|
import com.example.util.SessionIdGenerator;
|
|
import java.time.Instant;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.stream.IntStream;
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
import com.example.service.conversation.ConversationManagerService;
|
|
import org.springframework.context.annotation.Lazy;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
@Service
|
|
public class QuickRepliesManagerService {
|
|
private static final Logger logger = LoggerFactory.getLogger(QuickRepliesManagerService.class);
|
|
private final MemoryStoreConversationService memoryStoreConversationService;
|
|
private final FirestoreConversationService firestoreConversationService;
|
|
private final QuickReplyContentService quickReplyContentService;
|
|
private final ConversationManagerService conversationManagerService;
|
|
private final ConversationEntryMapper conversationEntryMapper;
|
|
|
|
public QuickRepliesManagerService(
|
|
@Lazy ConversationManagerService conversationManagerService,
|
|
MemoryStoreConversationService memoryStoreConversationService,
|
|
FirestoreConversationService firestoreConversationService,
|
|
QuickReplyContentService quickReplyContentService,
|
|
ConversationEntryMapper conversationEntryMapper) {
|
|
this.conversationManagerService = conversationManagerService;
|
|
this.memoryStoreConversationService = memoryStoreConversationService;
|
|
this.firestoreConversationService = firestoreConversationService;
|
|
this.quickReplyContentService = quickReplyContentService;
|
|
this.conversationEntryMapper = conversationEntryMapper;
|
|
}
|
|
|
|
public Mono<DetectIntentResponseDTO> startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) {
|
|
String userPhoneNumber = externalRequest.user().telefono();
|
|
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
|
|
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
|
|
return Mono
|
|
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
|
|
}
|
|
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
|
|
.flatMap(session -> Mono.just(session.sessionId()))
|
|
.switchIfEmpty(Mono.fromCallable(SessionIdGenerator::generateStandardSessionId))
|
|
.flatMap(sessionId -> {
|
|
String userId = "user_by_phone_" + userPhoneNumber.replaceAll("[^0-9]", "");
|
|
ConversationEntryDTO systemEntry = new ConversationEntryDTO(
|
|
ConversationEntryEntity.SISTEMA,
|
|
ConversationEntryType.INICIO,
|
|
Instant.now(),
|
|
"Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :",
|
|
null,
|
|
null);
|
|
ConversationSessionDTO newSession = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber).withPantallaContexto(externalRequest.pantallaContexto());
|
|
return persistConversationTurn(newSession, systemEntry)
|
|
.then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto()))
|
|
.map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
|
|
});
|
|
}
|
|
|
|
public Mono<DetectIntentResponseDTO> manageConversation(ExternalConvRequestDTO externalRequest) {
|
|
String userPhoneNumber = externalRequest.user().telefono();
|
|
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
|
|
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
|
|
return Mono
|
|
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
|
|
}
|
|
|
|
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
|
|
.switchIfEmpty(Mono.error(
|
|
new IllegalStateException("No quick reply session found for phone number")))
|
|
.flatMap(session -> {
|
|
|
|
return memoryStoreConversationService.getMessages(session.sessionId()).collectList().flatMap(messages -> {
|
|
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message());
|
|
|
|
int lastInitIndex = IntStream.range(0, messages.size())
|
|
.map(i -> messages.size() - 1 - i)
|
|
.filter(i -> {
|
|
ConversationMessageDTO message = messages.get(i);
|
|
return message.type() == MessageType.SYSTEM;
|
|
})
|
|
.findFirst()
|
|
.orElse(-1);
|
|
|
|
long userMessagesCount;
|
|
if (lastInitIndex != -1) {
|
|
userMessagesCount = messages.subList(lastInitIndex + 1, messages.size()).stream()
|
|
.filter(e -> e.type() == MessageType.USER)
|
|
.count();
|
|
} else {
|
|
userMessagesCount = 0;
|
|
}
|
|
|
|
if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow
|
|
// This is the second message of the flow. Return the full list.
|
|
return persistConversationTurn(session, userEntry)
|
|
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
|
|
.flatMap(quickReplyDTO -> {
|
|
ConversationEntryDTO agentEntry = ConversationEntryDTO
|
|
.forAgentWithMessage(quickReplyDTO.toString());
|
|
return persistConversationTurn(session, agentEntry)
|
|
.thenReturn(new DetectIntentResponseDTO(session.sessionId(), null, quickReplyDTO));
|
|
});
|
|
} else if (userMessagesCount == 1) { // Is the second user message in the QR flow
|
|
// This is the third message of the flow. Filter and end.
|
|
return persistConversationTurn(session, userEntry)
|
|
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
|
|
.flatMap(quickReplyDTO -> {
|
|
List<QuestionDTO> matchedPreguntas = quickReplyDTO.preguntas().stream()
|
|
.filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim()))
|
|
.toList();
|
|
|
|
if (!matchedPreguntas.isEmpty()) {
|
|
// Matched question, return the answer
|
|
String respuesta = matchedPreguntas.get(0).respuesta();
|
|
QueryResultDTO queryResult = new QueryResultDTO(respuesta, null);
|
|
DetectIntentResponseDTO response = new DetectIntentResponseDTO(session.sessionId(),
|
|
queryResult, null);
|
|
|
|
return memoryStoreConversationService
|
|
.updateSession(session.withPantallaContexto(null))
|
|
.then(persistConversationTurn(session,
|
|
ConversationEntryDTO.forAgentWithMessage(respuesta)))
|
|
.thenReturn(response);
|
|
} else {
|
|
// No match, delegate to Dialogflow
|
|
return memoryStoreConversationService
|
|
.updateSession(session.withPantallaContexto(null))
|
|
.then(conversationManagerService.manageConversation(externalRequest));
|
|
}
|
|
});
|
|
} else {
|
|
// Should not happen. End the flow.
|
|
return memoryStoreConversationService.updateSession(session.withPantallaContexto(null))
|
|
.then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null,
|
|
new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList()))));
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
private Mono<Void> persistConversationTurn(ConversationSessionDTO session, ConversationEntryDTO entry) {
|
|
logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.",
|
|
session.sessionId(), entry.type().name());
|
|
ConversationMessageDTO message = conversationEntryMapper.toConversationMessageDTO(entry);
|
|
ConversationSessionDTO updatedSession = session.withLastMessage(message.text());
|
|
return memoryStoreConversationService.saveSession(updatedSession)
|
|
.then(memoryStoreConversationService.saveMessage(session.sessionId(), message))
|
|
.doOnSuccess(v -> logger.info(
|
|
"Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.",
|
|
session.sessionId(), entry.type().name()))
|
|
.then(firestoreConversationService.saveSession(updatedSession)
|
|
.then(firestoreConversationService.saveMessage(session.sessionId(), message))
|
|
.doOnSuccess(fsVoid -> logger.debug(
|
|
"Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.",
|
|
session.sessionId(), entry.type().name()))
|
|
.doOnError(fsError -> logger.error(
|
|
"Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}",
|
|
session.sessionId(), entry.type().name(), fsError.getMessage(), fsError)))
|
|
.doOnError(
|
|
e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}",
|
|
session.sessionId(), entry.type().name(), e.getMessage(), e));
|
|
}
|
|
} |