diff --git a/pom.xml b/pom.xml index 954f683..9b199f9 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,7 @@ com.google.cloud libraries-bom - 26.37.0 + 26.40.0 pom import diff --git a/src/main/java/com/example/mapper/notification/ExternalNotRequestMapper.java b/src/main/java/com/example/mapper/notification/ExternalNotRequestMapper.java index 3abe064..b852b6f 100644 --- a/src/main/java/com/example/mapper/notification/ExternalNotRequestMapper.java +++ b/src/main/java/com/example/mapper/notification/ExternalNotRequestMapper.java @@ -30,6 +30,7 @@ public class ExternalNotRequestMapper { private static final String LANGUAGE_CODE = "es"; private static final String TELEPHONE_PARAM_NAME = "telefono"; private static final String NOTIFICATION_TEXT_PARAM = "notification_text"; + private static final String NOTIFICATION_LABEL = "NOTIFICACION"; private static final String PREFIX_PO_PARAM = "notification_po_"; @@ -53,7 +54,7 @@ public class ExternalNotRequestMapper { }); } - TextInputDTO textInput = new TextInputDTO(request.text()); + TextInputDTO textInput = new TextInputDTO(NOTIFICATION_LABEL); QueryInputDTO queryInput = new QueryInputDTO(textInput, null, LANGUAGE_CODE); diff --git a/src/main/java/com/example/service/base/DialogflowClientService.java b/src/main/java/com/example/service/base/DialogflowClientService.java index b6e5412..6bf2b75 100644 --- a/src/main/java/com/example/service/base/DialogflowClientService.java +++ b/src/main/java/com/example/service/base/DialogflowClientService.java @@ -12,6 +12,8 @@ import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.exception.DialogflowClientException; import com.google.api.gax.rpc.ApiException; import com.google.cloud.dialogflow.cx.v3.DetectIntentRequest; +import com.google.cloud.dialogflow.cx.v3.PageName; +import com.google.cloud.dialogflow.cx.v3.QueryParameters; import com.google.cloud.dialogflow.cx.v3.SessionsClient; import com.google.cloud.dialogflow.cx.v3.SessionName; import com.google.cloud.dialogflow.cx.v3.SessionsSettings; @@ -38,6 +40,9 @@ public class DialogflowClientService { private final String dialogflowCxLocation; private final String dialogflowCxAgentId; + private static final String TARGET_FLOW_ID = "00000000-0000-0000-0000-000000000000"; + private static final String TARGET_PAGE_ID = "START_PAGE"; + private final DialogflowRequestMapper dialogflowRequestMapper; private final DialogflowResponseMapper dialogflowResponseMapper; private SessionsClient sessionsClient; @@ -106,6 +111,19 @@ public class DialogflowClientService { detectIntentRequestBuilder.setSession(sessionName.toString()); logger.debug("Set session path {} on the request builder for session: {}", sessionName.toString(), sessionId); + // Configure the query parameters to set the target flow + PageName pageName = PageName.of(dialogflowCxProjectId, dialogflowCxLocation, dialogflowCxAgentId, TARGET_FLOW_ID, TARGET_PAGE_ID); + + QueryParameters.Builder queryParamsBuilder; + if (detectIntentRequestBuilder.hasQueryParams()) { + queryParamsBuilder = detectIntentRequestBuilder.getQueryParams().toBuilder(); + } else { + queryParamsBuilder = QueryParameters.newBuilder(); + } + + queryParamsBuilder.setCurrentPage(pageName.toString()); + detectIntentRequestBuilder.setQueryParams(queryParamsBuilder.build()); + // Build the final DetectIntentRequest Protobuf object DetectIntentRequest detectIntentRequest = detectIntentRequestBuilder.build(); return Mono.fromCallable(() -> { diff --git a/src/main/java/com/example/service/conversation/ConversationManagerService.java b/src/main/java/com/example/service/conversation/ConversationManagerService.java index bb99653..ccbd165 100644 --- a/src/main/java/com/example/service/conversation/ConversationManagerService.java +++ b/src/main/java/com/example/service/conversation/ConversationManagerService.java @@ -33,8 +33,9 @@ import java.util.Optional; @Service public class ConversationManagerService { private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); - private static final long SESSION_RESET_THRESHOLD_HOURS = 24; - private static final String CURRENT_PAGE_PARAM = "currentPage"; + + private static final long SESSION_RESET_THRESHOLD_MINUTES = 30; + private static final String CONV_HISTORY_PARAM = "conversation_history"; private final ExternalConvRequestMapper externalRequestToDialogflowMapper; private final DialogflowClientService dialogflowServiceClient; private final FirestoreConversationService firestoreConversationService; @@ -116,31 +117,32 @@ public class ConversationManagerService { return handleMessageClassification(context, request); } private Mono handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request) { - final String userPhoneNumber = context.primaryPhoneNumber(); - final String userMessageText = context.userMessageText(); - return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) - .flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId)) - .map(notificationSession -> notificationSession.notificaciones().stream() - .filter(notification -> "active".equalsIgnoreCase(notification.status())) - .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) - .orElse(null)) - .filter(Objects::nonNull) - .flatMap((NotificationDTO notification) -> { - String notificationText = notificationContextMapper.toText(notification); - return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) - .map(conversationContextMapper::toText) - .defaultIfEmpty("") - .flatMap(conversationHistory -> { - String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory); - if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { - return startNotificationConversation(context, request, notification); - } else { - return continueConversationFlow(context, request); - } + final String userPhoneNumber = context.primaryPhoneNumber(); + final String userMessageText = context.userMessageText(); + + return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) + .map(conversationContextMapper::toText) + .defaultIfEmpty("") + .flatMap(conversationHistory -> { + return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) + .flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId)) + .map(notificationSession -> notificationSession.notificaciones().stream() + .filter(notification -> "active".equalsIgnoreCase(notification.status())) + .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) + .orElse(null)) + .filter(Objects::nonNull) + .flatMap((NotificationDTO notification) -> { + String notificationText = notificationContextMapper.toText(notification); + String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory); + if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { + return startNotificationConversation(context, request, notification); + } else { + return continueConversationFlow(context, request); + } + }) + .switchIfEmpty(continueConversationFlow(context, request)); }); - }) - .switchIfEmpty(continueConversationFlow(context, request)); -} + } private Mono continueConversationFlow(ConversationContext context, DetectIntentRequestDTO request) { final String userId = context.userId(); final String userMessageText = context.userMessageText(); @@ -153,16 +155,7 @@ public class ConversationManagerService { logger.info("Primary Check (MemoryStore): Looking up session for phone number: {}", userPhoneNumber); return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) - .flatMap(session -> { - Instant now = Instant.now(); - if (Duration.between(session.lastModified(), now).toHours() < SESSION_RESET_THRESHOLD_HOURS) { - logger.info("Recent Session Found: Session {} is within the 24-hour threshold. Proceeding to Dialogflow.", session.sessionId()); - return processDialogflowRequest(session, request, userId, userMessageText, userPhoneNumber, false); - } else { - logger.info("Old Session Found: Session {} is older than the threshold. Proceeding to full lookup.", session.sessionId()); - return fullLookupAndProcess(session, request, userId, userMessageText, userPhoneNumber); - } - }) + .flatMap(session -> handleMessageClassification(context, request, session)) .switchIfEmpty(Mono.defer(() -> { logger.info("No session found in MemoryStore. Performing full lookup to Firestore."); return fullLookupAndProcess(null, request, userId, userMessageText, userPhoneNumber); @@ -172,6 +165,43 @@ public class ConversationManagerService { return Mono.error(new RuntimeException("Failed to process conversation due to an internal error.", e)); }); } + + private Mono handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) { + final String userPhoneNumber = context.primaryPhoneNumber(); + final String userMessageText = context.userMessageText(); + + return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) + .flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId)) + .map(notificationSession -> notificationSession.notificaciones().stream() + .filter(notification -> "active".equalsIgnoreCase(notification.status())) + .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) + .orElse(null)) + .filter(Objects::nonNull) + .flatMap((NotificationDTO notification) -> { + String conversationHistory = conversationContextMapper.toText(session); + String notificationText = notificationContextMapper.toText(notification); + String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, conversationHistory); + if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { + return startNotificationConversation(context, request, notification); + } else { + return proceedWithConversation(context, request, session); + } + }) + .switchIfEmpty(proceedWithConversation(context, request, session)); + } + + private Mono proceedWithConversation(ConversationContext context, DetectIntentRequestDTO request, ConversationSessionDTO session) { + Instant now = Instant.now(); + if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { + logger.info("Recent Session Found: Session {} is within the 10-minute threshold. Proceeding to Dialogflow.", session.sessionId()); + return processDialogflowRequest(session, request, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false); + } else { + logger.info("Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.", session.sessionId()); + String conversationHistory = conversationContextMapper.toTextWithLimits(session); + DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); + return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), context.primaryPhoneNumber(), false); + } + } private Mono fullLookupAndProcess(ConversationSessionDTO oldSession, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) { return firestoreConversationService.getSessionByTelefono(userPhoneNumber) @@ -181,7 +211,7 @@ public class ConversationManagerService { String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.info("Creating new session {} after full lookup.", newSessionId); ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber); - DetectIntentRequestDTO newRequest = request.withParameter(CURRENT_PAGE_PARAM, conversationHistory); + DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, true); }); } @@ -218,12 +248,24 @@ public class ConversationManagerService { .flatMap(session -> { final String sessionId = session.sessionId(); ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText); - return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber) - .then(dialogflowServiceClient.detectIntent(sessionId, request) - .doOnSuccess(response -> { - ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); - memoryStoreNotificationService.saveEntry(userId, sessionId, agentEntry, userPhoneNumber).subscribe(); - })); + Instant now = Instant.now(); + if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { + return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber) + .then(dialogflowServiceClient.detectIntent(sessionId, request) + .doOnSuccess(response -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); + memoryStoreNotificationService.saveEntry(userId, sessionId, agentEntry, userPhoneNumber).subscribe(); + })); + } else { + String conversationHistory = conversationContextMapper.toTextWithLimits(session); + DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); + return memoryStoreNotificationService.saveEntry(userId, sessionId, userEntry, userPhoneNumber) + .then(dialogflowServiceClient.detectIntent(sessionId, newRequest) + .doOnSuccess(response -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); + memoryStoreNotificationService.saveEntry(userId, sessionId, agentEntry, userPhoneNumber).subscribe(); + })); + } }); } private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,String userPhoneNumber) { diff --git a/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java b/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java index 0b554af..3c53a72 100644 --- a/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java +++ b/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java @@ -26,7 +26,7 @@ public class MemoryStoreConversationService { private static final Logger logger = LoggerFactory.getLogger(MemoryStoreConversationService.class); private static final String SESSION_KEY_PREFIX = "conversation:session:"; private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:"; - private static final Duration SESSION_TTL = Duration.ofHours(24); + private static final Duration SESSION_TTL = Duration.ofDays(30); private final ReactiveRedisTemplate redisTemplate; private final ReactiveRedisTemplate stringRedisTemplate; @@ -49,23 +49,25 @@ public class MemoryStoreConversationService { String sessionKey = SESSION_KEY_PREFIX + sessionId; String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber; - logger.info("Attempting to save entry to Redis for session {}. Entity: {}", sessionId, newEntry.entity().name()); - return redisTemplate.opsForValue().get(sessionKey) - .defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber)) + .switchIfEmpty(Mono.defer(() -> { + logger.info("Creating new session {} in Redis with TTL.", sessionId); + ConversationSessionDTO newSession = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber); + return redisTemplate.opsForValue().set(sessionKey, newSession, SESSION_TTL) + .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL)) + .thenReturn(newSession); + })) .flatMap(session -> { ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber); ConversationSessionDTO sessionWithPantallaContexto = (pantallaContexto != null) ? sessionWithUpdatedTelefono.withPantallaContexto(pantallaContexto) : sessionWithUpdatedTelefono; ConversationSessionDTO updatedSession = sessionWithPantallaContexto.withAddedEntry(newEntry); - logger.info("Attempting to set updated session {} with new entry entity {} in Redis.", sessionId, newEntry.entity().name()); - - return redisTemplate.opsForValue().set(sessionKey, updatedSession, SESSION_TTL) - .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL)) + return redisTemplate.opsForValue().set(sessionKey, updatedSession) + .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId)) .then(); }) .doOnSuccess(success -> { @@ -97,6 +99,6 @@ public class MemoryStoreConversationService { public Mono updateSession(ConversationSessionDTO session) { String sessionKey = SESSION_KEY_PREFIX + session.sessionId(); logger.info("Attempting to update session {} in Redis.", session.sessionId()); - return redisTemplate.opsForValue().set(sessionKey, session, SESSION_TTL).then(); + return redisTemplate.opsForValue().set(sessionKey, session).then(); } } \ No newline at end of file diff --git a/src/main/java/com/example/service/notification/MemoryStoreNotificationService.java b/src/main/java/com/example/service/notification/MemoryStoreNotificationService.java index e90b0b7..1277772 100644 --- a/src/main/java/com/example/service/notification/MemoryStoreNotificationService.java +++ b/src/main/java/com/example/service/notification/MemoryStoreNotificationService.java @@ -33,7 +33,7 @@ public class MemoryStoreNotificationService { private static final String PHONE_TO_NOTIFICATION_SESSION_KEY_PREFIX = "notification:phone_to_notification:"; private static final String CONVERSATION_SESSION_KEY_PREFIX = "conversation-notification:session:"; private static final String PHONE_TO_CONVERSATION_SESSION_KEY_PREFIX = "conversation-notification:phone_to_session:"; - private final Duration notificationTtl = Duration.ofMinutes(5); + private final Duration notificationTtl = Duration.ofDays(30); public MemoryStoreNotificationService( ReactiveRedisTemplate notificationRedisTemplate, diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 1294eed..df3ccfd 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -30,8 +30,6 @@ spring.cloud.gcp.firestore.port=${GCP_FIRESTORE_PORT} # ========================================================= spring.data.redis.host=${REDIS_HOST} spring.data.redis.port=${REDIS_PORT} -spring.redis.jedis.pool.enabled=true -spring.redis.notify-keyspace-events=Ex #spring.data.redis.password=23cb4c76-9d96-4c74-b8c0-778fb364877a #spring.data.redis.username=default @@ -67,4 +65,9 @@ google.cloud.dlp.dlpTemplateCompleteFlow=${DLP_TEMPLATE_COMPLETE_FLOW} # ========================================================= # Quick-replies Preset-data # ========================================================= -firestore.data.importer.enabled=true \ No newline at end of file +firestore.data.importer.enabled=true +# ========================================================= +# LOGGING Configuration +# ========================================================= +logging.level.root=INFO +logging.level.com.example=INFO \ No newline at end of file