UPDATE 10-sept

This commit is contained in:
PAVEL PALMA
2025-09-10 10:24:55 -06:00
parent 11888d2632
commit 9b1824bc48
15 changed files with 582 additions and 234 deletions

View File

@@ -3,18 +3,21 @@
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.conversation;
import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.dto.dialogflow.conversation.ConversationContext;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO;
import com.example.dto.dialogflow.conversation.QueryResultDTO;
import com.example.dto.dialogflow.notification.NotificationDTO;
import com.example.mapper.conversation.ExternalConvRequestMapper;
import com.example.mapper.messagefilter.ConversationContextMapper;
import com.example.mapper.messagefilter.NotificationContextMapper;
import com.example.service.base.DialogflowClientService;
import com.example.service.base.MessageEntryFilter;
import com.example.service.base.NotificationContextResolver;
import com.example.service.notification.MemoryStoreNotificationService;
import com.example.service.quickreplies.QuickRepliesManagerService;
import com.example.util.SessionIdGenerator;
@@ -29,37 +32,41 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@Service
public class ConversationManagerService {
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
private static final long SESSION_RESET_THRESHOLD_MINUTES = 30;
private static final String CONV_HISTORY_PARAM = "conversation_history";
private final ExternalConvRequestMapper externalRequestToDialogflowMapper;
private final DialogflowClientService dialogflowServiceClient;
private final FirestoreConversationService firestoreConversationService;
private final MemoryStoreConversationService memoryStoreConversationService;
private final QuickRepliesManagerService quickRepliesManagerService;
private final MessageEntryFilter messageEntryFilter;
private final MemoryStoreNotificationService memoryStoreNotificationService;
private final NotificationContextMapper notificationContextMapper;
private final ConversationContextMapper conversationContextMapper;
private final DataLossPrevention dataLossPrevention;
private final String dlpTemplateCompleteFlow;
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
public ConversationManagerService(
DialogflowClientService dialogflowServiceClient,
FirestoreConversationService firestoreConversationService,
MemoryStoreConversationService memoryStoreConversationService,
ExternalConvRequestMapper externalRequestToDialogflowMapper,
QuickRepliesManagerService quickRepliesManagerService,
MessageEntryFilter messageEntryFilter,
MemoryStoreNotificationService memoryStoreNotificationService,
NotificationContextMapper notificationContextMapper,
ConversationContextMapper conversationContextMapper,
DataLossPrevention dataLossPrevention,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
private static final long SESSION_RESET_THRESHOLD_MINUTES = 30;
private static final String CONV_HISTORY_PARAM = "conversation_history";
private final ExternalConvRequestMapper externalRequestToDialogflowMapper;
private final DialogflowClientService dialogflowServiceClient;
private final FirestoreConversationService firestoreConversationService;
private final MemoryStoreConversationService memoryStoreConversationService;
private final QuickRepliesManagerService quickRepliesManagerService;
private final MessageEntryFilter messageEntryFilter;
private final MemoryStoreNotificationService memoryStoreNotificationService;
private final NotificationContextMapper notificationContextMapper;
private final ConversationContextMapper conversationContextMapper;
private final DataLossPrevention dataLossPrevention;
private final String dlpTemplateCompleteFlow;
private final NotificationContextResolver notificationContextResolver;
public ConversationManagerService(
DialogflowClientService dialogflowServiceClient,
FirestoreConversationService firestoreConversationService,
MemoryStoreConversationService memoryStoreConversationService,
ExternalConvRequestMapper externalRequestToDialogflowMapper,
QuickRepliesManagerService quickRepliesManagerService,
MessageEntryFilter messageEntryFilter,
MemoryStoreNotificationService memoryStoreNotificationService,
NotificationContextMapper notificationContextMapper,
ConversationContextMapper conversationContextMapper,
DataLossPrevention dataLossPrevention,
NotificationContextResolver notificationContextResolver,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
this.dialogflowServiceClient = dialogflowServiceClient;
this.firestoreConversationService = firestoreConversationService;
this.memoryStoreConversationService = memoryStoreConversationService;
@@ -71,204 +78,269 @@ public class ConversationManagerService {
this.conversationContextMapper = conversationContextMapper;
this.dataLossPrevention = dataLossPrevention;
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
}
public Mono<DetectIntentResponseDTO> manageConversation(ExternalConvRequestDTO externalrequest) {
return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow)
.flatMap(obfuscatedMessage -> {
ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO(
obfuscatedMessage,
externalrequest.user(),
externalrequest.channel(),
externalrequest.tipo(),
externalrequest.pantallaContexto()
);
return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono())
.flatMap(session -> {
if (session != null && session.pantallaContexto() != null && !session.pantallaContexto().isBlank()) {
logger.info("Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService.");
return quickRepliesManagerService.manageConversation(obfuscatedRequest);
this.notificationContextResolver = notificationContextResolver;
}
public Mono<DetectIntentResponseDTO> manageConversation(ExternalConvRequestDTO externalrequest) {
return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow)
.flatMap(obfuscatedMessage -> {
ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO(
obfuscatedMessage,
externalrequest.user(),
externalrequest.channel(),
externalrequest.tipo(),
externalrequest.pantallaContexto());
return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono())
.flatMap(session -> {
if (session != null && session.pantallaContexto() != null
&& !session.pantallaContexto().isBlank()) {
logger.info(
"Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService.");
return quickRepliesManagerService.manageConversation(obfuscatedRequest);
}
return continueManagingConversation(obfuscatedRequest);
})
.switchIfEmpty(continueManagingConversation(obfuscatedRequest));
})
.switchIfEmpty(continueManagingConversation(obfuscatedRequest));
});
}
private Mono<DetectIntentResponseDTO> continueManagingConversation(ExternalConvRequestDTO externalrequest) {
final DetectIntentRequestDTO request;
try {
request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(externalrequest);
logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO");
} catch (IllegalArgumentException e) {
logger.error("Error during pre-mapping: {}", e.getMessage());
return Mono.error(new IllegalArgumentException(
"Failed to process external request due to mapping error: " + e.getMessage(), e));
}
private Mono<DetectIntentResponseDTO> continueManagingConversation(ExternalConvRequestDTO externalrequest) {
final DetectIntentRequestDTO request;
try {
request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(externalrequest);
logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO");
} catch (IllegalArgumentException e) {
logger.error("Error during pre-mapping: {}", e.getMessage());
return Mono.error(new IllegalArgumentException(
"Failed to process external request due to mapping error: " + e.getMessage(), e));
}
final ConversationContext context;
try {
context = resolveAndValidateRequest(request);
} catch (IllegalArgumentException e) {
logger.error("Validation error for incoming request: {}", e.getMessage());
return Mono.error(e);
}
return handleMessageClassification(context, request);
}
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toText)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return continueConversationFlow(context, request);
}
})
.switchIfEmpty(continueConversationFlow(context, request));
});
}
private Mono<DetectIntentResponseDTO> continueConversationFlow(ConversationContext context, DetectIntentRequestDTO request) {
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
logger.info("Primary Check (MemoryStore): Looking up session for phone number: {}", userPhoneNumber);
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.flatMap(session -> handleMessageClassification(context, request, session))
.switchIfEmpty(Mono.defer(() -> {
logger.info("No session found in MemoryStore. Performing full lookup to Firestore.");
return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber);
}))
.onErrorResume(e -> {
logger.error("Overall error handling conversation in ConversationManagerService: {}", e.getMessage(), e);
return Mono.error(new RuntimeException("Failed to process conversation due to an internal error.", e));
});
final ConversationContext context;
try {
context = resolveAndValidateRequest(request);
} catch (IllegalArgumentException e) {
logger.error("Validation error for incoming request: {}", e.getMessage());
return Mono.error(e);
}
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
return handleMessageClassification(context, request);
}
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String conversationHistory = conversationContextMapper.toText(session);
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context,
DetectIntentRequestDTO request) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toText)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService
.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory);
String classification = messageEntryFilter.classifyMessage(userMessageText,
notificationText, conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
return startNotificationConversation(context, request, notification);
} else {
return proceedWithConversation(context, request, session);
return continueConversationFlow(context, request);
}
})
.switchIfEmpty(proceedWithConversation(context, request, session));
}
})
.switchIfEmpty(continueConversationFlow(context, request));
});
}
private Mono<DetectIntentResponseDTO> proceedWithConversation(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) {
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.", session.sessionId());
return processDialogflowRequest(session, request, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false);
} else {
logger.info("Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.", session.sessionId());
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false);
}
}
private Mono<DetectIntentResponseDTO> fullLookupAndProcess(ConversationSessionDTO oldSession, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) {
return firestoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toTextWithLimits)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("Creating new session {} after full lookup.", newSessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, true);
});
}
private Mono<DetectIntentResponseDTO> processDialogflowRequest(ConversationSessionDTO session, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber, boolean newSession) {
final String finalSessionId = session.sessionId();
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber)
.doOnSuccess(v -> logger.debug("User entry successfully persisted for session {}. Proceeding to Dialogflow...", finalSessionId))
.doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId, e.getMessage(), e))
.then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request)
.flatMap(response -> {
logger.debug("Received Dialogflow CX response for session {}. Initiating agent response persistence.", finalSessionId);
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber)
.thenReturn(response);
})
.doOnError(error -> logger.error("Overall error during conversation management for session {}: {}", finalSessionId, error.getMessage(), error))
));
}
private Mono<DetectIntentResponseDTO> startNotificationConversation(ConversationContext context, DetectIntentRequestDTO request, NotificationDTO notification) {
private Mono<DetectIntentResponseDTO> continueConversationFlow(ConversationContext context,
DetectIntentRequestDTO request) {
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
logger.info("Primary Check (MemoryStore): Looking up session for phone number: {}", userPhoneNumber);
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.flatMap(session -> handleMessageClassification(context, request, session))
.switchIfEmpty(Mono.defer(() -> {
logger.info("No session found in MemoryStore. Performing full lookup to Firestore.");
return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber);
}))
.onErrorResume(e -> {
logger.error("Overall error handling conversation in ConversationManagerService: {}",
e.getMessage(), e);
return Mono
.error(new RuntimeException("Failed to process conversation due to an internal error.", e));
});
}
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context,
DetectIntentRequestDTO request, ConversationSessionDTO session) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String conversationHistory = conversationContextMapper.toText(session);
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText,
conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return proceedWithConversation(context, request, session);
}
})
.switchIfEmpty(proceedWithConversation(context, request, session));
}
private Mono<DetectIntentResponseDTO> proceedWithConversation(ConversationContext context,
DetectIntentRequestDTO request, ConversationSessionDTO session) {
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.",
session.sessionId());
return processDialogflowRequest(session, request, context.userId(), context.userMessageText(),
context.primaryPhoneNumber(), false);
} else {
logger.info(
"Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.",
session.sessionId());
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(),
context.primaryPhoneNumber(), false);
}
}
private Mono<DetectIntentResponseDTO> fullLookupAndProcess(ConversationSessionDTO oldSession,
DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) {
return firestoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toTextWithLimits)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("Creating new session {} after full lookup.", newSessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId,
userPhoneNumber);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber,
true);
});
}
private Mono<DetectIntentResponseDTO> processDialogflowRequest(ConversationSessionDTO session,
DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber,
boolean newSession) {
final String finalSessionId = session.sessionId();
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber)
.doOnSuccess(v -> logger.debug(
"User entry successfully persisted for session {}. Proceeding to Dialogflow...",
finalSessionId))
.doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId,
e.getMessage(), e))
.then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request)
.flatMap(response -> {
logger.debug(
"Received Dialogflow CX response for session {}. Initiating agent response persistence.",
finalSessionId);
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber)
.thenReturn(response);
})
.doOnError(
error -> logger.error("Overall error during conversation management for session {}: {}",
finalSessionId, error.getMessage(), error))));
}
private Mono<DetectIntentResponseDTO> startNotificationConversation(ConversationContext context,
DetectIntentRequestDTO request, NotificationDTO notification) {
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber)
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing notification session found for phone number {}. Creating new session: {}",
userPhoneNumber, newSessionId);
return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber));
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing notification session found for phone number {}. Creating new session: {}",
userPhoneNumber, newSessionId);
return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber));
}))
.flatMap(session -> {
final String sessionId = session.sessionId();
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, request)
.doOnSuccess(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
memoryStoreNotificationService.saveEntry(userId, sessionId, agentEntry, userPhoneNumber).subscribe();
final String sessionId = session.sessionId();
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
String notificationText = notificationContextMapper.toText(notification);
Map<String, Object> filteredParams = notification.parametros().entrySet().stream()
.filter(entry -> entry.getKey().startsWith("po_"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
String resolvedContext = notificationContextResolver.resolveContext(userMessageText,
notificationText, conversationHistory, filteredParams.toString(), userId, sessionId,
userPhoneNumber);
if (!NotificationContextResolver.CATEGORY_DIALOGFLOW.equals(resolvedContext)) {
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros());
ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext,
notification.parametros());
return persistNotificationTurn(userId, sessionId, userEntry, userPhoneNumber)
.then(persistNotificationTurn(userId, sessionId, llmEntry, userPhoneNumber))
.then(Mono.fromCallable(() -> {
QueryResultDTO queryResult = new QueryResultDTO(resolvedContext,
java.util.Collections.emptyMap());
return new DetectIntentResponseDTO(null, queryResult);
}));
}
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros());
DetectIntentRequestDTO finalRequest;
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
finalRequest = request.withParameters(notification.parametros());
} else {
finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory)
.withParameters(notification.parametros());
}
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, finalRequest)
.flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult());
return memoryStoreNotificationService
.saveEntry(userId, sessionId, agentEntry, userPhoneNumber)
.thenReturn(response);
}));
} else {
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, newRequest)
.doOnSuccess(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
memoryStoreNotificationService.saveEntry(userId, sessionId, agentEntry, userPhoneNumber).subscribe();
}));
}
});
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,String userPhoneNumber) {
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId,
entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
@@ -284,42 +356,54 @@ public class ConversationManagerService {
sessionId, entry.type().name(), fsError.getMessage(), fsError)))
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) {
}
private Mono<Void> persistNotificationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for notification session {}. Type: {}. Writing to Redis first.", sessionId,
entry.type().name());
return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> logger.info(
"Entry saved to Redis for notification session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()))
.doOnError(e -> logger.error("Error during primary Redis write for notification session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) {
Map<String, Object> params = Optional.ofNullable(request.queryParams())
.map(queryParamsDTO -> queryParamsDTO.parameters())
.orElse(Collections.emptyMap());
String primaryPhoneNumber = null;
Object telefonoObj = params.get("telefono"); // Get from map
if (telefonoObj instanceof String) {
primaryPhoneNumber = (String) telefonoObj;
primaryPhoneNumber = (String) telefonoObj;
} else if (telefonoObj != null) {
logger.warn("Parameter 'telefono' in queryParams is not a String (type: {}). Expected String.",
telefonoObj.getClass().getName());
logger.warn("Parameter 'telefono' in queryParams is not a String (type: {}). Expected String.",
telefonoObj.getClass().getName());
}
if (primaryPhoneNumber == null || primaryPhoneNumber.trim().isEmpty()) {
throw new IllegalArgumentException(
"Phone number (telefono) is required in query parameters for conversation management.");
throw new IllegalArgumentException(
"Phone number (telefono) is required in query parameters for conversation management.");
}
String resolvedUserId = null;
Object userIdObj = params.get("usuario_id");
if (userIdObj instanceof String) {
resolvedUserId = (String) userIdObj;
resolvedUserId = (String) userIdObj;
} else if (userIdObj != null) {
logger.warn("Parameter 'userId' in query_params is not a String (type: {}). Expected String.",
userIdObj.getClass().getName());
logger.warn("Parameter 'userId' in query_params is not a String (type: {}). Expected String.",
userIdObj.getClass().getName());
}
if (resolvedUserId == null || resolvedUserId.trim().isEmpty()) {
resolvedUserId = "user_by_phone_" + primaryPhoneNumber.replaceAll("[^0-9]", "");
logger.warn("User ID not provided in query parameters. Using derived ID from phone number: {}",
resolvedUserId);
resolvedUserId = "user_by_phone_" + primaryPhoneNumber.replaceAll("[^0-9]", "");
logger.warn("User ID not provided in query parameters. Using derived ID from phone number: {}",
resolvedUserId);
}
if (request.queryInput() == null || request.queryInput().text() == null ||
request.queryInput().text().text() == null || request.queryInput().text().text().trim().isEmpty()) {
throw new IllegalArgumentException("Dialogflow query input text is required.");
throw new IllegalArgumentException("Dialogflow query input text is required.");
}
String userMessageText = request.queryInput().text().text();
return new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber);
}
}
}