diff --git a/src/main/java/com/example/service/conversation/ConversationHistoryService.java b/src/main/java/com/example/service/conversation/ConversationHistoryService.java new file mode 100644 index 0000000..57096de --- /dev/null +++ b/src/main/java/com/example/service/conversation/ConversationHistoryService.java @@ -0,0 +1,78 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.service.conversation; + +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.domain.Range; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +/** +Service for managing the lifecycle and data hygiene of conversation histories stored in MemoryStore. +It encapsulates the logic for pruning conversation logs to enforce data retention policies. +Its primary function, pruneHistory, operates on a Redis Sorted Set (ZSET) for a given session, +performing two main tasks: + +1) removing all messages older than a configurable time limit (e.g., 30 days) +based on their timestamp score, + +2) trimming the remaining set to a maximum message count +(e.g., 60) by removing the oldest entries, all within a reactive programming context. +*/ +@Service +public class ConversationHistoryService { + + private static final Logger logger = LoggerFactory.getLogger(ConversationHistoryService.class); + + private static final String MESSAGES_KEY_PREFIX = "conversation:messages:"; + + private final ReactiveRedisTemplate messageRedisTemplate; + + @Value("${conversation.context.message.limit:60}") + private int messageLimit; + + @Value("${conversation.context.days.limit:30}") + private int daysLimit; + + @Autowired + public ConversationHistoryService(ReactiveRedisTemplate messageRedisTemplate) { + this.messageRedisTemplate = messageRedisTemplate; + } + + public Mono pruneHistory(String sessionId) { + logger.info("Pruning history for sessionId: {}", sessionId); + String messagesKey = MESSAGES_KEY_PREFIX + sessionId; + + Instant cutoff = Instant.now().minus(daysLimit, ChronoUnit.DAYS); + Range scoreRange = Range.of(Range.Bound.inclusive(0d), Range.Bound.inclusive((double) cutoff.toEpochMilli())); + logger.info("Removing messages older than {} for sessionId: {}", cutoff, sessionId); + Mono removeByScore = messageRedisTemplate.opsForZSet().removeRangeByScore(messagesKey, scoreRange) + .doOnSuccess(count -> logger.info("Removed {} old messages for sessionId: {}", count, sessionId)); + + + Mono trimToSize = messageRedisTemplate.opsForZSet().size(messagesKey) + .flatMap(size -> { + if (size > messageLimit) { + logger.info("Current message count {} exceeds limit {} for sessionId: {}. Trimming...", size, messageLimit, sessionId); + Range rankRange = Range.of(Range.Bound.inclusive(0L), Range.Bound.inclusive(size - messageLimit - 1)); + return messageRedisTemplate.opsForZSet().removeRange(messagesKey, rankRange) + .doOnSuccess(count -> logger.info("Trimmed {} messages for sessionId: {}", count, sessionId)); + } + return Mono.just(0L); + }); + return removeByScore.then(trimToSize).then() + .doOnSuccess(v -> logger.info("Successfully pruned history for sessionId: {}", sessionId)) + .doOnError(e -> logger.error("Error pruning history for sessionId: {}", sessionId, e)); + } +} \ No newline at end of file diff --git a/src/main/java/com/example/service/conversation/ConversationManagerService.java b/src/main/java/com/example/service/conversation/ConversationManagerService.java index 81e327d..da10103 100644 --- a/src/main/java/com/example/service/conversation/ConversationManagerService.java +++ b/src/main/java/com/example/service/conversation/ConversationManagerService.java @@ -35,6 +35,29 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; +/** +Service acting as the central orchestrator for managing user conversations. +It integrates Data Loss Prevention (DLP) for message obfuscation, multi-stage routing, +hybrid AI logic, and a reactive write-back persistence layer for conversation history. + +Routes traffic based on session context: +If a 'pantallaContexto' (screen context) is present, it delegates to the QuickRepliesManagerService. +Otherwise, it uses a Gemini-based MessageEntryFilter to classify the message against +active notifications and history, routing to one of two main flows: +a) Standard Conversation (proceedWithConversation): Handles regular dialogue, + +managing 30-minute session timeouts and injecting conversation history parameter to Dialogflow. + +b) Notifications (startNotificationConversation): +It first asks a Gemini model (NotificationContextResolver) if it can answer the +query. If yes, it saves the LLM's response and sends an 'LLM_RESPONSE_PROCESSED' +event to Dialogflow. If no ("DIALOGFLOW"), it sends the user's original text +to Dialogflow for intent matching. + +All conversation turns (user, agent, and LLM) are persisted using a reactive write-back +cache pattern, saving to Memorystore (Redis) first and then asynchronously to a +Firestore subcollection data model (persistConversationTurn). +*/ @Service public class ConversationManagerService { private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); diff --git a/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java b/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java index ae57eef..f731c53 100644 --- a/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java +++ b/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java @@ -9,6 +9,7 @@ import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Range; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.stereotype.Service; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; @@ -16,6 +17,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; + @Service public class MemoryStoreConversationService { private static final Logger logger = LoggerFactory.getLogger(MemoryStoreConversationService.class); @@ -26,22 +28,27 @@ public class MemoryStoreConversationService { private final ReactiveRedisTemplate redisTemplate; private final ReactiveRedisTemplate stringRedisTemplate; private final ReactiveRedisTemplate messageRedisTemplate; + private final ConversationHistoryService conversationHistoryService; @Autowired public MemoryStoreConversationService( ReactiveRedisTemplate redisTemplate, ReactiveRedisTemplate stringRedisTemplate, - ReactiveRedisTemplate messageRedisTemplate) { + ReactiveRedisTemplate messageRedisTemplate, + ConversationHistoryService conversationHistoryService) { this.redisTemplate = redisTemplate; this.stringRedisTemplate = stringRedisTemplate; this.messageRedisTemplate = messageRedisTemplate; + this.conversationHistoryService = conversationHistoryService; } public Mono saveMessage(String sessionId, ConversationMessageDTO message) { String messagesKey = MESSAGES_KEY_PREFIX + sessionId; - return messageRedisTemplate.opsForList().rightPush(messagesKey, message).then(); + double score = message.timestamp().toEpochMilli(); + return messageRedisTemplate.opsForZSet().add(messagesKey, message, score) + .then(conversationHistoryService.pruneHistory(sessionId)); } public Mono saveSession(ConversationSessionDTO session) { @@ -54,7 +61,7 @@ public class MemoryStoreConversationService { public Flux getMessages(String sessionId) { String messagesKey = MESSAGES_KEY_PREFIX + sessionId; - return messageRedisTemplate.opsForList().range(messagesKey, 0, -1); + return messageRedisTemplate.opsForZSet().range(messagesKey, Range.of(Range.Bound.inclusive(0L), Range.Bound.inclusive(-1L))); } public Mono getSessionByTelefono(String telefono) { @@ -91,10 +98,10 @@ public class MemoryStoreConversationService { String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + session.telefono(); return redisTemplate.opsForValue().delete(sessionKey) .then(stringRedisTemplate.opsForValue().delete(phoneToSessionKey)) - .then(messageRedisTemplate.opsForList().delete(messagesKey)); + .then(messageRedisTemplate.delete(messagesKey)); } else { return redisTemplate.opsForValue().delete(sessionKey) - .then(messageRedisTemplate.opsForList().delete(messagesKey)); + .then(messageRedisTemplate.delete(messagesKey)); } }).then(); }