/* * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Your use of it is subject to your agreement with Google. */ package com.example.service.conversation; import com.example.dto.dialogflow.conversation.ConversationMessageDTO; import com.example.dto.dialogflow.conversation.ConversationSessionDTO; import com.example.exception.FirestorePersistenceException; import com.example.mapper.conversation.ConversationMessageMapper; import com.example.mapper.conversation.FirestoreConversationMapper; import com.example.repository.FirestoreBaseRepository; import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.DocumentSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.util.Objects; import java.util.concurrent.ExecutionException; @Service public class FirestoreConversationService { private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationService.class); private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations"; private static final String MESSAGES_SUBCOLLECTION = "mensajes"; private final FirestoreBaseRepository firestoreBaseRepository; private final FirestoreConversationMapper firestoreConversationMapper; private final ConversationMessageMapper conversationMessageMapper; public FirestoreConversationService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper, ConversationMessageMapper conversationMessageMapper) { this.firestoreBaseRepository = firestoreBaseRepository; this.firestoreConversationMapper = firestoreConversationMapper; this.conversationMessageMapper = conversationMessageMapper; } public Mono saveSession(ConversationSessionDTO session) { return Mono.fromRunnable(() -> { DocumentReference sessionDocRef = getSessionDocumentReference(session.sessionId()); try { firestoreBaseRepository.setDocument(sessionDocRef, firestoreConversationMapper.createSessionMap(session)); } catch (ExecutionException | InterruptedException e) { handleException(e, session.sessionId()); } }).subscribeOn(Schedulers.boundedElastic()).then(); } public Mono saveMessage(String sessionId, ConversationMessageDTO message) { return Mono.fromRunnable(() -> { DocumentReference messageDocRef = getSessionDocumentReference(sessionId).collection(MESSAGES_SUBCOLLECTION).document(); try { firestoreBaseRepository.setDocument(messageDocRef, conversationMessageMapper.toMap(message)); } catch (ExecutionException | InterruptedException e) { handleException(e, sessionId); } }).subscribeOn(Schedulers.boundedElastic()).then(); } public Flux getMessages(String sessionId) { String messagesPath = getConversationCollectionPath() + "/" + sessionId + "/" + MESSAGES_SUBCOLLECTION; return firestoreBaseRepository.getDocuments(messagesPath) .map(documentSnapshot -> { if (documentSnapshot != null && documentSnapshot.exists()) { return conversationMessageMapper.fromMap(documentSnapshot.getData()); } return null; }) .filter(Objects::nonNull); } public Mono getConversationSession(String sessionId) { logger.info("Attempting to retrieve conversation session for session {}.", sessionId); return Mono.fromCallable(() -> { DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); try { DocumentSnapshot documentSnapshot = firestoreBaseRepository.getDocumentSnapshot(sessionDocRef); if (documentSnapshot != null && documentSnapshot.exists()) { ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot); logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionId); return sessionDTO; } logger.info("Conversation session not found for session {}.", sessionId); return null; } catch (InterruptedException | ExecutionException e) { handleException(e, sessionId); return null; } }).subscribeOn(Schedulers.boundedElastic()); } public Mono getSessionByTelefono(String userPhoneNumber) { return firestoreBaseRepository.getDocumentsByField(getConversationCollectionPath(), "telefono", userPhoneNumber) .map(documentSnapshot -> { if (documentSnapshot != null && documentSnapshot.exists()) { ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot); logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionDTO.sessionId()); return sessionDTO; } return null; }); } public Mono deleteSession(String sessionId) { logger.info("Attempting to delete conversation session for session {}.", sessionId); return Mono.fromRunnable(() -> { DocumentReference sessionDocRef = getSessionDocumentReference(sessionId); try { firestoreBaseRepository.deleteDocumentAndSubcollections(sessionDocRef, MESSAGES_SUBCOLLECTION); logger.info("Successfully deleted conversation session for session {}.", sessionId); } catch (InterruptedException | ExecutionException e) { handleException(e, sessionId); } }).subscribeOn(Schedulers.boundedElastic()).then(); } private String getConversationCollectionPath() { return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId()); } private DocumentReference getSessionDocumentReference(String sessionId) { String collectionPath = getConversationCollectionPath(); return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId); } private void handleException(Exception e, String sessionId) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } logger.error("Error processing Firestore operation for session {}: {}", sessionId, e.getMessage(), e); throw new FirestorePersistenceException("Failed to process Firestore operation for session " + sessionId, e); } }