UPDATE 24-Oct

This commit is contained in:
PAVEL PALMA
2025-10-24 22:28:55 -06:00
parent 4169858861
commit 9fb1088f7d
23 changed files with 863 additions and 761 deletions

View File

@@ -5,11 +5,10 @@
package com.example.config; package com.example.config;
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.notification.NotificationSessionDTO; import com.example.dto.dialogflow.notification.NotificationSessionDTO;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
@@ -18,60 +17,35 @@ import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* Spring configuration class for setting up Reactive Redis(Memorystore in GCP)
* templates.
* It defines and customizes `ReactiveRedisTemplate` beans for different data
* types
* like `ConversationSessionDTO` and `NotificationDTO`, using Jackson for JSON
* serialization and ensuring proper handling of Java 8 and higher date/time
* objects.
*/
@Configuration @Configuration
public class RedisConfig { public class RedisConfig {
@Bean @Bean
public ReactiveRedisTemplate<String, ConversationSessionDTO> reactiveConversationRedisTemplate( public ReactiveRedisTemplate<String, ConversationSessionDTO> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory, ObjectMapper objectMapper) {
ReactiveRedisConnectionFactory factory) { Jackson2JsonRedisSerializer<ConversationSessionDTO> serializer = new Jackson2JsonRedisSerializer<>(objectMapper, ConversationSessionDTO.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, ConversationSessionDTO> builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
ObjectMapper objectMapper = new ObjectMapper(); RedisSerializationContext<String, ConversationSessionDTO> context = builder.value(serializer).build();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
Jackson2JsonRedisSerializer<ConversationSessionDTO> serializer = new Jackson2JsonRedisSerializer<>(
objectMapper, ConversationSessionDTO.class);
return new ReactiveRedisTemplate<>(factory, RedisSerializationContext
.<String, ConversationSessionDTO>newSerializationContext(new StringRedisSerializer())
.value(serializer)
.build());
}
@Bean
public ReactiveRedisTemplate<String, String> reactiveStringRedisTemplate(
ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, RedisSerializationContext
.<String, String>newSerializationContext(new StringRedisSerializer())
.value(new StringRedisSerializer())
.build());
}
@Bean
public ReactiveRedisTemplate<String, NotificationSessionDTO> reactiveNotificationRedisTemplate(
ReactiveRedisConnectionFactory factory) {
ObjectMapper notificationObjectMapper = new ObjectMapper();
notificationObjectMapper.registerModule(new JavaTimeModule());
notificationObjectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<NotificationSessionDTO> valueSerializer = new Jackson2JsonRedisSerializer<>(
notificationObjectMapper, NotificationSessionDTO.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, NotificationSessionDTO> builder = RedisSerializationContext
.newSerializationContext(keySerializer);
RedisSerializationContext<String, NotificationSessionDTO> context = builder.value(valueSerializer)
.build();
return new ReactiveRedisTemplate<>(factory, context); return new ReactiveRedisTemplate<>(factory, context);
} }
@Bean
public ReactiveRedisTemplate<String, NotificationSessionDTO> reactiveNotificationRedisTemplate(ReactiveRedisConnectionFactory factory, ObjectMapper objectMapper) {
Jackson2JsonRedisSerializer<NotificationSessionDTO> serializer = new Jackson2JsonRedisSerializer<>(objectMapper, NotificationSessionDTO.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, NotificationSessionDTO> builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
RedisSerializationContext<String, NotificationSessionDTO> context = builder.value(serializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
@Bean
public ReactiveRedisTemplate<String, ConversationMessageDTO> reactiveMessageRedisTemplate(ReactiveRedisConnectionFactory factory, ObjectMapper objectMapper) {
Jackson2JsonRedisSerializer<ConversationMessageDTO> serializer = new Jackson2JsonRedisSerializer<>(objectMapper, ConversationMessageDTO.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, ConversationMessageDTO> builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
RedisSerializationContext<String, ConversationMessageDTO> context = builder.value(serializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
@Bean
public ReactiveRedisTemplate<String, String> reactiveStringRedisTemplate(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, RedisSerializationContext.string());
}
} }

View File

@@ -0,0 +1,29 @@
package com.example.controller;
import com.example.service.base.DataPurgeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/v1/data-purge")
public class DataPurgeController {
private static final Logger logger = LoggerFactory.getLogger(DataPurgeController.class);
private final DataPurgeService dataPurgeService;
public DataPurgeController(DataPurgeService dataPurgeService) {
this.dataPurgeService = dataPurgeService;
}
@DeleteMapping("/all")
public Mono<Void> purgeAllData() {
logger.warn("Received request to purge all data. This is a destructive operation.");
return dataPurgeService.purgeAllData()
.doOnSuccess(voidResult -> logger.info("Successfully purged all data."))
.doOnError(error -> logger.error("Error purging all data.", error));
}
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.service.base.SessionPurgeService;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/v1/session-purge")
public class SessionPurgeController {
private static final Logger logger = LoggerFactory.getLogger(SessionPurgeController.class);
private final SessionPurgeService sessionPurgeService;
public SessionPurgeController(SessionPurgeService sessionPurgeService) {
this.sessionPurgeService = sessionPurgeService;
}
@DeleteMapping("/conversation/session/{sessionId}")
public Mono<Void> deleteSession(@PathVariable String sessionId) {
return sessionPurgeService.deleteSession(sessionId)
.doOnSuccess(voidResult -> logger.info("Successfully deleted session with id: {}", sessionId))
.doOnError(error -> logger.error("Error deleting session with id: {}", sessionId, error));
}
}

View File

@@ -0,0 +1,20 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.dto.dialogflow.conversation;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.time.Instant;
import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
public record ConversationMessageDTO(
MessageType type,
Instant timestamp,
String text,
Map<String, Object> parameters,
String canal
) {
}

View File

@@ -21,38 +21,36 @@ public record ConversationSessionDTO(
String telefono, String telefono,
Instant createdAt, Instant createdAt,
Instant lastModified, Instant lastModified,
List<ConversationEntryDTO> entries, String lastMessage,
String pantallaContexto String pantallaContexto
) { ) {
public ConversationSessionDTO(String sessionId, String userId, String telefono, Instant createdAt, Instant lastModified, List<ConversationEntryDTO> entries, String pantallaContexto) { public ConversationSessionDTO(String sessionId, String userId, String telefono, Instant createdAt, Instant lastModified, String lastMessage, String pantallaContexto) {
this.sessionId = sessionId; this.sessionId = sessionId;
this.userId = userId; this.userId = userId;
this.telefono = telefono; this.telefono = telefono;
this.createdAt = createdAt; this.createdAt = createdAt;
this.lastModified = lastModified; this.lastModified = lastModified;
this.entries = Collections.unmodifiableList(new ArrayList<>(entries)); this.lastMessage = lastMessage;
this.pantallaContexto = pantallaContexto; this.pantallaContexto = pantallaContexto;
} }
public static ConversationSessionDTO create(String sessionId, String userId, String telefono) { public static ConversationSessionDTO create(String sessionId, String userId, String telefono) {
Instant now = Instant.now(); Instant now = Instant.now();
return new ConversationSessionDTO(sessionId, userId, telefono, now, now, Collections.emptyList(), null); return new ConversationSessionDTO(sessionId, userId, telefono, now, now, null, null);
} }
public ConversationSessionDTO withAddedEntry(ConversationEntryDTO newEntry) { public ConversationSessionDTO withLastMessage(String lastMessage) {
List<ConversationEntryDTO> updatedEntries = new ArrayList<>(this.entries); return new ConversationSessionDTO(this.sessionId, this.userId, this.telefono, this.createdAt, Instant.now(), lastMessage, this.pantallaContexto);
updatedEntries.add(newEntry);
return new ConversationSessionDTO(this.sessionId, this.userId, this.telefono, this.createdAt, Instant.now(), updatedEntries, this.pantallaContexto);
} }
public ConversationSessionDTO withTelefono(String newTelefono) { public ConversationSessionDTO withTelefono(String newTelefono) {
if (newTelefono != null && !newTelefono.equals(this.telefono)) { if (newTelefono != null && !newTelefono.equals(this.telefono)) {
return new ConversationSessionDTO(this.sessionId, this.userId, newTelefono, this.createdAt, this.lastModified, this.entries, this.pantallaContexto); return new ConversationSessionDTO(this.sessionId, this.userId, newTelefono, this.createdAt, this.lastModified, this.lastMessage, this.pantallaContexto);
} }
return this; return this;
} }
public ConversationSessionDTO withPantallaContexto(String pantallaContexto) { public ConversationSessionDTO withPantallaContexto(String pantallaContexto) {
return new ConversationSessionDTO(this.sessionId, this.userId, this.telefono, this.createdAt, this.lastModified, this.entries, pantallaContexto); return new ConversationSessionDTO(this.sessionId, this.userId, this.telefono, this.createdAt, this.lastModified, this.lastMessage, pantallaContexto);
} }
} }

View File

@@ -0,0 +1,13 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.dto.dialogflow.conversation;
public enum MessageType {
USER,
AGENT,
SYSTEM,
LLM
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.mapper.conversation;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import com.example.dto.dialogflow.conversation.MessageType;
import org.springframework.stereotype.Component;
@Component
public class ConversationEntryMapper {
public ConversationMessageDTO toConversationMessageDTO(ConversationEntryDTO entry) {
MessageType type = switch (entry.entity()) {
case USUARIO -> MessageType.USER;
case AGENTE -> MessageType.AGENT;
case SISTEMA -> MessageType.SYSTEM;
case LLM -> MessageType.LLM;
};
return new ConversationMessageDTO(
type,
entry.timestamp(),
entry.text(),
entry.parameters(),
entry.canal()
);
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.mapper.conversation;
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import com.example.dto.dialogflow.conversation.MessageType;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
@Component
public class ConversationMessageMapper {
public Map<String, Object> toMap(ConversationMessageDTO message) {
Map<String, Object> map = new HashMap<>();
map.put("type", message.type().name());
map.put("timestamp", message.timestamp());
map.put("text", message.text());
if (message.parameters() != null) {
map.put("parameters", message.parameters());
}
if (message.canal() != null) {
map.put("canal", message.canal());
}
return map;
}
public ConversationMessageDTO fromMap(Map<String, Object> map) {
return new ConversationMessageDTO(
MessageType.valueOf((String) map.get("type")),
(Instant) map.get("timestamp"),
(String) map.get("text"),
(Map<String, Object>) map.get("parameters"),
(String) map.get("canal")
);
}
}

View File

@@ -1,170 +1,53 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.mapper.conversation; package com.example.mapper.conversation;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationEntryEntity;
import com.example.dto.dialogflow.conversation.ConversationEntryType;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.google.cloud.Timestamp; import com.google.cloud.Timestamp;
import com.google.cloud.firestore.FieldValue;
import com.google.cloud.firestore.DocumentSnapshot; import com.google.cloud.firestore.DocumentSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
/**
* Spring component for mapping data between a `ConversationSessionDTO` and Firestore documents.
* It provides methods to convert a DTO into a format suitable for Firestore storage
* (creating new documents or updating existing ones) and to deserialize a Firestore
* `DocumentSnapshot` back into a `ConversationSessionDTO`, handling data types
* and nested collections correctly.
*/
@Component @Component
public class FirestoreConversationMapper { public class FirestoreConversationMapper {
private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationMapper.class); public ConversationSessionDTO mapFirestoreDocumentToConversationSessionDTO(DocumentSnapshot document) {
if (document == null || !document.exists()) {
return null;
}
// Class-level constants for Firestore field names at the session level Timestamp createdAtTimestamp = document.getTimestamp("createdAt");
private static final String FIELD_SESSION_ID = "session_id"; Timestamp lastModifiedTimestamp = document.getTimestamp("lastModified");
private static final String FIELD_USER_ID = "usuario_id";
private static final String FIELD_PHONE_NUMBER = "telefono";
private static final String FIELD_CREATED_AT = "fechaCreacion";
private static final String FIELD_LAST_UPDATED = "ultimaActualizacion";
private static final String FIELD_MESSAGES = "mensajes";
private static final String FIELD_PANTALLA_CONTEXTO = "pantallaContexto";
// Constants for fields within the 'mensajes' sub-documents
private static final String FIELD_MESSAGE_ENTITY = "entidad";
private static final String FIELD_MESSAGE_TYPE = "tipo";
private static final String FIELD_MESSAGE_TEXT = "mensaje";
private static final String FIELD_MESSAGE_TIMESTAMP = "tiempo";
private static final String FIELD_MESSAGE_PARAMETERS = "parametros";
private static final String FIELD_MESSAGE_CHANNEL = "canal";
public Map<String, Object> createUpdateMapForSingleEntry(ConversationEntryDTO newEntry) { Instant createdAt = (createdAtTimestamp != null) ? createdAtTimestamp.toDate().toInstant() : null;
Map<String, Object> updates = new HashMap<>(); Instant lastModified = (lastModifiedTimestamp != null) ? lastModifiedTimestamp.toDate().toInstant() : null;
Map<String, Object> entryMap = toFirestoreEntryMap(newEntry);
updates.put(FIELD_MESSAGES, FieldValue.arrayUnion(entryMap)); return new ConversationSessionDTO(
updates.put(FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now()))); document.getString("sessionId"),
return updates; document.getString("userId"),
document.getString("telefono"),
createdAt,
lastModified,
document.getString("lastMessage"),
document.getString("pantallaContexto")
);
} }
public Map<String, Object> createNewSessionMapForSingleEntry(String sessionId, String userId, String telefono, ConversationEntryDTO initialEntry) { public Map<String, Object> createSessionMap(ConversationSessionDTO session) {
return createNewSessionMapForSingleEntry(sessionId, userId, telefono, initialEntry, null);
}
public Map<String, Object> createNewSessionMapForSingleEntry(String sessionId, String userId, String telefono, ConversationEntryDTO initialEntry, String pantallaContexto) {
Map<String, Object> sessionMap = new HashMap<>(); Map<String, Object> sessionMap = new HashMap<>();
sessionMap.put(FIELD_SESSION_ID, sessionId); sessionMap.put("sessionId", session.sessionId());
sessionMap.put(FIELD_USER_ID, userId); sessionMap.put("userId", session.userId());
sessionMap.put("telefono", session.telefono());
if (telefono != null && !telefono.trim().isEmpty()) { sessionMap.put("createdAt", session.createdAt());
sessionMap.put(FIELD_PHONE_NUMBER, telefono); sessionMap.put("lastModified", session.lastModified());
} else { sessionMap.put("lastMessage", session.lastMessage());
sessionMap.put(FIELD_PHONE_NUMBER, null); sessionMap.put("pantallaContexto", session.pantallaContexto());
}
if (pantallaContexto != null) {
sessionMap.put(FIELD_PANTALLA_CONTEXTO, pantallaContexto);
}
sessionMap.put(FIELD_CREATED_AT, Timestamp.of(java.util.Date.from(Instant.now())));
sessionMap.put(FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now())));
List<Map<String, Object>> entriesList = new ArrayList<>();
entriesList.add(toFirestoreEntryMap(initialEntry));
sessionMap.put(FIELD_MESSAGES, entriesList);
return sessionMap; return sessionMap;
} }
private Map<String, Object> toFirestoreEntryMap(ConversationEntryDTO entry) {
Map<String, Object> entryMap = new HashMap<>();
entryMap.put(FIELD_MESSAGE_ENTITY, entry.entity().name());
entryMap.put(FIELD_MESSAGE_TYPE, entry.type().name());
entryMap.put(FIELD_MESSAGE_TEXT, entry.text());
entryMap.put(FIELD_MESSAGE_TIMESTAMP, Timestamp.of(java.util.Date.from(entry.timestamp())));
if (entry.parameters() != null && !entry.parameters().isEmpty()) {
entryMap.put(FIELD_MESSAGE_PARAMETERS, entry.parameters());
}
if (entry.canal() != null) {
entryMap.put(FIELD_MESSAGE_CHANNEL, entry.canal());
}
logger.debug("Created Firestore entry map: {}", entryMap);
return entryMap;
}
public ConversationSessionDTO mapFirestoreDocumentToConversationSessionDTO(DocumentSnapshot documentSnapshot) {
if (!documentSnapshot.exists()) {
return null;
}
String sessionId = documentSnapshot.getString(FIELD_SESSION_ID);
String userId = documentSnapshot.getString(FIELD_USER_ID);
String telefono = documentSnapshot.getString(FIELD_PHONE_NUMBER);
String pantallaContexto = documentSnapshot.getString(FIELD_PANTALLA_CONTEXTO);
Timestamp createdAtFirestore = documentSnapshot.getTimestamp(FIELD_CREATED_AT);
Instant createdAt = (createdAtFirestore != null) ? createdAtFirestore.toDate().toInstant() : null;
Timestamp lastModifiedFirestore = documentSnapshot.getTimestamp(FIELD_LAST_UPDATED);
Instant lastModified = (lastModifiedFirestore != null) ? lastModifiedFirestore.toDate().toInstant() : null;
List<Map<String, Object>> rawEntries = (List<Map<String, Object>>) documentSnapshot.get(FIELD_MESSAGES);
List<ConversationEntryDTO> entries = new ArrayList<>();
if (rawEntries != null) {
entries = rawEntries.stream()
.map(this::mapFirestoreEntryMapToConversationEntryDTO)
.collect(Collectors.toList());
}
return new ConversationSessionDTO(sessionId, userId, telefono, createdAt, lastModified, entries, pantallaContexto);
}
private ConversationEntryDTO mapFirestoreEntryMapToConversationEntryDTO(Map<String, Object> entryMap) {
ConversationEntryEntity entity = null;
Object entityObj = entryMap.get(FIELD_MESSAGE_ENTITY);
if (entityObj instanceof String) {
try {
entity = ConversationEntryEntity.valueOf((String) entityObj);
} catch (IllegalArgumentException e) {
logger.warn("Unknown ConversationEntryEntity encountered: {}. Setting entity to null.", entityObj);
}
}
ConversationEntryType type = null;
Object typeObj = entryMap.get(FIELD_MESSAGE_TYPE);
if (typeObj instanceof String) {
try {
type = ConversationEntryType.valueOf((String) typeObj);
} catch (IllegalArgumentException e) {
logger.warn("Unknown ConversationEntryType encountered: {}. Setting type to null.", typeObj);
}
}
String text = (String) entryMap.get(FIELD_MESSAGE_TEXT);
Timestamp timestampFirestore = (Timestamp) entryMap.get(FIELD_MESSAGE_TIMESTAMP);
Instant timestamp = (timestampFirestore != null) ? timestampFirestore.toDate().toInstant() : null;
Map<String, Object> parameters = (Map<String, Object>) entryMap.get(FIELD_MESSAGE_PARAMETERS);
String canal = (String) entryMap.get(FIELD_MESSAGE_CHANNEL);
return new ConversationEntryDTO(entity, type, timestamp, text, parameters, canal);
}
} }

View File

@@ -1,82 +1,36 @@
/* /*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google. * Your use of it is subject to your agreement with Google.
*/ */
package com.example.mapper.messagefilter; package com.example.mapper.messagefilter;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Component @Component
public class ConversationContextMapper { public class ConversationContextMapper {
@Value("${conversation.context.message.limit:60}") private static final int MAX_MESSAGES = 10;
private int messageLimit;
@Value("${conversation.context.days.limit:30}") public String toText(ConversationSessionDTO session, List<ConversationMessageDTO> messages) {
private int daysLimit; return toTextFromMessages(messages);
public String toText(ConversationSessionDTO session) {
if (session == null || session.entries() == null || session.entries().isEmpty()) {
return "";
}
return session.entries().stream()
.map(this::formatEntry)
.collect(Collectors.joining(""));
} }
public String toTextWithLimits(ConversationSessionDTO session) { public String toTextWithLimits(ConversationSessionDTO session, List<ConversationMessageDTO> messages) {
if (session == null || session.entries() == null || session.entries().isEmpty()) { if (messages.size() > MAX_MESSAGES) {
return ""; messages = messages.subList(messages.size() - MAX_MESSAGES, messages.size());
} }
return toTextFromMessages(messages);
Instant thirtyDaysAgo = Instant.now().minus(daysLimit, ChronoUnit.DAYS);
List<ConversationEntryDTO> recentEntries = session.entries().stream()
.filter(entry -> entry.timestamp().isAfter(thirtyDaysAgo))
.sorted(Comparator.comparing(ConversationEntryDTO::timestamp).reversed())
.limit(messageLimit)
.sorted(Comparator.comparing(ConversationEntryDTO::timestamp))
.collect(Collectors.toList());
return recentEntries.stream()
.map(this::formatEntry)
.collect(Collectors.joining(""));
} }
private String formatEntry(ConversationEntryDTO entry) { public String toTextFromMessages(List<ConversationMessageDTO> messages) {
String prefix = "User: "; return messages.stream()
if (entry.entity() != null) { .map(message -> String.format("%s: %s", message.type(), message.text()))
switch (entry.entity()) { .collect(Collectors.joining("\n"));
case AGENTE:
prefix = "Agent: ";
break;
case SISTEMA:
prefix = "System: ";
break;
case USUARIO:
default:
prefix = "User: ";
break;
}
}
String text = prefix + entry.text();
if (entry.parameters() != null && !entry.parameters().isEmpty()) {
text += " " + entry.parameters().toString();
}
return text;
} }
} }

View File

@@ -17,9 +17,12 @@ import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.DocumentSnapshot; import com.google.cloud.firestore.DocumentSnapshot;
import com.google.cloud.firestore.Firestore; import com.google.cloud.firestore.Firestore;
import com.google.cloud.firestore.Query; import com.google.cloud.firestore.Query;
import com.google.cloud.firestore.QueryDocumentSnapshot;
import com.google.cloud.firestore.QuerySnapshot; import com.google.cloud.firestore.QuerySnapshot;
import com.google.cloud.firestore.WriteBatch; import com.google.cloud.firestore.WriteBatch;
import com.google.cloud.firestore.WriteResult; import com.google.cloud.firestore.WriteResult;
import com.google.cloud.firestore.CollectionReference;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@@ -27,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/** /**
@@ -114,6 +118,23 @@ public class FirestoreBaseRepository {
return future.get(); return future.get();
} }
public Flux<DocumentSnapshot> getDocuments(String collectionPath) {
return Flux.create(sink -> {
ApiFuture<QuerySnapshot> future = firestore.collection(collectionPath).get();
future.addListener(() -> {
try {
QuerySnapshot querySnapshot = future.get();
if (querySnapshot != null) {
querySnapshot.getDocuments().forEach(sink::next);
}
sink.complete();
} catch (InterruptedException | ExecutionException e) {
sink.error(e);
}
}, Runnable::run);
});
}
public Mono<DocumentSnapshot> getDocumentsByField( public Mono<DocumentSnapshot> getDocumentsByField(
String collectionPath, String fieldName, String value) { String collectionPath, String fieldName, String value) {
return Mono.fromCallable( return Mono.fromCallable(
@@ -155,6 +176,15 @@ public class FirestoreBaseRepository {
"Document updated: {} with update time: {}", docRef.getPath(), writeResult.getUpdateTime()); "Document updated: {} with update time: {}", docRef.getPath(), writeResult.getUpdateTime());
} }
public void deleteDocument(DocumentReference docRef)
throws InterruptedException, ExecutionException {
Objects.requireNonNull(docRef, "DocumentReference cannot be null.");
ApiFuture<WriteResult> future = docRef.delete();
WriteResult writeResult = future.get();
logger.debug(
"Document deleted: {} with update time: {}", docRef.getPath(), writeResult.getUpdateTime());
}
public WriteBatch createBatch() { public WriteBatch createBatch() {
return firestore.batch(); return firestore.batch();
} }
@@ -168,4 +198,26 @@ public class FirestoreBaseRepository {
public String getAppId() { public String getAppId() {
return appId; return appId;
} }
public void deleteCollection(String collectionPath, int batchSize) {
try {
CollectionReference collection = firestore.collection(collectionPath);
ApiFuture<QuerySnapshot> future = collection.limit(batchSize).get();
int deleted = 0;
// future.get() blocks on document retrieval
List<QueryDocumentSnapshot> documents = future.get().getDocuments();
while (!documents.isEmpty()) {
for (QueryDocumentSnapshot document : documents) {
document.getReference().delete();
++deleted;
}
future = collection.limit(batchSize).get();
documents = future.get().getDocuments();
}
logger.info("Deleted {} documents from collection {}", deleted, collectionPath);
} catch (Exception e) {
logger.error("Error deleting collection: " + e.getMessage(), e);
throw new RuntimeException("Error deleting collection", e);
}
}
} }

View File

@@ -0,0 +1,106 @@
package com.example.service.base;
import com.example.repository.FirestoreBaseRepository;
import com.google.cloud.firestore.CollectionReference;
import com.google.cloud.firestore.Firestore;
import com.google.cloud.firestore.QueryDocumentSnapshot;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Service
public class DataPurgeService {
private static final Logger logger = LoggerFactory.getLogger(DataPurgeService.class);
private final ReactiveRedisTemplate<String, ?> redisTemplate;
private final FirestoreBaseRepository firestoreBaseRepository;
private final Firestore firestore;
@Autowired
public DataPurgeService(
@Qualifier("reactiveRedisTemplate") ReactiveRedisTemplate<String, ?> redisTemplate,
FirestoreBaseRepository firestoreBaseRepository, Firestore firestore) {
this.redisTemplate = redisTemplate;
this.firestoreBaseRepository = firestoreBaseRepository;
this.firestore = firestore;
}
public Mono<Void> purgeAllData() {
return purgeRedis()
.then(purgeFirestore());
}
private Mono<Void> purgeRedis() {
logger.info("Starting Redis data purge.");
return redisTemplate.getConnectionFactory().getReactiveConnection().serverCommands().flushAll()
.doOnSuccess(v -> logger.info("Successfully purged all data from Redis."))
.doOnError(e -> logger.error("Error purging data from Redis.", e))
.then();
}
private Mono<Void> purgeFirestore() {
logger.info("Starting Firestore data purge.");
return Mono.fromRunnable(() -> {
try {
String appId = firestoreBaseRepository.getAppId();
String conversationsCollectionPath = String.format("artifacts/%s/conversations", appId);
String notificationsCollectionPath = String.format("artifacts/%s/notifications", appId);
// Delete 'messages' sub-collections in 'conversations'
logger.info("Deleting 'messages' sub-collections from '{}'", conversationsCollectionPath);
try {
List<QueryDocumentSnapshot> conversationDocuments = firestore.collection(conversationsCollectionPath).get().get().getDocuments();
for (QueryDocumentSnapshot document : conversationDocuments) {
String messagesCollectionPath = document.getReference().getPath() + "/messages";
logger.info("Deleting sub-collection: {}", messagesCollectionPath);
firestoreBaseRepository.deleteCollection(messagesCollectionPath, 50);
}
} catch (Exception e) {
if (e.getMessage().contains("NOT_FOUND")) {
logger.warn("Collection '{}' not found, skipping.", conversationsCollectionPath);
} else {
throw e;
}
}
// Delete the 'conversations' collection
logger.info("Deleting collection: {}", conversationsCollectionPath);
try {
firestoreBaseRepository.deleteCollection(conversationsCollectionPath, 50);
} catch (Exception e) {
if (e.getMessage().contains("NOT_FOUND")) {
logger.warn("Collection '{}' not found, skipping.", conversationsCollectionPath);
} else {
throw e;
}
}
// Delete the 'notifications' collection
logger.info("Deleting collection: {}", notificationsCollectionPath);
try {
firestoreBaseRepository.deleteCollection(notificationsCollectionPath, 50);
} catch (Exception e) {
if (e.getMessage().contains("NOT_FOUND")) {
logger.warn("Collection '{}' not found, skipping.", notificationsCollectionPath);
} else {
throw e;
}
}
logger.info("Successfully purged Firestore collections.");
} catch (Exception e) {
logger.error("Error purging Firestore collections.", e);
throw new RuntimeException("Failed to purge Firestore collections.", e);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
}

View File

@@ -0,0 +1,65 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.base;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.service.conversation.FirestoreConversationService;
import com.example.service.notification.FirestoreNotificationService;
import com.example.service.notification.MemoryStoreNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class SessionPurgeService {
private static final Logger logger = LoggerFactory.getLogger(SessionPurgeService.class);
private static final String SESSION_KEY_PREFIX = "conversation:session:";
private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:";
private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate;
private final ReactiveRedisTemplate<String, String> stringRedisTemplate;
private final FirestoreConversationService firestoreConversationService;
private final MemoryStoreNotificationService memoryStoreNotificationService;
private final FirestoreNotificationService firestoreNotificationService;
@Autowired
public SessionPurgeService(
ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate,
ReactiveRedisTemplate<String, String> stringRedisTemplate,
FirestoreConversationService firestoreConversationService,
MemoryStoreNotificationService memoryStoreNotificationService,
FirestoreNotificationService firestoreNotificationService) {
this.redisTemplate = redisTemplate;
this.stringRedisTemplate = stringRedisTemplate;
this.firestoreConversationService = firestoreConversationService;
this.memoryStoreNotificationService = memoryStoreNotificationService;
this.firestoreNotificationService = firestoreNotificationService;
}
public Mono<Void> deleteSession(String sessionId) {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
logger.info("Deleting session {} from all stores.", sessionId);
return redisTemplate.opsForValue().get(sessionKey)
.flatMap(session -> {
if (session != null && session.telefono() != null) {
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + session.telefono();
return redisTemplate.opsForValue().delete(sessionKey)
.then(stringRedisTemplate.opsForValue().delete(phoneToSessionKey))
.then(firestoreConversationService.deleteSession(sessionId))
.then(memoryStoreNotificationService.deleteNotificationSession(session.telefono()))
.then(firestoreNotificationService.deleteNotification(session.telefono()));
} else {
return redisTemplate.opsForValue().delete(sessionKey)
.then(firestoreConversationService.deleteSession(sessionId));
}
});
}
}

View File

@@ -7,13 +7,10 @@ package com.example.service.conversation;
import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.dto.dialogflow.conversation.ConversationContext; import com.example.dto.dialogflow.conversation.*;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO;
import com.example.dto.dialogflow.conversation.QueryInputDTO;
import com.example.dto.dialogflow.notification.EventInputDTO; import com.example.dto.dialogflow.notification.EventInputDTO;
import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.dto.dialogflow.notification.NotificationDTO;
import com.example.mapper.conversation.ConversationEntryMapper;
import com.example.mapper.conversation.ExternalConvRequestMapper; import com.example.mapper.conversation.ExternalConvRequestMapper;
import com.example.mapper.messagefilter.ConversationContextMapper; import com.example.mapper.messagefilter.ConversationContextMapper;
import com.example.mapper.messagefilter.NotificationContextMapper; import com.example.mapper.messagefilter.NotificationContextMapper;
@@ -38,19 +35,6 @@ import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/**
* Orchestrates the full lifecycle of a user conversation,(this is the core of
* the entire integration layer service), acting as the central point for all
* inbound requests.
* This service manages conversational state by integrating both an in-memory
* cache (for active sessions)
* and a durable database (for conversation history). It intelligently routes
* incoming messages
* to the appropriate handler, which can include a standard Dialogflow agent, a
* notification-specific flow, or a direct LLM-based response. The class also
* ensures data integrity and security by applying Data Loss Prevention (DLP)
* to all incoming user messages before they are processed.
*/
@Service @Service
public class ConversationManagerService { public class ConversationManagerService {
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
@@ -71,6 +55,7 @@ public class ConversationManagerService {
private final NotificationContextResolver notificationContextResolver; private final NotificationContextResolver notificationContextResolver;
private final LlmResponseTunerService llmResponseTunerService; private final LlmResponseTunerService llmResponseTunerService;
private final ConversationEntryMapper conversationEntryMapper;
public ConversationManagerService( public ConversationManagerService(
DialogflowClientService dialogflowServiceClient, DialogflowClientService dialogflowServiceClient,
@@ -85,6 +70,7 @@ public class ConversationManagerService {
DataLossPrevention dataLossPrevention, DataLossPrevention dataLossPrevention,
NotificationContextResolver notificationContextResolver, NotificationContextResolver notificationContextResolver,
LlmResponseTunerService llmResponseTunerService, LlmResponseTunerService llmResponseTunerService,
ConversationEntryMapper conversationEntryMapper,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
this.dialogflowServiceClient = dialogflowServiceClient; this.dialogflowServiceClient = dialogflowServiceClient;
this.firestoreConversationService = firestoreConversationService; this.firestoreConversationService = firestoreConversationService;
@@ -99,6 +85,7 @@ public class ConversationManagerService {
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
this.notificationContextResolver = notificationContextResolver; this.notificationContextResolver = notificationContextResolver;
this.llmResponseTunerService = llmResponseTunerService; this.llmResponseTunerService = llmResponseTunerService;
this.conversationEntryMapper = conversationEntryMapper;
} }
@@ -160,29 +147,31 @@ public class ConversationManagerService {
final String userMessageText = context.userMessageText(); final String userMessageText = context.userMessageText();
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber) return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toText) .flatMap(session -> memoryStoreConversationService.getMessages(session.sessionId()).collectList()
.defaultIfEmpty("") .map(conversationContextMapper::toTextFromMessages)
.flatMap(conversationHistory -> { .defaultIfEmpty("")
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber) .flatMap(conversationHistory -> {
.flatMap(notificationId -> memoryStoreNotificationService return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
.getCachedNotificationSession(notificationId)) .flatMap(notificationId -> memoryStoreNotificationService
.map(notificationSession -> notificationSession.notificaciones().stream() .getCachedNotificationSession(notificationId))
.filter(notification -> "active".equalsIgnoreCase(notification.status())) .map(notificationSession -> notificationSession.notificaciones().stream()
.max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion)) .filter(notification -> "active".equalsIgnoreCase(notification.status()))
.orElse(null)) .max(java.util.Comparator.comparing(NotificationDTO::timestampCreacion))
.filter(Objects::nonNull) .orElse(null))
.flatMap((NotificationDTO notification) -> { .filter(Objects::nonNull)
String notificationText = notificationContextMapper.toText(notification); .flatMap((NotificationDTO notification) -> {
String classification = messageEntryFilter.classifyMessage(userMessageText, String notificationText = notificationContextMapper.toText(notification);
notificationText, conversationHistory); String classification = messageEntryFilter.classifyMessage(userMessageText,
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { notificationText, conversationHistory);
return startNotificationConversation(context, request, notification); if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
} else { return startNotificationConversation(context, request, notification);
return continueConversationFlow(context, request); } else {
} return continueConversationFlow(context, request);
}) }
.switchIfEmpty(continueConversationFlow(context, request)); })
}); .switchIfEmpty(continueConversationFlow(context, request));
}))
.switchIfEmpty(continueConversationFlow(context, request));
} }
private Mono<DetectIntentResponseDTO> continueConversationFlow(ConversationContext context, private Mono<DetectIntentResponseDTO> continueConversationFlow(ConversationContext context,
@@ -225,15 +214,19 @@ public class ConversationManagerService {
.orElse(null)) .orElse(null))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.flatMap((NotificationDTO notification) -> { .flatMap((NotificationDTO notification) -> {
String conversationHistory = conversationContextMapper.toText(session); return memoryStoreConversationService.getMessages(session.sessionId()).collectList()
String notificationText = notificationContextMapper.toText(notification); .map(conversationContextMapper::toTextFromMessages)
String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText, .defaultIfEmpty("")
conversationHistory); .flatMap(conversationHistory -> {
if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) { String notificationText = notificationContextMapper.toText(notification);
return startNotificationConversation(context, request, notification); String classification = messageEntryFilter.classifyMessage(userMessageText, notificationText,
} else { conversationHistory);
return proceedWithConversation(context, request, session); if (MessageEntryFilter.CATEGORY_NOTIFICATION.equals(classification)) {
} return startNotificationConversation(context, request, notification);
} else {
return proceedWithConversation(context, request, session);
}
});
}) })
.switchIfEmpty(proceedWithConversation(context, request, session)); .switchIfEmpty(proceedWithConversation(context, request, session));
} }
@@ -250,27 +243,40 @@ public class ConversationManagerService {
logger.info( logger.info(
"Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.", "Old Session Found: Session {} is older than the threshold. Fetching history and continuing with same session.",
session.sessionId()); session.sessionId());
String conversationHistory = conversationContextMapper.toTextWithLimits(session); return memoryStoreConversationService.getMessages(session.sessionId()).collectList()
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); .map(conversationContextMapper::toTextFromMessages)
return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(), .defaultIfEmpty("")
context.primaryPhoneNumber(), false); .flatMap(conversationHistory -> {
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(session, newRequest, context.userId(), context.userMessageText(),
context.primaryPhoneNumber(), false);
});
} }
} }
private Mono<DetectIntentResponseDTO> fullLookupAndProcess(ConversationSessionDTO oldSession, private Mono<DetectIntentResponseDTO> fullLookupAndProcess(ConversationSessionDTO oldSession,
DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) { DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber) {
return firestoreConversationService.getSessionByTelefono(userPhoneNumber) return firestoreConversationService.getSessionByTelefono(userPhoneNumber)
.map(conversationContextMapper::toTextWithLimits) .flatMap(session -> firestoreConversationService.getMessages(session.sessionId()).collectList()
.defaultIfEmpty("") .map(conversationContextMapper::toTextFromMessages)
.flatMap(conversationHistory -> { .defaultIfEmpty("")
.flatMap(conversationHistory -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("Creating new session {} after full lookup.", newSessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId,
userPhoneNumber);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory);
return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber,
true);
}))
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId(); String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("Creating new session {} after full lookup.", newSessionId); logger.info("Creating new session {} after full lookup.", newSessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId,
userPhoneNumber); userPhoneNumber);
DetectIntentRequestDTO newRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory); return processDialogflowRequest(newSession, request, userId, userMessageText, userPhoneNumber,
return processDialogflowRequest(newSession, newRequest, userId, userMessageText, userPhoneNumber,
true); true);
}); }));
} }
private Mono<DetectIntentResponseDTO> processDialogflowRequest(ConversationSessionDTO session, private Mono<DetectIntentResponseDTO> processDialogflowRequest(ConversationSessionDTO session,
@@ -280,7 +286,7 @@ public class ConversationManagerService {
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText); ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber) return this.persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry))
.doOnSuccess(v -> logger.debug( .doOnSuccess(v -> logger.debug(
"User entry successfully persisted for session {}. Proceeding to Dialogflow...", "User entry successfully persisted for session {}. Proceeding to Dialogflow...",
finalSessionId)) finalSessionId))
@@ -292,7 +298,7 @@ public class ConversationManagerService {
"Received Dialogflow CX response for session {}. Initiating agent response persistence.", "Received Dialogflow CX response for session {}. Initiating agent response persistence.",
finalSessionId); finalSessionId);
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult()); ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
return persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber) return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry))
.thenReturn(response); .thenReturn(response);
}) })
.doOnError( .doOnError(
@@ -315,85 +321,90 @@ public class ConversationManagerService {
})) }))
.flatMap(session -> { .flatMap(session -> {
final String sessionId = session.sessionId(); final String sessionId = session.sessionId();
String conversationHistory = conversationContextMapper.toTextWithLimits(session); return memoryStoreConversationService.getMessages(sessionId).collectList()
String notificationText = notificationContextMapper.toText(notification); .map(conversationContextMapper::toTextFromMessages)
.defaultIfEmpty("")
.flatMap(conversationHistory -> {
String notificationText = notificationContextMapper.toText(notification);
Map<String, Object> filteredParams = notification.parametros().entrySet().stream() Map<String, Object> filteredParams = notification.parametros().entrySet().stream()
.filter(entry -> entry.getKey().startsWith("notification_po_")) .filter(entry -> entry.getKey().startsWith("notification_po_"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
String resolvedContext = notificationContextResolver.resolveContext(userMessageText, String resolvedContext = notificationContextResolver.resolveContext(userMessageText,
notificationText, conversationHistory, filteredParams.toString(), userId, sessionId, notificationText, conversationHistory, filteredParams.toString(), userId, sessionId,
userPhoneNumber); userPhoneNumber);
if (!resolvedContext.trim().toUpperCase().contains(NotificationContextResolver.CATEGORY_DIALOGFLOW)) { if (!resolvedContext.trim().toUpperCase().contains(NotificationContextResolver.CATEGORY_DIALOGFLOW)) {
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
llmResponseTunerService.setValue(uuid, resolvedContext).subscribe(); llmResponseTunerService.setValue(uuid, resolvedContext).subscribe();
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
notification.parametros()); notification.parametros());
ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext, ConversationEntryDTO llmEntry = ConversationEntryDTO.forLlmConversation(resolvedContext,
notification.parametros()); notification.parametros());
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber) return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry))
.then(persistConversationTurn(userId, sessionId, llmEntry, userPhoneNumber)) .then(persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(llmEntry)))
.then(Mono.defer(() -> { .then(Mono.defer(() -> {
EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED"); EventInputDTO eventInput = new EventInputDTO("LLM_RESPONSE_PROCESSED");
QueryInputDTO queryInput = new QueryInputDTO(null, eventInput, QueryInputDTO queryInput = new QueryInputDTO(null, eventInput,
request.queryInput().languageCode()); request.queryInput().languageCode());
DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput, DetectIntentRequestDTO newRequest = new DetectIntentRequestDTO(queryInput,
request.queryParams()) request.queryParams())
.withParameter("llm_reponse_uuid", uuid); .withParameter("llm_reponse_uuid", uuid);
return dialogflowServiceClient.detectIntent(sessionId, newRequest) return dialogflowServiceClient.detectIntent(sessionId, newRequest)
.flatMap(response -> { .flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult()); .forAgent(response.queryResult());
return persistConversationTurn(userId, sessionId, agentEntry, return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry))
userPhoneNumber) .thenReturn(response);
.thenReturn(response); });
}); }));
})); } else {
} else { ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText,
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText, notification.parametros());
notification.parametros());
DetectIntentRequestDTO finalRequest; DetectIntentRequestDTO finalRequest;
Instant now = Instant.now(); Instant now = Instant.now();
if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) { if (Duration.between(session.lastModified(), now).toMinutes() < SESSION_RESET_THRESHOLD_MINUTES) {
finalRequest = request.withParameters(notification.parametros()); finalRequest = request.withParameters(notification.parametros());
} else { } else {
finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory) finalRequest = request.withParameter(CONV_HISTORY_PARAM, conversationHistory)
.withParameters(notification.parametros()); .withParameters(notification.parametros());
} }
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber) return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry))
.then(dialogflowServiceClient.detectIntent(sessionId, finalRequest) .then(dialogflowServiceClient.detectIntent(sessionId, finalRequest)
.flatMap(response -> { .flatMap(response -> {
ConversationEntryDTO agentEntry = ConversationEntryDTO ConversationEntryDTO agentEntry = ConversationEntryDTO
.forAgent(response.queryResult()); .forAgent(response.queryResult());
return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber) return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(agentEntry))
.thenReturn(response); .thenReturn(response);
})); }));
} }
});
}); });
} }
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, private Mono<Void> persistConversationTurn(ConversationSessionDTO session, ConversationMessageDTO message) {
String userPhoneNumber) { logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", session.sessionId(),
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, message.type().name());
entry.type().name()); ConversationSessionDTO updatedSession = session.withLastMessage(message.text());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) return memoryStoreConversationService.saveSession(updatedSession)
.then(memoryStoreConversationService.saveMessage(session.sessionId(), message))
.doOnSuccess(v -> logger.info( .doOnSuccess(v -> logger.info(
"Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name())) session.sessionId(), message.type().name()))
.then(firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) .then(firestoreConversationService.saveSession(updatedSession)
.then(firestoreConversationService.saveMessage(session.sessionId(), message))
.doOnSuccess(fsVoid -> logger.debug( .doOnSuccess(fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
sessionId, entry.type().name())) session.sessionId(), message.type().name()))
.doOnError(fsError -> logger.error( .doOnError(fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError))) session.sessionId(), message.type().name(), fsError.getMessage(), fsError)))
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", session.sessionId(),
entry.type().name(), e.getMessage(), e)); message.type().name(), e.getMessage(), e));
} }
} }

View File

@@ -5,86 +5,76 @@
package com.example.service.conversation; package com.example.service.conversation;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.exception.FirestorePersistenceException; import com.example.exception.FirestorePersistenceException;
import com.example.mapper.conversation.ConversationMessageMapper;
import com.example.mapper.conversation.FirestoreConversationMapper; import com.example.mapper.conversation.FirestoreConversationMapper;
import com.example.repository.FirestoreBaseRepository; import com.example.repository.FirestoreBaseRepository;
import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.DocumentSnapshot; import com.google.cloud.firestore.DocumentSnapshot;
import com.google.cloud.firestore.WriteBatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import java.util.Map; import java.util.Objects;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
/**
* Service for managing conversation sessions in Firestore.
* It handles the persistence of conversation entries, either by creating
* a new document for a new session or appending an entry to an existing
* session document using a Firestore batch. The service also provides
* methods for retrieving a complete conversation session from Firestore.
*/
@Service @Service
public class FirestoreConversationService { public class FirestoreConversationService {
private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationService.class); private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationService.class);
private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations"; private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations";
private static final String MESSAGES_SUBCOLLECTION = "messages";
private final FirestoreBaseRepository firestoreBaseRepository; private final FirestoreBaseRepository firestoreBaseRepository;
private final FirestoreConversationMapper firestoreConversationMapper; private final FirestoreConversationMapper firestoreConversationMapper;
private final ConversationMessageMapper conversationMessageMapper;
public FirestoreConversationService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper) { public FirestoreConversationService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper, ConversationMessageMapper conversationMessageMapper) {
this.firestoreBaseRepository = firestoreBaseRepository; this.firestoreBaseRepository = firestoreBaseRepository;
this.firestoreConversationMapper = firestoreConversationMapper; this.firestoreConversationMapper = firestoreConversationMapper;
this.conversationMessageMapper = conversationMessageMapper;
} }
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) { public Mono<Void> saveSession(ConversationSessionDTO session) {
return saveEntry(userId, sessionId, newEntry, userPhoneNumber, null);
}
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber, String pantallaContexto) {
logger.info("Attempting to save conversation entry to Firestore for session {}. Entity: {}", sessionId, newEntry.entity().name());
return Mono.fromRunnable(() -> { return Mono.fromRunnable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); DocumentReference sessionDocRef = getSessionDocumentReference(session.sessionId());
WriteBatch batch = firestoreBaseRepository.createBatch();
try { try {
DocumentSnapshot documentSnapshot = firestoreBaseRepository.getDocumentSnapshot(sessionDocRef); firestoreBaseRepository.setDocument(sessionDocRef, firestoreConversationMapper.createSessionMap(session));
} catch (ExecutionException | InterruptedException e) {
if (documentSnapshot != null && documentSnapshot.exists()) { handleException(e, session.sessionId());
// Update: Append the new entry using arrayUnion and update lastModified
Map<String, Object> updates = firestoreConversationMapper.createUpdateMapForSingleEntry(newEntry);
if (pantallaContexto != null) {
updates.put("pantallaContexto", pantallaContexto);
}
batch.update(sessionDocRef, updates);
logger.info("Appending entry to existing conversation session for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name());
} else {
// Create: Start a new session with the first entry.
// Pass userId and userPhoneNumber to the mapper to be stored as fields in the document.
Map<String, Object> newSessionMap = firestoreConversationMapper.createNewSessionMapForSingleEntry(sessionId, userId, userPhoneNumber, newEntry, pantallaContexto);
batch.set(sessionDocRef, newSessionMap);
logger.info("Creating new conversation session with first entry for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name());
}
firestoreBaseRepository.commitBatch(batch);
logger.info("Successfully committed batch for session {} to Firestore.", sessionId);
} catch (ExecutionException e) {
logger.error("Error saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Failed to save conversation entry to Firestore for session " + sessionId, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Thread interrupted while saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Saving conversation entry was interrupted for session " + sessionId, e);
} }
}).subscribeOn(Schedulers.boundedElastic()).then(); }).subscribeOn(Schedulers.boundedElastic()).then();
} }
public Mono<ConversationSessionDTO> getConversationSession(String userId, String sessionId) { public Mono<Void> saveMessage(String sessionId, ConversationMessageDTO message) {
logger.info("Attempting to retrieve conversation session for session {} (user ID {} for context).", sessionId, userId); return Mono.fromRunnable(() -> {
DocumentReference messageDocRef = getSessionDocumentReference(sessionId).collection(MESSAGES_SUBCOLLECTION).document();
try {
firestoreBaseRepository.setDocument(messageDocRef, conversationMessageMapper.toMap(message));
} catch (ExecutionException | InterruptedException e) {
handleException(e, sessionId);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
public Flux<ConversationMessageDTO> getMessages(String sessionId) {
String messagesPath = getConversationCollectionPath() + "/" + sessionId + "/" + MESSAGES_SUBCOLLECTION;
return firestoreBaseRepository.getDocuments(messagesPath)
.map(documentSnapshot -> {
if (documentSnapshot != null && documentSnapshot.exists()) {
return conversationMessageMapper.fromMap(documentSnapshot.getData());
}
return null;
})
.filter(Objects::nonNull);
}
public Mono<ConversationSessionDTO> getConversationSession(String sessionId) {
logger.info("Attempting to retrieve conversation session for session {}.", sessionId);
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
try { try {
@@ -95,16 +85,16 @@ public class FirestoreConversationService {
return sessionDTO; return sessionDTO;
} }
logger.info("Conversation session not found for session {}.", sessionId); logger.info("Conversation session not found for session {}.", sessionId);
return null; // Or Mono.empty() if this method returned Mono<Optional<ConversationSessionDTO>> return null;
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
logger.error("Error retrieving conversation session from Firestore for session {}: {}", sessionId, e.getMessage(), e); handleException(e, sessionId);
throw new FirestorePersistenceException("Failed to retrieve conversation session from Firestore for session " + sessionId, e); return null;
} }
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(Schedulers.boundedElastic());
} }
public Mono<ConversationSessionDTO> getSessionByTelefono(String userPhoneNumber) { public Mono<ConversationSessionDTO> getSessionByTelefono(String userPhoneNumber) {
return firestoreBaseRepository.getDocumentsByField(getConversationCollectionPath(), "userPhoneNumber", userPhoneNumber) return firestoreBaseRepository.getDocumentsByField(getConversationCollectionPath(), "telefono", userPhoneNumber)
.map(documentSnapshot -> { .map(documentSnapshot -> {
if (documentSnapshot != null && documentSnapshot.exists()) { if (documentSnapshot != null && documentSnapshot.exists()) {
ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot); ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot);
@@ -115,7 +105,18 @@ public class FirestoreConversationService {
}); });
} }
public Mono<Void> deleteSession(String sessionId) {
logger.info("Attempting to delete conversation session for session {}.", sessionId);
return Mono.fromRunnable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
try {
firestoreBaseRepository.deleteDocument(sessionDocRef);
logger.info("Successfully deleted conversation session for session {}.", sessionId);
} catch (InterruptedException | ExecutionException e) {
handleException(e, sessionId);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
private String getConversationCollectionPath() { private String getConversationCollectionPath() {
return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId()); return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
@@ -125,4 +126,12 @@ public class FirestoreConversationService {
String collectionPath = getConversationCollectionPath(); String collectionPath = getConversationCollectionPath();
return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId); return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId);
} }
private void handleException(Exception e, String sessionId) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.error("Error processing Firestore operation for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Failed to process Firestore operation for session " + sessionId, e);
}
} }

View File

@@ -4,96 +4,69 @@
*/ */
package com.example.service.conversation; package com.example.service.conversation;
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.time.Duration; import java.time.Duration;
/**
* Service for managing conversation sessions using a memory store (Redis).
* It caches and retrieves `ConversationSessionDTO` objects, maintaining a mapping
* from a user's phone number to their active session ID. This service uses
* a time-to-live (TTL) to manage session expiration and provides a fast
* reactive interface for persisting new conversation entries and fetching sessions.
*/
@Service @Service
public class MemoryStoreConversationService { public class MemoryStoreConversationService {
private static final Logger logger = LoggerFactory.getLogger(MemoryStoreConversationService.class); private static final Logger logger = LoggerFactory.getLogger(MemoryStoreConversationService.class);
private static final String SESSION_KEY_PREFIX = "conversation:session:"; private static final String SESSION_KEY_PREFIX = "conversation:session:";
private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:"; private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:";
private static final String MESSAGES_KEY_PREFIX = "conversation:messages:";
private static final Duration SESSION_TTL = Duration.ofDays(30); private static final Duration SESSION_TTL = Duration.ofDays(30);
private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate; private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate;
private final ReactiveRedisTemplate<String, String> stringRedisTemplate; private final ReactiveRedisTemplate<String, String> stringRedisTemplate;
private final ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate;
@Autowired @Autowired
public MemoryStoreConversationService( public MemoryStoreConversationService(
ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate, ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate,
ReactiveRedisTemplate<String, String> stringRedisTemplate, ReactiveRedisTemplate<String, String> stringRedisTemplate,
FirestoreConversationService firestoreConversationService) { ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate) {
this.redisTemplate = redisTemplate; this.redisTemplate = redisTemplate;
this.stringRedisTemplate = stringRedisTemplate; this.stringRedisTemplate = stringRedisTemplate;
this.messageRedisTemplate = messageRedisTemplate;
} }
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) { public Mono<Void> saveMessage(String sessionId, ConversationMessageDTO message) {
return saveEntry(userId, sessionId, newEntry, userPhoneNumber, null); String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
return messageRedisTemplate.opsForList().rightPush(messagesKey, message).then();
} }
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber, String pantallaContexto) { public Mono<Void> saveSession(ConversationSessionDTO session) {
String sessionKey = SESSION_KEY_PREFIX + sessionId; String sessionKey = SESSION_KEY_PREFIX + session.sessionId();
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber; String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + session.telefono();
return redisTemplate.opsForValue().set(sessionKey, session, SESSION_TTL)
logger.info("Attempting to save entry to Memorystore for session {}. Entity: {}", sessionId, newEntry.entity().name()); .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, session.sessionId(), SESSION_TTL))
.then();
return redisTemplate.opsForValue().get(sessionKey)
.doOnSuccess(session -> {
if (session != null) {
logger.info("Found existing session in Memorystore: {}", sessionKey);
} else {
logger.info("No session found in Memorystore for key: {}", sessionKey);
}
})
.switchIfEmpty(Mono.defer(() -> {
logger.info("Creating new session {} in Memorystore with TTL.", sessionId);
ConversationSessionDTO newSession = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber);
return redisTemplate.opsForValue().set(sessionKey, newSession, SESSION_TTL)
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL))
.thenReturn(newSession);
}))
.flatMap(session -> {
ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber);
ConversationSessionDTO sessionWithPantallaContexto = (pantallaContexto != null) ? sessionWithUpdatedTelefono.withPantallaContexto(pantallaContexto) : sessionWithUpdatedTelefono;
ConversationSessionDTO updatedSession = sessionWithPantallaContexto.withAddedEntry(newEntry);
logger.info("Attempting to set updated session {} with new entry entity {} in Redis.", sessionId, newEntry.entity().name());
return redisTemplate.opsForValue().set(sessionKey, updatedSession)
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId))
.then();
})
.doOnSuccess(success -> {
logger.info("Successfully saved updated session and phone mapping to Redis for session {}. Entity Type: {}", sessionId, newEntry.entity().name());
})
.doOnError(e -> logger.error("Error appending entry to Memorystore for session {}: {}", sessionId, e.getMessage(), e));
} }
public Flux<ConversationMessageDTO> getMessages(String sessionId) {
String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
return messageRedisTemplate.opsForList().range(messagesKey, 0, -1);
}
public Mono<ConversationSessionDTO> getSessionByTelefono(String telefono) { public Mono<ConversationSessionDTO> getSessionByTelefono(String telefono) {
if (telefono == null || telefono.isBlank()) { if (telefono == null || telefono.isBlank()) {
return Mono.empty(); return Mono.empty();
} }
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + telefono; String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + telefono;
return stringRedisTemplate.opsForValue().get(phoneToSessionKey) return stringRedisTemplate.opsForValue().get(phoneToSessionKey)
.flatMap(sessionId -> { .flatMap(sessionId -> redisTemplate.opsForValue().get(SESSION_KEY_PREFIX + sessionId))
return redisTemplate.opsForValue().get(SESSION_KEY_PREFIX + sessionId);
})
.doOnSuccess(session -> { .doOnSuccess(session -> {
if (session != null) { if (session != null) {
logger.info("Successfully retrieved session by phone number"); logger.info("Successfully retrieved session by phone number");
} else { } else {
logger.info("No session found in Redis for phone number."); logger.info("No session found in Redis for phone number.");
} }
@@ -106,4 +79,12 @@ public class MemoryStoreConversationService {
logger.info("Attempting to update session {} in Memorystore.", session.sessionId()); logger.info("Attempting to update session {} in Memorystore.", session.sessionId());
return redisTemplate.opsForValue().set(sessionKey, session).then(); return redisTemplate.opsForValue().set(sessionKey, session).then();
} }
public Mono<Void> deleteSession(String sessionId) {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
logger.info("Deleting session {} from Memorystore.", sessionId);
return redisTemplate.opsForValue().delete(sessionKey)
.then(messageRedisTemplate.opsForList().delete(messagesKey)).then();
}
} }

View File

@@ -1,107 +0,0 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.notification;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.exception.FirestorePersistenceException;
import com.example.mapper.conversation.FirestoreConversationMapper;
import com.example.repository.FirestoreBaseRepository;
import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.DocumentSnapshot;
import com.google.cloud.firestore.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* Service for managing notification conversation sessions in Firestore.
* It handles the persistence of conversation entries, either by creating
* a new document for a new session or appending an entry to an existing
* session document using a Firestore batch. The service also provides
* methods for retrieving a complete conversation session from Firestore.
*/
@Service
public class FirestoreNotificationConvService {
private static final Logger logger = LoggerFactory.getLogger(FirestoreNotificationConvService.class);
private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversation-notifications";
private final FirestoreBaseRepository firestoreBaseRepository;
private final FirestoreConversationMapper firestoreConversationMapper;
public FirestoreNotificationConvService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper) {
this.firestoreBaseRepository = firestoreBaseRepository;
this.firestoreConversationMapper = firestoreConversationMapper;
}
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) {
logger.info("Attempting to save conversation entry to Firestore for session {}. Entity: {}", sessionId, newEntry.entity().name());
return Mono.fromRunnable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
// Synchronize on the session ID to prevent race conditions when creating a new session.
synchronized (sessionId.intern()) {
WriteBatch batch = firestoreBaseRepository.createBatch();
try {
if (firestoreBaseRepository.documentExists(sessionDocRef)) {
// Update: Append the new entry using arrayUnion and update lastModified
Map<String, Object> updates = firestoreConversationMapper.createUpdateMapForSingleEntry(newEntry);
batch.update(sessionDocRef, updates);
logger.info("Appending entry to existing conversation session for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name());
} else {
// Create: Start a new session with the first entry.
// Pass userId and userPhoneNumber to the mapper to be stored as fields in the document.
Map<String, Object> newSessionMap = firestoreConversationMapper.createNewSessionMapForSingleEntry(sessionId, userId, userPhoneNumber, newEntry);
batch.set(sessionDocRef, newSessionMap);
logger.info("Creating new conversation session with first entry for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name());
}
firestoreBaseRepository.commitBatch(batch);
logger.info("Successfully committed batch for session {} to Firestore.", sessionId);
} catch (ExecutionException e) {
logger.error("Error saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Failed to save conversation entry to Firestore for session " + sessionId, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Thread interrupted while saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Saving conversation entry was interrupted for session " + sessionId, e);
}
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
public Mono<ConversationSessionDTO> getConversationSession(String userId, String sessionId) {
logger.info("Attempting to retrieve conversation session for session {} (user ID {} for context).", sessionId, userId);
return Mono.fromCallable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
try {
DocumentSnapshot documentSnapshot = firestoreBaseRepository.getDocumentSnapshot(sessionDocRef);
if (documentSnapshot != null && documentSnapshot.exists()) {
ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot);
logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionId);
return sessionDTO;
}
logger.info("Conversation session not found for session {}.", sessionId);
return null; // Or Mono.empty() if this method returned Mono<Optional<ConversationSessionDTO>>
} catch (InterruptedException | ExecutionException e) {
logger.error("Error retrieving conversation session from Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Failed to retrieve conversation session from Firestore for session " + sessionId, e);
}
}).subscribeOn(Schedulers.boundedElastic());
}
private String getConversationCollectionPath() {
return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
}
private DocumentReference getSessionDocumentReference(String sessionId) {
String collectionPath = getConversationCollectionPath();
return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId);
}
}

View File

@@ -167,4 +167,22 @@ public class FirestoreNotificationService {
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.then(); .then();
} }
public Mono<Void> deleteNotification(String notificationId) {
logger.info("Attempting to delete notification session {} from Firestore.", notificationId);
return Mono.fromRunnable(() -> {
try {
DocumentReference notificationDocRef = getNotificationDocumentReference(notificationId);
firestoreBaseRepository.deleteDocument(notificationDocRef);
logger.info("Successfully deleted notification session {} from Firestore.", notificationId);
} catch (ExecutionException e) {
logger.error("Error deleting notification session {} from Firestore: {}", notificationId, e.getMessage(), e);
throw new FirestorePersistenceException("Failed to delete notification session " + notificationId, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Thread interrupted while deleting notification session {} from Firestore: {}", notificationId, e.getMessage(), e);
throw new FirestorePersistenceException("Deleting notification session was interrupted for " + notificationId, e);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
} }

View File

@@ -5,8 +5,6 @@
package com.example.service.notification; package com.example.service.notification;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.dto.dialogflow.notification.NotificationDTO;
import com.example.dto.dialogflow.notification.NotificationSessionDTO; import com.example.dto.dialogflow.notification.NotificationSessionDTO;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@@ -26,26 +24,17 @@ public class MemoryStoreNotificationService {
private static final Logger logger = LoggerFactory.getLogger(MemoryStoreNotificationService.class); private static final Logger logger = LoggerFactory.getLogger(MemoryStoreNotificationService.class);
private final ReactiveRedisTemplate<String, NotificationSessionDTO> notificationRedisTemplate; private final ReactiveRedisTemplate<String, NotificationSessionDTO> notificationRedisTemplate;
private final ReactiveRedisTemplate<String, ConversationSessionDTO> conversationRedisTemplate;
private final ReactiveRedisTemplate<String, String> stringRedisTemplate; private final ReactiveRedisTemplate<String, String> stringRedisTemplate;
private final FirestoreNotificationConvService firestoreNotificationConvService;
private static final String NOTIFICATION_KEY_PREFIX = "notification:"; private static final String NOTIFICATION_KEY_PREFIX = "notification:";
private static final String PHONE_TO_NOTIFICATION_SESSION_KEY_PREFIX = "notification:phone_to_notification:"; private static final String PHONE_TO_NOTIFICATION_SESSION_KEY_PREFIX = "notification:phone_to_notification:";
private static final String CONVERSATION_SESSION_KEY_PREFIX = "conversation-notification:session:";
private static final String PHONE_TO_CONVERSATION_SESSION_KEY_PREFIX = "conversation-notification:phone_to_session:";
private final Duration notificationTtl = Duration.ofDays(30); private final Duration notificationTtl = Duration.ofDays(30);
public MemoryStoreNotificationService( public MemoryStoreNotificationService(
ReactiveRedisTemplate<String, NotificationSessionDTO> notificationRedisTemplate, ReactiveRedisTemplate<String, NotificationSessionDTO> notificationRedisTemplate,
ReactiveRedisTemplate<String, ConversationSessionDTO> conversationRedisTemplate,
ReactiveRedisTemplate<String, String> stringRedisTemplate, ReactiveRedisTemplate<String, String> stringRedisTemplate,
FirestoreNotificationConvService firestoreNotificationConvService,
ObjectMapper objectMapper) { ObjectMapper objectMapper) {
this.notificationRedisTemplate = notificationRedisTemplate; this.notificationRedisTemplate = notificationRedisTemplate;
this.conversationRedisTemplate = conversationRedisTemplate;
this.stringRedisTemplate = stringRedisTemplate; this.stringRedisTemplate = stringRedisTemplate;
this.firestoreNotificationConvService = firestoreNotificationConvService;
} }
public Mono<Void> saveOrAppendNotificationEntry(NotificationDTO newEntry) { public Mono<Void> saveOrAppendNotificationEntry(NotificationDTO newEntry) {
@@ -120,42 +109,12 @@ public class MemoryStoreNotificationService {
e.getMessage(), e)); e.getMessage(), e));
} }
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) { public Mono<Void> deleteNotificationSession(String phoneNumber) {
String sessionKey = CONVERSATION_SESSION_KEY_PREFIX + sessionId; String notificationKey = NOTIFICATION_KEY_PREFIX + phoneNumber;
String phoneToSessionKey = PHONE_TO_CONVERSATION_SESSION_KEY_PREFIX + userPhoneNumber; String phoneToNotificationKey = PHONE_TO_NOTIFICATION_SESSION_KEY_PREFIX + phoneNumber;
logger.info("Attempting to save entry to Redis for session {}. Entity: {}", sessionId, newEntry.entity().name()); logger.info("Deleting notification session for phone number {}.", phoneNumber);
return conversationRedisTemplate.opsForValue().get(sessionKey) return notificationRedisTemplate.opsForValue().delete(notificationKey)
.defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber)) .then(stringRedisTemplate.opsForValue().delete(phoneToNotificationKey))
.flatMap(session -> { .then();
ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber);
ConversationSessionDTO updatedSession = sessionWithUpdatedTelefono.withAddedEntry(newEntry);
logger.info("Attempting to set updated session {} with new entry entity {} in Redis.", sessionId, newEntry.entity().name());
return conversationRedisTemplate.opsForValue().set(sessionKey, updatedSession, notificationTtl)
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, notificationTtl))
.then();
})
.doOnSuccess(success -> logger.info("Successfully saved updated session and phone mapping to Redis for session {}. Entity Type: {}", sessionId, newEntry.entity().name()))
.then(firestoreNotificationConvService.saveEntry(userId, sessionId, newEntry, userPhoneNumber))
.doOnError(e -> logger.error("Error appending entry to Redis for session {}: {}", sessionId, e.getMessage(), e));
}
public Mono<ConversationSessionDTO> getSessionByTelefono(String telefono) {
if (telefono == null || telefono.isBlank()) {
return Mono.empty();
}
String phoneToSessionKey = PHONE_TO_CONVERSATION_SESSION_KEY_PREFIX + telefono;
return stringRedisTemplate.opsForValue().get(phoneToSessionKey)
.flatMap(sessionId -> {
logger.debug("Found session ID {} for phone number. Retrieving session data.", sessionId);
return conversationRedisTemplate.opsForValue().get(CONVERSATION_SESSION_KEY_PREFIX + sessionId);
})
.doOnSuccess(session -> {
if (session != null) {
logger.info("Successfully retrieved session {} by phone number.", session.sessionId());
} else {
logger.info("No session found in Redis for phone number.");
}
})
.doOnError(e -> logger.error("Error retrieving session by phone number: {}",e.getMessage(), e));
} }
} }

View File

@@ -9,8 +9,10 @@ import com.example.dto.dialogflow.notification.ExternalNotRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.dto.dialogflow.notification.NotificationDTO;
import com.example.mapper.conversation.ConversationEntryMapper;
import com.example.mapper.notification.ExternalNotRequestMapper; import com.example.mapper.notification.ExternalNotRequestMapper;
import com.example.service.base.DialogflowClientService; import com.example.service.base.DialogflowClientService;
import com.example.service.conversation.DataLossPrevention; import com.example.service.conversation.DataLossPrevention;
@@ -42,6 +44,7 @@ public class NotificationManagerService {
private final FirestoreConversationService firestoreConversationService; private final FirestoreConversationService firestoreConversationService;
private final DataLossPrevention dataLossPrevention; private final DataLossPrevention dataLossPrevention;
private final String dlpTemplateCompleteFlow; private final String dlpTemplateCompleteFlow;
private final ConversationEntryMapper conversationEntryMapper;
@Value("${dialogflow.default-language-code:es}") @Value("${dialogflow.default-language-code:es}")
private String defaultLanguageCode; private String defaultLanguageCode;
@@ -55,6 +58,7 @@ public class NotificationManagerService {
ExternalNotRequestMapper externalNotRequestMapper, ExternalNotRequestMapper externalNotRequestMapper,
DataLossPrevention dataLossPrevention, DataLossPrevention dataLossPrevention,
ConversationEntryMapper conversationEntryMapper,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) { @Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
this.dialogflowClientService = dialogflowClientService; this.dialogflowClientService = dialogflowClientService;
@@ -65,6 +69,7 @@ public class NotificationManagerService {
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow; this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
this.memoryStoreConversationService = memoryStoreConversationService; this.memoryStoreConversationService = memoryStoreConversationService;
this.firestoreConversationService = firestoreConversationService; this.firestoreConversationService = firestoreConversationService;
this.conversationEntryMapper = conversationEntryMapper;
} }
public Mono<DetectIntentResponseDTO> processNotification(ExternalNotRequestDTO externalRequest) { public Mono<DetectIntentResponseDTO> processNotification(ExternalNotRequestDTO externalRequest) {
@@ -115,7 +120,7 @@ public class NotificationManagerService {
} }
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(),
prefixedParameters); prefixedParameters);
return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono) return persistConversationTurn(session, systemEntry)
.thenReturn(session); .thenReturn(session);
}) })
.switchIfEmpty(Mono.defer(() -> { .switchIfEmpty(Mono.defer(() -> {
@@ -130,8 +135,9 @@ public class NotificationManagerService {
} }
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(), ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(),
prefixedParameters); prefixedParameters);
return persistConversationTurn(userId, newSessionId, systemEntry, telefono) ConversationSessionDTO newSession = ConversationSessionDTO.create(newSessionId, userId, telefono);
.then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono))); return persistConversationTurn(newSession, systemEntry)
.then(Mono.just(newSession));
})); }));
return persistenceMono.then(sessionMono) return persistenceMono.then(sessionMono)
@@ -149,27 +155,30 @@ public class NotificationManagerService {
}); });
} }
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, private Mono<Void> persistConversationTurn(ConversationSessionDTO session, ConversationEntryDTO entry) {
String userPhoneNumber) { logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", session.sessionId(),
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId,
entry.type().name()); entry.type().name());
ConversationMessageDTO message = conversationEntryMapper.toConversationMessageDTO(entry);
ConversationSessionDTO updatedSession = session.withLastMessage(message.text());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) return memoryStoreConversationService.saveSession(updatedSession)
.then(memoryStoreConversationService.saveMessage(session.sessionId(), message))
.doOnSuccess(v -> { .doOnSuccess(v -> {
logger.info( logger.info(
"Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name()); session.sessionId(), entry.type().name());
firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber) firestoreConversationService.saveSession(updatedSession)
.then(firestoreConversationService.saveMessage(session.sessionId(), message))
.subscribe( .subscribe(
fsVoid -> logger.debug( fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.", "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
sessionId, entry.type().name()), session.sessionId(), entry.type().name()),
fsError -> logger.error( fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}", "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError)); session.sessionId(), entry.type().name(), fsError.getMessage(), fsError));
}) })
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", session.sessionId(),
entry.type().name(), e.getMessage(), e)); entry.type().name(), e.getMessage(), e));
} }
} }

View File

@@ -6,7 +6,9 @@
package com.example.service.quickreplies; package com.example.service.quickreplies;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.mapper.conversation.ConversationEntryMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -21,22 +23,30 @@ public class MemoryStoreQRService {
private static final Logger logger = LoggerFactory.getLogger(MemoryStoreQRService.class); private static final Logger logger = LoggerFactory.getLogger(MemoryStoreQRService.class);
private static final String SESSION_KEY_PREFIX = "qr:session:"; private static final String SESSION_KEY_PREFIX = "qr:session:";
private static final String PHONE_TO_SESSION_KEY_PREFIX = "qr:phone_to_session:"; private static final String PHONE_TO_SESSION_KEY_PREFIX = "qr:phone_to_session:";
private static final String MESSAGES_KEY_PREFIX = "qr:messages:";
private static final Duration SESSION_TTL = Duration.ofHours(24); private static final Duration SESSION_TTL = Duration.ofHours(24);
private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate; private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate;
private final ReactiveRedisTemplate<String, String> stringRedisTemplate; private final ReactiveRedisTemplate<String, String> stringRedisTemplate;
private final ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate;
private final ConversationEntryMapper conversationEntryMapper;
@Autowired @Autowired
public MemoryStoreQRService( public MemoryStoreQRService(
ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate, ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate,
ReactiveRedisTemplate<String, String> stringRedisTemplate) { ReactiveRedisTemplate<String, String> stringRedisTemplate,
ReactiveRedisTemplate<String, ConversationMessageDTO> messageRedisTemplate,
ConversationEntryMapper conversationEntryMapper) {
this.redisTemplate = redisTemplate; this.redisTemplate = redisTemplate;
this.stringRedisTemplate = stringRedisTemplate; this.stringRedisTemplate = stringRedisTemplate;
this.messageRedisTemplate = messageRedisTemplate;
this.conversationEntryMapper = conversationEntryMapper;
} }
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry,
String userPhoneNumber) { String userPhoneNumber) {
String sessionKey = SESSION_KEY_PREFIX + sessionId; String sessionKey = SESSION_KEY_PREFIX + sessionId;
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber; String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber;
String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
logger.info("Attempting to save entry to Redis for quick reply session {}. Entity: {}", sessionId, logger.info("Attempting to save entry to Redis for quick reply session {}. Entity: {}", sessionId,
newEntry.entity().name()); newEntry.entity().name());
@@ -45,13 +55,15 @@ public class MemoryStoreQRService {
.defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber)) .defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber))
.flatMap(session -> { .flatMap(session -> {
ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber); ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber);
ConversationSessionDTO updatedSession = sessionWithUpdatedTelefono.withAddedEntry(newEntry); ConversationSessionDTO updatedSession = sessionWithUpdatedTelefono.withLastMessage(newEntry.text());
ConversationMessageDTO message = conversationEntryMapper.toConversationMessageDTO(newEntry);
logger.info("Attempting to set updated quick reply session {} with new entry entity {} in Redis.", logger.info("Attempting to set updated quick reply session {} with new entry entity {} in Redis.",
sessionId, newEntry.entity().name()); sessionId, newEntry.entity().name());
return redisTemplate.opsForValue().set(sessionKey, updatedSession, SESSION_TTL) return redisTemplate.opsForValue().set(sessionKey, updatedSession, SESSION_TTL)
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL)) .then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL))
.then(messageRedisTemplate.opsForList().rightPush(messagesKey, message))
.then(); .then();
}) })
.doOnSuccess(success -> { .doOnSuccess(success -> {

View File

@@ -6,13 +6,11 @@
package com.example.service.quickreplies; package com.example.service.quickreplies;
import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.*;
import com.example.dto.dialogflow.conversation.ConversationEntryEntity;
import com.example.dto.dialogflow.conversation.ConversationEntryType;
import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO;
import com.example.dto.quickreplies.QuickReplyScreenRequestDTO; import com.example.dto.quickreplies.QuickReplyScreenRequestDTO;
import com.example.dto.quickreplies.QuestionDTO; import com.example.dto.quickreplies.QuestionDTO;
import com.example.dto.quickreplies.QuickReplyDTO; import com.example.dto.quickreplies.QuickReplyDTO;
import com.example.mapper.conversation.ConversationEntryMapper;
import com.example.service.conversation.FirestoreConversationService; import com.example.service.conversation.FirestoreConversationService;
import com.example.service.conversation.MemoryStoreConversationService; import com.example.service.conversation.MemoryStoreConversationService;
import com.example.util.SessionIdGenerator; import com.example.util.SessionIdGenerator;
@@ -36,16 +34,19 @@ public class QuickRepliesManagerService {
private final FirestoreConversationService firestoreConversationService; private final FirestoreConversationService firestoreConversationService;
private final QuickReplyContentService quickReplyContentService; private final QuickReplyContentService quickReplyContentService;
private final ConversationManagerService conversationManagerService; private final ConversationManagerService conversationManagerService;
private final ConversationEntryMapper conversationEntryMapper;
public QuickRepliesManagerService( public QuickRepliesManagerService(
@Lazy ConversationManagerService conversationManagerService, @Lazy ConversationManagerService conversationManagerService,
MemoryStoreConversationService memoryStoreConversationService, MemoryStoreConversationService memoryStoreConversationService,
FirestoreConversationService firestoreConversationService, FirestoreConversationService firestoreConversationService,
QuickReplyContentService quickReplyContentService) { QuickReplyContentService quickReplyContentService,
ConversationEntryMapper conversationEntryMapper) {
this.conversationManagerService = conversationManagerService; this.conversationManagerService = conversationManagerService;
this.memoryStoreConversationService = memoryStoreConversationService; this.memoryStoreConversationService = memoryStoreConversationService;
this.firestoreConversationService = firestoreConversationService; this.firestoreConversationService = firestoreConversationService;
this.quickReplyContentService = quickReplyContentService; this.quickReplyContentService = quickReplyContentService;
this.conversationEntryMapper = conversationEntryMapper;
} }
public Mono<DetectIntentResponseDTO> startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) { public Mono<DetectIntentResponseDTO> startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) {
@@ -67,8 +68,8 @@ public class QuickRepliesManagerService {
"Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :", "Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :",
null, null,
null); null);
return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber, ConversationSessionDTO newSession = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber).withPantallaContexto(externalRequest.pantallaContexto());
externalRequest.pantallaContexto()) return persistConversationTurn(newSession, systemEntry)
.then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto())) .then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto()))
.map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); .map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
}); });
@@ -86,100 +87,96 @@ public class QuickRepliesManagerService {
.switchIfEmpty(Mono.error( .switchIfEmpty(Mono.error(
new IllegalStateException("No quick reply session found for phone number"))) new IllegalStateException("No quick reply session found for phone number")))
.flatMap(session -> { .flatMap(session -> {
String userId = session.userId();
String sessionId = session.sessionId();
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message()); return memoryStoreConversationService.getMessages(session.sessionId()).collectList().flatMap(messages -> {
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message());
List<ConversationEntryDTO> entries = session.entries(); int lastInitIndex = IntStream.range(0, messages.size())
int lastInitIndex = IntStream.range(0, entries.size()) .map(i -> messages.size() - 1 - i)
.map(i -> entries.size() - 1 - i) .filter(i -> {
.filter(i -> { ConversationMessageDTO message = messages.get(i);
ConversationEntryDTO entry = entries.get(i); return message.type() == MessageType.SYSTEM;
return entry.entity() == ConversationEntryEntity.SISTEMA })
&& entry.type() == ConversationEntryType.INICIO; .findFirst()
}) .orElse(-1);
.findFirst()
.orElse(-1);
long userMessagesCount; long userMessagesCount;
if (lastInitIndex != -1) { if (lastInitIndex != -1) {
userMessagesCount = entries.subList(lastInitIndex + 1, entries.size()).stream() userMessagesCount = messages.subList(lastInitIndex + 1, messages.size()).stream()
.filter(e -> e.entity() == ConversationEntryEntity.USUARIO) .filter(e -> e.type() == MessageType.USER)
.count(); .count();
} else { } else {
userMessagesCount = 0; userMessagesCount = 0;
} }
if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow
// This is the second message of the flow. Return the full list. // This is the second message of the flow. Return the full list.
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, return persistConversationTurn(session, userEntry)
session.pantallaContexto()) .then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) .flatMap(quickReplyDTO -> {
.flatMap(quickReplyDTO -> { ConversationEntryDTO agentEntry = ConversationEntryDTO
ConversationEntryDTO agentEntry = ConversationEntryDTO .forAgentWithMessage(quickReplyDTO.toString());
.forAgentWithMessage(quickReplyDTO.toString()); return persistConversationTurn(session, agentEntry)
return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber, .thenReturn(new DetectIntentResponseDTO(session.sessionId(), null, quickReplyDTO));
session.pantallaContexto()) });
.thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO)); } else if (userMessagesCount == 1) { // Is the second user message in the QR flow
}); // This is the third message of the flow. Filter and end.
} else if (userMessagesCount == 1) { // Is the second user message in the QR flow return persistConversationTurn(session, userEntry)
// This is the third message of the flow. Filter and end. .then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, .flatMap(quickReplyDTO -> {
session.pantallaContexto()) List<QuestionDTO> matchedPreguntas = quickReplyDTO.preguntas().stream()
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto())) .filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim()))
.flatMap(quickReplyDTO -> { .toList();
List<QuestionDTO> matchedPreguntas = quickReplyDTO.preguntas().stream()
.filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim()))
.toList();
if (!matchedPreguntas.isEmpty()) { if (!matchedPreguntas.isEmpty()) {
// Matched question, return the answer // Matched question, return the answer
String respuesta = matchedPreguntas.get(0).respuesta(); String respuesta = matchedPreguntas.get(0).respuesta();
QueryResultDTO queryResult = new QueryResultDTO(respuesta, null); QueryResultDTO queryResult = new QueryResultDTO(respuesta, null);
DetectIntentResponseDTO response = new DetectIntentResponseDTO(sessionId, DetectIntentResponseDTO response = new DetectIntentResponseDTO(session.sessionId(),
queryResult, null); queryResult, null);
return memoryStoreConversationService return memoryStoreConversationService
.updateSession(session.withPantallaContexto(null)) .updateSession(session.withPantallaContexto(null))
.then(persistConversationTurn(userId, sessionId, .then(persistConversationTurn(session,
ConversationEntryDTO.forAgentWithMessage(respuesta), ConversationEntryDTO.forAgentWithMessage(respuesta)))
userPhoneNumber, null)) .thenReturn(response);
.thenReturn(response); } else {
} else { // No match, delegate to Dialogflow
// No match, delegate to Dialogflow return memoryStoreConversationService
return memoryStoreConversationService .updateSession(session.withPantallaContexto(null))
.updateSession(session.withPantallaContexto(null)) .then(conversationManagerService.manageConversation(externalRequest));
.then(conversationManagerService.manageConversation(externalRequest)); }
} });
}); } else {
} else { // Should not happen. End the flow.
// Should not happen. End the flow. return memoryStoreConversationService.updateSession(session.withPantallaContexto(null))
return memoryStoreConversationService.updateSession(session.withPantallaContexto(null)) .then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null,
.then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null, new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList()))));
new QuickReplyDTO("Flow Error", null, null, null, Collections.emptyList())))); }
} });
}); });
} }
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, private Mono<Void> persistConversationTurn(ConversationSessionDTO session, ConversationEntryDTO entry) {
String userPhoneNumber, String pantallaContexto) {
logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.", logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.",
sessionId, entry.type().name()); session.sessionId(), entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) ConversationMessageDTO message = conversationEntryMapper.toConversationMessageDTO(entry);
ConversationSessionDTO updatedSession = session.withLastMessage(message.text());
return memoryStoreConversationService.saveSession(updatedSession)
.then(memoryStoreConversationService.saveMessage(session.sessionId(), message))
.doOnSuccess(v -> logger.info( .doOnSuccess(v -> logger.info(
"Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.", "Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name())) session.sessionId(), entry.type().name()))
.then(firestoreConversationService .then(firestoreConversationService.saveSession(updatedSession)
.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto) .then(firestoreConversationService.saveMessage(session.sessionId(), message))
.doOnSuccess(fsVoid -> logger.debug( .doOnSuccess(fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.", "Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.",
sessionId, entry.type().name())) session.sessionId(), entry.type().name()))
.doOnError(fsError -> logger.error( .doOnError(fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}", "Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError))) session.sessionId(), entry.type().name(), fsError.getMessage(), fsError)))
.doOnError( .doOnError(
e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}", e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}",
sessionId, entry.type().name(), e.getMessage(), e)); session.sessionId(), entry.type().name(), e.getMessage(), e));
} }
} }

View File

@@ -7,10 +7,9 @@ package com.example.service.conversation;
import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.dto.dialogflow.conversation.ConversationContext; import com.example.dto.dialogflow.conversation.*;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.dto.dialogflow.notification.NotificationDTO;
import com.example.mapper.conversation.ConversationEntryMapper;
import com.example.mapper.conversation.ExternalConvRequestMapper; import com.example.mapper.conversation.ExternalConvRequestMapper;
import com.example.mapper.messagefilter.ConversationContextMapper; import com.example.mapper.messagefilter.ConversationContextMapper;
import com.example.mapper.messagefilter.NotificationContextMapper; import com.example.mapper.messagefilter.NotificationContextMapper;
@@ -25,6 +24,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@@ -62,6 +62,8 @@ public class ConversationManagerServiceTest {
private NotificationContextResolver notificationContextResolver; private NotificationContextResolver notificationContextResolver;
@Mock @Mock
private LlmResponseTunerService llmResponseTunerService; private LlmResponseTunerService llmResponseTunerService;
@Mock
private ConversationEntryMapper conversationEntryMapper;
@InjectMocks @InjectMocks
private ConversationManagerService conversationManagerService; private ConversationManagerService conversationManagerService;
@@ -80,13 +82,19 @@ public class ConversationManagerServiceTest {
NotificationDTO notification = new NotificationDTO("1", "1234567890", Instant.now(), "test text", "test_event", "es", Collections.emptyMap(), "active"); NotificationDTO notification = new NotificationDTO("1", "1234567890", Instant.now(), "test text", "test_event", "es", Collections.emptyMap(), "active");
ConversationSessionDTO session = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber); ConversationSessionDTO session = ConversationSessionDTO.create(sessionId, userId, userPhoneNumber);
when(memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber)).thenReturn(Mono.just(session)); when(memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)).thenReturn(Mono.just(session));
when(conversationContextMapper.toTextWithLimits(session)).thenReturn("history"); when(memoryStoreConversationService.getMessages(anyString())).thenReturn(Flux.empty());
when(conversationContextMapper.toTextFromMessages(any())).thenReturn("history");
when(notificationContextMapper.toText(notification)).thenReturn("notification text"); when(notificationContextMapper.toText(notification)).thenReturn("notification text");
when(notificationContextResolver.resolveContext(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString())) when(notificationContextResolver.resolveContext(anyString(), anyString(), anyString(), anyString(), anyString(), anyString(), anyString()))
.thenReturn(resolvedContext); .thenReturn(resolvedContext);
when(llmResponseTunerService.setValue(anyString(), anyString())).thenReturn(Mono.empty()); when(llmResponseTunerService.setValue(anyString(), anyString())).thenReturn(Mono.empty());
when(memoryStoreNotificationService.saveEntry(anyString(), anyString(), any(ConversationEntryDTO.class), anyString())).thenReturn(Mono.empty()); when(memoryStoreConversationService.saveSession(any(ConversationSessionDTO.class))).thenReturn(Mono.empty());
when(memoryStoreConversationService.saveMessage(anyString(), any(ConversationMessageDTO.class))).thenReturn(Mono.empty());
when(firestoreConversationService.saveSession(any(ConversationSessionDTO.class))).thenReturn(Mono.empty());
when(firestoreConversationService.saveMessage(anyString(), any(ConversationMessageDTO.class))).thenReturn(Mono.empty());
when(conversationEntryMapper.toConversationMessageDTO(any(ConversationEntryDTO.class))).thenReturn(new ConversationMessageDTO(MessageType.USER, Instant.now(), "text", null, null));
when(dialogflowServiceClient.detectIntent(anyString(), any(DetectIntentRequestDTO.class))).thenReturn(Mono.just(new DetectIntentResponseDTO(sessionId, new QueryResultDTO(resolvedContext, null), null)));
// When // When
Mono<DetectIntentResponseDTO> result = conversationManagerService.startNotificationConversation(context, request, notification); Mono<DetectIntentResponseDTO> result = conversationManagerService.startNotificationConversation(context, request, notification);