UPDATE FixPack 1.5 06-sept

This commit is contained in:
PAVEL PALMA
2025-09-07 10:40:20 -06:00
parent 37c1b31b7c
commit c92178e925
7 changed files with 124 additions and 58 deletions

View File

@@ -12,6 +12,8 @@ import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.exception.DialogflowClientException;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.dialogflow.cx.v3.DetectIntentRequest;
import com.google.cloud.dialogflow.cx.v3.PageName;
import com.google.cloud.dialogflow.cx.v3.QueryParameters;
import com.google.cloud.dialogflow.cx.v3.SessionsClient;
import com.google.cloud.dialogflow.cx.v3.SessionName;
import com.google.cloud.dialogflow.cx.v3.SessionsSettings;
@@ -38,6 +40,9 @@ public class DialogflowClientService {
private final String dialogflowCxLocation;
private final String dialogflowCxAgentId;
private static final String TARGET_FLOW_ID = "00000000-0000-0000-0000-000000000000";
private static final String TARGET_PAGE_ID = "START_PAGE";
private final DialogflowRequestMapper dialogflowRequestMapper;
private final DialogflowResponseMapper dialogflowResponseMapper;
private SessionsClient sessionsClient;
@@ -106,6 +111,19 @@ public class DialogflowClientService {
detectIntentRequestBuilder.setSession(sessionName.toString());
logger.debug("Set session path {} on the request builder for session: {}", sessionName.toString(), sessionId);
// Configure the query parameters to set the target flow
PageName pageName = PageName.of(dialogflowCxProjectId, dialogflowCxLocation, dialogflowCxAgentId, TARGET_FLOW_ID, TARGET_PAGE_ID);
QueryParameters.Builder queryParamsBuilder;
if (detectIntentRequestBuilder.hasQueryParams()) {
queryParamsBuilder = detectIntentRequestBuilder.getQueryParams().toBuilder();
} else {
queryParamsBuilder = QueryParameters.newBuilder();
}
queryParamsBuilder.setCurrentPage(pageName.toString());
detectIntentRequestBuilder.setQueryParams(queryParamsBuilder.build());
// Build the final DetectIntentRequest Protobuf object
DetectIntentRequest detectIntentRequest = detectIntentRequestBuilder.build();
return Mono.fromCallable(() -> {

View File

@@ -33,8 +33,9 @@ import java.util.Optional;
@Service
public class ConversationManagerService {
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
private static final long SESSION_RESET_THRESHOLD_HOURS = 24;
private static final String CURRENT_PAGE_PARAM = "currentPage";
private static final long SESSION_RESET_THRESHOLD_MINUTES = 30;
private static final String CONV_HISTORY_PARAM = "conversation_history";
private final ExternalConvRequestMapper externalRequestToDialogflowMapper;
private final DialogflowClientService dialogflowServiceClient;
private final FirestoreConversationService firestoreConversationService;
@@ -116,31 +117,32 @@ public class ConversationManagerService {
return handleMessageClassification(context, request);
}
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String notificationText = notificationContextMapper.toText(notification);
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toText)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return continueConversationFlow(context, request);
}
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toText)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return continueConversationFlow(context, request);
}
})
.switchIfEmpty(continueConversationFlow(context, request));
});
})
.switchIfEmpty(continueConversationFlow(context, request));
}
}
private Mono<DetectIntentResponseDTO> continueConversationFlow(ConversationContext context, DetectIntentRequestDTO request) {
final String userId = context.userId();
final String userMessageText = context.userMessageText();
@@ -153,16 +155,7 @@ public class ConversationManagerService {
logger.info("Primary Check (MemoryStore): Looking up session for phone number: {}", userPhoneNumber);
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.flatMap(session -> {
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toHours() < SESSION_RESET_THRESHOLD_HOURS) {
logger.info("Recent Session Found: Session {} is within the 24-hour threshold. Proceeding to Dialogflow.", session.sessionId());
return processDialogflowRequest(session, request, userId, userMessageText, userPhoneNumber, false);
} else {
logger.info("Old Session Found: Session {} is older than the threshold. Proceeding to full lookup.", session.sessionId());
return fullLookupAndProcess(session, request, userId, userMessageText, userPhoneNumber);
}
})
.flatMap(session -> handleMessageClassification(context, request, session))
.switchIfEmpty(Mono.defer(() -> {
logger.info("No session found in MemoryStore. Performing full lookup to Firestore.");
return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber);
@@ -172,6 +165,43 @@ public class ConversationManagerService {
return Mono.error(new RuntimeException("Failed to process conversation due to an internal error.", e));
});
}
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) {
final String userPhoneNumber = context.primaryPhoneNumber();
final String userMessageText = context.userMessageText();
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
.map(notificationSession -> notificationSession.notificaciones().stream()
.filter(notification -> "active".equalsIgnoreCase(notification.status()))
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.orElse(null))
.filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> {
String conversationHistory = conversationContextMapper.toText(session);
String notificationText = notificationContextMapper.toText(notification);
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory);
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
return startNotificationConversation(context, request, notification);
} else {
return proceedWithConversation(context, request, session);
}
})
.switchIfEmpty(proceedWithConversation(context, request, session));
}
private Mono<DetectIntentResponseDTO> proceedWithConversation(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) {
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.", session.sessionId());
return processDialogflowRequest(session, request, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false);
} else {
logger.info("Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.", session.sessionId());
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false);
}
}
private Mono<DetectIntentResponseDTO> fullLookupAndProcess(ConversationSessionDTO oldSession, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) {
return firestoreConversationService.getSessionByTelefono(userPhoneNumber)
@@ -181,7 +211,7 @@ public class ConversationManagerService {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("Creating new session {} after full lookup.", newSessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber);
DetectIntentRequestDTO newRequest = request.withParameter(CURRENT_PAGE_PARAM, conversationHistory);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, true);
});
}
@@ -218,12 +248,24 @@ public class ConversationManagerService {
.flatMap(session -> {
final String sessionId = session.sessionId();
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, request)
.doOnSuccess(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
memoryStoreNotificationService.saveEntry(userId, sessionId, agentEntry, userPhoneNumber).subscribe();
}));
Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, request)
.doOnSuccess(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
memoryStoreNotificationService.saveEntry(userId, sessionId, agentEntry, userPhoneNumber).subscribe();
}));
} else {
String conversationHistory = conversationContextMapper.toTextWithLimits(session);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber)
.then(dialogflowServiceClient.detectIntent(sessionId, newRequest)
.doOnSuccess(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
memoryStoreNotificationService.saveEntry(userId, sessionId, agentEntry, userPhoneNumber).subscribe();
}));
}
});
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,String userPhoneNumber) {

View File

@@ -26,7 +26,7 @@ public class MemoryStoreConversationService {
private static final Logger logger = LoggerFactory.getLogger(MemoryStoreConversationService.class);
private static final String SESSION_KEY_PREFIX = "conversation:session:";
private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:";
private static final Duration SESSION_TTL = Duration.ofHours(24);
private static final Duration SESSION_TTL = Duration.ofDays(30);
private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate;
private final ReactiveRedisTemplate<String, String> stringRedisTemplate;
@@ -49,23 +49,25 @@ public class MemoryStoreConversationService {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber;
logger.info("Attempting to save entry to Redis for session {}. Entity: {}", sessionId, newEntry.entity().name());
return redisTemplate.opsForValue().get(sessionKey)
.defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber))
.switchIfEmpty(Mono.defer(() -> {
logger.info("Creating new session {} in Redis with TTL.", sessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber);
return redisTemplate.opsForValue().set(sessionKey, newSession, SESSION_TTL)
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL))
.thenReturn(newSession);
}))
.flatMap(session -> {
ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber);
ConversationSessionDTO sessionWithPantallaContexto = (pantallaContexto != null) ? sessionWithUpdatedTelefono.withPantallaContexto(pantallaContexto) : sessionWithUpdatedTelefono;
ConversationSessionDTO updatedSession = sessionWithPantallaContexto.withAddedEntry(newEntry);
logger.info("Attempting to set updated session {} with new entry entity {} in Redis.", sessionId, newEntry.entity().name());
return redisTemplate.opsForValue().set(sessionKey, updatedSession, SESSION_TTL)
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL))
return redisTemplate.opsForValue().set(sessionKey, updatedSession)
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId))
.then();
})
.doOnSuccess(success -> {
@@ -97,6 +99,6 @@ public class MemoryStoreConversationService {
public Mono<Void> updateSession(ConversationSessionDTO session) {
String sessionKey = SESSION_KEY_PREFIX + session.sessionId();
logger.info("Attempting to update session {} in Redis.", session.sessionId());
return redisTemplate.opsForValue().set(sessionKey, session, SESSION_TTL).then();
return redisTemplate.opsForValue().set(sessionKey, session).then();
}
}

View File

@@ -33,7 +33,7 @@ public class MemoryStoreNotificationService {
private static final String PHONE_TO_NOTIFICATION_SESSION_KEY_PREFIX = "notification:phone_to_notification:";
private static final String CONVERSATION_SESSION_KEY_PREFIX = "conversation-notification:session:";
private static final String PHONE_TO_CONVERSATION_SESSION_KEY_PREFIX = "conversation-notification:phone_to_session:";
private final Duration notificationTtl = Duration.ofMinutes(5);
private final Duration notificationTtl = Duration.ofDays(30);
public MemoryStoreNotificationService(
ReactiveRedisTemplate<String, NotificationSessionDTO> notificationRedisTemplate,