From 9fb1088f7d2f28186f447fd96ef4b80c881d8c32 Mon Sep 17 00:00:00 2001 From: PAVEL PALMA Date: Fri, 24 Oct 2025 22:28:55 -0600 Subject: [PATCH] UPDATE 24-Oct --- .../java/com/example/config/RedisConfig.java | 82 ++---- .../controller/DataPurgeController.java | 29 ++ .../controller/SessionPurgeController.java | 37 +++ .../conversation/ConversationMessageDTO.java | 20 ++ .../conversation/ConversationSessionDTO.java | 18 +- .../dialogflow/conversation/MessageType.java | 13 + .../conversation/ConversationEntryMapper.java | 32 +++ .../ConversationMessageMapper.java | 42 +++ .../FirestoreConversationMapper.java | 179 ++---------- .../ConversationContextMapper.java | 72 +---- .../repository/FirestoreBaseRepository.java | 52 ++++ .../service/base/DataPurgeService.java | 106 +++++++ .../service/base/SessionPurgeService.java | 65 +++++ .../ConversationManagerService.java | 265 +++++++++--------- .../FirestoreConversationService.java | 115 ++++---- .../MemoryStoreConversationService.java | 83 +++--- .../FirestoreNotificationConvService.java | 107 ------- .../FirestoreNotificationService.java | 18 ++ .../MemoryStoreNotificationService.java | 55 +--- .../NotificationManagerService.java | 33 ++- .../quickreplies/MemoryStoreQRService.java | 16 +- .../QuickRepliesManagerService.java | 165 ++++++----- .../ConversationManagerServiceTest.java | 20 +- 23 files changed, 863 insertions(+), 761 deletions(-) create mode 100644 src/main/java/com/example/controller/DataPurgeController.java create mode 100644 src/main/java/com/example/controller/SessionPurgeController.java create mode 100644 src/main/java/com/example/dto/dialogflow/conversation/ConversationMessageDTO.java create mode 100644 src/main/java/com/example/dto/dialogflow/conversation/MessageType.java create mode 100644 src/main/java/com/example/mapper/conversation/ConversationEntryMapper.java create mode 100644 src/main/java/com/example/mapper/conversation/ConversationMessageMapper.java create mode 100644 src/main/java/com/example/service/base/DataPurgeService.java create mode 100644 src/main/java/com/example/service/base/SessionPurgeService.java delete mode 100644 src/main/java/com/example/service/notification/FirestoreNotificationConvService.java diff --git a/src/main/java/com/example/config/RedisConfig.java b/src/main/java/com/example/config/RedisConfig.java index ca28bc6..cc74107 100644 --- a/src/main/java/com/example/config/RedisConfig.java +++ b/src/main/java/com/example/config/RedisConfig.java @@ -5,11 +5,10 @@ package com.example.config; +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.notification.NotificationSessionDTO; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; @@ -18,60 +17,35 @@ import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.StringRedisSerializer; -/** - * Spring configuration class for setting up Reactive Redis(Memorystore in GCP) - * templates. - * It defines and customizes `ReactiveRedisTemplate` beans for different data - * types - * like `ConversationSessionDTO` and `NotificationDTO`, using Jackson for JSON - * serialization and ensuring proper handling of Java 8 and higher date/time - * objects. - */ @Configuration public class RedisConfig { -@Bean -public ReactiveRedisTemplate reactiveConversationRedisTemplate( - ReactiveRedisConnectionFactory factory) { - - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(new JavaTimeModule()); - objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - - Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer<>( - objectMapper, ConversationSessionDTO.class); - - return new ReactiveRedisTemplate<>(factory, RedisSerializationContext - .newSerializationContext(new StringRedisSerializer()) - .value(serializer) - .build()); -} - -@Bean -public ReactiveRedisTemplate reactiveStringRedisTemplate( - ReactiveRedisConnectionFactory factory) { - return new ReactiveRedisTemplate<>(factory, RedisSerializationContext - .newSerializationContext(new StringRedisSerializer()) - .value(new StringRedisSerializer()) - .build()); -} - -@Bean -public ReactiveRedisTemplate reactiveNotificationRedisTemplate( - ReactiveRedisConnectionFactory factory) { - ObjectMapper notificationObjectMapper = new ObjectMapper(); - notificationObjectMapper.registerModule(new JavaTimeModule()); - notificationObjectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - - StringRedisSerializer keySerializer = new StringRedisSerializer(); - Jackson2JsonRedisSerializer valueSerializer = new Jackson2JsonRedisSerializer<>( - notificationObjectMapper, NotificationSessionDTO.class); - - RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext - .newSerializationContext(keySerializer); - - RedisSerializationContext context = builder.value(valueSerializer) - .build(); + @Bean + public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory, ObjectMapper objectMapper) { + Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer<>(objectMapper, ConversationSessionDTO.class); + RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer()); + RedisSerializationContext context = builder.value(serializer).build(); return new ReactiveRedisTemplate<>(factory, context); -} + } + + @Bean + public ReactiveRedisTemplate reactiveNotificationRedisTemplate(ReactiveRedisConnectionFactory factory, ObjectMapper objectMapper) { + Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer<>(objectMapper, NotificationSessionDTO.class); + RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer()); + RedisSerializationContext context = builder.value(serializer).build(); + return new ReactiveRedisTemplate<>(factory, context); + } + + @Bean + public ReactiveRedisTemplate reactiveMessageRedisTemplate(ReactiveRedisConnectionFactory factory, ObjectMapper objectMapper) { + Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer<>(objectMapper, ConversationMessageDTO.class); + RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer()); + RedisSerializationContext context = builder.value(serializer).build(); + return new ReactiveRedisTemplate<>(factory, context); + } + + @Bean + public ReactiveRedisTemplate reactiveStringRedisTemplate(ReactiveRedisConnectionFactory factory) { + return new ReactiveRedisTemplate<>(factory, RedisSerializationContext.string()); + } } \ No newline at end of file diff --git a/src/main/java/com/example/controller/DataPurgeController.java b/src/main/java/com/example/controller/DataPurgeController.java new file mode 100644 index 0000000..75b598b --- /dev/null +++ b/src/main/java/com/example/controller/DataPurgeController.java @@ -0,0 +1,29 @@ +package com.example.controller; + +import com.example.service.base.DataPurgeService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +@RestController +@RequestMapping("/api/v1/data-purge") +public class DataPurgeController { + + private static final Logger logger = LoggerFactory.getLogger(DataPurgeController.class); + private final DataPurgeService dataPurgeService; + + public DataPurgeController(DataPurgeService dataPurgeService) { + this.dataPurgeService = dataPurgeService; + } + + @DeleteMapping("/all") + public Mono purgeAllData() { + logger.warn("Received request to purge all data. This is a destructive operation."); + return dataPurgeService.purgeAllData() + .doOnSuccess(voidResult -> logger.info("Successfully purged all data.")) + .doOnError(error -> logger.error("Error purging all data.", error)); + } +} diff --git a/src/main/java/com/example/controller/SessionPurgeController.java b/src/main/java/com/example/controller/SessionPurgeController.java new file mode 100644 index 0000000..552af5f --- /dev/null +++ b/src/main/java/com/example/controller/SessionPurgeController.java @@ -0,0 +1,37 @@ + +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.controller; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.example.service.base.SessionPurgeService; + +import reactor.core.publisher.Mono; + +@RestController +@RequestMapping("/api/v1/session-purge") +public class SessionPurgeController { + + private static final Logger logger = LoggerFactory.getLogger(SessionPurgeController.class); + private final SessionPurgeService sessionPurgeService; + + public SessionPurgeController(SessionPurgeService sessionPurgeService) { + this.sessionPurgeService = sessionPurgeService; + } + + @DeleteMapping("/conversation/session/{sessionId}") + public Mono deleteSession(@PathVariable String sessionId) { + return sessionPurgeService.deleteSession(sessionId) + .doOnSuccess(voidResult -> logger.info("Successfully deleted session with id: {}", sessionId)) + .doOnError(error -> logger.error("Error deleting session with id: {}", sessionId, error)); + } +} diff --git a/src/main/java/com/example/dto/dialogflow/conversation/ConversationMessageDTO.java b/src/main/java/com/example/dto/dialogflow/conversation/ConversationMessageDTO.java new file mode 100644 index 0000000..af8da0c --- /dev/null +++ b/src/main/java/com/example/dto/dialogflow/conversation/ConversationMessageDTO.java @@ -0,0 +1,20 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.dto.dialogflow.conversation; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.time.Instant; +import java.util.Map; + +@JsonInclude(JsonInclude.Include.NON_NULL) +public record ConversationMessageDTO( + MessageType type, + Instant timestamp, + String text, + Map parameters, + String canal +) { +} \ No newline at end of file diff --git a/src/main/java/com/example/dto/dialogflow/conversation/ConversationSessionDTO.java b/src/main/java/com/example/dto/dialogflow/conversation/ConversationSessionDTO.java index d2d9f0d..146b439 100644 --- a/src/main/java/com/example/dto/dialogflow/conversation/ConversationSessionDTO.java +++ b/src/main/java/com/example/dto/dialogflow/conversation/ConversationSessionDTO.java @@ -21,38 +21,36 @@ public record ConversationSessionDTO( String telefono, Instant createdAt, Instant lastModified, - List entries, + String lastMessage, String pantallaContexto ) { - public ConversationSessionDTO(String sessionId, String userId, String telefono, Instant createdAt, Instant lastModified, List entries, String pantallaContexto) { + public ConversationSessionDTO(String sessionId, String userId, String telefono, Instant createdAt, Instant lastModified, String lastMessage, String pantallaContexto) { this.sessionId = sessionId; this.userId = userId; this.telefono = telefono; this.createdAt = createdAt; this.lastModified = lastModified; - this.entries = Collections.unmodifiableList(new ArrayList<>(entries)); + this.lastMessage = lastMessage; this.pantallaContexto = pantallaContexto; } public static ConversationSessionDTO create(String sessionId, String userId, String telefono) { Instant now = Instant.now(); - return new ConversationSessionDTO(sessionId, userId, telefono, now, now, Collections.emptyList(), null); + return new ConversationSessionDTO(sessionId, userId, telefono, now, now, null, null); } - public ConversationSessionDTO withAddedEntry(ConversationEntryDTO newEntry) { - List updatedEntries = new ArrayList<>(this.entries); - updatedEntries.add(newEntry); - return new ConversationSessionDTO(this.sessionId, this.userId, this.telefono, this.createdAt, Instant.now(), updatedEntries, this.pantallaContexto); + public ConversationSessionDTO withLastMessage(String lastMessage) { + return new ConversationSessionDTO(this.sessionId, this.userId, this.telefono, this.createdAt, Instant.now(), lastMessage, this.pantallaContexto); } public ConversationSessionDTO withTelefono(String newTelefono) { if (newTelefono != null && !newTelefono.equals(this.telefono)) { - return new ConversationSessionDTO(this.sessionId, this.userId, newTelefono, this.createdAt, this.lastModified, this.entries, this.pantallaContexto); + return new ConversationSessionDTO(this.sessionId, this.userId, newTelefono, this.createdAt, this.lastModified, this.lastMessage, this.pantallaContexto); } return this; } public ConversationSessionDTO withPantallaContexto(String pantallaContexto) { - return new ConversationSessionDTO(this.sessionId, this.userId, this.telefono, this.createdAt, this.lastModified, this.entries, pantallaContexto); + return new ConversationSessionDTO(this.sessionId, this.userId, this.telefono, this.createdAt, this.lastModified, this.lastMessage, pantallaContexto); } } \ No newline at end of file diff --git a/src/main/java/com/example/dto/dialogflow/conversation/MessageType.java b/src/main/java/com/example/dto/dialogflow/conversation/MessageType.java new file mode 100644 index 0000000..9393c93 --- /dev/null +++ b/src/main/java/com/example/dto/dialogflow/conversation/MessageType.java @@ -0,0 +1,13 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.dto.dialogflow.conversation; + +public enum MessageType { + USER, + AGENT, + SYSTEM, + LLM +} \ No newline at end of file diff --git a/src/main/java/com/example/mapper/conversation/ConversationEntryMapper.java b/src/main/java/com/example/mapper/conversation/ConversationEntryMapper.java new file mode 100644 index 0000000..f995ce1 --- /dev/null +++ b/src/main/java/com/example/mapper/conversation/ConversationEntryMapper.java @@ -0,0 +1,32 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.mapper.conversation; + +import com.example.dto.dialogflow.conversation.ConversationEntryDTO; +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; +import com.example.dto.dialogflow.conversation.MessageType; +import org.springframework.stereotype.Component; + +@Component +public class ConversationEntryMapper { + + public ConversationMessageDTO toConversationMessageDTO(ConversationEntryDTO entry) { + MessageType type = switch (entry.entity()) { + case USUARIO -> MessageType.USER; + case AGENTE -> MessageType.AGENT; + case SISTEMA -> MessageType.SYSTEM; + case LLM -> MessageType.LLM; + }; + + return new ConversationMessageDTO( + type, + entry.timestamp(), + entry.text(), + entry.parameters(), + entry.canal() + ); + } +} diff --git a/src/main/java/com/example/mapper/conversation/ConversationMessageMapper.java b/src/main/java/com/example/mapper/conversation/ConversationMessageMapper.java new file mode 100644 index 0000000..42f2a6a --- /dev/null +++ b/src/main/java/com/example/mapper/conversation/ConversationMessageMapper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.mapper.conversation; + +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; +import com.example.dto.dialogflow.conversation.MessageType; +import org.springframework.stereotype.Component; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +@Component +public class ConversationMessageMapper { + + public Map toMap(ConversationMessageDTO message) { + Map map = new HashMap<>(); + map.put("type", message.type().name()); + map.put("timestamp", message.timestamp()); + map.put("text", message.text()); + if (message.parameters() != null) { + map.put("parameters", message.parameters()); + } + if (message.canal() != null) { + map.put("canal", message.canal()); + } + return map; + } + + public ConversationMessageDTO fromMap(Map map) { + return new ConversationMessageDTO( + MessageType.valueOf((String) map.get("type")), + (Instant) map.get("timestamp"), + (String) map.get("text"), + (Map) map.get("parameters"), + (String) map.get("canal") + ); + } +} diff --git a/src/main/java/com/example/mapper/conversation/FirestoreConversationMapper.java b/src/main/java/com/example/mapper/conversation/FirestoreConversationMapper.java index 064c4a9..564e781 100644 --- a/src/main/java/com/example/mapper/conversation/FirestoreConversationMapper.java +++ b/src/main/java/com/example/mapper/conversation/FirestoreConversationMapper.java @@ -1,170 +1,53 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + package com.example.mapper.conversation; -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; -import com.example.dto.dialogflow.conversation.ConversationEntryEntity; -import com.example.dto.dialogflow.conversation.ConversationEntryType; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.google.cloud.Timestamp; -import com.google.cloud.firestore.FieldValue; import com.google.cloud.firestore.DocumentSnapshot; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.time.Instant; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -/** - * Spring component for mapping data between a `ConversationSessionDTO` and Firestore documents. - * It provides methods to convert a DTO into a format suitable for Firestore storage - * (creating new documents or updating existing ones) and to deserialize a Firestore - * `DocumentSnapshot` back into a `ConversationSessionDTO`, handling data types - * and nested collections correctly. - */ @Component public class FirestoreConversationMapper { - private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationMapper.class); + public ConversationSessionDTO mapFirestoreDocumentToConversationSessionDTO(DocumentSnapshot document) { + if (document == null || !document.exists()) { + return null; + } - // Class-level constants for Firestore field names at the session level - private static final String FIELD_SESSION_ID = "session_id"; - private static final String FIELD_USER_ID = "usuario_id"; - private static final String FIELD_PHONE_NUMBER = "telefono"; - private static final String FIELD_CREATED_AT = "fechaCreacion"; - private static final String FIELD_LAST_UPDATED = "ultimaActualizacion"; - private static final String FIELD_MESSAGES = "mensajes"; - private static final String FIELD_PANTALLA_CONTEXTO = "pantallaContexto"; - - - // Constants for fields within the 'mensajes' sub-documents - private static final String FIELD_MESSAGE_ENTITY = "entidad"; - private static final String FIELD_MESSAGE_TYPE = "tipo"; - private static final String FIELD_MESSAGE_TEXT = "mensaje"; - private static final String FIELD_MESSAGE_TIMESTAMP = "tiempo"; - private static final String FIELD_MESSAGE_PARAMETERS = "parametros"; - private static final String FIELD_MESSAGE_CHANNEL = "canal"; - + Timestamp createdAtTimestamp = document.getTimestamp("createdAt"); + Timestamp lastModifiedTimestamp = document.getTimestamp("lastModified"); - public Map createUpdateMapForSingleEntry(ConversationEntryDTO newEntry) { - Map updates = new HashMap<>(); - Map entryMap = toFirestoreEntryMap(newEntry); - updates.put(FIELD_MESSAGES, FieldValue.arrayUnion(entryMap)); - updates.put(FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now()))); - return updates; + Instant createdAt = (createdAtTimestamp != null) ? createdAtTimestamp.toDate().toInstant() : null; + Instant lastModified = (lastModifiedTimestamp != null) ? lastModifiedTimestamp.toDate().toInstant() : null; + + return new ConversationSessionDTO( + document.getString("sessionId"), + document.getString("userId"), + document.getString("telefono"), + createdAt, + lastModified, + document.getString("lastMessage"), + document.getString("pantallaContexto") + ); } - public Map createNewSessionMapForSingleEntry(String sessionId, String userId, String telefono, ConversationEntryDTO initialEntry) { - return createNewSessionMapForSingleEntry(sessionId, userId, telefono, initialEntry, null); - } - - public Map createNewSessionMapForSingleEntry(String sessionId, String userId, String telefono, ConversationEntryDTO initialEntry, String pantallaContexto) { + public Map createSessionMap(ConversationSessionDTO session) { Map sessionMap = new HashMap<>(); - sessionMap.put(FIELD_SESSION_ID, sessionId); - sessionMap.put(FIELD_USER_ID, userId); - - if (telefono != null && !telefono.trim().isEmpty()) { - sessionMap.put(FIELD_PHONE_NUMBER, telefono); - } else { - sessionMap.put(FIELD_PHONE_NUMBER, null); - } - - if (pantallaContexto != null) { - sessionMap.put(FIELD_PANTALLA_CONTEXTO, pantallaContexto); - } - - - - sessionMap.put(FIELD_CREATED_AT, Timestamp.of(java.util.Date.from(Instant.now()))); - sessionMap.put(FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now()))); - - List> entriesList = new ArrayList<>(); - entriesList.add(toFirestoreEntryMap(initialEntry)); - sessionMap.put(FIELD_MESSAGES, entriesList); - + sessionMap.put("sessionId", session.sessionId()); + sessionMap.put("userId", session.userId()); + sessionMap.put("telefono", session.telefono()); + sessionMap.put("createdAt", session.createdAt()); + sessionMap.put("lastModified", session.lastModified()); + sessionMap.put("lastMessage", session.lastMessage()); + sessionMap.put("pantallaContexto", session.pantallaContexto()); return sessionMap; } - - - - private Map toFirestoreEntryMap(ConversationEntryDTO entry) { - Map entryMap = new HashMap<>(); - entryMap.put(FIELD_MESSAGE_ENTITY, entry.entity().name()); - entryMap.put(FIELD_MESSAGE_TYPE, entry.type().name()); - entryMap.put(FIELD_MESSAGE_TEXT, entry.text()); - entryMap.put(FIELD_MESSAGE_TIMESTAMP, Timestamp.of(java.util.Date.from(entry.timestamp()))); - - if (entry.parameters() != null && !entry.parameters().isEmpty()) { - entryMap.put(FIELD_MESSAGE_PARAMETERS, entry.parameters()); - } - if (entry.canal() != null) { - entryMap.put(FIELD_MESSAGE_CHANNEL, entry.canal()); - } - logger.debug("Created Firestore entry map: {}", entryMap); - return entryMap; - } - - public ConversationSessionDTO mapFirestoreDocumentToConversationSessionDTO(DocumentSnapshot documentSnapshot) { - if (!documentSnapshot.exists()) { - return null; - } - - String sessionId = documentSnapshot.getString(FIELD_SESSION_ID); - String userId = documentSnapshot.getString(FIELD_USER_ID); - String telefono = documentSnapshot.getString(FIELD_PHONE_NUMBER); - String pantallaContexto = documentSnapshot.getString(FIELD_PANTALLA_CONTEXTO); - - Timestamp createdAtFirestore = documentSnapshot.getTimestamp(FIELD_CREATED_AT); - Instant createdAt = (createdAtFirestore != null) ? createdAtFirestore.toDate().toInstant() : null; - - Timestamp lastModifiedFirestore = documentSnapshot.getTimestamp(FIELD_LAST_UPDATED); - Instant lastModified = (lastModifiedFirestore != null) ? lastModifiedFirestore.toDate().toInstant() : null; - - List> rawEntries = (List>) documentSnapshot.get(FIELD_MESSAGES); - - List entries = new ArrayList<>(); - if (rawEntries != null) { - entries = rawEntries.stream() - .map(this::mapFirestoreEntryMapToConversationEntryDTO) - .collect(Collectors.toList()); - } - - return new ConversationSessionDTO(sessionId, userId, telefono, createdAt, lastModified, entries, pantallaContexto); - } - - -private ConversationEntryDTO mapFirestoreEntryMapToConversationEntryDTO(Map entryMap) { - ConversationEntryEntity entity = null; - Object entityObj = entryMap.get(FIELD_MESSAGE_ENTITY); - if (entityObj instanceof String) { - try { - entity = ConversationEntryEntity.valueOf((String) entityObj); - } catch (IllegalArgumentException e) { - logger.warn("Unknown ConversationEntryEntity encountered: {}. Setting entity to null.", entityObj); - } - } - - ConversationEntryType type = null; - Object typeObj = entryMap.get(FIELD_MESSAGE_TYPE); - if (typeObj instanceof String) { - try { - type = ConversationEntryType.valueOf((String) typeObj); - } catch (IllegalArgumentException e) { - logger.warn("Unknown ConversationEntryType encountered: {}. Setting type to null.", typeObj); - } - } - - String text = (String) entryMap.get(FIELD_MESSAGE_TEXT); - - Timestamp timestampFirestore = (Timestamp) entryMap.get(FIELD_MESSAGE_TIMESTAMP); - Instant timestamp = (timestampFirestore != null) ? timestampFirestore.toDate().toInstant() : null; - - Map parameters = (Map) entryMap.get(FIELD_MESSAGE_PARAMETERS); - String canal = (String) entryMap.get(FIELD_MESSAGE_CHANNEL); - - return new ConversationEntryDTO(entity, type, timestamp, text, parameters, canal); - } } \ No newline at end of file diff --git a/src/main/java/com/example/mapper/messagefilter/ConversationContextMapper.java b/src/main/java/com/example/mapper/messagefilter/ConversationContextMapper.java index 825138b..a3d28b5 100644 --- a/src/main/java/com/example/mapper/messagefilter/ConversationContextMapper.java +++ b/src/main/java/com/example/mapper/messagefilter/ConversationContextMapper.java @@ -1,82 +1,36 @@ /* * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. - * Your use of it is subject to your agreement with Google. + * Your use of it is subject to your agreement with Google. */ package com.example.mapper.messagefilter; -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @Component public class ConversationContextMapper { - @Value("${conversation.context.message.limit:60}") - private int messageLimit; + private static final int MAX_MESSAGES = 10; - @Value("${conversation.context.days.limit:30}") - private int daysLimit; - - public String toText(ConversationSessionDTO session) { - if (session == null || session.entries() == null || session.entries().isEmpty()) { - return ""; - } - - return session.entries().stream() - .map(this::formatEntry) - .collect(Collectors.joining("")); + public String toText(ConversationSessionDTO session, List messages) { + return toTextFromMessages(messages); } - public String toTextWithLimits(ConversationSessionDTO session) { - if (session == null || session.entries() == null || session.entries().isEmpty()) { - return ""; + public String toTextWithLimits(ConversationSessionDTO session, List messages) { + if (messages.size() > MAX_MESSAGES) { + messages = messages.subList(messages.size() - MAX_MESSAGES, messages.size()); } - - Instant thirtyDaysAgo = Instant.now().minus(daysLimit, ChronoUnit.DAYS); - - List recentEntries = session.entries().stream() - .filter(entry -> entry.timestamp().isAfter(thirtyDaysAgo)) - .sorted(Comparator.comparing(ConversationEntryDTO::timestamp).reversed()) - .limit(messageLimit) - .sorted(Comparator.comparing(ConversationEntryDTO::timestamp)) - .collect(Collectors.toList()); - - return recentEntries.stream() - .map(this::formatEntry) - .collect(Collectors.joining("")); + return toTextFromMessages(messages); } - private String formatEntry(ConversationEntryDTO entry) { - String prefix = "User: "; - if (entry.entity() != null) { - switch (entry.entity()) { - case AGENTE: - prefix = "Agent: "; - break; - case SISTEMA: - prefix = "System: "; - break; - case USUARIO: - default: - prefix = "User: "; - break; - } - } - - String text = prefix + entry.text(); - - if (entry.parameters() != null && !entry.parameters().isEmpty()) { - text += " " + entry.parameters().toString(); - } - - return text; + public String toTextFromMessages(List messages) { + return messages.stream() + .map(message -> String.format("%s: %s", message.type(), message.text())) + .collect(Collectors.joining("\n")); } } \ No newline at end of file diff --git a/src/main/java/com/example/repository/FirestoreBaseRepository.java b/src/main/java/com/example/repository/FirestoreBaseRepository.java index 4e05b4a..1fd5bf0 100644 --- a/src/main/java/com/example/repository/FirestoreBaseRepository.java +++ b/src/main/java/com/example/repository/FirestoreBaseRepository.java @@ -17,9 +17,12 @@ import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.DocumentSnapshot; import com.google.cloud.firestore.Firestore; import com.google.cloud.firestore.Query; +import com.google.cloud.firestore.QueryDocumentSnapshot; import com.google.cloud.firestore.QuerySnapshot; import com.google.cloud.firestore.WriteBatch; import com.google.cloud.firestore.WriteResult; +import com.google.cloud.firestore.CollectionReference; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; @@ -27,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Repository; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -114,6 +118,23 @@ public class FirestoreBaseRepository { return future.get(); } + public Flux getDocuments(String collectionPath) { + return Flux.create(sink -> { + ApiFuture future = firestore.collection(collectionPath).get(); + future.addListener(() -> { + try { + QuerySnapshot querySnapshot = future.get(); + if (querySnapshot != null) { + querySnapshot.getDocuments().forEach(sink::next); + } + sink.complete(); + } catch (InterruptedException | ExecutionException e) { + sink.error(e); + } + }, Runnable::run); + }); + } + public Mono getDocumentsByField( String collectionPath, String fieldName, String value) { return Mono.fromCallable( @@ -155,6 +176,15 @@ public class FirestoreBaseRepository { "Document updated: {} with update time: {}", docRef.getPath(), writeResult.getUpdateTime()); } + public void deleteDocument(DocumentReference docRef) + throws InterruptedException, ExecutionException { + Objects.requireNonNull(docRef, "DocumentReference cannot be null."); + ApiFuture future = docRef.delete(); + WriteResult writeResult = future.get(); + logger.debug( + "Document deleted: {} with update time: {}", docRef.getPath(), writeResult.getUpdateTime()); + } + public WriteBatch createBatch() { return firestore.batch(); } @@ -168,4 +198,26 @@ public class FirestoreBaseRepository { public String getAppId() { return appId; } + + public void deleteCollection(String collectionPath, int batchSize) { + try { + CollectionReference collection = firestore.collection(collectionPath); + ApiFuture future = collection.limit(batchSize).get(); + int deleted = 0; + // future.get() blocks on document retrieval + List documents = future.get().getDocuments(); + while (!documents.isEmpty()) { + for (QueryDocumentSnapshot document : documents) { + document.getReference().delete(); + ++deleted; + } + future = collection.limit(batchSize).get(); + documents = future.get().getDocuments(); + } + logger.info("Deleted {} documents from collection {}", deleted, collectionPath); + } catch (Exception e) { + logger.error("Error deleting collection: " + e.getMessage(), e); + throw new RuntimeException("Error deleting collection", e); + } + } } \ No newline at end of file diff --git a/src/main/java/com/example/service/base/DataPurgeService.java b/src/main/java/com/example/service/base/DataPurgeService.java new file mode 100644 index 0000000..4b9987c --- /dev/null +++ b/src/main/java/com/example/service/base/DataPurgeService.java @@ -0,0 +1,106 @@ + +package com.example.service.base; + +import com.example.repository.FirestoreBaseRepository; +import com.google.cloud.firestore.CollectionReference; +import com.google.cloud.firestore.Firestore; +import com.google.cloud.firestore.QueryDocumentSnapshot; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +@Service +public class DataPurgeService { + + private static final Logger logger = LoggerFactory.getLogger(DataPurgeService.class); + + private final ReactiveRedisTemplate redisTemplate; + private final FirestoreBaseRepository firestoreBaseRepository; + + private final Firestore firestore; + + @Autowired + public DataPurgeService( + @Qualifier("reactiveRedisTemplate") ReactiveRedisTemplate redisTemplate, + FirestoreBaseRepository firestoreBaseRepository, Firestore firestore) { + this.redisTemplate = redisTemplate; + this.firestoreBaseRepository = firestoreBaseRepository; + this.firestore = firestore; + } + + public Mono purgeAllData() { + return purgeRedis() + .then(purgeFirestore()); + } + + private Mono purgeRedis() { + logger.info("Starting Redis data purge."); + return redisTemplate.getConnectionFactory().getReactiveConnection().serverCommands().flushAll() + .doOnSuccess(v -> logger.info("Successfully purged all data from Redis.")) + .doOnError(e -> logger.error("Error purging data from Redis.", e)) + .then(); + } + + private Mono purgeFirestore() { + logger.info("Starting Firestore data purge."); + return Mono.fromRunnable(() -> { + try { + String appId = firestoreBaseRepository.getAppId(); + String conversationsCollectionPath = String.format("artifacts/%s/conversations", appId); + String notificationsCollectionPath = String.format("artifacts/%s/notifications", appId); + + // Delete 'messages' sub-collections in 'conversations' + logger.info("Deleting 'messages' sub-collections from '{}'", conversationsCollectionPath); + try { + List conversationDocuments = firestore.collection(conversationsCollectionPath).get().get().getDocuments(); + for (QueryDocumentSnapshot document : conversationDocuments) { + String messagesCollectionPath = document.getReference().getPath() + "/messages"; + logger.info("Deleting sub-collection: {}", messagesCollectionPath); + firestoreBaseRepository.deleteCollection(messagesCollectionPath, 50); + } + } catch (Exception e) { + if (e.getMessage().contains("NOT_FOUND")) { + logger.warn("Collection '{}' not found, skipping.", conversationsCollectionPath); + } else { + throw e; + } + } + + // Delete the 'conversations' collection + logger.info("Deleting collection: {}", conversationsCollectionPath); + try { + firestoreBaseRepository.deleteCollection(conversationsCollectionPath, 50); + } catch (Exception e) { + if (e.getMessage().contains("NOT_FOUND")) { + logger.warn("Collection '{}' not found, skipping.", conversationsCollectionPath); + } else { + throw e; + } + } + + // Delete the 'notifications' collection + logger.info("Deleting collection: {}", notificationsCollectionPath); + try { + firestoreBaseRepository.deleteCollection(notificationsCollectionPath, 50); + } catch (Exception e) { + if (e.getMessage().contains("NOT_FOUND")) { + logger.warn("Collection '{}' not found, skipping.", notificationsCollectionPath); + } else { + throw e; + } + } + + logger.info("Successfully purged Firestore collections."); + } catch (Exception e) { + logger.error("Error purging Firestore collections.", e); + throw new RuntimeException("Failed to purge Firestore collections.", e); + } + }).subscribeOn(Schedulers.boundedElastic()).then(); + } +} diff --git a/src/main/java/com/example/service/base/SessionPurgeService.java b/src/main/java/com/example/service/base/SessionPurgeService.java new file mode 100644 index 0000000..4ab3d4d --- /dev/null +++ b/src/main/java/com/example/service/base/SessionPurgeService.java @@ -0,0 +1,65 @@ +/* + * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. + * Your use of it is subject to your agreement with Google. + */ + +package com.example.service.base; + +import com.example.dto.dialogflow.conversation.ConversationSessionDTO; +import com.example.service.conversation.FirestoreConversationService; +import com.example.service.notification.FirestoreNotificationService; +import com.example.service.notification.MemoryStoreNotificationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +public class SessionPurgeService { + + private static final Logger logger = LoggerFactory.getLogger(SessionPurgeService.class); + private static final String SESSION_KEY_PREFIX = "conversation:session:"; + private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:"; + + private final ReactiveRedisTemplate redisTemplate; + private final ReactiveRedisTemplate stringRedisTemplate; + private final FirestoreConversationService firestoreConversationService; + private final MemoryStoreNotificationService memoryStoreNotificationService; + private final FirestoreNotificationService firestoreNotificationService; + + @Autowired + public SessionPurgeService( + ReactiveRedisTemplate redisTemplate, + ReactiveRedisTemplate stringRedisTemplate, + FirestoreConversationService firestoreConversationService, + MemoryStoreNotificationService memoryStoreNotificationService, + FirestoreNotificationService firestoreNotificationService) { + this.redisTemplate = redisTemplate; + this.stringRedisTemplate = stringRedisTemplate; + this.firestoreConversationService = firestoreConversationService; + this.memoryStoreNotificationService = memoryStoreNotificationService; + this.firestoreNotificationService = firestoreNotificationService; + } + + public Mono deleteSession(String sessionId) { + String sessionKey = SESSION_KEY_PREFIX + sessionId; + logger.info("Deleting session {} from all stores.", sessionId); + + return redisTemplate.opsForValue().get(sessionKey) + .flatMap(session -> { + if (session != null && session.telefono() != null) { + String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + session.telefono(); + return redisTemplate.opsForValue().delete(sessionKey) + .then(stringRedisTemplate.opsForValue().delete(phoneToSessionKey)) + .then(firestoreConversationService.deleteSession(sessionId)) + .then(memoryStoreNotificationService.deleteNotificationSession(session.telefono())) + .then(firestoreNotificationService.deleteNotification(session.telefono())); + } else { + return redisTemplate.opsForValue().delete(sessionKey) + .then(firestoreConversationService.deleteSession(sessionId)); + } + }); + } +} diff --git a/src/main/java/com/example/service/conversation/ConversationManagerService.java b/src/main/java/com/example/service/conversation/ConversationManagerService.java index ac7600e..81e327d 100644 --- a/src/main/java/com/example/service/conversation/ConversationManagerService.java +++ b/src/main/java/com/example/service/conversation/ConversationManagerService.java @@ -7,13 +7,10 @@ package com.example.service.conversation; import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO; -import com.example.dto.dialogflow.conversation.ConversationContext; -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; -import com.example.dto.dialogflow.conversation.ConversationSessionDTO; -import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO; -import com.example.dto.dialogflow.conversation.QueryInputDTO; +import com.example.dto.dialogflow.conversation.*; import com.example.dto.dialogflow.notification.EventInputDTO; import com.example.dto.dialogflow.notification.NotificationDTO; +import com.example.mapper.conversation.ConversationEntryMapper; import com.example.mapper.conversation.ExternalConvRequestMapper; import com.example.mapper.messagefilter.ConversationContextMapper; import com.example.mapper.messagefilter.NotificationContextMapper; @@ -38,19 +35,6 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; -/** - * Orchestrates the full lifecycle of a user conversation,(this is the core of - * the entire integration layer service), acting as the central point for all - * inbound requests. - * This service manages conversational state by integrating both an in-memory - * cache (for active sessions) - * and a durable database (for conversation history). It intelligently routes - * incoming messages - * to the appropriate handler, which can include a standard Dialogflow agent, a - * notification-specific flow, or a direct LLM-based response. The class also - * ensures data integrity and security by applying Data Loss Prevention (DLP) - * to all incoming user messages before they are processed. - */ @Service public class ConversationManagerService { private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); @@ -71,6 +55,7 @@ public class ConversationManagerService { private final NotificationContextResolver notificationContextResolver; private final LlmResponseTunerService llmResponseTunerService; + private final ConversationEntryMapper conversationEntryMapper; public ConversationManagerService( DialogflowClientService dialogflowServiceClient, @@ -85,6 +70,7 @@ public class ConversationManagerService { DataLossPrevention dataLossPrevention, NotificationContextResolver notificationContextResolver, LlmResponseTunerService llmResponseTunerService, + ConversationEntryMapper conversationEntryMapper, @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { this.dialogflowServiceClient = dialogflowServiceClient; this.firestoreConversationService = firestoreConversationService; @@ -99,6 +85,7 @@ public class ConversationManagerService { this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; this.notificationContextResolver = notificationContextResolver; this.llmResponseTunerService = llmResponseTunerService; + this.conversationEntryMapper = conversationEntryMapper; } @@ -160,29 +147,31 @@ public class ConversationManagerService { final String userMessageText = context.userMessageText(); return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) - .map(conversationContextMapper::toText) - .defaultIfEmpty("") - .flatMap(conversationHistory -> { - return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) - .flatMap(notificationId -> memoryStoreNotificationService - .getCachedNotificationSession(notificationId)) - .map(notificationSession -> notificationSession.notificaciones().stream() - .filter(notification -> "active".equalsIgnoreCase(notification.status())) - .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) - .orElse(null)) - .filter(Objects::nonNull) - .flatMap((NotificationDTO notification) -> { - String notificationText = notificationContextMapper.toText(notification); - String classification = messageEntryFilter.classifyMessage(userMessageText, - notificationText, conversationHistory); - if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { - return startNotificationConversation(context, request, notification); - } else { - return continueConversationFlow(context, request); - } - }) - .switchIfEmpty(continueConversationFlow(context, request)); - }); + .flatMap(session -> memoryStoreConversationService.getMessages(session.sessionId()).collectList() + .map(conversationContextMapper::toTextFromMessages) + .defaultIfEmpty("") + .flatMap(conversationHistory -> { + return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) + .flatMap(notificationId -> memoryStoreNotificationService + .getCachedNotificationSession(notificationId)) + .map(notificationSession -> notificationSession.notificaciones().stream() + .filter(notification -> "active".equalsIgnoreCase(notification.status())) + .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) + .orElse(null)) + .filter(Objects::nonNull) + .flatMap((NotificationDTO notification) -> { + String notificationText = notificationContextMapper.toText(notification); + String classification = messageEntryFilter.classifyMessage(userMessageText, + notificationText, conversationHistory); + if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { + return startNotificationConversation(context, request, notification); + } else { + return continueConversationFlow(context, request); + } + }) + .switchIfEmpty(continueConversationFlow(context, request)); + })) + .switchIfEmpty(continueConversationFlow(context, request)); } private Mono continueConversationFlow(ConversationContext context, @@ -225,15 +214,19 @@ public class ConversationManagerService { .orElse(null)) .filter(Objects::nonNull) .flatMap((NotificationDTO notification) -> { - String conversationHistory = conversationContextMapper.toText(session); - String notificationText = notificationContextMapper.toText(notification); - String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, - conversationHistory); - if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { - return startNotificationConversation(context, request, notification); - } else { - return proceedWithConversation(context, request, session); - } + return memoryStoreConversationService.getMessages(session.sessionId()).collectList() + .map(conversationContextMapper::toTextFromMessages) + .defaultIfEmpty("") + .flatMap(conversationHistory -> { + String notificationText = notificationContextMapper.toText(notification); + String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, + conversationHistory); + if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { + return startNotificationConversation(context, request, notification); + } else { + return proceedWithConversation(context, request, session); + } + }); }) .switchIfEmpty(proceedWithConversation(context, request, session)); } @@ -250,27 +243,40 @@ public class ConversationManagerService { logger.info( "Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.", session.sessionId()); - String conversationHistory = conversationContextMapper.toTextWithLimits(session); - DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); - return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), - context.primaryPhoneNumber(), false); + return memoryStoreConversationService.getMessages(session.sessionId()).collectList() + .map(conversationContextMapper::toTextFromMessages) + .defaultIfEmpty("") + .flatMap(conversationHistory -> { + DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); + return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), + context.primaryPhoneNumber(), false); + }); } } private Mono fullLookupAndProcess(ConversationSessionDTO oldSession, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) { return firestoreConversationService.getSessionByTelefono(userPhoneNumber) - .map(conversationContextMapper::toTextWithLimits) - .defaultIfEmpty("") - .flatMap(conversationHistory -> { + .flatMap(session -> firestoreConversationService.getMessages(session.sessionId()).collectList() + .map(conversationContextMapper::toTextFromMessages) + .defaultIfEmpty("") + .flatMap(conversationHistory -> { + String newSessionId = SessionIdGenerator.generateStandardSessionId(); + logger.info("Creating new session {} after full lookup.", newSessionId); + ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, + userPhoneNumber); + DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); + return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, + true); + })) + .switchIfEmpty(Mono.defer(() -> { String newSessionId = SessionIdGenerator.generateStandardSessionId(); logger.info("Creating new session {} after full lookup.", newSessionId); ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber); - DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); - return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber, + return processDialogflowRequest(newSession, request, userId, userMessageText, userPhoneNumber, true); - }); + })); } private Mono processDialogflowRequest(ConversationSessionDTO session, @@ -280,7 +286,7 @@ public class ConversationManagerService { ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText); - return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber) + return this.persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry)) .doOnSuccess(v -> logger.debug( "User entry successfully persisted for session {}. Proceeding to Dialogflow...", finalSessionId)) @@ -292,7 +298,7 @@ public class ConversationManagerService { "Received Dialogflow CX response for session {}. Initiating agent response persistence.", finalSessionId); ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); - return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber) + return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry)) .thenReturn(response); }) .doOnError( @@ -315,85 +321,90 @@ public class ConversationManagerService { })) .flatMap(session -> { final String sessionId = session.sessionId(); - String conversationHistory = conversationContextMapper.toTextWithLimits(session); - String notificationText = notificationContextMapper.toText(notification); + return memoryStoreConversationService.getMessages(sessionId).collectList() + .map(conversationContextMapper::toTextFromMessages) + .defaultIfEmpty("") + .flatMap(conversationHistory -> { + String notificationText = notificationContextMapper.toText(notification); - Map filteredParams = notification.parametros().entrySet().stream() - .filter(entry -> entry.getKey().startsWith("notification_po_")) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map filteredParams = notification.parametros().entrySet().stream() + .filter(entry -> entry.getKey().startsWith("notification_po_")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - String resolvedContext = notificationContextResolver.resolveContext(userMessageText, - notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, - userPhoneNumber); + String resolvedContext = notificationContextResolver.resolveContext(userMessageText, + notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, + userPhoneNumber); - if (!resolvedContext.trim().toUpperCase().contains(NotificationContextResolver.CATEGORY_DIALOGFLOW)) { - String uuid = UUID.randomUUID().toString(); - llmResponseTunerService.setValue(uuid, resolvedContext).subscribe(); + if (!resolvedContext.trim().toUpperCase().contains(NotificationContextResolver.CATEGORY_DIALOGFLOW)) { + String uuid = UUID.randomUUID().toString(); + llmResponseTunerService.setValue(uuid, resolvedContext).subscribe(); - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, - notification.parametros()); - ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, - notification.parametros()); + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, + notification.parametros()); + ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, + notification.parametros()); - return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber) - .then(persistConversationTurn(userId, sessionId, llmEntry, userPhoneNumber)) - .then(Mono.defer(() -> { - EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED"); - QueryInputDTO queryInput = new QueryInputDTO(null, eventInput, - request.queryInput().languageCode()); - DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput, - request.queryParams()) - .withParameter("llm_reponse_uuid", uuid); - - return dialogflowServiceClient.detectIntent(sessionId, newRequest) - .flatMap(response -> { - ConversationEntryDTO agentEntry = ConversationEntryDTO - .forAgent(response.queryResult()); - return persistConversationTurn(userId, sessionId, agentEntry, - userPhoneNumber) - .thenReturn(response); - }); - })); - } else { - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, - notification.parametros()); + return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry)) + .then(persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(llmEntry))) + .then(Mono.defer(() -> { + EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED"); + QueryInputDTO queryInput = new QueryInputDTO(null, eventInput, + request.queryInput().languageCode()); + DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput, + request.queryParams()) + .withParameter("llm_reponse_uuid", uuid); + + return dialogflowServiceClient.detectIntent(sessionId, newRequest) + .flatMap(response -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO + .forAgent(response.queryResult()); + return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry)) + .thenReturn(response); + }); + })); + } else { + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, + notification.parametros()); - DetectIntentRequestDTO finalRequest; - Instant now = Instant.now(); - if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { - finalRequest = request.withParameters(notification.parametros()); - } else { - finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) - .withParameters(notification.parametros()); - } - return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber) - .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) - .flatMap(response -> { - ConversationEntryDTO agentEntry = ConversationEntryDTO - .forAgent(response.queryResult()); - return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber) - .thenReturn(response); - })); - } + DetectIntentRequestDTO finalRequest; + Instant now = Instant.now(); + if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { + finalRequest = request.withParameters(notification.parametros()); + } else { + finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) + .withParameters(notification.parametros()); + } + return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry)) + .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) + .flatMap(response -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO + .forAgent(response.queryResult()); + return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry)) + .thenReturn(response); + })); + } + }); }); } - private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, - String userPhoneNumber) { - logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, - entry.type().name()); - return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + private Mono persistConversationTurn(ConversationSessionDTO session, ConversationMessageDTO message) { + logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", session.sessionId(), + message.type().name()); + ConversationSessionDTO updatedSession = session.withLastMessage(message.text()); + return memoryStoreConversationService.saveSession(updatedSession) + .then(memoryStoreConversationService.saveMessage(session.sessionId(), message)) .doOnSuccess(v -> logger.info( "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", - sessionId, entry.type().name())) - .then(firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + session.sessionId(), message.type().name())) + .then(firestoreConversationService.saveSession(updatedSession) + .then(firestoreConversationService.saveMessage(session.sessionId(), message)) .doOnSuccess(fsVoid -> logger.debug( "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", - sessionId, entry.type().name())) + session.sessionId(), message.type().name())) .doOnError(fsError -> logger.error( "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", - sessionId, entry.type().name(), fsError.getMessage(), fsError))) - .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, - entry.type().name(), e.getMessage(), e)); + session.sessionId(), message.type().name(), fsError.getMessage(), fsError))) + .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", session.sessionId(), + message.type().name(), e.getMessage(), e)); } } \ No newline at end of file diff --git a/src/main/java/com/example/service/conversation/FirestoreConversationService.java b/src/main/java/com/example/service/conversation/FirestoreConversationService.java index 7c09ebf..70753c3 100644 --- a/src/main/java/com/example/service/conversation/FirestoreConversationService.java +++ b/src/main/java/com/example/service/conversation/FirestoreConversationService.java @@ -5,86 +5,76 @@ package com.example.service.conversation; -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.exception.FirestorePersistenceException; +import com.example.mapper.conversation.ConversationMessageMapper; import com.example.mapper.conversation.FirestoreConversationMapper; import com.example.repository.FirestoreBaseRepository; import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.DocumentSnapshot; -import com.google.cloud.firestore.WriteBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; -/** - * Service for managing conversation sessions in Firestore. - * It handles the persistence of conversation entries, either by creating - * a new document for a new session or appending an entry to an existing - * session document using a Firestore batch. The service also provides - * methods for retrieving a complete conversation session from Firestore. - */ @Service public class FirestoreConversationService { private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationService.class); private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations"; + private static final String MESSAGES_SUBCOLLECTION = "messages"; private final FirestoreBaseRepository firestoreBaseRepository; private final FirestoreConversationMapper firestoreConversationMapper; + private final ConversationMessageMapper conversationMessageMapper; - public FirestoreConversationService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper) { + public FirestoreConversationService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper, ConversationMessageMapper conversationMessageMapper) { this.firestoreBaseRepository = firestoreBaseRepository; this.firestoreConversationMapper = firestoreConversationMapper; + this.conversationMessageMapper = conversationMessageMapper; } - public Mono saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) { - return saveEntry(userId, sessionId, newEntry, userPhoneNumber, null); - } - - public Mono saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber, String pantallaContexto) { - logger.info("Attempting to save conversation entry to Firestore for session {}. Entity: {}", sessionId, newEntry.entity().name()); + public Mono saveSession(ConversationSessionDTO session) { return Mono.fromRunnable(() -> { - DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); - WriteBatch batch = firestoreBaseRepository.createBatch(); - + DocumentReference sessionDocRef = getSessionDocumentReference(session.sessionId()); try { - DocumentSnapshot documentSnapshot = firestoreBaseRepository.getDocumentSnapshot(sessionDocRef); - - if (documentSnapshot != null && documentSnapshot.exists()) { - // Update: Append the new entry using arrayUnion and update lastModified - Map updates = firestoreConversationMapper.createUpdateMapForSingleEntry(newEntry); - if (pantallaContexto != null) { - updates.put("pantallaContexto", pantallaContexto); - } - batch.update(sessionDocRef, updates); - logger.info("Appending entry to existing conversation session for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name()); - } else { - // Create: Start a new session with the first entry. - // Pass userId and userPhoneNumber to the mapper to be stored as fields in the document. - Map newSessionMap = firestoreConversationMapper.createNewSessionMapForSingleEntry(sessionId, userId, userPhoneNumber, newEntry, pantallaContexto); - batch.set(sessionDocRef, newSessionMap); - logger.info("Creating new conversation session with first entry for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name()); - } - firestoreBaseRepository.commitBatch(batch); - logger.info("Successfully committed batch for session {} to Firestore.", sessionId); - } catch (ExecutionException e) { - logger.error("Error saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e); - throw new FirestorePersistenceException("Failed to save conversation entry to Firestore for session " + sessionId, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Thread interrupted while saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e); - throw new FirestorePersistenceException("Saving conversation entry was interrupted for session " + sessionId, e); + firestoreBaseRepository.setDocument(sessionDocRef, firestoreConversationMapper.createSessionMap(session)); + } catch (ExecutionException | InterruptedException e) { + handleException(e, session.sessionId()); } }).subscribeOn(Schedulers.boundedElastic()).then(); } - public Mono getConversationSession(String userId, String sessionId) { - logger.info("Attempting to retrieve conversation session for session {} (user ID {} for context).", sessionId, userId); + public Mono saveMessage(String sessionId, ConversationMessageDTO message) { + return Mono.fromRunnable(() -> { + DocumentReference messageDocRef = getSessionDocumentReference(sessionId).collection(MESSAGES_SUBCOLLECTION).document(); + try { + firestoreBaseRepository.setDocument(messageDocRef, conversationMessageMapper.toMap(message)); + } catch (ExecutionException | InterruptedException e) { + handleException(e, sessionId); + } + }).subscribeOn(Schedulers.boundedElastic()).then(); + } + + public Flux getMessages(String sessionId) { + String messagesPath = getConversationCollectionPath() + "/" + sessionId + "/" + MESSAGES_SUBCOLLECTION; + return firestoreBaseRepository.getDocuments(messagesPath) + .map(documentSnapshot -> { + if (documentSnapshot != null && documentSnapshot.exists()) { + return conversationMessageMapper.fromMap(documentSnapshot.getData()); + } + return null; + }) + .filter(Objects::nonNull); + } + + public Mono getConversationSession(String sessionId) { + logger.info("Attempting to retrieve conversation session for session {}.", sessionId); return Mono.fromCallable(() -> { DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); try { @@ -95,16 +85,16 @@ public class FirestoreConversationService { return sessionDTO; } logger.info("Conversation session not found for session {}.", sessionId); - return null; // Or Mono.empty() if this method returned Mono> + return null; } catch (InterruptedException | ExecutionException e) { - logger.error("Error retrieving conversation session from Firestore for session {}: {}", sessionId, e.getMessage(), e); - throw new FirestorePersistenceException("Failed to retrieve conversation session from Firestore for session " + sessionId, e); + handleException(e, sessionId); + return null; } }).subscribeOn(Schedulers.boundedElastic()); } public Mono getSessionByTelefono(String userPhoneNumber) { - return firestoreBaseRepository.getDocumentsByField(getConversationCollectionPath(), "userPhoneNumber", userPhoneNumber) + return firestoreBaseRepository.getDocumentsByField(getConversationCollectionPath(), "telefono", userPhoneNumber) .map(documentSnapshot -> { if (documentSnapshot != null && documentSnapshot.exists()) { ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot); @@ -115,7 +105,18 @@ public class FirestoreConversationService { }); } - + public Mono deleteSession(String sessionId) { + logger.info("Attempting to delete conversation session for session {}.", sessionId); + return Mono.fromRunnable(() -> { + DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); + try { + firestoreBaseRepository.deleteDocument(sessionDocRef); + logger.info("Successfully deleted conversation session for session {}.", sessionId); + } catch (InterruptedException | ExecutionException e) { + handleException(e, sessionId); + } + }).subscribeOn(Schedulers.boundedElastic()).then(); + } private String getConversationCollectionPath() { return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId()); @@ -125,4 +126,12 @@ public class FirestoreConversationService { String collectionPath = getConversationCollectionPath(); return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId); } + + private void handleException(Exception e, String sessionId) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + logger.error("Error processing Firestore operation for session {}: {}", sessionId, e.getMessage(), e); + throw new FirestorePersistenceException("Failed to process Firestore operation for session " + sessionId, e); + } } \ No newline at end of file diff --git a/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java b/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java index b45c3ee..2948364 100644 --- a/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java +++ b/src/main/java/com/example/service/conversation/MemoryStoreConversationService.java @@ -4,96 +4,69 @@ */ package com.example.service.conversation; + +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.stereotype.Service; -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; -/** - * Service for managing conversation sessions using a memory store (Redis). - * It caches and retrieves `ConversationSessionDTO` objects, maintaining a mapping - * from a user's phone number to their active session ID. This service uses - * a time-to-live (TTL) to manage session expiration and provides a fast - * reactive interface for persisting new conversation entries and fetching sessions. - */ @Service public class MemoryStoreConversationService { private static final Logger logger = LoggerFactory.getLogger(MemoryStoreConversationService.class); private static final String SESSION_KEY_PREFIX = "conversation:session:"; private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:"; + private static final String MESSAGES_KEY_PREFIX = "conversation:messages:"; private static final Duration SESSION_TTL = Duration.ofDays(30); private final ReactiveRedisTemplate redisTemplate; private final ReactiveRedisTemplate stringRedisTemplate; + private final ReactiveRedisTemplate messageRedisTemplate; @Autowired public MemoryStoreConversationService( ReactiveRedisTemplate redisTemplate, ReactiveRedisTemplate stringRedisTemplate, - FirestoreConversationService firestoreConversationService) { + ReactiveRedisTemplate messageRedisTemplate) { this.redisTemplate = redisTemplate; this.stringRedisTemplate = stringRedisTemplate; + this.messageRedisTemplate = messageRedisTemplate; } - public Mono saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) { - return saveEntry(userId, sessionId, newEntry, userPhoneNumber, null); + public Mono saveMessage(String sessionId, ConversationMessageDTO message) { + String messagesKey = MESSAGES_KEY_PREFIX + sessionId; + return messageRedisTemplate.opsForList().rightPush(messagesKey, message).then(); } - public Mono saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber, String pantallaContexto) { - String sessionKey = SESSION_KEY_PREFIX + sessionId; - String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber; - - logger.info("Attempting to save entry to Memorystore for session {}. Entity: {}", sessionId, newEntry.entity().name()); - - return redisTemplate.opsForValue().get(sessionKey) - .doOnSuccess(session -> { - if (session != null) { - logger.info("Found existing session in Memorystore: {}", sessionKey); - } else { - logger.info("No session found in Memorystore for key: {}", sessionKey); - } - }) - .switchIfEmpty(Mono.defer(() -> { - logger.info("Creating new session {} in Memorystore with TTL.", sessionId); - ConversationSessionDTO newSession = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber); - return redisTemplate.opsForValue().set(sessionKey, newSession, SESSION_TTL) - .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL)) - .thenReturn(newSession); - })) - .flatMap(session -> { - ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber); - ConversationSessionDTO sessionWithPantallaContexto = (pantallaContexto != null) ? sessionWithUpdatedTelefono.withPantallaContexto(pantallaContexto) : sessionWithUpdatedTelefono; - ConversationSessionDTO updatedSession = sessionWithPantallaContexto.withAddedEntry(newEntry); - - logger.info("Attempting to set updated session {} with new entry entity {} in Redis.", sessionId, newEntry.entity().name()); - - return redisTemplate.opsForValue().set(sessionKey, updatedSession) - .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId)) - .then(); - }) - .doOnSuccess(success -> { - logger.info("Successfully saved updated session and phone mapping to Redis for session {}. Entity Type: {}", sessionId, newEntry.entity().name()); - }) - .doOnError(e -> logger.error("Error appending entry to Memorystore for session {}: {}", sessionId, e.getMessage(), e)); + public Mono saveSession(ConversationSessionDTO session) { + String sessionKey = SESSION_KEY_PREFIX + session.sessionId(); + String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + session.telefono(); + return redisTemplate.opsForValue().set(sessionKey, session, SESSION_TTL) + .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, session.sessionId(), SESSION_TTL)) + .then(); } + + public Flux getMessages(String sessionId) { + String messagesKey = MESSAGES_KEY_PREFIX + sessionId; + return messageRedisTemplate.opsForList().range(messagesKey, 0, -1); + } + public Mono getSessionByTelefono(String telefono) { if (telefono == null || telefono.isBlank()) { return Mono.empty(); } String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + telefono; return stringRedisTemplate.opsForValue().get(phoneToSessionKey) - .flatMap(sessionId -> { - return redisTemplate.opsForValue().get(SESSION_KEY_PREFIX + sessionId); - }) + .flatMap(sessionId -> redisTemplate.opsForValue().get(SESSION_KEY_PREFIX + sessionId)) .doOnSuccess(session -> { if (session != null) { - logger.info("Successfully retrieved session by phone number"); + logger.info("Successfully retrieved session by phone number"); } else { logger.info("No session found in Redis for phone number."); } @@ -106,4 +79,12 @@ public class MemoryStoreConversationService { logger.info("Attempting to update session {} in Memorystore.", session.sessionId()); return redisTemplate.opsForValue().set(sessionKey, session).then(); } + + public Mono deleteSession(String sessionId) { + String sessionKey = SESSION_KEY_PREFIX + sessionId; + String messagesKey = MESSAGES_KEY_PREFIX + sessionId; + logger.info("Deleting session {} from Memorystore.", sessionId); + return redisTemplate.opsForValue().delete(sessionKey) + .then(messageRedisTemplate.opsForList().delete(messagesKey)).then(); + } } \ No newline at end of file diff --git a/src/main/java/com/example/service/notification/FirestoreNotificationConvService.java b/src/main/java/com/example/service/notification/FirestoreNotificationConvService.java deleted file mode 100644 index be46ecd..0000000 --- a/src/main/java/com/example/service/notification/FirestoreNotificationConvService.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. - * Your use of it is subject to your agreement with Google. - */ - -package com.example.service.notification; - -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; -import com.example.dto.dialogflow.conversation.ConversationSessionDTO; -import com.example.exception.FirestorePersistenceException; -import com.example.mapper.conversation.FirestoreConversationMapper; -import com.example.repository.FirestoreBaseRepository; -import com.google.cloud.firestore.DocumentReference; -import com.google.cloud.firestore.DocumentSnapshot; -import com.google.cloud.firestore.WriteBatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -import java.util.Map; -import java.util.concurrent.ExecutionException; - -/** - * Service for managing notification conversation sessions in Firestore. - * It handles the persistence of conversation entries, either by creating - * a new document for a new session or appending an entry to an existing - * session document using a Firestore batch. The service also provides - * methods for retrieving a complete conversation session from Firestore. - */ -@Service -public class FirestoreNotificationConvService { - - private static final Logger logger = LoggerFactory.getLogger(FirestoreNotificationConvService.class); - private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversation-notifications"; - private final FirestoreBaseRepository firestoreBaseRepository; - private final FirestoreConversationMapper firestoreConversationMapper; - - public FirestoreNotificationConvService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper) { - this.firestoreBaseRepository = firestoreBaseRepository; - this.firestoreConversationMapper = firestoreConversationMapper; - } - - public Mono saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) { - logger.info("Attempting to save conversation entry to Firestore for session {}. Entity: {}", sessionId, newEntry.entity().name()); - return Mono.fromRunnable(() -> { - DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); - // Synchronize on the session ID to prevent race conditions when creating a new session. - synchronized (sessionId.intern()) { - WriteBatch batch = firestoreBaseRepository.createBatch(); - try { - if (firestoreBaseRepository.documentExists(sessionDocRef)) { - // Update: Append the new entry using arrayUnion and update lastModified - Map updates = firestoreConversationMapper.createUpdateMapForSingleEntry(newEntry); - batch.update(sessionDocRef, updates); - logger.info("Appending entry to existing conversation session for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name()); - } else { - // Create: Start a new session with the first entry. - // Pass userId and userPhoneNumber to the mapper to be stored as fields in the document. - Map newSessionMap = firestoreConversationMapper.createNewSessionMapForSingleEntry(sessionId, userId, userPhoneNumber, newEntry); - batch.set(sessionDocRef, newSessionMap); - logger.info("Creating new conversation session with first entry for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name()); - } - firestoreBaseRepository.commitBatch(batch); - logger.info("Successfully committed batch for session {} to Firestore.", sessionId); - } catch (ExecutionException e) { - logger.error("Error saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e); - throw new FirestorePersistenceException("Failed to save conversation entry to Firestore for session " + sessionId, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Thread interrupted while saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e); - throw new FirestorePersistenceException("Saving conversation entry was interrupted for session " + sessionId, e); - } - } - }).subscribeOn(Schedulers.boundedElastic()).then(); - } - - public Mono getConversationSession(String userId, String sessionId) { - logger.info("Attempting to retrieve conversation session for session {} (user ID {} for context).", sessionId, userId); - return Mono.fromCallable(() -> { - DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); - try { - DocumentSnapshot documentSnapshot = firestoreBaseRepository.getDocumentSnapshot(sessionDocRef); - if (documentSnapshot != null && documentSnapshot.exists()) { - ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot); - logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionId); - return sessionDTO; - } - logger.info("Conversation session not found for session {}.", sessionId); - return null; // Or Mono.empty() if this method returned Mono> - } catch (InterruptedException | ExecutionException e) { - logger.error("Error retrieving conversation session from Firestore for session {}: {}", sessionId, e.getMessage(), e); - throw new FirestorePersistenceException("Failed to retrieve conversation session from Firestore for session " + sessionId, e); - } - }).subscribeOn(Schedulers.boundedElastic()); - } - - private String getConversationCollectionPath() { - return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId()); - } - - private DocumentReference getSessionDocumentReference(String sessionId) { - String collectionPath = getConversationCollectionPath(); - return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId); - } -} \ No newline at end of file diff --git a/src/main/java/com/example/service/notification/FirestoreNotificationService.java b/src/main/java/com/example/service/notification/FirestoreNotificationService.java index b741521..1fbb427 100644 --- a/src/main/java/com/example/service/notification/FirestoreNotificationService.java +++ b/src/main/java/com/example/service/notification/FirestoreNotificationService.java @@ -167,4 +167,22 @@ public class FirestoreNotificationService { .subscribeOn(Schedulers.boundedElastic()) .then(); } + + public Mono deleteNotification(String notificationId) { + logger.info("Attempting to delete notification session {} from Firestore.", notificationId); + return Mono.fromRunnable(() -> { + try { + DocumentReference notificationDocRef = getNotificationDocumentReference(notificationId); + firestoreBaseRepository.deleteDocument(notificationDocRef); + logger.info("Successfully deleted notification session {} from Firestore.", notificationId); + } catch (ExecutionException e) { + logger.error("Error deleting notification session {} from Firestore: {}", notificationId, e.getMessage(), e); + throw new FirestorePersistenceException("Failed to delete notification session " + notificationId, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Thread interrupted while deleting notification session {} from Firestore: {}", notificationId, e.getMessage(), e); + throw new FirestorePersistenceException("Deleting notification session was interrupted for " + notificationId, e); + } + }).subscribeOn(Schedulers.boundedElastic()).then(); + } } \ No newline at end of file diff --git a/src/main/java/com/example/service/notification/MemoryStoreNotificationService.java b/src/main/java/com/example/service/notification/MemoryStoreNotificationService.java index 4475a0f..2d2cd0b 100644 --- a/src/main/java/com/example/service/notification/MemoryStoreNotificationService.java +++ b/src/main/java/com/example/service/notification/MemoryStoreNotificationService.java @@ -5,8 +5,6 @@ package com.example.service.notification; -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; -import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.dto.dialogflow.notification.NotificationSessionDTO; import com.fasterxml.jackson.databind.ObjectMapper; @@ -26,26 +24,17 @@ public class MemoryStoreNotificationService { private static final Logger logger = LoggerFactory.getLogger(MemoryStoreNotificationService.class); private final ReactiveRedisTemplate notificationRedisTemplate; - private final ReactiveRedisTemplate conversationRedisTemplate; private final ReactiveRedisTemplate stringRedisTemplate; - private final FirestoreNotificationConvService firestoreNotificationConvService; private static final String NOTIFICATION_KEY_PREFIX = "notification:"; private static final String PHONE_TO_NOTIFICATION_SESSION_KEY_PREFIX = "notification:phone_to_notification:"; - private static final String CONVERSATION_SESSION_KEY_PREFIX = "conversation-notification:session:"; - private static final String PHONE_TO_CONVERSATION_SESSION_KEY_PREFIX = "conversation-notification:phone_to_session:"; private final Duration notificationTtl = Duration.ofDays(30); public MemoryStoreNotificationService( ReactiveRedisTemplate notificationRedisTemplate, - ReactiveRedisTemplate conversationRedisTemplate, ReactiveRedisTemplate stringRedisTemplate, - FirestoreNotificationConvService firestoreNotificationConvService, ObjectMapper objectMapper) { this.notificationRedisTemplate = notificationRedisTemplate; - this.conversationRedisTemplate = conversationRedisTemplate; this.stringRedisTemplate = stringRedisTemplate; - this.firestoreNotificationConvService = firestoreNotificationConvService; - } public Mono saveOrAppendNotificationEntry(NotificationDTO newEntry) { @@ -120,42 +109,12 @@ public class MemoryStoreNotificationService { e.getMessage(), e)); } - public Mono saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) { - String sessionKey = CONVERSATION_SESSION_KEY_PREFIX + sessionId; - String phoneToSessionKey = PHONE_TO_CONVERSATION_SESSION_KEY_PREFIX + userPhoneNumber; - logger.info("Attempting to save entry to Redis for session {}. Entity: {}", sessionId, newEntry.entity().name()); - return conversationRedisTemplate.opsForValue().get(sessionKey) - .defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber)) - .flatMap(session -> { - ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber); - ConversationSessionDTO updatedSession = sessionWithUpdatedTelefono.withAddedEntry(newEntry); - logger.info("Attempting to set updated session {} with new entry entity {} in Redis.", sessionId, newEntry.entity().name()); - return conversationRedisTemplate.opsForValue().set(sessionKey, updatedSession, notificationTtl) - .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, notificationTtl)) - .then(); - }) - .doOnSuccess(success -> logger.info("Successfully saved updated session and phone mapping to Redis for session {}. Entity Type: {}", sessionId, newEntry.entity().name())) - .then(firestoreNotificationConvService.saveEntry(userId, sessionId, newEntry, userPhoneNumber)) - .doOnError(e -> logger.error("Error appending entry to Redis for session {}: {}", sessionId, e.getMessage(), e)); - } - - public Mono getSessionByTelefono(String telefono) { - if (telefono == null || telefono.isBlank()) { - return Mono.empty(); - } - String phoneToSessionKey = PHONE_TO_CONVERSATION_SESSION_KEY_PREFIX + telefono; - return stringRedisTemplate.opsForValue().get(phoneToSessionKey) - .flatMap(sessionId -> { - logger.debug("Found session ID {} for phone number. Retrieving session data.", sessionId); - return conversationRedisTemplate.opsForValue().get(CONVERSATION_SESSION_KEY_PREFIX + sessionId); - }) - .doOnSuccess(session -> { - if (session != null) { - logger.info("Successfully retrieved session {} by phone number.", session.sessionId()); - } else { - logger.info("No session found in Redis for phone number."); - } - }) - .doOnError(e -> logger.error("Error retrieving session by phone number: {}",e.getMessage(), e)); + public Mono deleteNotificationSession(String phoneNumber) { + String notificationKey = NOTIFICATION_KEY_PREFIX + phoneNumber; + String phoneToNotificationKey = PHONE_TO_NOTIFICATION_SESSION_KEY_PREFIX + phoneNumber; + logger.info("Deleting notification session for phone number {}.", phoneNumber); + return notificationRedisTemplate.opsForValue().delete(notificationKey) + .then(stringRedisTemplate.opsForValue().delete(phoneToNotificationKey)) + .then(); } } \ No newline at end of file diff --git a/src/main/java/com/example/service/notification/NotificationManagerService.java b/src/main/java/com/example/service/notification/NotificationManagerService.java index 09cd755..49fd852 100644 --- a/src/main/java/com/example/service/notification/NotificationManagerService.java +++ b/src/main/java/com/example/service/notification/NotificationManagerService.java @@ -9,8 +9,10 @@ import com.example.dto.dialogflow.notification.ExternalNotRequestDTO; import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.conversation.ConversationEntryDTO; +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.notification.NotificationDTO; +import com.example.mapper.conversation.ConversationEntryMapper; import com.example.mapper.notification.ExternalNotRequestMapper; import com.example.service.base.DialogflowClientService; import com.example.service.conversation.DataLossPrevention; @@ -42,6 +44,7 @@ public class NotificationManagerService { private final FirestoreConversationService firestoreConversationService; private final DataLossPrevention dataLossPrevention; private final String dlpTemplateCompleteFlow; + private final ConversationEntryMapper conversationEntryMapper; @Value("${dialogflow.default-language-code:es}") private String defaultLanguageCode; @@ -55,6 +58,7 @@ public class NotificationManagerService { ExternalNotRequestMapper externalNotRequestMapper, DataLossPrevention dataLossPrevention, + ConversationEntryMapper conversationEntryMapper, @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { this.dialogflowClientService = dialogflowClientService; @@ -65,6 +69,7 @@ public class NotificationManagerService { this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; this.memoryStoreConversationService = memoryStoreConversationService; this.firestoreConversationService = firestoreConversationService; + this.conversationEntryMapper = conversationEntryMapper; } public Mono processNotification(ExternalNotRequestDTO externalRequest) { @@ -115,7 +120,7 @@ public class NotificationManagerService { } ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), prefixedParameters); - return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono) + return persistConversationTurn(session, systemEntry) .thenReturn(session); }) .switchIfEmpty(Mono.defer(() -> { @@ -130,8 +135,9 @@ public class NotificationManagerService { } ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), prefixedParameters); - return persistConversationTurn(userId, newSessionId, systemEntry, telefono) - .then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono))); + ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, telefono); + return persistConversationTurn(newSession, systemEntry) + .then(Mono.just(newSession)); })); return persistenceMono.then(sessionMono) @@ -149,27 +155,30 @@ public class NotificationManagerService { }); } - private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, - String userPhoneNumber) { - logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, + private Mono persistConversationTurn(ConversationSessionDTO session, ConversationEntryDTO entry) { + logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", session.sessionId(), entry.type().name()); + ConversationMessageDTO message = conversationEntryMapper.toConversationMessageDTO(entry); + ConversationSessionDTO updatedSession = session.withLastMessage(message.text()); - return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + return memoryStoreConversationService.saveSession(updatedSession) + .then(memoryStoreConversationService.saveMessage(session.sessionId(), message)) .doOnSuccess(v -> { logger.info( "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", - sessionId, entry.type().name()); + session.sessionId(), entry.type().name()); - firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) + firestoreConversationService.saveSession(updatedSession) + .then(firestoreConversationService.saveMessage(session.sessionId(), message)) .subscribe( fsVoid -> logger.debug( "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", - sessionId, entry.type().name()), + session.sessionId(), entry.type().name()), fsError -> logger.error( "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", - sessionId, entry.type().name(), fsError.getMessage(), fsError)); + session.sessionId(), entry.type().name(), fsError.getMessage(), fsError)); }) - .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, + .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", session.sessionId(), entry.type().name(), e.getMessage(), e)); } } \ No newline at end of file diff --git a/src/main/java/com/example/service/quickreplies/MemoryStoreQRService.java b/src/main/java/com/example/service/quickreplies/MemoryStoreQRService.java index 8398495..0d02ef8 100644 --- a/src/main/java/com/example/service/quickreplies/MemoryStoreQRService.java +++ b/src/main/java/com/example/service/quickreplies/MemoryStoreQRService.java @@ -6,7 +6,9 @@ package com.example.service.quickreplies; import com.example.dto.dialogflow.conversation.ConversationEntryDTO; +import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; +import com.example.mapper.conversation.ConversationEntryMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -21,22 +23,30 @@ public class MemoryStoreQRService { private static final Logger logger = LoggerFactory.getLogger(MemoryStoreQRService.class); private static final String SESSION_KEY_PREFIX = "qr:session:"; private static final String PHONE_TO_SESSION_KEY_PREFIX = "qr:phone_to_session:"; + private static final String MESSAGES_KEY_PREFIX = "qr:messages:"; private static final Duration SESSION_TTL = Duration.ofHours(24); private final ReactiveRedisTemplate redisTemplate; private final ReactiveRedisTemplate stringRedisTemplate; + private final ReactiveRedisTemplate messageRedisTemplate; + private final ConversationEntryMapper conversationEntryMapper; @Autowired public MemoryStoreQRService( ReactiveRedisTemplate redisTemplate, - ReactiveRedisTemplate stringRedisTemplate) { + ReactiveRedisTemplate stringRedisTemplate, + ReactiveRedisTemplate messageRedisTemplate, + ConversationEntryMapper conversationEntryMapper) { this.redisTemplate = redisTemplate; this.stringRedisTemplate = stringRedisTemplate; + this.messageRedisTemplate = messageRedisTemplate; + this.conversationEntryMapper = conversationEntryMapper; } public Mono saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) { String sessionKey = SESSION_KEY_PREFIX + sessionId; String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber; + String messagesKey = MESSAGES_KEY_PREFIX + sessionId; logger.info("Attempting to save entry to Redis for quick reply session {}. Entity: {}", sessionId, newEntry.entity().name()); @@ -45,13 +55,15 @@ public class MemoryStoreQRService { .defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber)) .flatMap(session -> { ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber); - ConversationSessionDTO updatedSession = sessionWithUpdatedTelefono.withAddedEntry(newEntry); + ConversationSessionDTO updatedSession = sessionWithUpdatedTelefono.withLastMessage(newEntry.text()); + ConversationMessageDTO message = conversationEntryMapper.toConversationMessageDTO(newEntry); logger.info("Attempting to set updated quick reply session {} with new entry entity {} in Redis.", sessionId, newEntry.entity().name()); return redisTemplate.opsForValue().set(sessionKey, updatedSession, SESSION_TTL) .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL)) + .then(messageRedisTemplate.opsForList().rightPush(messagesKey, message)) .then(); }) .doOnSuccess(success -> { diff --git a/src/main/java/com/example/service/quickreplies/QuickRepliesManagerService.java b/src/main/java/com/example/service/quickreplies/QuickRepliesManagerService.java index 3b3c37c..809cde3 100644 --- a/src/main/java/com/example/service/quickreplies/QuickRepliesManagerService.java +++ b/src/main/java/com/example/service/quickreplies/QuickRepliesManagerService.java @@ -6,13 +6,11 @@ package com.example.service.quickreplies; import com.example.dto.dialogflow.base.DetectIntentResponseDTO; -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; -import com.example.dto.dialogflow.conversation.ConversationEntryEntity; -import com.example.dto.dialogflow.conversation.ConversationEntryType; -import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO; +import com.example.dto.dialogflow.conversation.*; import com.example.dto.quickreplies.QuickReplyScreenRequestDTO; import com.example.dto.quickreplies.QuestionDTO; import com.example.dto.quickreplies.QuickReplyDTO; +import com.example.mapper.conversation.ConversationEntryMapper; import com.example.service.conversation.FirestoreConversationService; import com.example.service.conversation.MemoryStoreConversationService; import com.example.util.SessionIdGenerator; @@ -36,16 +34,19 @@ public class QuickRepliesManagerService { private final FirestoreConversationService firestoreConversationService; private final QuickReplyContentService quickReplyContentService; private final ConversationManagerService conversationManagerService; + private final ConversationEntryMapper conversationEntryMapper; public QuickRepliesManagerService( @Lazy ConversationManagerService conversationManagerService, MemoryStoreConversationService memoryStoreConversationService, FirestoreConversationService firestoreConversationService, - QuickReplyContentService quickReplyContentService) { + QuickReplyContentService quickReplyContentService, + ConversationEntryMapper conversationEntryMapper) { this.conversationManagerService = conversationManagerService; this.memoryStoreConversationService = memoryStoreConversationService; this.firestoreConversationService = firestoreConversationService; this.quickReplyContentService = quickReplyContentService; + this.conversationEntryMapper = conversationEntryMapper; } public Mono startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) { @@ -67,8 +68,8 @@ public class QuickRepliesManagerService { "Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :", null, null); - return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber, - externalRequest.pantallaContexto()) + ConversationSessionDTO newSession = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber).withPantallaContexto(externalRequest.pantallaContexto()); + return persistConversationTurn(newSession, systemEntry) .then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto())) .map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); }); @@ -86,100 +87,96 @@ public class QuickRepliesManagerService { .switchIfEmpty(Mono.error( new IllegalStateException("No quick reply session found for phone number"))) .flatMap(session -> { - String userId = session.userId(); - String sessionId = session.sessionId(); - ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message()); + return memoryStoreConversationService.getMessages(session.sessionId()).collectList().flatMap(messages -> { + ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message()); - List entries = session.entries(); - int lastInitIndex = IntStream.range(0, entries.size()) - .map(i -> entries.size() - 1 - i) - .filter(i -> { - ConversationEntryDTO entry = entries.get(i); - return entry.entity() == ConversationEntryEntity.SISTEMA - && entry.type() == ConversationEntryType.INICIO; - }) - .findFirst() - .orElse(-1); + int lastInitIndex = IntStream.range(0, messages.size()) + .map(i -> messages.size() - 1 - i) + .filter(i -> { + ConversationMessageDTO message = messages.get(i); + return message.type() == MessageType.SYSTEM; + }) + .findFirst() + .orElse(-1); - long userMessagesCount; - if (lastInitIndex != -1) { - userMessagesCount = entries.subList(lastInitIndex + 1, entries.size()).stream() - .filter(e -> e.entity() == ConversationEntryEntity.USUARIO) - .count(); - } else { - userMessagesCount = 0; - } + long userMessagesCount; + if (lastInitIndex != -1) { + userMessagesCount = messages.subList(lastInitIndex + 1, messages.size()).stream() + .filter(e -> e.type() == MessageType.USER) + .count(); + } else { + userMessagesCount = 0; + } - if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow - // This is the second message of the flow. Return the full list. - return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, - session.pantallaContexto()) - .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) - .flatMap(quickReplyDTO -> { - ConversationEntryDTO agentEntry = ConversationEntryDTO - .forAgentWithMessage(quickReplyDTO.toString()); - return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber, - session.pantallaContexto()) - .thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); - }); - } else if (userMessagesCount == 1) { // Is the second user message in the QR flow - // This is the third message of the flow. Filter and end. - return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, - session.pantallaContexto()) - .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) - .flatMap(quickReplyDTO -> { - List matchedPreguntas = quickReplyDTO.preguntas().stream() - .filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim())) - .toList(); + if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow + // This is the second message of the flow. Return the full list. + return persistConversationTurn(session, userEntry) + .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) + .flatMap(quickReplyDTO -> { + ConversationEntryDTO agentEntry = ConversationEntryDTO + .forAgentWithMessage(quickReplyDTO.toString()); + return persistConversationTurn(session, agentEntry) + .thenReturn(new DetectIntentResponseDTO(session.sessionId(), null, quickReplyDTO)); + }); + } else if (userMessagesCount == 1) { // Is the second user message in the QR flow + // This is the third message of the flow. Filter and end. + return persistConversationTurn(session, userEntry) + .then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) + .flatMap(quickReplyDTO -> { + List matchedPreguntas = quickReplyDTO.preguntas().stream() + .filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim())) + .toList(); - if (!matchedPreguntas.isEmpty()) { - // Matched question, return the answer - String respuesta = matchedPreguntas.get(0).respuesta(); - QueryResultDTO queryResult = new QueryResultDTO(respuesta, null); - DetectIntentResponseDTO response = new DetectIntentResponseDTO(sessionId, - queryResult, null); + if (!matchedPreguntas.isEmpty()) { + // Matched question, return the answer + String respuesta = matchedPreguntas.get(0).respuesta(); + QueryResultDTO queryResult = new QueryResultDTO(respuesta, null); + DetectIntentResponseDTO response = new DetectIntentResponseDTO(session.sessionId(), + queryResult, null); - return memoryStoreConversationService - .updateSession(session.withPantallaContexto(null)) - .then(persistConversationTurn(userId, sessionId, - ConversationEntryDTO.forAgentWithMessage(respuesta), - userPhoneNumber, null)) - .thenReturn(response); - } else { - // No match, delegate to Dialogflow - return memoryStoreConversationService - .updateSession(session.withPantallaContexto(null)) - .then(conversationManagerService.manageConversation(externalRequest)); - } - }); - } else { - // Should not happen. End the flow. - return memoryStoreConversationService.updateSession(session.withPantallaContexto(null)) - .then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null, - new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList())))); - } + return memoryStoreConversationService + .updateSession(session.withPantallaContexto(null)) + .then(persistConversationTurn(session, + ConversationEntryDTO.forAgentWithMessage(respuesta))) + .thenReturn(response); + } else { + // No match, delegate to Dialogflow + return memoryStoreConversationService + .updateSession(session.withPantallaContexto(null)) + .then(conversationManagerService.manageConversation(externalRequest)); + } + }); + } else { + // Should not happen. End the flow. + return memoryStoreConversationService.updateSession(session.withPantallaContexto(null)) + .then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null, + new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList())))); + } + }); }); } - private Mono persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, - String userPhoneNumber, String pantallaContexto) { + private Mono persistConversationTurn(ConversationSessionDTO session, ConversationEntryDTO entry) { logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.", - sessionId, entry.type().name()); - return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) + session.sessionId(), entry.type().name()); + ConversationMessageDTO message = conversationEntryMapper.toConversationMessageDTO(entry); + ConversationSessionDTO updatedSession = session.withLastMessage(message.text()); + return memoryStoreConversationService.saveSession(updatedSession) + .then(memoryStoreConversationService.saveMessage(session.sessionId(), message)) .doOnSuccess(v -> logger.info( "Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.", - sessionId, entry.type().name())) - .then(firestoreConversationService - .saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) + session.sessionId(), entry.type().name())) + .then(firestoreConversationService.saveSession(updatedSession) + .then(firestoreConversationService.saveMessage(session.sessionId(), message)) .doOnSuccess(fsVoid -> logger.debug( "Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.", - sessionId, entry.type().name())) + session.sessionId(), entry.type().name())) .doOnError(fsError -> logger.error( "Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}", - sessionId, entry.type().name(), fsError.getMessage(), fsError))) + session.sessionId(), entry.type().name(), fsError.getMessage(), fsError))) .doOnError( e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}", - sessionId, entry.type().name(), e.getMessage(), e)); + session.sessionId(), entry.type().name(), e.getMessage(), e)); } } \ No newline at end of file diff --git a/src/test/java/com/example/service/conversation/ConversationManagerServiceTest.java b/src/test/java/com/example/service/conversation/ConversationManagerServiceTest.java index 8c6e936..a411275 100644 --- a/src/test/java/com/example/service/conversation/ConversationManagerServiceTest.java +++ b/src/test/java/com/example/service/conversation/ConversationManagerServiceTest.java @@ -7,10 +7,9 @@ package com.example.service.conversation; import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO; -import com.example.dto.dialogflow.conversation.ConversationContext; -import com.example.dto.dialogflow.conversation.ConversationEntryDTO; -import com.example.dto.dialogflow.conversation.ConversationSessionDTO; +import com.example.dto.dialogflow.conversation.*; import com.example.dto.dialogflow.notification.NotificationDTO; +import com.example.mapper.conversation.ConversationEntryMapper; import com.example.mapper.conversation.ExternalConvRequestMapper; import com.example.mapper.messagefilter.ConversationContextMapper; import com.example.mapper.messagefilter.NotificationContextMapper; @@ -25,6 +24,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -62,6 +62,8 @@ public class ConversationManagerServiceTest { private NotificationContextResolver notificationContextResolver; @Mock private LlmResponseTunerService llmResponseTunerService; + @Mock + private ConversationEntryMapper conversationEntryMapper; @InjectMocks private ConversationManagerService conversationManagerService; @@ -80,13 +82,19 @@ public class ConversationManagerServiceTest { NotificationDTO notification = new NotificationDTO("1", "1234567890", Instant.now(), "test text", "test_event", "es", Collections.emptyMap(), "active"); ConversationSessionDTO session = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber); - when(memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber)).thenReturn(Mono.just(session)); - when(conversationContextMapper.toTextWithLimits(session)).thenReturn("history"); + when(memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)).thenReturn(Mono.just(session)); + when(memoryStoreConversationService.getMessages(anyString())).thenReturn(Flux.empty()); + when(conversationContextMapper.toTextFromMessages(any())).thenReturn("history"); when(notificationContextMapper.toText(notification)).thenReturn("notification text"); when(notificationContextResolver.resolveContext(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString())) .thenReturn(resolvedContext); when(llmResponseTunerService.setValue(anyString(), anyString())).thenReturn(Mono.empty()); - when(memoryStoreNotificationService.saveEntry(anyString(), anyString(), any(ConversationEntryDTO.class), anyString())).thenReturn(Mono.empty()); + when(memoryStoreConversationService.saveSession(any(ConversationSessionDTO.class))).thenReturn(Mono.empty()); + when(memoryStoreConversationService.saveMessage(anyString(), any(ConversationMessageDTO.class))).thenReturn(Mono.empty()); + when(firestoreConversationService.saveSession(any(ConversationSessionDTO.class))).thenReturn(Mono.empty()); + when(firestoreConversationService.saveMessage(anyString(), any(ConversationMessageDTO.class))).thenReturn(Mono.empty()); + when(conversationEntryMapper.toConversationMessageDTO(any(ConversationEntryDTO.class))).thenReturn(new ConversationMessageDTO(MessageType.USER, Instant.now(), "text", null, null)); + when(dialogflowServiceClient.detectIntent(anyString(), any(DetectIntentRequestDTO.class))).thenReturn(Mono.just(new DetectIntentResponseDTO(sessionId, new QueryResultDTO(resolvedContext, null), null))); // When Mono result = conversationManagerService.startNotificationConversation(context, request, notification);