Initial commit

This commit is contained in:
PAVEL PALMA
2025-07-16 13:43:46 -06:00
parent 54cb86ab65
commit fac3550287
46 changed files with 2062 additions and 88 deletions

View File

@@ -0,0 +1,136 @@
package com.example.service;
import com.example.dto.dialogflow.DetectIntentRequestDTO;
import com.example.dto.dialogflow.DetectIntentResponseDTO;
import com.example.dto.base.ConversationContext;
import com.example.dto.dialogflow.ConversationEntryDTO;
import com.example.dto.dialogflow.ConversationSessionDTO;
import com.example.dto.base.UsuarioDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.UUID;
import java.util.Optional;
@Service
public class ConversationManagerService {
private static final Logger logger = LoggerFactory.getLogger(ConversationManagerService.class);
private final DialogflowClientService dialogflowServiceClient;
private final FirestoreConversationService firestoreConversationService;
private final MemoryStoreConversationService memoryStoreConversationService;
public ConversationManagerService(
DialogflowClientService dialogflowServiceClient,
FirestoreConversationService firestoreConversationService,
MemoryStoreConversationService memoryStoreConversationService) {
this.dialogflowServiceClient = dialogflowServiceClient;
this.firestoreConversationService = firestoreConversationService;
this.memoryStoreConversationService = memoryStoreConversationService;
}
public Mono<DetectIntentResponseDTO> manageConversation(DetectIntentRequestDTO request) {
final ConversationContext context;
try {
context = resolveAndValidateRequest(request);
} catch (IllegalArgumentException e) {
logger.error("Validation error for incoming request: {}", e.getMessage());
return Mono.error(e);
}
final String userId = context.userId();
final String userMessageText = context.userMessageText();
final String userPhoneNumber = context.primaryPhoneNumber();
Mono<ConversationSessionDTO> sessionMono;
if (userPhoneNumber != null && !userPhoneNumber.trim().isEmpty()) {
logger.info("Checking for existing session for phone number: {}", userPhoneNumber);
sessionMono = memoryStoreConversationService.getSessionByTelefono(userPhoneNumber)
.doOnNext(session -> logger.info("Found existing session {} for phone number {}", session.sessionId(), userPhoneNumber))
.switchIfEmpty(Mono.defer(() -> {
String newSessionId = UUID.randomUUID().toString();
logger.info("No existing session found for phone number {}. Creating new session: {}", userPhoneNumber, newSessionId);
return Mono.just(ConversationSessionDTO.create(newSessionId, userId, userPhoneNumber));
}));
} else {
String newSessionId = UUID.randomUUID().toString();
logger.warn("No phone number provided in request. Creating new session: {}", newSessionId);
return Mono.error(new IllegalArgumentException("Phone number is required to manage conversation sessions."));
}
return sessionMono.flatMap(session -> {
final String finalSessionId = session.sessionId();
logger.info("Managing conversation for resolved session: {}", finalSessionId);
ConversationEntryDTO userEntry = ConversationEntryDTO.forUser(userMessageText);
DetectIntentRequestDTO updatedRequest = request.withSessionId(finalSessionId);
return this.persistConversationTurn(userId, finalSessionId, userEntry, userPhoneNumber)
.doOnSuccess(v -> logger.debug("User entry successfully persisted for session {}. Proceeding to Dialogflow...", finalSessionId))
.doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId, e.getMessage(), e))
// After user entry persistence is complete (Mono<Void> emits 'onComplete'),
// then proceed to call Dialogflow.
.then(Mono.defer(() -> { // Use Mono.defer to ensure Dialogflow call is subscribed AFTER persistence
// Call Dialogflow.
return dialogflowServiceClient.detectIntent(finalSessionId, updatedRequest)
.doOnSuccess(response -> {
logger.debug("Received Dialogflow CX response for session {}. Initiating agent response persistence.", finalSessionId);
ConversationEntryDTO agentEntry = ConversationEntryDTO.forAgent(response.queryResult());
// Agent entry persistence can still be backgrounded via .subscribe()
// if its completion isn't strictly required before returning the Dialogflow response.
this.persistConversationTurn(userId, finalSessionId, agentEntry, userPhoneNumber).subscribe(
v -> logger.debug("Background: Agent entry persistence initiated for session {}.", finalSessionId),
e -> logger.error("Background: Error during agent entry persistence for session {}: {}", finalSessionId, e.getMessage(), e)
);
})
.doOnError(error -> logger.error("Overall error during conversation management for session {}: {}", finalSessionId, error.getMessage(), error));
}));
})
.onErrorResume(e -> {
logger.error("Overall error handling conversation in ConversationManagerService: {}", e.getMessage(), e);
return Mono.error(new RuntimeException("Failed to process conversation due to an internal error.", e));
})
.subscribeOn(Schedulers.boundedElastic());
}
private Mono<Void> persistConversationTurn(String userId, String sessionId, ConversationEntryDTO entry, String userPhoneNumber) {
logger.debug("Starting Write-Back persistence for session {}. Type: {}. Writing to Redis first.", sessionId, entry.type().name());
return memoryStoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.doOnSuccess(v -> {
logger.info("Entry saved to Redis for session {}. Type: {}. Kicking off async Firestore write-back.", sessionId, entry.type().name());
firestoreConversationService.saveEntry(userId, sessionId, entry, userPhoneNumber)
.subscribe(
fsVoid -> logger.debug("Asynchronously (Write-Back): Entry successfully saved to Firestore for session {}. Type: {}.",
sessionId, entry.type().name()),
fsError -> logger.error("Asynchronously (Write-Back): Failed to save entry to Firestore for session {}. Type: {}: {}",
sessionId, entry.type().name(), fsError.getMessage(), fsError)
);
})
.doOnError(e -> logger.error("Error during primary Redis write for session {}. Type: {}: {}", sessionId, entry.type().name(), e.getMessage(), e));
}
private ConversationContext resolveAndValidateRequest(DetectIntentRequestDTO request) {
String primaryPhoneNumber = Optional.ofNullable(request.usuario())
.map(UsuarioDTO::telefono)
.orElse(null);
if (primaryPhoneNumber == null || primaryPhoneNumber.trim().isEmpty()) {
throw new IllegalArgumentException("Phone number (telefono) is required in the 'usuario' field for conversation management.");
}
String resolvedUserId = request.userId();
if (resolvedUserId == null || resolvedUserId.trim().isEmpty()) {
resolvedUserId = "user_by_phone_" + primaryPhoneNumber.replaceAll("[^0-9]", ""); // Derive from phone number
logger.warn("User ID not provided in request. Using derived ID from phone number: {}", resolvedUserId);
}
if (request.queryInput() == null || request.queryInput().text() == null || request.queryInput().text().text() == null || request.queryInput().text().text().trim().isEmpty()) {
throw new IllegalArgumentException("Dialogflow query input text is required.");
}
String userMessageText = request.queryInput().text().text();
return new ConversationContext(resolvedUserId, null, userMessageText, primaryPhoneNumber);
}
}

View File

@@ -0,0 +1,121 @@
package com.example.service;
import com.example.dto.gemini.ConversationSummaryRequest;
import com.example.dto.gemini.ConversationSummaryResponse;
import com.example.dto.gemini.ConversationSessionSummaryDTO;
import com.example.dto.gemini.ConversationEntrySummaryDTO;
import com.example.repository.FirestoreBaseRepository;
import com.google.cloud.firestore.DocumentReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@Service
public class ConversationSummaryService {
private static final Logger logger = LoggerFactory.getLogger(ConversationSummaryService.class);
private final GeminiClientService geminiService;
private final FirestoreBaseRepository firestoreBaseRepository;
private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations";
private static final String DEFAULT_GEMINI_MODEL_NAME = "gemini-2.0-flash-001";
private static final Float DEFAULT_TEMPERATURE = 0.7f;
private static final Integer DEFAULT_MAX_OUTPUT_TOKENS = 800;
public ConversationSummaryService(GeminiClientService geminiService, FirestoreBaseRepository firestoreBaseRepository) {
this.geminiService = geminiService;
this.firestoreBaseRepository = firestoreBaseRepository;
}
public ConversationSummaryResponse summarizeConversation(ConversationSummaryRequest request) {
if (request == null) {
logger.warn("Summarization request is null.");
return new ConversationSummaryResponse("Request cannot be null.");
}
if (request.sessionId() == null || request.sessionId().isBlank()) {
logger.warn("Session ID is missing in the summarization request.");
return new ConversationSummaryResponse("Session ID is required.");
}
if (request.prompt() == null || request.prompt().isBlank()) {
logger.warn("Prompt for summarization is missing in the request.");
return new ConversationSummaryResponse("Prompt for summarization is required.");
}
String sessionId = request.sessionId();
String summarizationPromptInstruction = request.prompt();
String actualModelName = (request.modelName() != null && !request.modelName().isBlank())
? request.modelName() : DEFAULT_GEMINI_MODEL_NAME;
Float actualTemperature = (request.temperature() != null)
? request.temperature() : DEFAULT_TEMPERATURE;
Integer actualMaxOutputTokens = (request.maxOutputTokens() != null)
? request.maxOutputTokens() : DEFAULT_MAX_OUTPUT_TOKENS;
String collectionPath = String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
String documentId = sessionId;
logger.info("Fetching conversation from Firestore: Collection='{}', Document='{}'", collectionPath, documentId);
ConversationSessionSummaryDTO sessionSummary;
try {
DocumentReference docRef = firestoreBaseRepository.getDocumentReference(collectionPath, documentId);
sessionSummary = firestoreBaseRepository.getDocument(docRef, ConversationSessionSummaryDTO.class);
logger.debug("Retrieved ConversationSessionSummaryDTO after Firestore fetch: sessionId={}, entries size={}",
sessionSummary != null ? sessionSummary.sessionId() : "null",
sessionSummary != null && sessionSummary.entries() != null ? sessionSummary.entries().size() : "N/A (entries list is null)");
if (sessionSummary == null) {
logger.warn("Firestore document not found or could not be mapped: {}/{}", collectionPath, documentId);
return new ConversationSummaryResponse("Conversation document not found for session ID: " + sessionId);
}
List<ConversationEntrySummaryDTO> entries = sessionSummary.entries();
if (entries == null || entries.isEmpty()) {
logger.warn("No conversation entries found in document {}/{} for session ID: {}",
collectionPath, documentId, sessionId);
return new ConversationSummaryResponse("No conversation messages found in the document for session ID: " + sessionId);
}
List<String> conversationMessages = entries.stream()
.map(entry -> {
String type = entry.type().map(t -> t.name()).orElse("UNKNOWN_TYPE");
String timestampString = entry.timestamp() != null ? entry.timestamp().toDate().toInstant().toString() : "UNKNOWN_TIMESTAMP";
return String.format("[%s - %s] %s", type, timestampString, entry.text());
})
.collect(Collectors.toList());
String formattedConversation = String.join("\n", conversationMessages);
String fullPromptForGemini = summarizationPromptInstruction + "\n\n" + formattedConversation;
logger.info("Sending summarization request to Gemini with custom prompt (first 200 chars): \n{}",
fullPromptForGemini.substring(0, Math.min(fullPromptForGemini.length(), 200)) + "...");
String summaryText = geminiService.generateContent(
fullPromptForGemini,
actualTemperature,
actualMaxOutputTokens,
actualModelName
);
if (summaryText == null || summaryText.trim().isEmpty()) {
logger.warn("Gemini returned an empty or null summary for the conversation.");
return new ConversationSummaryResponse("Could not generate a summary. The model returned no text.");
}
logger.info("Successfully generated summary for session ID: {}", sessionId);
return new ConversationSummaryResponse(summaryText);
} catch (InterruptedException | ExecutionException e) {
logger.error("Error accessing Firestore for session ID {}: {}", sessionId, e.getMessage(), e);
Thread.currentThread().interrupt();
return new ConversationSummaryResponse("Error accessing conversation data: " + e.getMessage());
} catch (Exception e) {
logger.error("An unexpected error occurred during summarization for session ID {}: {}", sessionId, e.getMessage(), e);
return new ConversationSummaryResponse("An unexpected error occurred during summarization: " + e.getMessage());
}
}
}

View File

@@ -0,0 +1,120 @@
package com.example.service;
import com.example.dto.dialogflow.DetectIntentRequestDTO;
import com.example.dto.dialogflow.DetectIntentResponseDTO;
import com.example.mapper.DialogflowRequestMapper;
import com.example.mapper.DialogflowResponseMapper;
import com.example.exception.DialogflowClientException;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.dialogflow.cx.v3.DetectIntentRequest;
import com.google.cloud.dialogflow.cx.v3.SessionsClient;
import com.google.cloud.dialogflow.cx.v3.SessionName;
import com.google.cloud.dialogflow.cx.v3.SessionsSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Objects;
@Service
public class DialogflowClientService {
private static final Logger logger = LoggerFactory.getLogger(DialogflowClientService.class);
private final String dialogflowCxProjectId;
private final String dialogflowCxLocation;
private final String dialogflowCxAgentId;
private final DialogflowRequestMapper dialogflowRequestMapper;
private final DialogflowResponseMapper dialogflowResponseMapper;
private SessionsClient sessionsClient;
public DialogflowClientService(
@org.springframework.beans.factory.annotation.Value("${dialogflow.cx.project-id}") String dialogflowCxProjectId,
@org.springframework.beans.factory.annotation.Value("${dialogflow.cx.location}") String dialogflowCxLocation,
@org.springframework.beans.factory.annotation.Value("${dialogflow.cx.agent-id}") String dialogflowCxAgentId,
DialogflowRequestMapper dialogflowRequestMapper,
DialogflowResponseMapper dialogflowResponseMapper)
throws IOException {
this.dialogflowCxProjectId = dialogflowCxProjectId;
this.dialogflowCxLocation = dialogflowCxLocation;
this.dialogflowCxAgentId = dialogflowCxAgentId;
this.dialogflowRequestMapper = dialogflowRequestMapper;
this.dialogflowResponseMapper = dialogflowResponseMapper;
try {
String regionalEndpoint = String.format("%s-dialogflow.googleapis.com:443", dialogflowCxLocation);
SessionsSettings sessionsSettings = SessionsSettings.newBuilder()
.setEndpoint(regionalEndpoint)
.build();
this.sessionsClient = SessionsClient.create(sessionsSettings);
logger.info("Dialogflow CX SessionsClient initialized successfully for endpoint: {}", regionalEndpoint);
} catch (IOException e) {
logger.error("Failed to create Dialogflow CX SessionsClient: {}", e.getMessage(), e);
throw e;
}
}
@PreDestroy
public void closeSessionsClient() {
if (sessionsClient != null) {
sessionsClient.close();
logger.info("Dialogflow CX SessionsClient closed.");
}
}
public Mono<DetectIntentResponseDTO> detectIntent(
String sessionId,
DetectIntentRequestDTO request) {
Objects.requireNonNull(sessionId, "Dialogflow session ID cannot be null.");
Objects.requireNonNull(request, "Dialogflow request DTO cannot be null.");
logger.info("Initiating detectIntent for session: {}", sessionId);
DetectIntentRequest.Builder detectIntentRequestBuilder;
try {
detectIntentRequestBuilder = dialogflowRequestMapper.mapToDetectIntentRequestBuilder(request);
logger.debug("Obtained partial DetectIntentRequest.Builder from mapper for session: {}", sessionId);
} catch (IllegalArgumentException e) {
logger.error(" Failed to map DTO to partial Protobuf request for session {}: {}", sessionId, e.getMessage());
return Mono.error(new IllegalArgumentException("Invalid Dialogflow request input: " + e.getMessage()));
}
SessionName sessionName = SessionName.newBuilder()
.setProject(dialogflowCxProjectId)
.setLocation(dialogflowCxLocation)
.setAgent(dialogflowCxAgentId)
.setSession(sessionId)
.build();
detectIntentRequestBuilder.setSession(sessionName.toString());
logger.debug("Set session path {} on the request builder for session: {}", sessionName.toString(), sessionId);
// Build the final DetectIntentRequest Protobuf object
DetectIntentRequest detectIntentRequest = detectIntentRequestBuilder.build();
return Mono.fromCallable(() -> {
logger.debug("Calling Dialogflow CX detectIntent for session: {}", sessionId);
return sessionsClient.detectIntent(detectIntentRequest);
})
.onErrorMap(ApiException.class, e -> {
logger.error("Dialogflow CX API error for session {}: status={}, message={}",
sessionId, e.getStatusCode().getCode(), e.getMessage(), e);
return new DialogflowClientException(
"Dialogflow CX API error: " + e.getStatusCode().getCode() + " - " + e.getMessage(), e);
})
.onErrorMap(IOException.class, e -> {
logger.error("IO error when calling Dialogflow CX for session {}: {}", sessionId, e.getMessage(),e);
return new RuntimeException("IO error with Dialogflow CX API: " + e.getMessage(), e);
})
.map(dfResponse -> this.dialogflowResponseMapper.mapFromDialogflowResponse(dfResponse, sessionId));
}
}

View File

@@ -0,0 +1,93 @@
package com.example.service;
import com.example.dto.dialogflow.ConversationEntryDTO;
import com.example.dto.dialogflow.ConversationSessionDTO;
import com.example.exception.FirestorePersistenceException;
import com.example.mapper.FirestoreConversationMapper;
import com.example.repository.FirestoreBaseRepository;
import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.DocumentSnapshot;
import com.google.cloud.firestore.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@Service
public class FirestoreConversationService {
private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationService.class);
private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations";
private final FirestoreBaseRepository firestoreBaseRepository;
private final FirestoreConversationMapper firestoreConversationMapper;
public FirestoreConversationService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper) {
this.firestoreBaseRepository = firestoreBaseRepository;
this.firestoreConversationMapper = firestoreConversationMapper;
}
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) {
logger.info("Attempting to save conversation entry to Firestore for session {}. Type: {}", sessionId, newEntry.type().name());
return Mono.fromRunnable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
WriteBatch batch = firestoreBaseRepository.createBatch();
try {
if (firestoreBaseRepository.documentExists(sessionDocRef)) {
// Update: Append the new entry using arrayUnion and update lastModified
Map<String, Object> updates = firestoreConversationMapper.createUpdateMapForSingleEntry(newEntry);
batch.update(sessionDocRef, updates);
logger.info("Appending entry to existing conversation session for user {} and session {}. Type: {}", userId, sessionId, newEntry.type().name());
} else {
// Create: Start a new session with the first entry.
// Pass userId and userPhoneNumber to the mapper to be stored as fields in the document.
Map<String, Object> newSessionMap = firestoreConversationMapper.createNewSessionMapForSingleEntry(sessionId, userId, userPhoneNumber, newEntry);
batch.set(sessionDocRef, newSessionMap);
logger.info("Creating new conversation session with first entry for user {} and session {}. Type: {}", userId, sessionId, newEntry.type().name());
}
firestoreBaseRepository.commitBatch(batch);
logger.info("Successfully committed batch for session {} to Firestore.", sessionId);
} catch (ExecutionException e) {
logger.error("Error saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Failed to save conversation entry to Firestore for session " + sessionId, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Thread interrupted while saving conversation entry to Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Saving conversation entry was interrupted for session " + sessionId, e);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
public Mono<ConversationSessionDTO> getConversationSession(String userId, String sessionId) {
logger.info("Attempting to retrieve conversation session for session {} (user ID {} for context).", sessionId, userId);
return Mono.fromCallable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
try {
DocumentSnapshot documentSnapshot = firestoreBaseRepository.getDocumentSnapshot(sessionDocRef);
if (documentSnapshot != null && documentSnapshot.exists()) {
ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot);
logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionId);
return sessionDTO;
}
logger.info("Conversation session not found for session {}.", sessionId);
return null; // Or Mono.empty() if this method returned Mono<Optional<ConversationSessionDTO>>
} catch (InterruptedException | ExecutionException e) {
logger.error("Error retrieving conversation session from Firestore for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Failed to retrieve conversation session from Firestore for session " + sessionId, e);
}
}).subscribeOn(Schedulers.boundedElastic());
}
private String getConversationCollectionPath() {
return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
}
private DocumentReference getSessionDocumentReference(String sessionId) {
String collectionPath = getConversationCollectionPath();
return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId);
}
}

View File

@@ -0,0 +1,45 @@
package com.example.service;
import com.google.genai.Client;
import com.google.genai.errors.GenAiIOException;
import com.google.genai.types.Content;
import com.google.genai.types.GenerateContentConfig;
import com.google.genai.types.GenerateContentResponse;
import com.google.genai.types.Part;
import org.springframework.stereotype.Service;
@Service
public class GeminiClientService {
private final Client geminiClient;
public GeminiClientService(Client geminiClient) {
this.geminiClient = geminiClient;
}
public String generateContent(String prompt, Float temperature, Integer maxOutputTokens, String modelName) {
try {
Content content = Content.fromParts(Part.fromText(prompt));
GenerateContentConfig config = GenerateContentConfig.builder()
.temperature(temperature)
.maxOutputTokens(maxOutputTokens)
.build();
GenerateContentResponse response = geminiClient.models.generateContent(modelName, content, config);
if (response != null && response.text() != null) {
return response.text();
} else {
return "No content generated or unexpected response structure.";
}
} catch (GenAiIOException e) {
System.err.println("Gemini API I/O error: " + e.getMessage());
e.printStackTrace();
return "Error: An API communication issue occurred: " + e.getMessage();
} catch (Exception e) {
System.err.println("An unexpected error occurred during Gemini content generation: " + e.getMessage());
e.printStackTrace();
return "Error: An unexpected issue occurred during content generation.";
}
}
}

View File

@@ -0,0 +1,73 @@
package com.example.service;
import com.example.dto.dialogflow.ConversationEntryDTO;
import com.example.dto.dialogflow.ConversationSessionDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.time.Duration;
@Service
public class MemoryStoreConversationService {
private static final Logger logger = LoggerFactory.getLogger(MemoryStoreConversationService.class);
private static final String SESSION_KEY_PREFIX = "conversation:session:";
private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:";
private static final Duration SESSION_TTL = Duration.ofHours(24);
private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate;
private final ReactiveRedisTemplate<String, String> stringRedisTemplate;
@Autowired
public MemoryStoreConversationService(
ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate,
ReactiveRedisTemplate<String, String> stringRedisTemplate) {
this.redisTemplate = redisTemplate;
this.stringRedisTemplate = stringRedisTemplate;
}
public Mono<Void> saveEntry(String userId, String sessionId, ConversationEntryDTO newEntry, String userPhoneNumber) {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + userPhoneNumber;
logger.info("Attempting to save entry to Redis for session {}. Type: {}", sessionId, newEntry.type().name());
return redisTemplate.opsForValue().get(sessionKey)
.defaultIfEmpty(ConversationSessionDTO.create(sessionId, userId, userPhoneNumber))
.flatMap(session -> {
ConversationSessionDTO sessionWithUpdatedTelefono = session.withTelefono(userPhoneNumber);
ConversationSessionDTO updatedSession = sessionWithUpdatedTelefono.withAddedEntry(newEntry);
logger.info("Attempting to set updated session {} with new entry type {} in Redis.", sessionId, newEntry.type().name());
return redisTemplate.opsForValue().set(sessionKey, updatedSession, SESSION_TTL)
.then(stringRedisTemplate.opsForValue().set(phoneToSessionKey, sessionId, SESSION_TTL));
})
.doOnSuccess(success -> logger.info("Successfully saved updated session and phone mapping to Redis for session {}. Entry Type: {}", sessionId, newEntry.type().name()))
.doOnError(e -> logger.error("Error appending entry to Redis for session {}: {}", sessionId, e.getMessage(), e))
.then();
}
public Mono<ConversationSessionDTO> getSessionByTelefono(String telefono) {
if (telefono == null || telefono.trim().isEmpty()) {
return Mono.empty();
}
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + telefono;
logger.debug("Attempting to retrieve session ID for phone number {} from Redis.", telefono);
return stringRedisTemplate.opsForValue().get(phoneToSessionKey)
.flatMap(sessionId -> {
logger.debug("Found session ID {} for phone number {}. Retrieving session data.", sessionId, telefono);
return redisTemplate.opsForValue().get(SESSION_KEY_PREFIX + sessionId);
})
.doOnSuccess(session -> {
if (session != null) {
logger.info("Successfully retrieved session {} by phone number {}.", session.sessionId(), telefono);
} else {
logger.info("No session found in Redis for phone number {}.", telefono);
}
})
.doOnError(e -> logger.error("Error retrieving session by phone number {}: {}", telefono, e.getMessage(), e));
}
}