Update 25-sept

This commit is contained in:
PAVEL PALMA
2025-09-29 02:10:37 -06:00
parent 535a1d8ca0
commit 2ad649c321
17 changed files with 993 additions and 538 deletions

View File

@@ -87,9 +87,6 @@ public class MessageEntryFilter {
String classificationPrompt = String.format(
this.promptTemplate,
CATEGORY_NOTIFICATION, CATEGORY_NOTIFICATION, CATEGORY_CONVERSATION, CATEGORY_CONVERSATION,
CATEGORY_CONVERSATION, CATEGORY_CONVERSATION, CATEGORY_CONVERSATION, CATEGORY_NOTIFICATION,
CATEGORY_NOTIFICATION, CATEGORY_CONVERSATION, CATEGORY_CONVERSATION,
conversationHistory,
interruptingNotification,
queryInputText
@@ -106,8 +103,8 @@ public class MessageEntryFilter {
geminiModelNameClassifier,
classifierTopP
);
String resultCategory = switch (geminiResponse != null ? geminiResponse.trim().toUpperCase() : "") {
String resultCategory = switch (geminiResponse != null ? geminiResponse.strip().toUpperCase() : "") {
case CATEGORY_CONVERSATION -> {
logger.info("Classified as {}.", CATEGORY_CONVERSATION);
yield CATEGORY_CONVERSATION;

View File

@@ -2,6 +2,7 @@
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.base;
import com.example.service.notification.MemoryStoreNotificationService;
@@ -75,9 +76,10 @@ public class NotificationContextResolver {
public String resolveContext(String queryInputText, String notificationsJson, String conversationJson,
String metadata, String userId, String sessionId, String userPhoneNumber) {
logger.debug("resolveContext -> queryInputText: {}, notificationsJson: {}, conversationJson: {}, metadata: {}", queryInputText, notificationsJson, conversationJson, metadata);
logger.debug("resolveContext -> queryInputText: {}, notificationsJson: {}, conversationJson: {}, metadata: {}",
queryInputText, notificationsJson, conversationJson, metadata);
if (queryInputText == null || queryInputText.isBlank()) {
logger.warn("Query input text for context resolution is null or blank. Returning {}.", CATEGORY_DIALOGFLOW);
logger.warn("Query input text for context resolution is null or blank.", CATEGORY_DIALOGFLOW);
return CATEGORY_DIALOGFLOW;
}
@@ -107,15 +109,15 @@ public class NotificationContextResolver {
if (geminiResponse != null && !geminiResponse.isBlank()) {
if (geminiResponse.trim().equalsIgnoreCase(CATEGORY_DIALOGFLOW)) {
logger.info("Resolved to {}.", CATEGORY_DIALOGFLOW);
logger.debug("Resolved to {}. Input: '{}'", CATEGORY_DIALOGFLOW, queryInputText);
return CATEGORY_DIALOGFLOW;
} else {
logger.info("Resolved to a specific response.");
logger.debug("Resolved to a specific response. Input: '{}'", queryInputText);
return geminiResponse;
}
} else {
logger.warn("Gemini returned a null or blank response. Returning {}.",
CATEGORY_DIALOGFLOW);
logger.warn("Gemini returned a null or blank response",
queryInputText, CATEGORY_DIALOGFLOW);
return CATEGORY_DIALOGFLOW;
}
} catch (Exception e) {

View File

@@ -2,6 +2,7 @@
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.conversation;
import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
@@ -10,7 +11,8 @@ import com.example.dto.dialogflow.conversation.ConversationContext;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO;
import com.example.dto.dialogflow.conversation.QueryResultDTO;
import com.example.dto.dialogflow.conversation.QueryInputDTO;
import com.example.dto.dialogflow.notification.EventInputDTO;
import com.example.dto.dialogflow.notification.NotificationDTO;
import com.example.mapper.conversation.ExternalConvRequestMapper;
import com.example.mapper.messagefilter.ConversationContextMapper;
@@ -20,6 +22,7 @@ import com.example.service.base.MessageEntryFilter;
import com.example.service.base.NotificationContextResolver;
import com.example.service.notification.MemoryStoreNotificationService;
import com.example.service.quickreplies.QuickRepliesManagerService;
import com.example.service.llm.LlmResponseTunerService;
import com.example.util.SessionIdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,379 +35,381 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Orchestrates the full lifecycle of a user conversation,(this is the core of
* the entire integration layer service), acting as the central point for all
* inbound requests.
* This service manages conversational state by integrating both an in-memory
* cache (for active sessions)
* and a durable database (for conversation history). It intelligently routes
* incoming messages
* to the appropriate handler, which can include a standard Dialogflow agent, a
* notification-specific flow, or a direct LLM-based response. The class also
* ensures data integrity and security by applying Data Loss Prevention (DLP)
* to all incoming user messages before they are processed.
*/
@Service
public class ConversationManagerService {
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
private static final long SESSION_RESET_THRESHOLD_MINUTES = 30;
private static final String CONV_HISTORY_PARAM = "conversation_history";
private final ExternalConvRequestMapper externalRequestToDialogflowMapper;
private final DialogflowClientService dialogflowServiceClient;
private final FirestoreConversationService firestoreConversationService;
private final MemoryStoreConversationService memoryStoreConversationService;
private final QuickRepliesManagerService quickRepliesManagerService;
private final MessageEntryFilter messageEntryFilter;
private final MemoryStoreNotificationService memoryStoreNotificationService;
private final NotificationContextMapper notificationContextMapper;
private final ConversationContextMapper conversationContextMapper;
private final DataLossPrevention dataLossPrevention;
private final String dlpTemplateCompleteFlow;
private static final long SESSION_RESET_THRESHOLD_MINUTES = 30;
private static final String CONV_HISTORY_PARAM = "conversation_history";
private final ExternalConvRequestMapper externalRequestToDialogflowMapper;
private final DialogflowClientService dialogflowServiceClient;
private final FirestoreConversationService firestoreConversationService;
private final MemoryStoreConversationService memoryStoreConversationService;
private final QuickRepliesManagerService quickRepliesManagerService;
private final MessageEntryFilter messageEntryFilter;
private final MemoryStoreNotificationService memoryStoreNotificationService;
private final NotificationContextMapper notificationContextMapper;
private final ConversationContextMapper conversationContextMapper;
private final DataLossPrevention dataLossPrevention;
private final String dlpTemplateCompleteFlow;
private final NotificationContextResolver notificationContextResolver;
private final NotificationContextResolver notificationContextResolver;
private final LlmResponseTunerService llmResponseTunerService;
public ConversationManagerService(
DialogflowClientService dialogflowServiceClient,
FirestoreConversationService firestoreConversationService,
MemoryStoreConversationService memoryStoreConversationService,
ExternalConvRequestMapper externalRequestToDialogflowMapper,
QuickRepliesManagerService quickRepliesManagerService,
MessageEntryFilter messageEntryFilter,
MemoryStoreNotificationService memoryStoreNotificationService,
NotificationContextMapper notificationContextMapper,
ConversationContextMapper conversationContextMapper,
DataLossPrevention dataLossPrevention,
NotificationContextResolver notificationContextResolver,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
this.dialogflowServiceClient = dialogflowServiceClient;
this.firestoreConversationService = firestoreConversationService;
this.memoryStoreConversationService = memoryStoreConversationService;
this.externalRequestToDialogflowMapper = externalRequestToDialogflowMapper;
this.quickRepliesManagerService = quickRepliesManagerService;
this.messageEntryFilter = messageEntryFilter;
this.memoryStoreNotificationService = memoryStoreNotificationService;
this.notificationContextMapper = notificationContextMapper;
this.conversationContextMapper = conversationContextMapper;
this.dataLossPrevention = dataLossPrevention;
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
this.notificationContextResolver = notificationContextResolver;
public ConversationManagerService(
DialogflowClientService dialogflowServiceClient,
FirestoreConversationService firestoreConversationService,
MemoryStoreConversationService memoryStoreConversationService,
ExternalConvRequestMapper externalRequestToDialogflowMapper,
QuickRepliesManagerService quickRepliesManagerService,
MessageEntryFilter messageEntryFilter,
MemoryStoreNotificationService memoryStoreNotificationService,
NotificationContextMapper notificationContextMapper,
ConversationContextMapper conversationContextMapper,
DataLossPrevention dataLossPrevention,
NotificationContextResolver notificationContextResolver,
LlmResponseTunerService llmResponseTunerService,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
this.dialogflowServiceClient = dialogflowServiceClient;
this.firestoreConversationService = firestoreConversationService;
this.memoryStoreConversationService = memoryStoreConversationService;
this.externalRequestToDialogflowMapper = externalRequestToDialogflowMapper;
this.quickRepliesManagerService = quickRepliesManagerService;
this.messageEntryFilter = messageEntryFilter;
this.memoryStoreNotificationService = memoryStoreNotificationService;
this.notificationContextMapper = notificationContextMapper;
this.conversationContextMapper = conversationContextMapper;
this.dataLossPrevention = dataLossPrevention;
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
this.notificationContextResolver = notificationContextResolver;
this.llmResponseTunerService = llmResponseTunerService;
}
}
public Mono<DetectIntentResponseDTO> manageConversation(ExternalConvRequestDTO externalrequest) {
return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow)
.flatMap(obfuscatedMessage -> {
ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO(
obfuscatedMessage,
externalrequest.user(),
externalrequest.channel(),
externalrequest.tipo(),
externalrequest.pantallaContexto());
return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono())
.flatMap(session -> {
public Mono<DetectIntentResponseDTO> manageConversation(ExternalConvRequestDTO externalrequest) {
return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow)
.flatMap(obfuscatedMessage -> {
ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO(
obfuscatedMessage,
externalrequest.user(),
externalrequest.channel(),
externalrequest.tipo(),
externalrequest.pantallaContexto());
return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono())
.flatMap(session -> {
if (session != null && session.pantallaContexto() != null
&& !session.pantallaContexto().isBlank()) {
logger.info(
"Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService.");
return quickRepliesManagerService.manageConversation(obfuscatedRequest);
}
return continueManagingConversation(obfuscatedRequest);
})
.switchIfEmpty(continueManagingConversation(obfuscatedRequest));
});
}
if (session != null && session.pantallaContexto() != null
&& !session.pantallaContexto().isBlank()) {
logger.info(
"Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService.");
return quickRepliesManagerService.manageConversation(obfuscatedRequest);
}
return continueManagingConversation(obfuscatedRequest);
})
.switchIfEmpty(continueManagingConversation(obfuscatedRequest));
});
}
private Mono<DetectIntentResponseDTO> continueManagingConversation(ExternalConvRequestDTO externalrequest) {
final DetectIntentRequestDTO request;
try {
request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(externalrequest);
logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO");
} catch (IllegalArgumentException e) {
logger.error("Error during pre-mapping: {}", e.getMessage());
return Mono.error(new IllegalArgumentException(
"Failed to process external request due to mapping error: " + e.getMessage(), e));
}
private Mono<DetectIntentResponseDTO> continueManagingConversation(ExternalConvRequestDTO externalrequest) {
final DetectIntentRequestDTO request;
try {
request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(externalrequest);
logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO");
} catch (IllegalArgumentException e) {
logger.error("Error during pre-mapping: {}", e.getMessage());
return Mono.error(new IllegalArgumentException(
"Failed to process external request due to mapping error: " + e.getMessage(), e));
}
Map<String, Object> params = Optional.ofNullable(request.queryParams())
.map(queryParamsDTO -> queryParamsDTO.parameters())
.orElse(Collections.emptyMap());
final ConversationContext context;
try {
context = resolveAndValidateRequest(request);
} catch (IllegalArgumentException e) {
logger.error("Validation error for incoming request: {}", e.getMessage());
return Mono.error(e);
}
Object telefonoObj = params.get("telefono");
if (!(telefonoObj instanceof String) || ((String) telefonoObj).isBlank()) {
logger.error("Critical error: parameter is missing, not a String, or blank after mapping.");
return Mono.error(new IllegalStateException("Internal error: parameter is invalid."));
}
return handleMessageClassification(context, request);
}
String primaryPhoneNumber = (String) telefonoObj;
String resolvedUserId = params.get("usuario_id") instanceof String ? (String) params.get("usuario_id") : null;
String userMessageText = request.queryInput().text().text();
final ConversationContext context = new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber);
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context,
DetectIntentRequestDTO request) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
return handleMessageClassification(context, request);
}
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toText)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService
.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText,
notificationText, conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return continueConversationFlow(context, request);
}
})
.switchIfEmpty(continueConversationFlow(context, request));
});
}
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context,
DetectIntentRequestDTO request) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
private Mono<DetectIntentResponseDTO> continueConversationFlow(ConversationContext context,
DetectIntentRequestDTO request) {
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toText)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService
.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText,
notificationText, conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return continueConversationFlow(context, request);
}
})
.switchIfEmpty(continueConversationFlow(context, request));
});
}
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
private Mono<DetectIntentResponseDTO> continueConversationFlow(ConversationContext context,
DetectIntentRequestDTO request) {
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
logger.info("Primary Check (MemoryStore): Looking up session for phone number");
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.flatMap(session -> handleMessageClassification(context, request, session))
.switchIfEmpty(Mono.defer(() -> {
logger.info("No session found in MemoryStore. Performing full lookup to Firestore.");
return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber);
}))
.onErrorResume(e -> {
logger.error("Overall error handling conversation in ConversationManagerService: {}",
e.getMessage(), e);
return Mono
.error(new RuntimeException("Failed to process conversation due to an internal error.", e));
});
}
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context,
DetectIntentRequestDTO request, ConversationSessionDTO session) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
logger.info("Primary Check (MemoryStore): Looking up session for phone number: {}", userPhoneNumber);
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.flatMap(session -> handleMessageClassification(context, request, session))
.switchIfEmpty(Mono.defer(() -> {
logger.info("No session found in MemoryStore. Performing full lookup to Firestore.");
return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber);
}))
.onErrorResume(e -> {
logger.error("Overall error handling conversation in ConversationManagerService: {}",
e.getMessage(), e);
return Mono
.error(new RuntimeException("Failed to process conversation due to an internal error.", e));
});
}
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String conversationHistory = conversationContextMapper.toText(session);
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText,
conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return proceedWithConversation(context, request, session);
}
})
.switchIfEmpty(proceedWithConversation(context, request, session));
}
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context,
DetectIntentRequestDTO request, ConversationSessionDTO session) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
private Mono<DetectIntentResponseDTO> proceedWithConversation(ConversationContext context,
DetectIntentRequestDTO request, ConversationSessionDTO session) {
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.",
session.sessionId());
return processDialogflowRequest(session, request, context.userId(), context.userMessageText(),
context.primaryPhoneNumber(), false);
} else {
logger.info(
"Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.",
session.sessionId());
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(),
context.primaryPhoneNumber(), false);
}
}
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String conversationHistory = conversationContextMapper.toText(session);
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText,
conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return proceedWithConversation(context, request, session);
}
})
.switchIfEmpty(proceedWithConversation(context, request, session));
}
private Mono<DetectIntentResponseDTO> fullLookupAndProcess(ConversationSessionDTO oldSession,
DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) {
return firestoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toTextWithLimits)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("Creating new session {} after full lookup.", newSessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId,
userPhoneNumber);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber,
true);
});
}
private Mono<DetectIntentResponseDTO> proceedWithConversation(ConversationContext context,
DetectIntentRequestDTO request, ConversationSessionDTO session) {
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.",
session.sessionId());
return processDialogflowRequest(session, request, context.userId(), context.userMessageText(),
context.primaryPhoneNumber(), false);
} else {
logger.info(
"Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.",
session.sessionId());
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(),
context.primaryPhoneNumber(), false);
}
}
private Mono<DetectIntentResponseDTO> processDialogflowRequest(ConversationSessionDTO session,
DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber,
boolean newSession) {
final String finalSessionId = session.sessionId();
private Mono<DetectIntentResponseDTO> fullLookupAndProcess(ConversationSessionDTO oldSession,
DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) {
return firestoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toTextWithLimits)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("Creating new session {} after full lookup.", newSessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId,
userPhoneNumber);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber,
true);
});
}
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
private Mono<DetectIntentResponseDTO> processDialogflowRequest(ConversationSessionDTO session,
DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber,
boolean newSession) {
final String finalSessionId = session.sessionId();
return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber)
.doOnSuccess(v -> logger.debug(
"User entry successfully persisted for session {}. Proceeding to Dialogflow...",
finalSessionId))
.doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId,
e.getMessage(), e))
.then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request)
.flatMap(response -> {
logger.debug(
"Received Dialogflow CX response for session {}. Initiating agent response persistence.",
finalSessionId);
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber)
.thenReturn(response);
})
.doOnError(
error -> logger.error("Overall error during conversation management for session {}: {}",
finalSessionId, error.getMessage(), error))));
}
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
private Mono<DetectIntentResponseDTO> startNotificationConversation(ConversationContext context,
DetectIntentRequestDTO request, NotificationDTO notification) {
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber)
.doOnSuccess(v -> logger.debug(
"User entry successfully persisted for session {}. Proceeding to Dialogflow...",
finalSessionId))
.doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId,
e.getMessage(), e))
.then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request)
.flatMap(response -> {
logger.debug(
"Received Dialogflow CX response for session {}. Initiating agent response persistence.",
finalSessionId);
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber)
.thenReturn(response);
})
.doOnError(
error -> logger.error("Overall error during conversation management for session {}: {}",
finalSessionId, error.getMessage(), error))));
}
return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber)
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing notification session found for phone number. Creating new session: {}",
newSessionId);
return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber));
}))
.flatMap(session -> {
final String sessionId = session.sessionId();
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
String notificationText = notificationContextMapper.toText(notification);
public Mono<DetectIntentResponseDTO> startNotificationConversation(ConversationContext context,
DetectIntentRequestDTO request, NotificationDTO notification) {
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
Map<String, Object> filteredParams = notification.parametros().entrySet().stream()
.filter(entry -> entry.getKey().startsWith("notification_po_"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber)
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing notification session found for phone number {}. Creating new session: {}",
userPhoneNumber, newSessionId);
return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber));
}))
.flatMap(session -> {
final String sessionId = session.sessionId();
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
String notificationText = notificationContextMapper.toText(notification);
String resolvedContext = notificationContextResolver.resolveContext(userMessageText,
notificationText, conversationHistory, filteredParams.toString(), userId, sessionId,
userPhoneNumber);
Map<String, Object> filteredParams = notification.parametros().entrySet().stream()
.filter(entry -> entry.getKey().startsWith("notification_po_"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!NotificationContextResolver.CATEGORY_DIALOGFLOW.equals(resolvedContext)) {
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros());
ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext,
notification.parametros());
String resolvedContext = notificationContextResolver.resolveContext(userMessageText,
notificationText, conversationHistory, filteredParams.toString(), userId, sessionId,
userPhoneNumber);
return persistNotificationTurn(userId, sessionId, userEntry, userPhoneNumber)
.then(persistNotificationTurn(userId, sessionId, llmEntry, userPhoneNumber))
.then(Mono.fromCallable(() -> {
QueryResultDTO queryResult = new QueryResultDTO(resolvedContext,
java.util.Collections.emptyMap());
return new DetectIntentResponseDTO(null, queryResult);
}));
}
if (!NotificationContextResolver.CATEGORY_DIALOGFLOW.equals(resolvedContext)) {
String uuid = UUID.randomUUID().toString();
llmResponseTunerService.setValue(uuid, resolvedContext).subscribe();
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros());
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros());
ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext,
notification.parametros());
DetectIntentRequestDTO finalRequest;
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
finalRequest = request.withParameters(notification.parametros());
} else {
finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory)
.withParameters(notification.parametros());
}
return persistNotificationTurn(userId, sessionId, userEntry, userPhoneNumber)
.then(persistNotificationTurn(userId, sessionId, llmEntry, userPhoneNumber))
.then(Mono.defer(() -> {
EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED");
QueryInputDTO queryInput = new QueryInputDTO(null, eventInput,
request.queryInput().languageCode());
DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput,
request.queryParams())
.withParameter("llm_reponse_uuid", uuid);
return dialogflowServiceClient.detectIntent(sessionId, newRequest)
.flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult());
return persistNotificationTurn(userId, sessionId, agentEntry,
userPhoneNumber)
.thenReturn(response);
});
}));
}
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, finalRequest)
.flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult());
return memoryStoreNotificationService
.saveEntry(userId, sessionId, agentEntry, userPhoneNumber)
.thenReturn(response);
}));
});
}
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros());
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId,
entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> logger.info(
"Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()))
.then(firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
sessionId, entry.type().name()))
.doOnError(fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError)))
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
DetectIntentRequestDTO finalRequest;
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
finalRequest = request.withParameters(notification.parametros());
} else {
finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory)
.withParameters(notification.parametros());
}
private Mono<Void> persistNotificationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for notification session {}. Type: {}. Writing to Redis first.",
sessionId,
entry.type().name());
return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> logger.info(
"Entry saved to Redis for notification session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()))
.doOnError(e -> logger.error(
"Error during primary Redis write for notification session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, finalRequest)
.flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult());
return memoryStoreNotificationService
.saveEntry(userId, sessionId, agentEntry, userPhoneNumber)
.thenReturn(response);
}));
});
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId,
entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> logger.info(
"Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()))
.then(firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
sessionId, entry.type().name()))
.doOnError(fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError)))
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
private Mono<Void> persistNotificationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for notification session {}. Type: {}. Writing to Redis first.",
sessionId,
entry.type().name());
return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> logger.info(
"Entry saved to Redis for notification session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()))
.doOnError(e -> logger.error(
"Error during primary Redis write for notification session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) {
Map<String, Object> params = Optional.ofNullable(request.queryParams())
.map(queryParamsDTO -> queryParamsDTO.parameters())
.orElse(Collections.emptyMap());
String primaryPhoneNumber = null;
Object telefonoObj = params.get("telefono"); // Get from map
if (telefonoObj instanceof String) {
primaryPhoneNumber = (String) telefonoObj;
} else if (telefonoObj != null) {
logger.warn("Parameter 'telefono' in queryParams is not a String (type: {}). Expected String.",
telefonoObj.getClass().getName());
}
if (primaryPhoneNumber == null || primaryPhoneNumber.trim().isEmpty()) {
throw new IllegalArgumentException(
"Phone number (telefono) is required in query parameters for conversation management.");
}
String resolvedUserId = null;
Object userIdObj = params.get("usuario_id");
if (userIdObj instanceof String) {
resolvedUserId = (String) userIdObj;
} else if (userIdObj != null) {
logger.warn("Parameter 'userId' in query_params is not a String (type: {}). Expected String.",
userIdObj.getClass().getName());
}
if (resolvedUserId == null || resolvedUserId.trim().isEmpty()) {
resolvedUserId = "user_by_phone_" + primaryPhoneNumber.replaceAll("[^0-9]", "");
logger.warn("User ID not provided in query parameters. Using derived ID from phone number");
}
if (request.queryInput() == null || request.queryInput().text() == null ||
request.queryInput().text().text() == null || request.queryInput().text().text().trim().isEmpty()) {
throw new IllegalArgumentException("Dialogflow query input text is required.");
}
String userMessageText = request.queryInput().text().text();
return new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber);
}
}

View File

@@ -21,6 +21,14 @@ import com.example.util.TextObfuscator;
import reactor.core.publisher.Mono;
/**
Implements a data loss prevention service by integrating with the
Google Cloud Data Loss Prevention (DLP) API. This service is responsible for
scanning a given text input to identify and obfuscate sensitive information based on
a specified DLP template. If the DLP API detects sensitive findings, the
original text is obfuscated to protect user data; otherwise, the original
text is returned.
*/
@Service
public class DataLossPreventionImpl implements DataLossPrevention {

View File

@@ -0,0 +1,13 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.llm;
import reactor.core.publisher.Mono;
public interface LlmResponseTunerService {
Mono<String> getValue(String key);
Mono<Void> setValue(String key, String value);
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.llm;
import java.time.Duration;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class LlmResponseTunerServiceImpl implements LlmResponseTunerService {
private final ReactiveRedisTemplate<String, String> reactiveStringRedisTemplate;
private final String llmPreResponseCollectionName = "llm-pre-response:";
private final Duration ttl = Duration.ofHours(1);
public LlmResponseTunerServiceImpl(ReactiveRedisTemplate<String, String> reactiveStringRedisTemplate) {
this.reactiveStringRedisTemplate = reactiveStringRedisTemplate;
}
@Override
public Mono<String> getValue(String key) {
return reactiveStringRedisTemplate.opsForValue().get(llmPreResponseCollectionName + key);
}
@Override
public Mono<Void> setValue(String key, String value) {
return reactiveStringRedisTemplate.opsForValue().set(llmPreResponseCollectionName + key, value, ttl).then();
}
}

View File

@@ -31,155 +31,155 @@ import reactor.core.publisher.Mono;
@Service
public class QuickRepliesManagerService {
private static final Logger logger = LoggerFactory.getLogger(QuickRepliesManagerService.class);
private final MemoryStoreConversationService memoryStoreConversationService;
private final FirestoreConversationService firestoreConversationService;
private final QuickReplyContentService quickReplyContentService;
private final ConversationManagerService conversationManagerService;
private static final Logger logger = LoggerFactory.getLogger(QuickRepliesManagerService.class);
private final MemoryStoreConversationService memoryStoreConversationService;
private final FirestoreConversationService firestoreConversationService;
private final QuickReplyContentService quickReplyContentService;
private final ConversationManagerService conversationManagerService;
public QuickRepliesManagerService(
@Lazy ConversationManagerService conversationManagerService,
MemoryStoreConversationService memoryStoreConversationService,
FirestoreConversationService firestoreConversationService,
QuickReplyContentService quickReplyContentService) {
this.conversationManagerService = conversationManagerService;
this.memoryStoreConversationService = memoryStoreConversationService;
this.firestoreConversationService = firestoreConversationService;
this.quickReplyContentService = quickReplyContentService;
}
public QuickRepliesManagerService(
@Lazy ConversationManagerService conversationManagerService,
MemoryStoreConversationService memoryStoreConversationService,
FirestoreConversationService firestoreConversationService,
QuickReplyContentService quickReplyContentService) {
this.conversationManagerService = conversationManagerService;
this.memoryStoreConversationService = memoryStoreConversationService;
this.firestoreConversationService = firestoreConversationService;
this.quickReplyContentService = quickReplyContentService;
}
public Mono<DetectIntentResponseDTO> startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) {
String userPhoneNumber = externalRequest.user().telefono();
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.flatMap(session -> Mono.just(session.sessionId()))
.switchIfEmpty(Mono.fromCallable(SessionIdGenerator::generateStandardSessionId))
.flatMap(sessionId -> {
String userId = "user_by_phone_" + userPhoneNumber.replaceAll("[^0-9]", "");
ConversationEntryDTO systemEntry = new ConversationEntryDTO(
ConversationEntryEntity.SISTEMA,
ConversationEntryType.INICIO,
Instant.now(),
"Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :",
null,
null);
return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber,
externalRequest.pantallaContexto())
.then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto()))
.map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
});
}
public Mono<DetectIntentResponseDTO> startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) {
String userPhoneNumber = externalRequest.user().telefono();
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.flatMap(session -> Mono.just(session.sessionId()))
.switchIfEmpty(Mono.fromCallable(SessionIdGenerator::generateStandardSessionId))
.flatMap(sessionId -> {
String userId = "user_by_phone_" + userPhoneNumber.replaceAll("[^0-9]", "");
ConversationEntryDTO systemEntry = new ConversationEntryDTO(
ConversationEntryEntity.SISTEMA,
ConversationEntryType.INICIO,
Instant.now(),
"Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :",
null,
null);
return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber,
externalRequest.pantallaContexto())
.then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto()))
.map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
});
}
public Mono<DetectIntentResponseDTO> manageConversation(ExternalConvRequestDTO externalRequest) {
String userPhoneNumber = externalRequest.user().telefono();
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
public Mono<DetectIntentResponseDTO> manageConversation(ExternalConvRequestDTO externalRequest) {
String userPhoneNumber = externalRequest.user().telefono();
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.switchIfEmpty(Mono.error(
new IllegalStateException("No quick reply session found for phone number" )))
.flatMap(session -> {
String userId = session.userId();
String sessionId = session.sessionId();
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.switchIfEmpty(Mono.error(
new IllegalStateException("No quick reply session found for phone number")))
.flatMap(session -> {
String userId = session.userId();
String sessionId = session.sessionId();
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message());
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message());
List<ConversationEntryDTO> entries = session.entries();
int lastInitIndex = IntStream.range(0, entries.size())
.map(i -> entries.size() - 1 - i)
.filter(i -> {
ConversationEntryDTO entry = entries.get(i);
return entry.entity() == ConversationEntryEntity.SISTEMA
&& entry.type() == ConversationEntryType.INICIO;
})
.findFirst()
.orElse(-1);
List<ConversationEntryDTO> entries = session.entries();
int lastInitIndex = IntStream.range(0, entries.size())
.map(i -> entries.size() - 1 - i)
.filter(i -> {
ConversationEntryDTO entry = entries.get(i);
return entry.entity() == ConversationEntryEntity.SISTEMA
&& entry.type() == ConversationEntryType.INICIO;
})
.findFirst()
.orElse(-1);
long userMessagesCount;
if (lastInitIndex != -1) {
userMessagesCount = entries.subList(lastInitIndex + 1, entries.size()).stream()
.filter(e -> e.entity() == ConversationEntryEntity.USUARIO)
.count();
} else {
userMessagesCount = 0;
}
long userMessagesCount;
if (lastInitIndex != -1) {
userMessagesCount = entries.subList(lastInitIndex + 1, entries.size()).stream()
.filter(e -> e.entity() == ConversationEntryEntity.USUARIO)
.count();
} else {
userMessagesCount = 0;
}
if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow
// This is the second message of the flow. Return the full list.
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber,
session.pantallaContexto())
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
.flatMap(quickReplyDTO -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgentWithMessage(quickReplyDTO.toString());
return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber,
session.pantallaContexto())
.thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
});
} else if (userMessagesCount == 1) { // Is the second user message in the QR flow
// This is the third message of the flow. Filter and end.
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber,
session.pantallaContexto())
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
.flatMap(quickReplyDTO -> {
List<QuestionDTO> matchedPreguntas = quickReplyDTO.preguntas().stream()
.filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim()))
.toList();
if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow
// This is the second message of the flow. Return the full list.
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber,
session.pantallaContexto())
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
.flatMap(quickReplyDTO -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgentWithMessage(quickReplyDTO.toString());
return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber,
session.pantallaContexto())
.thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
});
} else if (userMessagesCount == 1) { // Is the second user message in the QR flow
// This is the third message of the flow. Filter and end.
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber,
session.pantallaContexto())
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
.flatMap(quickReplyDTO -> {
List<QuestionDTO> matchedPreguntas = quickReplyDTO.preguntas().stream()
.filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim()))
.toList();
if (!matchedPreguntas.isEmpty()) {
// Matched question, return the answer
String respuesta = matchedPreguntas.get(0).respuesta();
QueryResultDTO queryResult = new QueryResultDTO(respuesta, null);
DetectIntentResponseDTO response = new DetectIntentResponseDTO(sessionId,
queryResult, null);
if (!matchedPreguntas.isEmpty()) {
// Matched question, return the answer
String respuesta = matchedPreguntas.get(0).respuesta();
QueryResultDTO queryResult = new QueryResultDTO(respuesta, null);
DetectIntentResponseDTO response = new DetectIntentResponseDTO(sessionId,
queryResult, null);
return memoryStoreConversationService
.updateSession(session.withPantallaContexto(null))
.then(persistConversationTurn(userId, sessionId,
ConversationEntryDTO.forAgentWithMessage(respuesta),
userPhoneNumber, null))
.thenReturn(response);
} else {
// No match, delegate to Dialogflow
return memoryStoreConversationService
.updateSession(session.withPantallaContexto(null))
.then(conversationManagerService.manageConversation(externalRequest));
}
});
} else {
// Should not happen. End the flow.
return memoryStoreConversationService.updateSession(session.withPantallaContexto(null))
.then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null,
new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList()))));
}
});
}
return memoryStoreConversationService
.updateSession(session.withPantallaContexto(null))
.then(persistConversationTurn(userId, sessionId,
ConversationEntryDTO.forAgentWithMessage(respuesta),
userPhoneNumber, null))
.thenReturn(response);
} else {
// No match, delegate to Dialogflow
return memoryStoreConversationService
.updateSession(session.withPantallaContexto(null))
.then(conversationManagerService.manageConversation(externalRequest));
}
});
} else {
// Should not happen. End the flow.
return memoryStoreConversationService.updateSession(session.withPantallaContexto(null))
.then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null,
new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList()))));
}
});
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber, String pantallaContexto) {
logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.",
sessionId, entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto)
.doOnSuccess(v -> logger.info(
"Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()))
.then(firestoreConversationService
.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto)
.doOnSuccess(fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.",
sessionId, entry.type().name()))
.doOnError(fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError)))
.doOnError(
e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}",
sessionId, entry.type().name(), e.getMessage(), e));
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber, String pantallaContexto) {
logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.",
sessionId, entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto)
.doOnSuccess(v -> logger.info(
"Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()))
.then(firestoreConversationService
.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto)
.doOnSuccess(fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.",
sessionId, entry.type().name()))
.doOnError(fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError)))
.doOnError(
e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}",
sessionId, entry.type().name(), e.getMessage(), e));
}
}