Files
int-layer/src.bak/main/java/com/example/service/conversation/ConversationHistoryService.java
2026-02-20 20:38:58 +00:00

78 lines
3.7 KiB
Java

/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.conversation;
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
/**
Service for managing the lifecycle and data hygiene of conversation histories stored in MemoryStore.
It encapsulates the logic for pruning conversation logs to enforce data retention policies.
Its primary function, pruneHistory, operates on a Redis Sorted Set (ZSET) for a given session,
performing two main tasks:
1) removing all messages older than a configurable time limit (e.g., 30 days)
based on their timestamp score,
2) trimming the remaining set to a maximum message count
(e.g., 60) by removing the oldest entries, all within a reactive programming context.
*/
@Service
public class ConversationHistoryService {
private static final Logger logger = LoggerFactory.getLogger(ConversationHistoryService.class);
private static final String MESSAGES_KEY_PREFIX = "conversation:messages:";
private final ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate;
@Value("${conversation.context.message.limit:60}")
private int messageLimit;
@Value("${conversation.context.days.limit:30}")
private int daysLimit;
@Autowired
public ConversationHistoryService(ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate) {
this.messageRedisTemplate = messageRedisTemplate;
}
public Mono<Void> pruneHistory(String sessionId) {
logger.info("Pruning history for sessionId: {}", sessionId);
String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
Instant cutoff = Instant.now().minus(daysLimit, ChronoUnit.DAYS);
Range<Double> scoreRange = Range.of(Range.Bound.inclusive(0d), Range.Bound.inclusive((double) cutoff.toEpochMilli()));
logger.info("Removing messages older than {} for sessionId: {}", cutoff, sessionId);
Mono<Long> removeByScore = messageRedisTemplate.opsForZSet().removeRangeByScore(messagesKey, scoreRange)
.doOnSuccess(count -> logger.info("Removed {} old messages for sessionId: {}", count, sessionId));
Mono<Long> trimToSize = messageRedisTemplate.opsForZSet().size(messagesKey)
.flatMap(size -> {
if (size > messageLimit) {
logger.info("Current message count {} exceeds limit {} for sessionId: {}. Trimming...", size, messageLimit, sessionId);
Range<Long> rankRange = Range.of(Range.Bound.inclusive(0L), Range.Bound.inclusive(size - messageLimit - 1));
return messageRedisTemplate.opsForZSet().removeRange(messagesKey, rankRange)
.doOnSuccess(count -> logger.info("Trimmed {} messages for sessionId: {}", count, sessionId));
}
return Mono.just(0L);
});
return removeByScore.then(trimToSize).then()
.doOnSuccess(v -> logger.info("Successfully pruned history for sessionId: {}", sessionId))
.doOnError(e -> logger.error("Error pruning history for sessionId: {}", sessionId, e));
}
}