FIX BUG_679

This commit is contained in:
PAVEL PALMA
2025-10-23 10:11:20 -06:00
parent 6448d88b7a
commit 4169858861
2 changed files with 204 additions and 215 deletions

View File

@@ -301,83 +301,82 @@ public class ConversationManagerService {
} }
public Mono<DetectIntentResponseDTO> startNotificationConversation(ConversationContext context, public Mono<DetectIntentResponseDTO> startNotificationConversation(ConversationContext context,
DetectIntentRequestDTO request, NotificationDTO notification) { DetectIntentRequestDTO request, NotificationDTO notification) {
final String userId = context.userId(); final String userId = context.userId();
final String userMessageText = context.userMessageText(); final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber(); final String userPhoneNumber = context.primaryPhoneNumber();
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.warn("No existing conversation session found for notification reply on phone {}. This is unexpected. Creating new session: {}",
userPhoneNumber, newSessionId);
return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber));
}))
.flatMap(session -> {
final String sessionId = session.sessionId();
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
String notificationText = notificationContextMapper.toText(notification);
return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber) Map<String, Object> filteredParams = notification.parametros().entrySet().stream()
.switchIfEmpty(Mono.defer(() -> { .filter(entry -> entry.getKey().startsWith("notification_po_"))
String newSessionId = SessionIdGenerator.generateStandardSessionId(); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
logger.info("No existing notification session found for phone number {}. Creating new session: {}",
userPhoneNumber, newSessionId);
return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber));
}))
.flatMap(session -> {
final String sessionId = session.sessionId();
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
String notificationText = notificationContextMapper.toText(notification);
Map<String, Object> filteredParams = notification.parametros().entrySet().stream() String resolvedContext = notificationContextResolver.resolveContext(userMessageText,
.filter(entry -> entry.getKey().startsWith("notification_po_")) notificationText, conversationHistory, filteredParams.toString(), userId, sessionId,
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); userPhoneNumber);
String resolvedContext = notificationContextResolver.resolveContext(userMessageText, if (!resolvedContext.trim().toUpperCase().contains(NotificationContextResolver.CATEGORY_DIALOGFLOW)) {
notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, String uuid = UUID.randomUUID().toString();
userPhoneNumber); llmResponseTunerService.setValue(uuid, resolvedContext).subscribe();
if (!NotificationContextResolver.CATEGORY_DIALOGFLOW.equals(resolvedContext)) { ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
String uuid = UUID.randomUUID().toString(); notification.parametros());
llmResponseTunerService.setValue(uuid, resolvedContext).subscribe(); ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext,
notification.parametros());
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber)
notification.parametros()); .then(persistConversationTurn(userId, sessionId, llmEntry, userPhoneNumber))
ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, .then(Mono.defer(() -> {
notification.parametros()); EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED");
QueryInputDTO queryInput = new QueryInputDTO(null, eventInput,
request.queryInput().languageCode());
DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput,
request.queryParams())
.withParameter("llm_reponse_uuid", uuid);
return dialogflowServiceClient.detectIntent(sessionId, newRequest)
.flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult());
return persistConversationTurn(userId, sessionId, agentEntry,
userPhoneNumber)
.thenReturn(response);
});
}));
} else {
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros());
return persistNotificationTurn(userId, sessionId, userEntry, userPhoneNumber) DetectIntentRequestDTO finalRequest;
.then(persistNotificationTurn(userId, sessionId, llmEntry, userPhoneNumber)) Instant now = Instant.now();
.then(Mono.defer(() -> { if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED"); finalRequest = request.withParameters(notification.parametros());
QueryInputDTO queryInput = new QueryInputDTO(null, eventInput, } else {
request.queryInput().languageCode()); finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory)
DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput, .withParameters(notification.parametros());
request.queryParams()) }
.withParameter("llm_reponse_uuid", uuid); return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber)
return dialogflowServiceClient.detectIntent(sessionId, newRequest) .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest)
.flatMap(response -> { .flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult()); .forAgent(response.queryResult());
return persistNotificationTurn(userId, sessionId, agentEntry, return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber)
userPhoneNumber) .thenReturn(response);
.thenReturn(response); }));
}); }
})); });
} }
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros());
DetectIntentRequestDTO finalRequest;
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
finalRequest = request.withParameters(notification.parametros());
} else {
finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory)
.withParameters(notification.parametros());
}
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, finalRequest)
.flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult());
return memoryStoreNotificationService
.saveEntry(userId, sessionId, agentEntry, userPhoneNumber)
.thenReturn(response);
}));
});
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) { String userPhoneNumber) {
@@ -397,19 +396,4 @@ public class ConversationManagerService {
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e)); entry.type().name(), e.getMessage(), e));
} }
private Mono<Void> persistNotificationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for notification session {}. Type: {}. Writing to Redis first.",
sessionId,
entry.type().name());
return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> logger.info(
"Entry saved to Redis for notification session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()))
.doOnError(e -> logger.error(
"Error during primary Redis write for notification session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
} }

View File

@@ -14,13 +14,14 @@ import com.example.dto.dialogflow.notification.NotificationDTO;
import com.example.mapper.notification.ExternalNotRequestMapper; import com.example.mapper.notification.ExternalNotRequestMapper;
import com.example.service.base.DialogflowClientService; import com.example.service.base.DialogflowClientService;
import com.example.service.conversation.DataLossPrevention; import com.example.service.conversation.DataLossPrevention;
import com.example.service.conversation.FirestoreConversationService;
import com.example.service.conversation.MemoryStoreConversationService;
import com.example.util.SessionIdGenerator; import com.example.util.SessionIdGenerator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.time.Instant; import java.time.Instant;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -29,142 +30,146 @@ import java.util.Objects;
@Service @Service
public class NotificationManagerService { public class NotificationManagerService {
private static final Logger logger = LoggerFactory.getLogger(NotificationManagerService.class); private static final Logger logger = LoggerFactory.getLogger(NotificationManagerService.class);
private static final String eventName = "notificacion"; private static final String eventName = "notificacion";
private static final String PREFIX_PO_PARAM = "notification_po_"; private static final String PREFIX_PO_PARAM = "notification_po_";
private final DialogflowClientService dialogflowClientService; private final DialogflowClientService dialogflowClientService;
private final FirestoreNotificationService firestoreNotificationService; private final FirestoreNotificationService firestoreNotificationService;
private final MemoryStoreNotificationService memoryStoreNotificationService; private final MemoryStoreNotificationService memoryStoreNotificationService;
private final FirestoreNotificationConvService firestoreConversationService; private final ExternalNotRequestMapper externalNotRequestMapper;
private final ExternalNotRequestMapper externalNotRequestMapper; private final MemoryStoreConversationService memoryStoreConversationService;
private final FirestoreConversationService firestoreConversationService;
private final DataLossPrevention dataLossPrevention;
private final String dlpTemplateCompleteFlow;
private final DataLossPrevention dataLossPrevention; @Value("${dialogflow.default-language-code:es}")
private final String dlpTemplateCompleteFlow; private String defaultLanguageCode;
@Value("${dialogflow.default-language-code:es}") public NotificationManagerService(
private String defaultLanguageCode; DialogflowClientService dialogflowClientService,
FirestoreNotificationService firestoreNotificationService,
MemoryStoreNotificationService memoryStoreNotificationService,
MemoryStoreConversationService memoryStoreConversationService,
FirestoreConversationService firestoreConversationService,
ExternalNotRequestMapper externalNotRequestMapper,
DataLossPrevention dataLossPrevention,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
this.dialogflowClientService = dialogflowClientService;
this.firestoreNotificationService = firestoreNotificationService;
this.memoryStoreNotificationService = memoryStoreNotificationService;
this.externalNotRequestMapper = externalNotRequestMapper;
this.dataLossPrevention = dataLossPrevention;
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
this.memoryStoreConversationService = memoryStoreConversationService;
this.firestoreConversationService = firestoreConversationService;
}
public NotificationManagerService( public Mono<DetectIntentResponseDTO> processNotification(ExternalNotRequestDTO externalRequest) {
DialogflowClientService dialogflowClientService, Objects.requireNonNull(externalRequest, "ExternalNotRequestDTO cannot be null.");
FirestoreNotificationService firestoreNotificationService,
MemoryStoreNotificationService memoryStoreNotificationService,
FirestoreNotificationConvService firestoreConversationService,
ExternalNotRequestMapper externalNotRequestMapper,
DataLossPrevention dataLossPrevention,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
this.dialogflowClientService = dialogflowClientService;
this.firestoreNotificationService = firestoreNotificationService;
this.memoryStoreNotificationService = memoryStoreNotificationService;
this.firestoreConversationService = firestoreConversationService;
this.externalNotRequestMapper = externalNotRequestMapper;
this.dataLossPrevention = dataLossPrevention;
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
}
public Mono<DetectIntentResponseDTO> processNotification(ExternalNotRequestDTO externalRequest) { String telefono = externalRequest.phoneNumber();
Objects.requireNonNull(externalRequest, "ExternalNotRequestDTO cannot be null."); if (telefono == null || telefono.isBlank()) {
logger.warn("No phone number provided in ExternalNotRequestDTO. Cannot process notification.");
String telefono = externalRequest.phoneNumber(); return Mono.error(new IllegalArgumentException("Phone number is required."));
if (telefono == null || telefono.isBlank()) { }
logger.warn("No phone number provided in ExternalNotRequestDTO. Cannot process notification.");
return Mono.error(new IllegalArgumentException("Phone number is required."));
}
return dataLossPrevention.getObfuscatedString(externalRequest.text(), dlpTemplateCompleteFlow) return dataLossPrevention.getObfuscatedString(externalRequest.text(), dlpTemplateCompleteFlow)
.flatMap(obfuscatedMessage -> { .flatMap(obfuscatedMessage -> {
ExternalNotRequestDTO obfuscatedRequest = new ExternalNotRequestDTO( ExternalNotRequestDTO obfuscatedRequest = new ExternalNotRequestDTO(
obfuscatedMessage, obfuscatedMessage,
externalRequest.phoneNumber(), externalRequest.phoneNumber(),
externalRequest.hiddenParameters() externalRequest.hiddenParameters()
); );
// 1. Persist the incoming notification entry
String newNotificationId = SessionIdGenerator.generateStandardSessionId();
Map<String, Object> parameters = new HashMap<>();
if (obfuscatedRequest.hiddenParameters() != null) {
obfuscatedRequest.hiddenParameters().forEach((key, value) -> parameters.put(PREFIX_PO_PARAM + key, value));
}
NotificationDTO newNotificationEntry = new NotificationDTO(newNotificationId, telefono, Instant.now(), String newNotificationId = SessionIdGenerator.generateStandardSessionId();
obfuscatedRequest.text(), eventName, defaultLanguageCode, parameters, "active"); Map<String, Object> parameters = new HashMap<>();
Mono<Void> persistenceMono = memoryStoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) if (obfuscatedRequest.hiddenParameters() != null) {
.doOnSuccess(v -> { obfuscatedRequest.hiddenParameters().forEach((key, value) -> parameters.put(PREFIX_PO_PARAM + key, value));
logger.info("Notification for phone cached. Kicking off async Firestore write-back."); }
firestoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry)
.subscribe(
ignored -> logger.debug(
"Background: Notification entry persistence initiated for phone in Firestore."),
e -> logger.error(
"Background: Error during notification entry persistence for phone in Firestore: {}", e.getMessage(), e));
});
// 2. Resolve or create a conversation session NotificationDTO newNotificationEntry = new NotificationDTO(newNotificationId, telefono, Instant.now(),
Mono<ConversationSessionDTO> sessionMono = memoryStoreNotificationService.getSessionByTelefono(telefono) obfuscatedRequest.text(), eventName, defaultLanguageCode, parameters, "active");
.doOnNext(session -> logger.info("Found existing conversation session {} for phone number", Mono<Void> persistenceMono = memoryStoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry)
session.sessionId())) .doOnSuccess(v -> {
.flatMap(session -> { logger.info("Notification for phone {} cached. Kicking off async Firestore write-back.", telefono);
Map<String, Object> prefixedParameters = new HashMap<>(); firestoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry)
if (obfuscatedRequest.hiddenParameters() != null) { .subscribe(
obfuscatedRequest.hiddenParameters() ignored -> logger.debug(
.forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value)); "Background: Notification entry persistence initiated for phone {} in Firestore.", telefono),
} e -> logger.error(
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), "Background: Error during notification entry persistence for phone {} in Firestore: {}",
prefixedParameters); telefono, e.getMessage(), e));
return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono)
.thenReturn(session);
})
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing conversation session found for phone number. Creating new session: {}", newSessionId);
String userId = "user_by_phone_" + telefono;
Map<String, Object> prefixedParameters = new HashMap<>();
if (obfuscatedRequest.hiddenParameters() != null) {
obfuscatedRequest.hiddenParameters()
.forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value));
}
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(),
prefixedParameters);
return persistConversationTurn(userId, newSessionId, systemEntry, telefono)
.then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono)));
}));
// 3. Send notification text to Dialogflow using the resolved conversation
// session
return persistenceMono.then(sessionMono)
.flatMap(session -> {
final String sessionId = session.sessionId();
logger.info("Sending notification text to Dialogflow using conversation session: {}", sessionId);
DetectIntentRequestDTO detectIntentRequest = externalNotRequestMapper.map(obfuscatedRequest);
return dialogflowClientService.detectIntent(sessionId, detectIntentRequest);
})
.doOnSuccess(response -> logger
.info("Finished processing notification. Dialogflow response received for phone"))
.doOnError(e -> logger.error("Overall error in NotificationManagerService: {}", e.getMessage(), e));
}); });
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, Mono<ConversationSessionDTO> sessionMono = memoryStoreConversationService.getSessionByTelefono(telefono)
String userPhoneNumber) { .doOnNext(session -> logger.info("Found existing conversation session {} for phone number {}",
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, session.sessionId(), telefono))
entry.type().name()); .flatMap(session -> {
Map<String, Object> prefixedParameters = new HashMap<>();
if (obfuscatedRequest.hiddenParameters() != null) {
obfuscatedRequest.hiddenParameters()
.forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value));
}
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(),
prefixedParameters);
return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono)
.thenReturn(session);
})
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing conversation session found for phone number {}. Creating new session: {}",
telefono, newSessionId);
String userId = "user_by_phone_" + telefono;
Map<String, Object> prefixedParameters = new HashMap<>();
if (obfuscatedRequest.hiddenParameters() != null) {
obfuscatedRequest.hiddenParameters()
.forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value));
}
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(),
prefixedParameters);
return persistConversationTurn(userId, newSessionId, systemEntry, telefono)
.then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono)));
}));
return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber) return persistenceMono.then(sessionMono)
.doOnSuccess(v -> { .flatMap(session -> {
logger.info( final String sessionId = session.sessionId();
"Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", logger.info("Sending notification text to Dialogflow using conversation session: {}", sessionId);
sessionId, entry.type().name());
firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) DetectIntentRequestDTO detectIntentRequest = externalNotRequestMapper.map(obfuscatedRequest);
.subscribe(
fsVoid -> logger.debug( return dialogflowClientService.detectIntent(sessionId, detectIntentRequest);
"Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", })
sessionId, entry.type().name()), .doOnSuccess(response -> logger
fsError -> logger.error( .info("Finished processing notification. Dialogflow response received for phone {}.", telefono))
"Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", .doOnError(e -> logger.error("Overall error in NotificationManagerService: {}", e.getMessage(), e));
sessionId, entry.type().name(), fsError.getMessage(), fsError)); });
}) }
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e)); private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
} String userPhoneNumber) {
} logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId,
entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> {
logger.info(
"Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name());
firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.subscribe(
fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
sessionId, entry.type().name()),
fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError));
})
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
}