UPDATE 12-ago-2025

This commit is contained in:
PAVEL PALMA
2025-08-12 16:09:32 -06:00
parent 55fcf3b7d6
commit 849095374f
74 changed files with 2656 additions and 669 deletions

View File

@@ -0,0 +1,197 @@
/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.conversation;
import com.example.mapper.conversation.ExternalConvRequestMapper;
import com.example.service.base.DialogflowClientService;
import com.example.util.SessionIdGenerator;
import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
import com.example.dto.dialogflow.conversation.ConversationContext;
import com.example.dto.dialogflow.conversation.ConversationEntryDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.dto.dialogflow.conversation.ExternalConvRequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
/**
* Service for orchestrating the end-to-end conversation flow.
* It manages user sessions, creating new ones or reusing existing ones
* based on a session reset threshold. The service handles the entire
* conversation turn, from mapping an external request to calling Dialogflow,
* and then persists both user and agent messages using a write-back strategy
* to a primary cache (Redis) and an asynchronous write to Firestore.
*/
@Service
public class ConversationManagerService {
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
private static final long SESSION_RESET_THRESHOLD_HOURS = 24;
private static final String CURRENT_PAGE_PARAM = "currentPage";
private final ExternalConvRequestMapper externalRequestToDialogflowMapper;
private final DialogflowClientService dialogflowServiceClient;
private final FirestoreConversationService firestoreConversationService;
private final MemoryStoreConversationService memoryStoreConversationService;
public ConversationManagerService(
DialogflowClientService dialogflowServiceClient,
FirestoreConversationService firestoreConversationService,
MemoryStoreConversationService memoryStoreConversationService,
ExternalConvRequestMapper externalRequestToDialogflowMapper) {
this.dialogflowServiceClient = dialogflowServiceClient;
this.firestoreConversationService = firestoreConversationService;
this.memoryStoreConversationService = memoryStoreConversationService;
this.externalRequestToDialogflowMapper = externalRequestToDialogflowMapper;
}
public Mono<DetectIntentResponseDTO>manageConversation(ExternalConvRequestDTO Externalrequest) {
final DetectIntentRequestDTO request;
try {
request = externalRequestToDialogflowMapper.mapExternalRequestToDetectIntentRequest(Externalrequest);
logger.debug("Successfully pre-mapped ExternalRequestDTO to DetectIntentRequestDTO");
} catch (IllegalArgumentException e) {
logger.error("Error during pre-mapping: {}", e.getMessage());
return Mono.error(new IllegalArgumentException("Failed to process external request due to mapping error: " + e.getMessage(), e));
}
final ConversationContext context;
try {
context = resolveAndValidateRequest(request);
} catch (IllegalArgumentException e) {
logger.error("Validation error for incoming request: {}", e.getMessage());
return Mono.error(e);
}
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
Mono<ConversationSessionDTO> sessionMono;
if (userPhoneNumber != null && !userPhoneNumber.isBlank()) {
logger.info("Checking for existing session for phone number: {}", userPhoneNumber);
sessionMono = memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.doOnNext(session -> logger.info("Found existing session {} for phone number {}", session.sessionId(), userPhoneNumber))
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = SessionIdGenerator.generateStandardSessionId();
logger.info("No existing session found for phone number {}. Creating new session: {}", userPhoneNumber, newSessionId);
return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber));
}));
} else {
logger.warn("No phone number provided in request. Cannot manage conversation session without it.");
return Mono.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
return sessionMono.flatMap(session -> {
final String finalSessionId = session.sessionId();
logger.info("Managing conversation for resolved session: {}", finalSessionId);
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
final DetectIntentRequestDTO requestToDialogflow;
Instant currentInteractionTimestamp = userEntry.timestamp();
if (session.lastModified() != null &&
Duration.between(session.lastModified(), currentInteractionTimestamp).toHours() >= SESSION_RESET_THRESHOLD_HOURS) {
logger.info("Session {} (last modified: {}) is older than {} hours. Adding '{}' parameter to Dialogflow request.",
session.sessionId(), session.lastModified(), SESSION_RESET_THRESHOLD_HOURS, CURRENT_PAGE_PARAM);
requestToDialogflow = request.withParameter(CURRENT_PAGE_PARAM, true);
} else {
requestToDialogflow = request;
}
return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber)
.doOnSuccess(v -> logger.debug("User entry successfully persisted for session {}. Proceeding to Dialogflow...", finalSessionId))
.doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId, e.getMessage(), e))
.then(Mono.defer(() -> {
return dialogflowServiceClient.detectIntent(finalSessionId, requestToDialogflow)
.doOnSuccess(response -> {
logger.debug("Received Dialogflow CX response for session {}. Initiating agent response persistence.", finalSessionId);
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
this.persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber).subscribe(
v -> logger.debug("Background: Agent entry persistence initiated for session {}.", finalSessionId),
e -> logger.error("Background: Error during agent entry persistence for session {}: {}", finalSessionId, e.getMessage(), e)
);
})
.doOnError(error -> logger.error("Overall error during conversation management for session {}: {}", finalSessionId, error.getMessage(), error));
}));
})
.onErrorResume(e -> {
logger.error("Overall error handling conversation in ConversationManagerService: {}", e.getMessage(), e);
return Mono.error(new RuntimeException("Failed to process conversation due to an internal error.", e));
})
.subscribeOn(Schedulers.boundedElastic());
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> {
logger.info("Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", sessionId, entry.type().name());
firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.subscribe(
fsVoid -> logger.debug("Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
sessionId, entry.type().name()),
fsError -> logger.error("Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError)
);
})
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, entry.type().name(), e.getMessage(), e));
}
private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) {
Map<String, Object> params = Optional.ofNullable(request.queryParams())
.map(queryParamsDTO -> queryParamsDTO.parameters())
.orElse(Collections.emptyMap());
String primaryPhoneNumber = null;
Object telefonoObj = params.get("telefono"); // Get from map
if (telefonoObj instanceof String) {
primaryPhoneNumber = (String) telefonoObj;
} else if (telefonoObj != null) {
logger.warn("Parameter 'telefono' in queryParams is not a String (type: {}). Expected String.", telefonoObj.getClass().getName());
}
if (primaryPhoneNumber == null || primaryPhoneNumber.trim().isEmpty()) {
throw new IllegalArgumentException("Phone number (telefono) is required in query parameters for conversation management.");
}
String resolvedUserId = null;
Object userIdObj = params.get("usuario_id");
if (userIdObj instanceof String) {
resolvedUserId = (String) userIdObj;
} else if (userIdObj != null) {
logger.warn("Parameter 'userId' in queryParams is not a String (type: {}). Expected String.", userIdObj.getClass().getName());
}
if (resolvedUserId == null || resolvedUserId.trim().isEmpty()) {
resolvedUserId = "user_by_phone_" + primaryPhoneNumber.replaceAll("[^0-9]", "");
logger.warn("User ID not provided in query parameters. Using derived ID from phone number: {}", resolvedUserId);
}
if (request.queryInput() == null || request.queryInput().text() == null ||
request.queryInput().text().text() == null || request.queryInput().text().text().trim().isEmpty()) {
throw new IllegalArgumentException("Dialogflow query input text is required.");
}
String userMessageText = request.queryInput().text().text();
return new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber);
}
}