UPDATE 04-Sept

This commit is contained in:
PAVEL PALMA
2025-09-05 11:32:38 -06:00
parent 499bc002ae
commit 37c1b31b7c
9 changed files with 323 additions and 258 deletions

View File

@@ -62,4 +62,15 @@ public record ConversationEntryDTO(
null null
); );
} }
public static ConversationEntryDTO forSystem(String text, Map<String, Object> parameters) {
return new ConversationEntryDTO(
ConversationEntryEntity.SISTEMA,
ConversationEntryType.CONVERSACION,
Instant.now(),
text,
parameters,
null
);
}
} }

View File

@@ -57,12 +57,22 @@ public class ConversationContextMapper {
case AGENTE: case AGENTE:
prefix = "Agent: "; prefix = "Agent: ";
break; break;
case SISTEMA:
prefix = "System: ";
break;
case USUARIO: case USUARIO:
default: default:
prefix = "User: "; prefix = "User: ";
break; break;
} }
} }
return prefix + entry.text();
String text = prefix + entry.text();
if (entry.parameters() != null && !entry.parameters().isEmpty()) {
text += " " + entry.parameters().toString();
}
return text;
} }
} }

View File

@@ -5,11 +5,12 @@
package com.example.mapper.notification; package com.example.mapper.notification;
import com.example.dto.dialogflow.notification.EventInputDTO;
import com.example.dto.dialogflow.notification.ExternalNotRequestDTO; import com.example.dto.dialogflow.notification.ExternalNotRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentRequestDTO; import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
import com.example.dto.dialogflow.conversation.QueryInputDTO; import com.example.dto.dialogflow.conversation.QueryInputDTO;
import com.example.dto.dialogflow.conversation.QueryParamsDTO; import com.example.dto.dialogflow.conversation.QueryParamsDTO;
import com.example.dto.dialogflow.conversation.TextInputDTO;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@@ -26,9 +27,12 @@ import java.util.Objects;
*/ */
@Component @Component
public class ExternalNotRequestMapper { public class ExternalNotRequestMapper {
private static final String EVENT_NAME = "notificacion";
private static final String LANGUAGE_CODE = "es"; private static final String LANGUAGE_CODE = "es";
private static final String TELEPHONE_PARAM_NAME = "telefono"; private static final String TELEPHONE_PARAM_NAME = "telefono";
private static final String NOTIFICATION_TEXT_PARAM = "notification_text";
private static final String PREFIX_PO_PARAM = "notification_po_";
public DetectIntentRequestDTO map(ExternalNotRequestDTO request) { public DetectIntentRequestDTO map(ExternalNotRequestDTO request) {
Objects.requireNonNull(request, "NotificationRequestDTO cannot be null for mapping."); Objects.requireNonNull(request, "NotificationRequestDTO cannot be null for mapping.");
@@ -37,16 +41,22 @@ public class ExternalNotRequestMapper {
throw new IllegalArgumentException("List of 'telefonos' (phone numbers) is required and cannot be empty in NotificationRequestDTO."); throw new IllegalArgumentException("List of 'telefonos' (phone numbers) is required and cannot be empty in NotificationRequestDTO.");
} }
String phoneNumber = request.phoneNumber(); String phoneNumber = request.phoneNumber();
EventInputDTO eventInput = new EventInputDTO(EVENT_NAME);
QueryInputDTO queryInput = new QueryInputDTO(null,eventInput, LANGUAGE_CODE);
Map<String, Object> parameters = new HashMap<>(); Map<String, Object> parameters = new HashMap<>();
parameters.put(TELEPHONE_PARAM_NAME, phoneNumber); parameters.put(TELEPHONE_PARAM_NAME, phoneNumber);
parameters.put(NOTIFICATION_TEXT_PARAM, request.text());
if (request.text() != null && !request.text().trim().isEmpty()) {
parameters.put("notification_text", request.text()); if (request.hiddenParameters() != null && !request.hiddenParameters().isEmpty()) {
request.hiddenParameters().forEach((key, value) -> {
parameters.put(PREFIX_PO_PARAM + key, value);
});
} }
TextInputDTO textInput = new TextInputDTO(request.text());
QueryInputDTO queryInput = new QueryInputDTO(textInput, null, LANGUAGE_CODE);
QueryParamsDTO queryParams = new QueryParamsDTO(parameters); QueryParamsDTO queryParams = new QueryParamsDTO(parameters);
return new DetectIntentRequestDTO(queryInput, queryParams); return new DetectIntentRequestDTO(queryInput, queryParams);

View File

@@ -7,8 +7,6 @@ import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.dto.dialogflow.conversation.ConversationContext; import com.example.dto.dialogflow.conversation.ConversationContext;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationEntryEntity;
import com.example.dto.dialogflow.conversation.ConversationEntryType;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO; import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO;
import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.dto.dialogflow.notification.NotificationDTO;
@@ -31,6 +29,7 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@Service @Service
public class ConversationManagerService { public class ConversationManagerService {
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class); private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
@@ -244,6 +243,7 @@ public class ConversationManagerService {
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, .doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e)); entry.type().name(), e.getMessage(), e));
} }
private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) { private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) {
Map<String, Object> params = Optional.ofNullable(request.queryParams()) Map<String, Object> params = Optional.ofNullable(request.queryParams())
.map(queryParamsDTO -> queryParamsDTO.parameters()) .map(queryParamsDTO -> queryParamsDTO.parameters())

View File

@@ -28,141 +28,144 @@ import reactor.core.scheduler.Schedulers;
@Service @Service
public class FirestoreNotificationService { public class FirestoreNotificationService {
private static final Logger logger = LoggerFactory.getLogger(FirestoreNotificationService.class); private static final Logger logger = LoggerFactory.getLogger(FirestoreNotificationService.class);
private static final String NOTIFICATION_COLLECTION_PATH_FORMAT = "artifacts/%s/notifications"; private static final String NOTIFICATION_COLLECTION_PATH_FORMAT = "artifacts/%s/notifications";
private static final String FIELD_MESSAGES = "notificaciones"; private static final String FIELD_MESSAGES = "notificaciones";
private static final String FIELD_LAST_UPDATED = "ultimaActualizacion"; private static final String FIELD_LAST_UPDATED = "ultimaActualizacion";
private static final String FIELD_PHONE_NUMBER = "telefono"; private static final String FIELD_PHONE_NUMBER = "telefono";
private static final String FIELD_NOTIFICATION_ID = "sessionId"; private static final String FIELD_NOTIFICATION_ID = "sessionId";
private final FirestoreBaseRepository firestoreBaseRepository; private final FirestoreBaseRepository firestoreBaseRepository;
private final FirestoreNotificationMapper firestoreNotificationMapper; private final FirestoreNotificationMapper firestoreNotificationMapper;
public FirestoreNotificationService( public FirestoreNotificationService(
FirestoreBaseRepository firestoreBaseRepository, FirestoreBaseRepository firestoreBaseRepository,
FirestoreNotificationMapper firestoreNotificationMapper, FirestoreNotificationMapper firestoreNotificationMapper,
MemoryStoreNotificationService memoryStoreNotificationService) { MemoryStoreNotificationService memoryStoreNotificationService) {
this.firestoreBaseRepository = firestoreBaseRepository; this.firestoreBaseRepository = firestoreBaseRepository;
this.firestoreNotificationMapper = firestoreNotificationMapper; this.firestoreNotificationMapper = firestoreNotificationMapper;
} }
public Mono<Void> saveOrAppendNotificationEntry(NotificationDTO newEntry) { public Mono<Void> saveOrAppendNotificationEntry(NotificationDTO newEntry) {
return Mono.fromRunnable( return Mono.fromRunnable(
() -> { () -> {
String phoneNumber = newEntry.telefono(); String phoneNumber = newEntry.telefono();
if (phoneNumber == null || phoneNumber.isBlank()) { if (phoneNumber == null || phoneNumber.isBlank()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Phone number is required to manage notification entries."); "Phone number is required to manage notification entries.");
}
// Use the phone number as the document ID for the session.
String notificationSessionId = phoneNumber;
// Synchronize on the notification session ID to prevent race conditions when creating a new session.
synchronized (notificationSessionId.intern()) {
DocumentReference notificationDocRef =
getNotificationDocumentReference(notificationSessionId);
Map<String, Object> entryMap =
firestoreNotificationMapper.mapNotificationDTOToMap(newEntry);
try {
// Check if the session document exists.
boolean docExists = firestoreBaseRepository.documentExists(notificationDocRef);
if (docExists) {
// If the document exists, append the new entry to the 'notificaciones' array.
Map<String, Object> updates =
Map.of(
FIELD_MESSAGES, FieldValue.arrayUnion(entryMap),
FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now())));
firestoreBaseRepository.updateDocument(notificationDocRef, updates);
logger.info(
"Successfully appended new entry to notification session {} in Firestore.",
notificationSessionId);
} else {
// If the document does not exist, create a new session document.
Map<String, Object> newSessionData =
Map.of(
FIELD_NOTIFICATION_ID,
notificationSessionId,
FIELD_PHONE_NUMBER,
phoneNumber,
"fechaCreacion",
Timestamp.of(java.util.Date.from(Instant.now())),
FIELD_LAST_UPDATED,
Timestamp.of(java.util.Date.from(Instant.now())),
FIELD_MESSAGES,
Collections.singletonList(entryMap));
firestoreBaseRepository.setDocument(notificationDocRef, newSessionData);
logger.info(
"Successfully created a new notification session {} in Firestore.",
notificationSessionId);
}
} catch (ExecutionException e) {
logger.error(
"Error saving notification to Firestore for phone {}: {}",
phoneNumber,
e.getMessage(),
e);
throw new FirestorePersistenceException(
"Failed to save notification to Firestore for phone " + phoneNumber, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(
"Thread interrupted while saving notification to Firestore for phone {}: {}",
phoneNumber,
e.getMessage(),
e);
throw new FirestorePersistenceException(
"Saving notification was interrupted for phone " + phoneNumber, e);
}
}
})
.subscribeOn(Schedulers.boundedElastic())
.then();
}
private String getNotificationCollectionPath() {
return String.format(NOTIFICATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
}
private DocumentReference getNotificationDocumentReference(String notificationId) {
String collectionPath = getNotificationCollectionPath();
return firestoreBaseRepository.getDocumentReference(collectionPath, notificationId);
}
@SuppressWarnings("unchecked")
public Mono<Void> updateNotificationStatus(String sessionId, String status) {
return Mono.fromRunnable(() -> {
DocumentReference notificationDocRef = getNotificationDocumentReference(sessionId);
try {
Map<String, Object> sessionData = firestoreBaseRepository.getDocument(notificationDocRef, Map.class);
if (sessionData != null) {
List<Map<String, Object>> notifications = (List<Map<String, Object>>) sessionData.get(FIELD_MESSAGES);
if (notifications != null) {
List<Map<String, Object>> updatedNotifications = new ArrayList<>();
for (Map<String, Object> notification : notifications) {
Map<String, Object> updatedNotification = new HashMap<>(notification);
updatedNotification.put("status", status);
updatedNotifications.add(updatedNotification);
} }
Map<String, Object> updates = new HashMap<>(); // Use the phone number as the document ID for the session.
updates.put(FIELD_MESSAGES, updatedNotifications); String notificationSessionId = phoneNumber;
updates.put(FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now())));
firestoreBaseRepository.updateDocument(notificationDocRef, updates); // Synchronize on the notification session ID to prevent race conditions when
logger.info("Successfully updated notification status to '{}' for session {} in Firestore.", status, sessionId); // creating a new session.
synchronized (notificationSessionId.intern()) {
DocumentReference notificationDocRef = getNotificationDocumentReference(notificationSessionId);
Map<String, Object> entryMap = firestoreNotificationMapper.mapNotificationDTOToMap(newEntry);
try {
// Check if the session document exists.
boolean docExists = firestoreBaseRepository.documentExists(notificationDocRef);
if (docExists) {
// If the document exists, append the new entry to the 'notificaciones' array.
Map<String, Object> updates = Map.of(
FIELD_MESSAGES, FieldValue.arrayUnion(entryMap),
FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now())));
firestoreBaseRepository.updateDocument(notificationDocRef, updates);
logger.info(
"Successfully appended new entry to notification session {} in Firestore.",
notificationSessionId);
} else {
// If the document does not exist, create a new session document.
Map<String, Object> newSessionData = Map.of(
FIELD_NOTIFICATION_ID,
notificationSessionId,
FIELD_PHONE_NUMBER,
phoneNumber,
"fechaCreacion",
Timestamp.of(java.util.Date.from(Instant.now())),
FIELD_LAST_UPDATED,
Timestamp.of(java.util.Date.from(Instant.now())),
FIELD_MESSAGES,
Collections.singletonList(entryMap));
firestoreBaseRepository.setDocument(notificationDocRef, newSessionData);
logger.info(
"Successfully created a new notification session {} in Firestore.",
notificationSessionId);
}
} catch (ExecutionException e) {
logger.error(
"Error saving notification to Firestore for phone {}: {}",
phoneNumber,
e.getMessage(),
e);
throw new FirestorePersistenceException(
"Failed to save notification to Firestore for phone " + phoneNumber, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error(
"Thread interrupted while saving notification to Firestore for phone {}: {}",
phoneNumber,
e.getMessage(),
e);
throw new FirestorePersistenceException(
"Saving notification was interrupted for phone " + phoneNumber, e);
}
}
})
.subscribeOn(Schedulers.boundedElastic())
.then();
}
private String getNotificationCollectionPath() {
return String.format(NOTIFICATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
}
private DocumentReference getNotificationDocumentReference(String notificationId) {
String collectionPath = getNotificationCollectionPath();
return firestoreBaseRepository.getDocumentReference(collectionPath, notificationId);
}
@SuppressWarnings("unchecked")
public Mono<Void> updateNotificationStatus(String sessionId, String status) {
return Mono.fromRunnable(() -> {
DocumentReference notificationDocRef = getNotificationDocumentReference(sessionId);
try {
Map<String, Object> sessionData = firestoreBaseRepository.getDocument(notificationDocRef, Map.class);
if (sessionData != null) {
List<Map<String, Object>> notifications = (List<Map<String, Object>>) sessionData
.get(FIELD_MESSAGES);
if (notifications != null) {
List<Map<String, Object>> updatedNotifications = new ArrayList<>();
for (Map<String, Object> notification : notifications) {
Map<String, Object> updatedNotification = new HashMap<>(notification);
updatedNotification.put("status", status);
updatedNotifications.add(updatedNotification);
}
Map<String, Object> updates = new HashMap<>();
updates.put(FIELD_MESSAGES, updatedNotifications);
updates.put(FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now())));
firestoreBaseRepository.updateDocument(notificationDocRef, updates);
logger.info("Successfully updated notification status to '{}' for session {} in Firestore.",
status, sessionId);
}
} else {
logger.warn("Notification session {} not found in Firestore. Cannot update status.", sessionId);
} }
} else { } catch (ExecutionException e) {
logger.warn("Notification session {} not found in Firestore. Cannot update status.", sessionId); logger.error("Error updating notification status in Firestore for session {}: {}", sessionId,
e.getMessage(), e);
throw new FirestorePersistenceException(
"Failed to update notification status in Firestore for session " + sessionId, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Thread interrupted while updating notification status in Firestore for session {}: {}",
sessionId, e.getMessage(), e);
throw new FirestorePersistenceException(
"Updating notification status was interrupted for session " + sessionId, e);
} }
} catch (ExecutionException e) { })
logger.error("Error updating notification status in Firestore for session {}: {}", sessionId, e.getMessage(), e); .subscribeOn(Schedulers.boundedElastic())
throw new FirestorePersistenceException("Failed to update notification status in Firestore for session " + sessionId, e); .then();
} catch (InterruptedException e) { }
Thread.currentThread().interrupt();
logger.error("Thread interrupted while updating notification status in Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Updating notification status was interrupted for session " + sessionId, e);
}
})
.subscribeOn(Schedulers.boundedElastic())
.then();
}
} }

View File

@@ -10,10 +10,10 @@ import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentResponseDTO; import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO; import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.conversation.QueryInputDTO;
import com.example.dto.dialogflow.conversation.QueryParamsDTO;
import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.dto.dialogflow.notification.NotificationDTO;
import com.example.mapper.notification.ExternalNotRequestMapper;
import com.example.service.base.DialogflowClientService; import com.example.service.base.DialogflowClientService;
import com.example.service.conversation.DataLossPrevention;
import com.example.util.SessionIdGenerator; import com.example.util.SessionIdGenerator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -22,131 +22,152 @@ import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.time.Instant; import java.time.Instant;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import com.example.dto.dialogflow.conversation.TextInputDTO;
@Service @Service
public class NotificationManagerService { public class NotificationManagerService {
private static final Logger logger = LoggerFactory.getLogger(NotificationManagerService.class); private static final Logger logger = LoggerFactory.getLogger(NotificationManagerService.class);
private static final String NOTIFICATION_TEXT_PARAM = "notificationText"; private static final String eventName = "notificacion";
private static final String eventName = "notificacion"; private static final String PREFIX_PO_PARAM = "notification_po_";
private final DialogflowClientService dialogflowClientService; private final DialogflowClientService dialogflowClientService;
private final FirestoreNotificationService firestoreNotificationService; private final FirestoreNotificationService firestoreNotificationService;
private final MemoryStoreNotificationService memoryStoreNotificationService; private final MemoryStoreNotificationService memoryStoreNotificationService;
private final FirestoreNotificationConvService firestoreConversationService; private final FirestoreNotificationConvService firestoreConversationService;
private final ExternalNotRequestMapper externalNotRequestMapper;
@Value("${dialogflow.default-language-code:es}") private final DataLossPrevention dataLossPrevention;
private String defaultLanguageCode; private final String dlpTemplateCompleteFlow;
public NotificationManagerService( @Value("${dialogflow.default-language-code:es}")
DialogflowClientService dialogflowClientService, private String defaultLanguageCode;
FirestoreNotificationService firestoreNotificationService,
MemoryStoreNotificationService memoryStoreNotificationService,
FirestoreNotificationConvService firestoreConversationService) {
this.dialogflowClientService = dialogflowClientService;
this.firestoreNotificationService = firestoreNotificationService;
this.memoryStoreNotificationService = memoryStoreNotificationService;
this.firestoreConversationService = firestoreConversationService;
}
public Mono<DetectIntentResponseDTO> processNotification(ExternalNotRequestDTO externalRequest) { public NotificationManagerService(
Objects.requireNonNull(externalRequest, "ExternalNotRequestDTO cannot be null."); DialogflowClientService dialogflowClientService,
FirestoreNotificationService firestoreNotificationService,
MemoryStoreNotificationService memoryStoreNotificationService,
FirestoreNotificationConvService firestoreConversationService,
ExternalNotRequestMapper externalNotRequestMapper,
DataLossPrevention dataLossPrevention,
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
this.dialogflowClientService = dialogflowClientService;
this.firestoreNotificationService = firestoreNotificationService;
this.memoryStoreNotificationService = memoryStoreNotificationService;
this.firestoreConversationService = firestoreConversationService;
this.externalNotRequestMapper = externalNotRequestMapper;
this.dataLossPrevention = dataLossPrevention;
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
}
String telefono = externalRequest.phoneNumber(); public Mono<DetectIntentResponseDTO> processNotification(ExternalNotRequestDTO externalRequest) {
if (telefono == null || telefono.isBlank()) { Objects.requireNonNull(externalRequest, "ExternalNotRequestDTO cannot be null.");
logger.warn("No phone number provided in ExternalNotRequestDTO. Cannot process notification.");
return Mono.error(new IllegalArgumentException("Phone number is required."));
}
// 1. Persist the incoming notification entry String telefono = externalRequest.phoneNumber();
String newNotificationId = SessionIdGenerator.generateStandardSessionId(); if (telefono == null || telefono.isBlank()) {
NotificationDTO newNotificationEntry = new NotificationDTO(newNotificationId, telefono, Instant.now(), logger.warn("No phone number provided in ExternalNotRequestDTO. Cannot process notification.");
externalRequest.text(), eventName, defaultLanguageCode, Collections.emptyMap(), "active"); return Mono.error(new IllegalArgumentException("Phone number is required."));
Mono<Void> persistenceMono = memoryStoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) }
.doOnSuccess(v -> {
logger.info("Notification for phone {} cached. Kicking off async Firestore write-back.", telefono); return dataLossPrevention.getObfuscatedString(externalRequest.text(), dlpTemplateCompleteFlow)
firestoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry) .flatMap(obfuscatedMessage -> {
.subscribe( ExternalNotRequestDTO obfuscatedRequest = new ExternalNotRequestDTO(
ignored -> logger.debug( obfuscatedMessage,
"Background: Notification entry persistence initiated for phone {} in Firestore.",telefono), externalRequest.phoneNumber(),
e -> logger.error( externalRequest.hiddenParameters()
"Background: Error during notification entry persistence for phone {} in Firestore: {}", );
telefono, e.getMessage(), e)); // 1. Persist the incoming notification entry
String newNotificationId = SessionIdGenerator.generateStandardSessionId();
Map<String, Object> parameters = new HashMap<>();
if (obfuscatedRequest.hiddenParameters() != null) {
obfuscatedRequest.hiddenParameters().forEach((key, value) -> parameters.put(PREFIX_PO_PARAM + key, value));
}
NotificationDTO newNotificationEntry = new NotificationDTO(newNotificationId, telefono, Instant.now(),
obfuscatedRequest.text(), eventName, defaultLanguageCode, parameters, "active");
Mono<Void> persistenceMono = memoryStoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry)
.doOnSuccess(v -> {
logger.info("Notification for phone {} cached. Kicking off async Firestore write-back.", telefono);
firestoreNotificationService.saveOrAppendNotificationEntry(newNotificationEntry)
.subscribe(
ignored -> logger.debug(
"Background: Notification entry persistence initiated for phone {} in Firestore.",
telefono),
e -> logger.error(
"Background: Error during notification entry persistence for phone {} in Firestore: {}",
telefono, e.getMessage(), e));
});
// 2. Resolve or create a conversation session
Mono<ConversationSessionDTO> sessionMono = memoryStoreNotificationService.getSessionByTelefono(telefono)
.doOnNext(session -> logger.info("Found existing conversation session {} for phone number {}",
session.sessionId(), telefono))
.flatMap(session -> {
Map<String, Object> prefixedParameters = new HashMap<>();
if (obfuscatedRequest.hiddenParameters() != null) {
obfuscatedRequest.hiddenParameters()
.forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value));
}
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(),
prefixedParameters);
return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono)
.thenReturn(session);
})
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing conversation session found for phone number {}. Creating new session: {}",
telefono, newSessionId);
String userId = "user_by_phone_" + telefono;
Map<String, Object> prefixedParameters = new HashMap<>();
if (obfuscatedRequest.hiddenParameters() != null) {
obfuscatedRequest.hiddenParameters()
.forEach((key, value) -> prefixedParameters.put(PREFIX_PO_PARAM + key, value));
}
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(obfuscatedRequest.text(),
prefixedParameters);
return persistConversationTurn(userId, newSessionId, systemEntry, telefono)
.then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono)));
}));
// 3. Send notification text to Dialogflow using the resolved conversation
// session
return persistenceMono.then(sessionMono)
.flatMap(session -> {
final String sessionId = session.sessionId();
logger.info("Sending notification text to Dialogflow using conversation session: {}", sessionId);
DetectIntentRequestDTO detectIntentRequest = externalNotRequestMapper.map(obfuscatedRequest);
return dialogflowClientService.detectIntent(sessionId, detectIntentRequest);
})
.doOnSuccess(response -> logger
.info("Finished processing notification. Dialogflow response received for phone {}.", telefono))
.doOnError(e -> logger.error("Overall error in NotificationManagerService: {}", e.getMessage(), e));
}); });
}
// 2. Resolve or create a conversation session private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
Mono<ConversationSessionDTO> sessionMono = memoryStoreNotificationService.getSessionByTelefono(telefono) String userPhoneNumber) {
.doOnNext(session -> logger.info("Found existing conversation session {} for phone number {}", logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId,
session.sessionId(), telefono)) entry.type().name());
.flatMap(session -> {
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(externalRequest.text());
return persistConversationTurn(session.userId(), session.sessionId(), systemEntry, telefono)
.thenReturn(session);
})
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing conversation session found for phone number {}. Creating new session: {}",telefono, newSessionId);
String userId = "user_by_phone_" + telefono;
ConversationEntryDTO systemEntry = ConversationEntryDTO.forSystem(externalRequest.text());
return persistConversationTurn(userId, newSessionId, systemEntry, telefono)
.then(Mono.just(ConversationSessionDTO.create(newSessionId, userId, telefono)));
}));
// 3. Send notification text to Dialogflow using the resolved conversation return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
// session .doOnSuccess(v -> {
return persistenceMono.then(sessionMono) logger.info(
.flatMap(session -> { "Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.",
final String sessionId = session.sessionId(); sessionId, entry.type().name());
logger.info("Sending notification text to Dialogflow using conversation session: {}", sessionId); firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.subscribe(
Map<String, Object> parameters = new HashMap<>(); fsVoid -> logger.debug(
parameters.put("telefono", telefono); "Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
parameters.put(NOTIFICATION_TEXT_PARAM, newNotificationEntry.texto()); sessionId, entry.type().name()),
fsError -> logger.error(
if (externalRequest.hiddenParameters() != null && !externalRequest.hiddenParameters().isEmpty()) { "Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
parameters.putAll(externalRequest.hiddenParameters()); sessionId, entry.type().name(), fsError.getMessage(), fsError));
} })
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
// Use a TextInputDTO to correctly build the QueryInputDTO entry.type().name(), e.getMessage(), e));
TextInputDTO textInput = new TextInputDTO(newNotificationEntry.texto()); }
QueryInputDTO queryInput = new QueryInputDTO(textInput, null, defaultLanguageCode);
DetectIntentRequestDTO detectIntentRequest = new DetectIntentRequestDTO(
queryInput,
new QueryParamsDTO(parameters));
return dialogflowClientService.detectIntent(sessionId, detectIntentRequest);
})
.doOnSuccess(response -> logger
.info("Finished processing notification. Dialogflow response received for phone {}.", telefono))
.doOnError(e -> logger.error("Overall error in NotificationManagerService: {}", e.getMessage(), e));
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId,
entry.type().name());
return memoryStoreNotificationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> {
logger.info(
"Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.",
sessionId, entry.type().name());
firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.subscribe(
fsVoid -> logger.debug(
"Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
sessionId, entry.type().name()),
fsError -> logger.error(
"Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError));
})
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
entry.type().name(), e.getMessage(), e));
}
} }

View File

@@ -9,7 +9,7 @@
# Best Practices: # Best Practices:
# - Use Spring Profiles (e.g., application-dev.properties, application-prod.properties) # - Use Spring Profiles (e.g., application-dev.properties, application-prod.properties)
# to manage environment-specific settings. # to manage environment-specific settings.
# - Do not store in PROD sensitive information (e.g., API keys, passwords) directly here. # - Do not store in PROD sensitive information directly here.
# Use environment variables or a configuration server for production environments. # Use environment variables or a configuration server for production environments.
# - This template can be adapted for logging configuration, database connections, # - This template can be adapted for logging configuration, database connections,
# and other external service settings. # and other external service settings.

View File

@@ -9,7 +9,7 @@
# Best Practices: # Best Practices:
# - Use Spring Profiles (e.g., application-dev.properties, application-prod.properties) # - Use Spring Profiles (e.g., application-dev.properties, application-prod.properties)
# to manage environment-specific settings. # to manage environment-specific settings.
# - Do not store in PROD sensitive information (e.g., API keys, passwords) directly here. # - Do not store in PROD sensitive information directly here.
# Use environment variables or a configuration server for production environments. # Use environment variables or a configuration server for production environments.
# - This template can be adapted for logging configuration, database connections, # - This template can be adapted for logging configuration, database connections,
# and other external service settings. # and other external service settings.
@@ -66,4 +66,9 @@ google.cloud.dlp.dlpTemplatePersistFlow=${DLP_TEMPLATE_PERSIST_FLOW}
# ========================================================= # =========================================================
# Quick-replies Preset-data # Quick-replies Preset-data
# ========================================================= # =========================================================
firestore.data.importer.enabled=true firestore.data.importer.enabled=true
# =========================================================
# LOGGING Configuration
# =========================================================
logging.level.root=${LOGGING_LEVEL_ROOT:INFO}
logging.level.com.example=${LOGGING_LEVEL_COM_EXAMPLE:INFO}

View File

@@ -9,7 +9,7 @@
# Best Practices: # Best Practices:
# - Use Spring Profiles (e.g., application-dev.properties, application-prod.properties) # - Use Spring Profiles (e.g., application-dev.properties, application-prod.properties)
# to manage environment-specific settings. # to manage environment-specific settings.
# - Do not store in PROD sensitive information (e.g., API keys, passwords) directly here. # - Do not store in PROD sensitive information directly here.
# Use environment variables or a configuration server for production environments. # Use environment variables or a configuration server for production environments.
# - This template can be adapted for logging configuration, database connections, # - This template can be adapted for logging configuration, database connections,
# and other external service settings. # and other external service settings.
@@ -66,4 +66,9 @@ google.cloud.dlp.dlpTemplatePersistFlow=${DLP_TEMPLATE_PERSIST_FLOW}
# ========================================================= # =========================================================
# Quick-replies Preset-data # Quick-replies Preset-data
# ========================================================= # =========================================================
firestore.data.importer.enabled=true firestore.data.importer.enabled=true
# =========================================================
# LOGGING Configuration
# =========================================================
logging.level.root=${LOGGING_LEVEL_ROOT:INFO}
logging.level.com.example=${LOGGING_LEVEL_COM_EXAMPLE:DEBUG}