UPDATE int-layer 25-Ago
This commit is contained in:
@@ -2,9 +2,7 @@
|
||||
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
|
||||
* Your use of it is subject to your agreement with Google.
|
||||
*/
|
||||
|
||||
package com.example.service.conversation;
|
||||
|
||||
import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
|
||||
import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
|
||||
import com.example.dto.dialogflow.conversation.ConversationContext;
|
||||
@@ -24,22 +22,20 @@ import com.example.service.quickreplies.QuickRepliesManagerService;
|
||||
import com.example.util.SessionIdGenerator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
@Service
|
||||
public class ConversationManagerService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
|
||||
private static final long SESSION_RESET_THRESHOLD_HOURS = 24;
|
||||
private static final String CURRENT_PAGE_PARAM = "currentPage";
|
||||
|
||||
private final ExternalConvRequestMapper externalRequestToDialogflowMapper;
|
||||
private final DialogflowClientService dialogflowServiceClient;
|
||||
private final FirestoreConversationService firestoreConversationService;
|
||||
@@ -49,6 +45,9 @@ public class ConversationManagerService {
|
||||
private final MemoryStoreNotificationService memoryStoreNotificationService;
|
||||
private final NotificationContextMapper notificationContextMapper;
|
||||
private final ConversationContextMapper conversationContextMapper;
|
||||
private final DataLossPrevention dataLossPrevention;
|
||||
private final String dlpTemplateCompleteFlow;
|
||||
private final String dlpTemplatePersistFlow;
|
||||
|
||||
public ConversationManagerService(
|
||||
DialogflowClientService dialogflowServiceClient,
|
||||
@@ -59,7 +58,10 @@ public class ConversationManagerService {
|
||||
MessageEntryFilter messageEntryFilter,
|
||||
MemoryStoreNotificationService memoryStoreNotificationService,
|
||||
NotificationContextMapper notificationContextMapper,
|
||||
ConversationContextMapper conversationContextMapper) {
|
||||
ConversationContextMapper conversationContextMapper,
|
||||
DataLossPrevention dataLossPrevention,
|
||||
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow,
|
||||
@Value("${google.cloud.dlp.dlpTemplatePersistFlow}") String dlpTemplatePersistFlow) {
|
||||
this.dialogflowServiceClient = dialogflowServiceClient;
|
||||
this.firestoreConversationService = firestoreConversationService;
|
||||
this.memoryStoreConversationService = memoryStoreConversationService;
|
||||
@@ -69,28 +71,31 @@ public class ConversationManagerService {
|
||||
this.memoryStoreNotificationService = memoryStoreNotificationService;
|
||||
this.notificationContextMapper = notificationContextMapper;
|
||||
this.conversationContextMapper = conversationContextMapper;
|
||||
this.dataLossPrevention = dataLossPrevention;
|
||||
this.dlpTemplateCompleteFlow = dlpTemplateCompleteFlow;
|
||||
this.dlpTemplatePersistFlow = dlpTemplatePersistFlow;
|
||||
}
|
||||
|
||||
public Mono<DetectIntentResponseDTO> manageConversation(ExternalConvRequestDTO externalrequest) {
|
||||
return dataLossPrevention.getObfuscatedString(externalrequest.message(), dlpTemplateCompleteFlow)
|
||||
.flatMap(obfuscatedMessage -> {
|
||||
ExternalConvRequestDTO obfuscatedRequest = new ExternalConvRequestDTO(
|
||||
obfuscatedMessage,
|
||||
externalrequest.user(),
|
||||
externalrequest.channel(),
|
||||
externalrequest.tipo(),
|
||||
externalrequest.pantallaContexto()
|
||||
);
|
||||
return memoryStoreConversationService.getSessionByTelefono(externalrequest.user().telefono())
|
||||
.flatMap(session -> {
|
||||
if (session != null && !session.entries().isEmpty()) {
|
||||
ConversationEntryDTO lastEntry = session.entries().get(session.entries().size() - 1);
|
||||
if (lastEntry.entity() == ConversationEntryEntity.SISTEMA && lastEntry.type() == ConversationEntryType.INICIO) {
|
||||
logger.info("Detected 'SISTEMA' and 'INICIO' values in last session entry. Delegating to QuickRepliesManagerService.");
|
||||
ExternalConvRequestDTO updatedRequest = new ExternalConvRequestDTO(
|
||||
externalrequest.message(),
|
||||
externalrequest.user(),
|
||||
externalrequest.channel(),
|
||||
externalrequest.tipo(),
|
||||
session.pantallaContexto()
|
||||
);
|
||||
return quickRepliesManagerService.manageConversation(updatedRequest);
|
||||
}
|
||||
|
||||
if (session != null && session.pantallaContexto() != null && !session.pantallaContexto().isBlank()) {
|
||||
logger.info("Detected 'pantallaContexto' in session. Delegating to QuickRepliesManagerService.");
|
||||
return quickRepliesManagerService.manageConversation(obfuscatedRequest);
|
||||
}
|
||||
return continueManagingConversation(externalrequest);
|
||||
return continueManagingConversation(obfuscatedRequest);
|
||||
})
|
||||
.switchIfEmpty(continueManagingConversation(externalrequest));
|
||||
.switchIfEmpty(continueManagingConversation(obfuscatedRequest));
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<DetectIntentResponseDTO> continueManagingConversation(ExternalConvRequestDTO externalrequest) {
|
||||
@@ -114,11 +119,9 @@ public class ConversationManagerService {
|
||||
|
||||
return handleMessageClassification(context, request);
|
||||
}
|
||||
|
||||
private Mono<DetectIntentResponseDTO> handleMessageClassification(ConversationContext context, DetectIntentRequestDTO request) {
|
||||
final String userPhoneNumber = context.primaryPhoneNumber();
|
||||
final String userMessageText = context.userMessageText();
|
||||
|
||||
return memoryStoreNotificationService.getNotificationIdForPhone(userPhoneNumber)
|
||||
.flatMap(notificationId -> memoryStoreNotificationService.getCachedNotificationSession(notificationId))
|
||||
.map(notificationSession -> notificationSession.notificaciones().stream()
|
||||
@@ -142,7 +145,6 @@ public class ConversationManagerService {
|
||||
})
|
||||
.switchIfEmpty(continueConversationFlow(context, request));
|
||||
}
|
||||
|
||||
private Mono<DetectIntentResponseDTO> continueConversationFlow(ConversationContext context, DetectIntentRequestDTO request) {
|
||||
final String userId = context.userId();
|
||||
final String userMessageText = context.userMessageText();
|
||||
@@ -190,7 +192,11 @@ public class ConversationManagerService {
|
||||
|
||||
private Mono<DetectIntentResponseDTO> processDialogflowRequest(ConversationSessionDTO session, DetectIntentRequestDTO request, String userId, String userMessageText, String userPhoneNumber, boolean newSession) {
|
||||
final String finalSessionId = session.sessionId();
|
||||
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
|
||||
|
||||
return dataLossPrevention.getObfuscatedString(userMessageText, dlpTemplatePersistFlow)
|
||||
.flatMap(obfuscatedUserMessageText -> {
|
||||
|
||||
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(obfuscatedUserMessageText);
|
||||
|
||||
return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber)
|
||||
.doOnSuccess(v -> logger.debug("User entry successfully persisted for session {}. Proceeding to Dialogflow...", finalSessionId))
|
||||
@@ -204,13 +210,12 @@ public class ConversationManagerService {
|
||||
})
|
||||
.doOnError(error -> logger.error("Overall error during conversation management for session {}: {}", finalSessionId, error.getMessage(), error))
|
||||
));
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<DetectIntentResponseDTO> startNotificationConversation(ConversationContext context, DetectIntentRequestDTO request, NotificationDTO notification) {
|
||||
final String userId = context.userId();
|
||||
final String userMessageText = context.userMessageText();
|
||||
final String userPhoneNumber = context.primaryPhoneNumber();
|
||||
|
||||
return memoryStoreNotificationService.getSessionByTelefono(userPhoneNumber)
|
||||
.switchIfEmpty(Mono.defer(() -> {
|
||||
String newSessionId = SessionIdGenerator.generateStandardSessionId();
|
||||
@@ -229,11 +234,9 @@ public class ConversationManagerService {
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,String userPhoneNumber) {
|
||||
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId,
|
||||
entry.type().name());
|
||||
|
||||
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
|
||||
.doOnSuccess(v -> logger.info(
|
||||
"Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.",
|
||||
@@ -248,12 +251,10 @@ public class ConversationManagerService {
|
||||
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId,
|
||||
entry.type().name(), e.getMessage(), e));
|
||||
}
|
||||
|
||||
private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) {
|
||||
Map<String, Object> params = Optional.ofNullable(request.queryParams())
|
||||
.map(queryParamsDTO -> queryParamsDTO.parameters())
|
||||
.orElse(Collections.emptyMap());
|
||||
|
||||
String primaryPhoneNumber = null;
|
||||
Object telefonoObj = params.get("telefono"); // Get from map
|
||||
if (telefonoObj instanceof String) {
|
||||
@@ -262,12 +263,10 @@ public class ConversationManagerService {
|
||||
logger.warn("Parameter 'telefono' in queryParams is not a String (type: {}). Expected String.",
|
||||
telefonoObj.getClass().getName());
|
||||
}
|
||||
|
||||
if (primaryPhoneNumber == null || primaryPhoneNumber.trim().isEmpty()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Phone number (telefono) is required in query parameters for conversation management.");
|
||||
}
|
||||
|
||||
String resolvedUserId = null;
|
||||
Object userIdObj = params.get("usuario_id");
|
||||
if (userIdObj instanceof String) {
|
||||
@@ -276,18 +275,15 @@ public class ConversationManagerService {
|
||||
logger.warn("Parameter 'userId' in query_params is not a String (type: {}). Expected String.",
|
||||
userIdObj.getClass().getName());
|
||||
}
|
||||
|
||||
if (resolvedUserId == null || resolvedUserId.trim().isEmpty()) {
|
||||
resolvedUserId = "user_by_phone_" + primaryPhoneNumber.replaceAll("[^0-9]", "");
|
||||
logger.warn("User ID not provided in query parameters. Using derived ID from phone number: {}",
|
||||
resolvedUserId);
|
||||
}
|
||||
|
||||
if (request.queryInput() == null || request.queryInput().text() == null ||
|
||||
request.queryInput().text().text() == null || request.queryInput().text().text().trim().isEmpty()) {
|
||||
throw new IllegalArgumentException("Dialogflow query input text is required.");
|
||||
}
|
||||
|
||||
String userMessageText = request.queryInput().text().text();
|
||||
return new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.example.service.conversation;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface DataLossPrevention {
|
||||
Mono<String> getObfuscatedString(String textToInspect, String templateId);
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package com.example.service.conversation;
|
||||
|
||||
import com.google.api.core.ApiFuture;
|
||||
import com.google.api.core.ApiFutureCallback;
|
||||
import com.google.api.core.ApiFutures;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import com.google.cloud.dlp.v2.DlpServiceClient;
|
||||
import com.google.privacy.dlp.v2.ByteContentItem;
|
||||
import com.google.privacy.dlp.v2.ContentItem;
|
||||
import com.google.privacy.dlp.v2.InspectConfig;
|
||||
import com.google.privacy.dlp.v2.InspectContentRequest;
|
||||
import com.google.privacy.dlp.v2.InspectContentResponse;
|
||||
import com.google.privacy.dlp.v2.Likelihood;
|
||||
import com.google.privacy.dlp.v2.LocationName;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.example.util.TextObfuscator;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Service
|
||||
public class DataLossPreventionImpl implements DataLossPrevention {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DataLossPreventionImpl.class);
|
||||
|
||||
private final String projectId;
|
||||
private final String location;
|
||||
private final DlpServiceClient dlpServiceClient;
|
||||
|
||||
public DataLossPreventionImpl(
|
||||
DlpServiceClient dlpServiceClient,
|
||||
@Value("${google.cloud.project}") String projectId,
|
||||
@Value("${google.cloud.location}") String location) {
|
||||
this.dlpServiceClient = dlpServiceClient;
|
||||
this.projectId = projectId;
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<String> getObfuscatedString(String text, String templateId) {
|
||||
ByteContentItem byteContentItem = ByteContentItem.newBuilder()
|
||||
.setType(ByteContentItem.BytesType.TEXT_UTF8)
|
||||
.setData(ByteString.copyFromUtf8(text))
|
||||
.build();
|
||||
ContentItem contentItem = ContentItem.newBuilder().setByteItem(byteContentItem).build();
|
||||
|
||||
Likelihood minLikelihood = Likelihood.VERY_UNLIKELY;
|
||||
|
||||
InspectConfig.FindingLimits findingLimits = InspectConfig.FindingLimits.newBuilder().setMaxFindingsPerItem(0)
|
||||
.build();
|
||||
|
||||
InspectConfig inspectConfig = InspectConfig.newBuilder()
|
||||
.setMinLikelihood(minLikelihood)
|
||||
.setLimits(findingLimits)
|
||||
.setIncludeQuote(true)
|
||||
.build();
|
||||
|
||||
String inspectTemplateName = String.format("projects/%s/locations/%s/inspectTemplates/%s", projectId, location,
|
||||
templateId);
|
||||
InspectContentRequest request = InspectContentRequest.newBuilder()
|
||||
.setParent(LocationName.of(projectId, location).toString())
|
||||
.setInspectTemplateName(inspectTemplateName)
|
||||
.setInspectConfig(inspectConfig)
|
||||
.setItem(contentItem)
|
||||
.build();
|
||||
|
||||
ApiFuture<InspectContentResponse> futureResponse = dlpServiceClient.inspectContentCallable()
|
||||
.futureCall(request);
|
||||
|
||||
return Mono.<InspectContentResponse>create(
|
||||
sink -> ApiFutures.addCallback(
|
||||
futureResponse,
|
||||
new ApiFutureCallback<>() {
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
sink.error(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(InspectContentResponse result) {
|
||||
sink.success(result);
|
||||
}
|
||||
},
|
||||
Runnable::run))
|
||||
.map(response -> {
|
||||
logger.info("DLP {} Findings: {}", templateId, response.getResult().getFindingsCount());
|
||||
return response.getResult().getFindingsCount() > 0
|
||||
? TextObfuscator.obfuscate(response, text)
|
||||
: text;
|
||||
}).onErrorResume(e -> {
|
||||
e.printStackTrace();
|
||||
return Mono.just(text);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -115,6 +115,8 @@ public class FirestoreConversationService {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
private String getConversationCollectionPath() {
|
||||
return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
|
||||
}
|
||||
|
||||
@@ -93,4 +93,10 @@ public class MemoryStoreConversationService {
|
||||
})
|
||||
.doOnError(e -> logger.error("Error retrieving session by phone number {}: {}", telefono, e.getMessage(), e));
|
||||
}
|
||||
|
||||
public Mono<Void> updateSession(ConversationSessionDTO session) {
|
||||
String sessionKey = SESSION_KEY_PREFIX + session.sessionId();
|
||||
logger.info("Attempting to update session {} in Redis.", session.sessionId());
|
||||
return redisTemplate.opsForValue().set(sessionKey, session, SESSION_TTL).then();
|
||||
}
|
||||
}
|
||||
@@ -1,88 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
|
||||
* Your use of it is subject to your agreement with Google.
|
||||
*/
|
||||
|
||||
package com.example.service.quickreplies;
|
||||
|
||||
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
|
||||
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
|
||||
import com.example.exception.FirestorePersistenceException;
|
||||
import com.example.mapper.quickreplies.FirestoreQuickReplyMapper;
|
||||
import com.example.repository.FirestoreBaseRepository;
|
||||
import com.google.cloud.firestore.DocumentReference;
|
||||
import com.google.cloud.firestore.WriteBatch;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@Service
|
||||
public class FirestoreQRService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FirestoreQRService.class);
|
||||
private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/quick-replies-conversations";
|
||||
private final FirestoreBaseRepository firestoreBaseRepository;
|
||||
private final FirestoreQuickReplyMapper firestoreQuickReplyMapper;
|
||||
|
||||
public FirestoreQRService(FirestoreBaseRepository firestoreBaseRepository, FirestoreQuickReplyMapper firestoreQuickReplyMapper) {
|
||||
this.firestoreBaseRepository = firestoreBaseRepository;
|
||||
this.firestoreQuickReplyMapper = firestoreQuickReplyMapper;
|
||||
}
|
||||
|
||||
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) {
|
||||
logger.info("Attempting to save quick reply entry to Firestore for session {}. Entity: {}", sessionId, newEntry.entity().name());
|
||||
return Mono.fromRunnable(() -> {
|
||||
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
|
||||
WriteBatch batch = firestoreBaseRepository.createBatch();
|
||||
|
||||
try {
|
||||
if (firestoreBaseRepository.documentExists(sessionDocRef)) {
|
||||
Map<String, Object> updates = firestoreQuickReplyMapper.createUpdateMapForSingleEntry(newEntry);
|
||||
batch.update(sessionDocRef, updates);
|
||||
logger.info("Appending entry to existing quick reply session for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name());
|
||||
} else {
|
||||
Map<String, Object> newSessionMap = firestoreQuickReplyMapper.createNewSessionMapForSingleEntry(sessionId, userId, userPhoneNumber, newEntry);
|
||||
batch.set(sessionDocRef, newSessionMap);
|
||||
logger.info("Creating new quick reply session with first entry for user {} and session {}. Entity: {}", userId, sessionId, newEntry.entity().name());
|
||||
}
|
||||
firestoreBaseRepository.commitBatch(batch);
|
||||
logger.info("Successfully committed batch for session {} to Firestore.", sessionId);
|
||||
} catch (ExecutionException e) {
|
||||
logger.error("Error saving quick reply entry to Firestore for session {}: {}", sessionId, e.getMessage(), e);
|
||||
throw new FirestorePersistenceException("Failed to save quick reply entry to Firestore for session " + sessionId, e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
logger.error("Thread interrupted while saving quick reply entry to Firestore for session {}: {}", sessionId, e.getMessage(), e);
|
||||
throw new FirestorePersistenceException("Saving quick reply entry was interrupted for session " + sessionId, e);
|
||||
}
|
||||
}).subscribeOn(Schedulers.boundedElastic()).then();
|
||||
}
|
||||
|
||||
public Mono<ConversationSessionDTO> getSessionByTelefono(String userPhoneNumber) {
|
||||
logger.info("Attempting to retrieve quick reply session for phone number {}.", userPhoneNumber);
|
||||
return firestoreBaseRepository.getDocumentsByField(getConversationCollectionPath(), "userPhoneNumber", userPhoneNumber)
|
||||
.map(documentSnapshot -> {
|
||||
if (documentSnapshot != null && documentSnapshot.exists()) {
|
||||
ConversationSessionDTO sessionDTO = firestoreQuickReplyMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot);
|
||||
logger.info("Successfully retrieved and mapped quick reply session for session {}.", sessionDTO.sessionId());
|
||||
return sessionDTO;
|
||||
}
|
||||
logger.info("Quick reply session not found for phone number {}.", userPhoneNumber);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private String getConversationCollectionPath() {
|
||||
return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
|
||||
}
|
||||
|
||||
private DocumentReference getSessionDocumentReference(String sessionId) {
|
||||
String collectionPath = getConversationCollectionPath();
|
||||
return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId);
|
||||
}
|
||||
}
|
||||
@@ -33,11 +33,13 @@ public class MemoryStoreQRService {
|
||||
this.stringRedisTemplate = stringRedisTemplate;
|
||||
}
|
||||
|
||||
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) {
|
||||
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry,
|
||||
String userPhoneNumber) {
|
||||
String sessionKey = SESSION_KEY_PREFIX + sessionId;
|
||||
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber;
|
||||
|
||||
logger.info("Attempting to save entry to Redis for quick reply session {}. Entity: {}", sessionId, newEntry.entity().name());
|
||||
logger.info("Attempting to save entry to Redis for quick reply session {}. Entity: {}", sessionId,
|
||||
newEntry.entity().name());
|
||||
|
||||
return redisTemplate.opsForValue().get(sessionKey)
|
||||
.defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber))
|
||||
@@ -45,16 +47,20 @@ public class MemoryStoreQRService {
|
||||
ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber);
|
||||
ConversationSessionDTO updatedSession = sessionWithUpdatedTelefono.withAddedEntry(newEntry);
|
||||
|
||||
logger.info("Attempting to set updated quick reply session {} with new entry entity {} in Redis.", sessionId, newEntry.entity().name());
|
||||
logger.info("Attempting to set updated quick reply session {} with new entry entity {} in Redis.",
|
||||
sessionId, newEntry.entity().name());
|
||||
|
||||
return redisTemplate.opsForValue().set(sessionKey, updatedSession, SESSION_TTL)
|
||||
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL))
|
||||
.then();
|
||||
})
|
||||
.doOnSuccess(success -> {
|
||||
logger.info("Successfully saved updated quick reply session and phone mapping to Redis for session {}. Entity Type: {}", sessionId, newEntry.entity().name());
|
||||
logger.info(
|
||||
"Successfully saved updated quick reply session and phone mapping to Redis for session {}. Entity Type: {}",
|
||||
sessionId, newEntry.entity().name());
|
||||
})
|
||||
.doOnError(e -> logger.error("Error appending entry to Redis for quick reply session {}: {}", sessionId, e.getMessage(), e));
|
||||
.doOnError(e -> logger.error("Error appending entry to Redis for quick reply session {}: {}", sessionId,
|
||||
e.getMessage(), e));
|
||||
}
|
||||
|
||||
public Mono<ConversationSessionDTO> getSessionByTelefono(String telefono) {
|
||||
@@ -65,16 +71,19 @@ public class MemoryStoreQRService {
|
||||
logger.debug("Attempting to retrieve quick reply session ID for phone number {} from Redis.", telefono);
|
||||
return stringRedisTemplate.opsForValue().get(phoneToSessionKey)
|
||||
.flatMap(sessionId -> {
|
||||
logger.debug("Found quick reply session ID {} for phone number {}. Retrieving session data.", sessionId, telefono);
|
||||
logger.debug("Found quick reply session ID {} for phone number {}. Retrieving session data.",
|
||||
sessionId, telefono);
|
||||
return redisTemplate.opsForValue().get(SESSION_KEY_PREFIX + sessionId);
|
||||
})
|
||||
.doOnSuccess(session -> {
|
||||
if (session != null) {
|
||||
logger.info("Successfully retrieved quick reply session {} by phone number {}.", session.sessionId(), telefono);
|
||||
logger.info("Successfully retrieved quick reply session {} by phone number {}.",
|
||||
session.sessionId(), telefono);
|
||||
} else {
|
||||
logger.info("No quick reply session found in Redis for phone number {}.", telefono);
|
||||
}
|
||||
})
|
||||
.doOnError(e -> logger.error("Error retrieving quick reply session by phone number {}: {}", telefono, e.getMessage(), e));
|
||||
.doOnError(e -> logger.error("Error retrieving quick reply session by phone number {}: {}", telefono,
|
||||
e.getMessage(), e));
|
||||
}
|
||||
}
|
||||
@@ -11,42 +11,50 @@ import com.example.dto.dialogflow.conversation.ConversationEntryEntity;
|
||||
import com.example.dto.dialogflow.conversation.ConversationEntryType;
|
||||
import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO;
|
||||
import com.example.dto.quickreplies.QuickReplyScreenRequestDTO;
|
||||
import com.example.service.conversation.DataLossPrevention;
|
||||
import com.example.dto.quickreplies.QuestionDTO;
|
||||
import com.example.dto.quickreplies.QuickReplyDTO;
|
||||
import com.example.service.conversation.FirestoreConversationService;
|
||||
import com.example.service.conversation.MemoryStoreConversationService;
|
||||
import com.example.util.SessionIdGenerator;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Service
|
||||
public class QuickRepliesManagerService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(QuickRepliesManagerService.class);
|
||||
|
||||
private final MemoryStoreConversationService memoryStoreConversationService;
|
||||
private final FirestoreConversationService firestoreConversationService;
|
||||
private final QuickReplyContentService quickReplyContentService;
|
||||
private final DataLossPrevention dataLossPrevention;
|
||||
private final String dlpTemplatePersistFlow;
|
||||
|
||||
public QuickRepliesManagerService(
|
||||
MemoryStoreConversationService memoryStoreConversationService,
|
||||
FirestoreConversationService firestoreConversationService,
|
||||
QuickReplyContentService quickReplyContentService) {
|
||||
QuickReplyContentService quickReplyContentService,
|
||||
DataLossPrevention dataLossPrevention,
|
||||
@Value("${google.cloud.dlp.dlpTemplatePersistFlow}") String dlpTemplatePersistFlow) {
|
||||
this.memoryStoreConversationService = memoryStoreConversationService;
|
||||
this.firestoreConversationService = firestoreConversationService;
|
||||
this.quickReplyContentService = quickReplyContentService;
|
||||
this.dataLossPrevention = dataLossPrevention;
|
||||
this.dlpTemplatePersistFlow = dlpTemplatePersistFlow;
|
||||
}
|
||||
|
||||
public Mono<DetectIntentResponseDTO> startQuickReplySession(QuickReplyScreenRequestDTO externalRequest) {
|
||||
String userPhoneNumber = externalRequest.user().telefono();
|
||||
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
|
||||
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
|
||||
return Mono.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
|
||||
return Mono
|
||||
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
|
||||
}
|
||||
|
||||
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
|
||||
.flatMap(session -> Mono.just(session.sessionId()))
|
||||
.switchIfEmpty(Mono.fromCallable(SessionIdGenerator::generateStandardSessionId))
|
||||
@@ -58,10 +66,9 @@ public class QuickRepliesManagerService {
|
||||
Instant.now(),
|
||||
"Pantalla :" + externalRequest.pantallaContexto() + " Agregada a la conversacion :",
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber, externalRequest.pantallaContexto())
|
||||
null);
|
||||
return persistConversationTurn(userId, sessionId, systemEntry, userPhoneNumber,
|
||||
externalRequest.pantallaContexto())
|
||||
.then(quickReplyContentService.getQuickReplies(externalRequest.pantallaContexto()))
|
||||
.map(quickReplyDTO -> new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
|
||||
});
|
||||
@@ -71,34 +78,93 @@ public class QuickRepliesManagerService {
|
||||
String userPhoneNumber = externalRequest.user().telefono();
|
||||
if (userPhoneNumber == null || userPhoneNumber.isBlank()) {
|
||||
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
|
||||
return Mono.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
|
||||
return Mono
|
||||
.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
|
||||
}
|
||||
|
||||
return memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
|
||||
.switchIfEmpty(Mono.error(new IllegalStateException("No quick reply session found for phone number: " + userPhoneNumber)))
|
||||
.switchIfEmpty(Mono.error(
|
||||
new IllegalStateException("No quick reply session found for phone number: " + userPhoneNumber)))
|
||||
.flatMap(session -> {
|
||||
String userId = session.userId();
|
||||
String sessionId = session.sessionId();
|
||||
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(externalRequest.message());
|
||||
return dataLossPrevention.getObfuscatedString(externalRequest.message(), dlpTemplatePersistFlow)
|
||||
.flatMap(obfuscatedMessage -> {
|
||||
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(obfuscatedMessage);
|
||||
|
||||
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber, session.pantallaContexto())
|
||||
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
|
||||
.flatMap(quickReplyDTO -> {
|
||||
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgentWithMessage(quickReplyDTO.toString());
|
||||
return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber, session.pantallaContexto())
|
||||
.thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
|
||||
});
|
||||
long userMessagesCount = session.entries().stream()
|
||||
.filter(e -> e.entity() == ConversationEntryEntity.USUARIO)
|
||||
.count();
|
||||
|
||||
if (userMessagesCount == 0) { // Is the first user message in the Quick-Replies flow
|
||||
// This is the second message of the flow. Return the full list.
|
||||
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber,
|
||||
session.pantallaContexto())
|
||||
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
|
||||
.flatMap(quickReplyDTO -> {
|
||||
ConversationEntryDTO agentEntry = ConversationEntryDTO
|
||||
.forAgentWithMessage(quickReplyDTO.toString());
|
||||
return persistConversationTurn(userId, sessionId, agentEntry, userPhoneNumber,
|
||||
session.pantallaContexto())
|
||||
.thenReturn(new DetectIntentResponseDTO(sessionId, null, quickReplyDTO));
|
||||
});
|
||||
} else if (userMessagesCount == 1) { // Is the second user message in the QR flow
|
||||
// This is the third message of the flow. Filter and end.
|
||||
return persistConversationTurn(userId, sessionId, userEntry, userPhoneNumber,
|
||||
session.pantallaContexto())
|
||||
.then(quickReplyContentService.getQuickReplies(session.pantallaContexto()))
|
||||
.flatMap(quickReplyDTO -> {
|
||||
List<QuestionDTO> matchedPreguntas = quickReplyDTO.preguntas().stream()
|
||||
.filter(p -> p.titulo().equalsIgnoreCase(externalRequest.message().trim()))
|
||||
.toList();
|
||||
|
||||
QuickReplyDTO responseQuickReplyDTO;
|
||||
if (!matchedPreguntas.isEmpty()) {
|
||||
responseQuickReplyDTO = new QuickReplyDTO(quickReplyDTO.header(),
|
||||
matchedPreguntas);
|
||||
} else {
|
||||
responseQuickReplyDTO = new QuickReplyDTO(quickReplyDTO.header(),
|
||||
Collections.emptyList());
|
||||
}
|
||||
|
||||
// End the quick reply flow by clearing the pantallaContexto
|
||||
return memoryStoreConversationService
|
||||
.updateSession(session.withPantallaContexto(null))
|
||||
.then(persistConversationTurn(userId, sessionId,
|
||||
ConversationEntryDTO.forAgentWithMessage(
|
||||
responseQuickReplyDTO.toString()),
|
||||
userPhoneNumber, null))
|
||||
.thenReturn(new DetectIntentResponseDTO(sessionId, null,
|
||||
responseQuickReplyDTO));
|
||||
});
|
||||
} else {
|
||||
// Should not happen. End the flow.
|
||||
return memoryStoreConversationService.updateSession(session.withPantallaContexto(null))
|
||||
.then(Mono.just(new DetectIntentResponseDTO(session.sessionId(), null,
|
||||
new QuickReplyDTO("Flow Error", Collections.emptyList()))));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, String userPhoneNumber, String pantallaContexto) {
|
||||
logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.", sessionId, entry.type().name());
|
||||
|
||||
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry,
|
||||
String userPhoneNumber, String pantallaContexto) {
|
||||
logger.debug("Starting Write-Back persistence for quick reply session {}. Type: {}. Writing to Redis first.",
|
||||
sessionId, entry.type().name());
|
||||
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto)
|
||||
.doOnSuccess(v -> logger.info("Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.", sessionId, entry.type().name()))
|
||||
.then(firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto)
|
||||
.doOnSuccess(fsVoid -> logger.debug("Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.", sessionId, entry.type().name()))
|
||||
.doOnError(fsError -> logger.error("Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}", sessionId, entry.type().name(), fsError.getMessage(), fsError)))
|
||||
.doOnError(e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}", sessionId, entry.type().name(), e.getMessage(), e));
|
||||
.doOnSuccess(v -> logger.info(
|
||||
"Entry saved to Redis for quick reply session {}. Type: {}. Kicking off async Firestore write-back.",
|
||||
sessionId, entry.type().name()))
|
||||
.then(firestoreConversationService
|
||||
.saveEntry(userId, sessionId, entry, userPhoneNumber, pantallaContexto)
|
||||
.doOnSuccess(fsVoid -> logger.debug(
|
||||
"Asynchronously (Write-Back): Entry successfully saved to Firestore for quick reply session {}. Type: {}.",
|
||||
sessionId, entry.type().name()))
|
||||
.doOnError(fsError -> logger.error(
|
||||
"Asynchronously (Write-Back): Failed to save entry to Firestore for quick reply session {}. Type: {}: {}",
|
||||
sessionId, entry.type().name(), fsError.getMessage(), fsError)))
|
||||
.doOnError(
|
||||
e -> logger.error("Error during primary Redis write for quick reply session {}. Type: {}: {}",
|
||||
sessionId, entry.type().name(), e.getMessage(), e));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,33 +9,27 @@ import com.example.dto.quickreplies.QuestionDTO;
|
||||
import com.example.dto.quickreplies.QuickReplyDTO;
|
||||
import com.google.cloud.firestore.DocumentSnapshot;
|
||||
import com.google.cloud.firestore.Firestore;
|
||||
import java.util.Map;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
public class QuickReplyContentService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(QuickReplyContentService.class);
|
||||
|
||||
private final Firestore firestore;
|
||||
|
||||
public QuickReplyContentService(Firestore firestore) {
|
||||
this.firestore = firestore;
|
||||
}
|
||||
|
||||
public Mono<QuickReplyDTO> getQuickReplies(String collectionId) {
|
||||
logger.info("Fetching quick replies from Firestore for document: {}", collectionId);
|
||||
|
||||
if (collectionId == null || collectionId.isBlank()) {
|
||||
logger.warn("collectionId is null or empty. Returning empty quick replies.");
|
||||
return Mono.just(new QuickReplyDTO("empty", Collections.emptyList()));
|
||||
}
|
||||
|
||||
return Mono.fromCallable(() -> {
|
||||
try {
|
||||
return firestore.collection("artifacts")
|
||||
@@ -50,8 +44,12 @@ public class QuickReplyContentService {
|
||||
})
|
||||
.filter(DocumentSnapshot::exists)
|
||||
.map(document -> {
|
||||
QuestionDTO pregunta = new QuestionDTO(document.getString("titulo"), document.getString("descripcion"));
|
||||
return new QuickReplyDTO("preguntas sobre " + collectionId, List.of(pregunta));
|
||||
String header = document.getString("header");
|
||||
List<Map<String, Object>> preguntasData = (List<Map<String, Object>>) document.get("preguntas");
|
||||
List<QuestionDTO> preguntas = preguntasData.stream()
|
||||
.map(p -> new QuestionDTO((String) p.get("titulo"), (String) p.get("descripcion"), (String) p.get("respuesta")))
|
||||
.toList();
|
||||
return new QuickReplyDTO(header, preguntas);
|
||||
})
|
||||
.doOnSuccess(quickReplyDTO -> {
|
||||
if (quickReplyDTO != null) {
|
||||
@@ -66,4 +64,4 @@ public class QuickReplyContentService {
|
||||
return Mono.empty();
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user