/* * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Your use of it is subject to your agreement with Google. */ package com.example.service.conversation; import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Range; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import java.time.Instant; import java.time.temporal.ChronoUnit; /** Service for managing the lifecycle and data hygiene of conversation histories stored in MemoryStore. It encapsulates the logic for pruning conversation logs to enforce data retention policies. Its primary function, pruneHistory, operates on a Redis Sorted Set (ZSET) for a given session, performing two main tasks: 1) removing all messages older than a configurable time limit (e.g., 30 days) based on their timestamp score, 2) trimming the remaining set to a maximum message count (e.g., 60) by removing the oldest entries, all within a reactive programming context. */ @Service public class ConversationHistoryService { private static final Logger logger = LoggerFactory.getLogger(ConversationHistoryService.class); private static final String MESSAGES_KEY_PREFIX = "conversation:messages:"; private final ReactiveRedisTemplate messageRedisTemplate; @Value("${conversation.context.message.limit:60}") private int messageLimit; @Value("${conversation.context.days.limit:30}") private int daysLimit; @Autowired public ConversationHistoryService(ReactiveRedisTemplate messageRedisTemplate) { this.messageRedisTemplate = messageRedisTemplate; } public Mono pruneHistory(String sessionId) { logger.info("Pruning history for sessionId: {}", sessionId); String messagesKey = MESSAGES_KEY_PREFIX + sessionId; Instant cutoff = Instant.now().minus(daysLimit, ChronoUnit.DAYS); Range scoreRange = Range.of(Range.Bound.inclusive(0d), Range.Bound.inclusive((double) cutoff.toEpochMilli())); logger.info("Removing messages older than {} for sessionId: {}", cutoff, sessionId); Mono removeByScore = messageRedisTemplate.opsForZSet().removeRangeByScore(messagesKey, scoreRange) .doOnSuccess(count -> logger.info("Removed {} old messages for sessionId: {}", count, sessionId)); Mono trimToSize = messageRedisTemplate.opsForZSet().size(messagesKey) .flatMap(size -> { if (size > messageLimit) { logger.info("Current message count {} exceeds limit {} for sessionId: {}. Trimming...", size, messageLimit, sessionId); Range rankRange = Range.of(Range.Bound.inclusive(0L), Range.Bound.inclusive(size - messageLimit - 1)); return messageRedisTemplate.opsForZSet().removeRange(messagesKey, rankRange) .doOnSuccess(count -> logger.info("Trimmed {} messages for sessionId: {}", count, sessionId)); } return Mono.just(0L); }); return removeByScore.then(trimToSize).then() .doOnSuccess(v -> logger.info("Successfully pruned history for sessionId: {}", sessionId)) .doOnError(e -> logger.error("Error pruning history for sessionId: {}", sessionId, e)); } }