UPDATE 30-Oct
This commit is contained in:
@@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
|
||||
* Your use of it is subject to your agreement with Google.
|
||||
*/
|
||||
|
||||
package com.example.service.conversation;
|
||||
|
||||
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.core.ReactiveRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
/**
|
||||
Service for managing the lifecycle and data hygiene of conversation histories stored in MemoryStore.
|
||||
It encapsulates the logic for pruning conversation logs to enforce data retention policies.
|
||||
Its primary function, pruneHistory, operates on a Redis Sorted Set (ZSET) for a given session,
|
||||
performing two main tasks:
|
||||
|
||||
1) removing all messages older than a configurable time limit (e.g., 30 days)
|
||||
based on their timestamp score,
|
||||
|
||||
2) trimming the remaining set to a maximum message count
|
||||
(e.g., 60) by removing the oldest entries, all within a reactive programming context.
|
||||
*/
|
||||
@Service
|
||||
public class ConversationHistoryService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConversationHistoryService.class);
|
||||
|
||||
private static final String MESSAGES_KEY_PREFIX = "conversation:messages:";
|
||||
|
||||
private final ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate;
|
||||
|
||||
@Value("${conversation.context.message.limit:60}")
|
||||
private int messageLimit;
|
||||
|
||||
@Value("${conversation.context.days.limit:30}")
|
||||
private int daysLimit;
|
||||
|
||||
@Autowired
|
||||
public ConversationHistoryService(ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate) {
|
||||
this.messageRedisTemplate = messageRedisTemplate;
|
||||
}
|
||||
|
||||
public Mono<Void> pruneHistory(String sessionId) {
|
||||
logger.info("Pruning history for sessionId: {}", sessionId);
|
||||
String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
|
||||
|
||||
Instant cutoff = Instant.now().minus(daysLimit, ChronoUnit.DAYS);
|
||||
Range<Double> scoreRange = Range.of(Range.Bound.inclusive(0d), Range.Bound.inclusive((double) cutoff.toEpochMilli()));
|
||||
logger.info("Removing messages older than {} for sessionId: {}", cutoff, sessionId);
|
||||
Mono<Long> removeByScore = messageRedisTemplate.opsForZSet().removeRangeByScore(messagesKey, scoreRange)
|
||||
.doOnSuccess(count -> logger.info("Removed {} old messages for sessionId: {}", count, sessionId));
|
||||
|
||||
|
||||
Mono<Long> trimToSize = messageRedisTemplate.opsForZSet().size(messagesKey)
|
||||
.flatMap(size -> {
|
||||
if (size > messageLimit) {
|
||||
logger.info("Current message count {} exceeds limit {} for sessionId: {}. Trimming...", size, messageLimit, sessionId);
|
||||
Range<Long> rankRange = Range.of(Range.Bound.inclusive(0L), Range.Bound.inclusive(size - messageLimit - 1));
|
||||
return messageRedisTemplate.opsForZSet().removeRange(messagesKey, rankRange)
|
||||
.doOnSuccess(count -> logger.info("Trimmed {} messages for sessionId: {}", count, sessionId));
|
||||
}
|
||||
return Mono.just(0L);
|
||||
});
|
||||
return removeByScore.then(trimToSize).then()
|
||||
.doOnSuccess(v -> logger.info("Successfully pruned history for sessionId: {}", sessionId))
|
||||
.doOnError(e -> logger.error("Error pruning history for sessionId: {}", sessionId, e));
|
||||
}
|
||||
}
|
||||
@@ -35,6 +35,29 @@ import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
Service acting as the central orchestrator for managing user conversations.
|
||||
It integrates Data Loss Prevention (DLP) for message obfuscation, multi-stage routing,
|
||||
hybrid AI logic, and a reactive write-back persistence layer for conversation history.
|
||||
|
||||
Routes traffic based on session context:
|
||||
If a 'pantallaContexto' (screen context) is present, it delegates to the QuickRepliesManagerService.
|
||||
Otherwise, it uses a Gemini-based MessageEntryFilter to classify the message against
|
||||
active notifications and history, routing to one of two main flows:
|
||||
a) Standard Conversation (proceedWithConversation): Handles regular dialogue,
|
||||
|
||||
managing 30-minute session timeouts and injecting conversation history parameter to Dialogflow.
|
||||
|
||||
b) Notifications (startNotificationConversation):
|
||||
It first asks a Gemini model (NotificationContextResolver) if it can answer the
|
||||
query. If yes, it saves the LLM's response and sends an 'LLM_RESPONSE_PROCESSED'
|
||||
event to Dialogflow. If no ("DIALOGFLOW"), it sends the user's original text
|
||||
to Dialogflow for intent matching.
|
||||
|
||||
All conversation turns (user, agent, and LLM) are persisted using a reactive write-back
|
||||
cache pattern, saving to Memorystore (Redis) first and then asynchronously to a
|
||||
Firestore subcollection data model (persistConversationTurn).
|
||||
*/
|
||||
@Service
|
||||
public class ConversationManagerService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.core.ReactiveRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
|
||||
@@ -16,6 +17,7 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import java.time.Duration;
|
||||
|
||||
|
||||
@Service
|
||||
public class MemoryStoreConversationService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(MemoryStoreConversationService.class);
|
||||
@@ -26,22 +28,27 @@ public class MemoryStoreConversationService {
|
||||
private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate;
|
||||
private final ReactiveRedisTemplate<String, String> stringRedisTemplate;
|
||||
private final ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate;
|
||||
private final ConversationHistoryService conversationHistoryService;
|
||||
|
||||
|
||||
@Autowired
|
||||
public MemoryStoreConversationService(
|
||||
ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate,
|
||||
ReactiveRedisTemplate<String, String> stringRedisTemplate,
|
||||
ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate) {
|
||||
ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate,
|
||||
ConversationHistoryService conversationHistoryService) {
|
||||
this.redisTemplate = redisTemplate;
|
||||
this.stringRedisTemplate = stringRedisTemplate;
|
||||
this.messageRedisTemplate = messageRedisTemplate;
|
||||
this.conversationHistoryService = conversationHistoryService;
|
||||
}
|
||||
|
||||
|
||||
public Mono<Void> saveMessage(String sessionId, ConversationMessageDTO message) {
|
||||
String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
|
||||
return messageRedisTemplate.opsForList().rightPush(messagesKey, message).then();
|
||||
double score = message.timestamp().toEpochMilli();
|
||||
return messageRedisTemplate.opsForZSet().add(messagesKey, message, score)
|
||||
.then(conversationHistoryService.pruneHistory(sessionId));
|
||||
}
|
||||
|
||||
public Mono<Void> saveSession(ConversationSessionDTO session) {
|
||||
@@ -54,7 +61,7 @@ public class MemoryStoreConversationService {
|
||||
|
||||
public Flux<ConversationMessageDTO> getMessages(String sessionId) {
|
||||
String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
|
||||
return messageRedisTemplate.opsForList().range(messagesKey, 0, -1);
|
||||
return messageRedisTemplate.opsForZSet().range(messagesKey, Range.of(Range.Bound.inclusive(0L), Range.Bound.inclusive(-1L)));
|
||||
}
|
||||
|
||||
public Mono<ConversationSessionDTO> getSessionByTelefono(String telefono) {
|
||||
@@ -91,10 +98,10 @@ public class MemoryStoreConversationService {
|
||||
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + session.telefono();
|
||||
return redisTemplate.opsForValue().delete(sessionKey)
|
||||
.then(stringRedisTemplate.opsForValue().delete(phoneToSessionKey))
|
||||
.then(messageRedisTemplate.opsForList().delete(messagesKey));
|
||||
.then(messageRedisTemplate.delete(messagesKey));
|
||||
} else {
|
||||
return redisTemplate.opsForValue().delete(sessionKey)
|
||||
.then(messageRedisTemplate.opsForList().delete(messagesKey));
|
||||
.then(messageRedisTemplate.delete(messagesKey));
|
||||
}
|
||||
}).then();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user