Files
int-layer/src/main/java/com/example/service/conversation/FirestoreConversationService.java
Anibal Angulo da95a64fb7 Initial commit
2026-02-18 19:29:54 +00:00

137 lines
7.0 KiB
Java

/*
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
* Your use of it is subject to your agreement with Google.
*/
package com.example.service.conversation;
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.exception.FirestorePersistenceException;
import com.example.mapper.conversation.ConversationMessageMapper;
import com.example.mapper.conversation.FirestoreConversationMapper;
import com.example.repository.FirestoreBaseRepository;
import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.DocumentSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
@Service
public class FirestoreConversationService {
private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationService.class);
private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations";
private static final String MESSAGES_SUBCOLLECTION = "mensajes";
private final FirestoreBaseRepository firestoreBaseRepository;
private final FirestoreConversationMapper firestoreConversationMapper;
private final ConversationMessageMapper conversationMessageMapper;
public FirestoreConversationService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper, ConversationMessageMapper conversationMessageMapper) {
this.firestoreBaseRepository = firestoreBaseRepository;
this.firestoreConversationMapper = firestoreConversationMapper;
this.conversationMessageMapper = conversationMessageMapper;
}
public Mono<Void> saveSession(ConversationSessionDTO session) {
return Mono.fromRunnable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(session.sessionId());
try {
firestoreBaseRepository.setDocument(sessionDocRef, firestoreConversationMapper.createSessionMap(session));
} catch (ExecutionException | InterruptedException e) {
handleException(e, session.sessionId());
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
public Mono<Void> saveMessage(String sessionId, ConversationMessageDTO message) {
return Mono.fromRunnable(() -> {
DocumentReference messageDocRef = getSessionDocumentReference(sessionId).collection(MESSAGES_SUBCOLLECTION).document();
try {
firestoreBaseRepository.setDocument(messageDocRef, conversationMessageMapper.toMap(message));
} catch (ExecutionException | InterruptedException e) {
handleException(e, sessionId);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
public Flux<ConversationMessageDTO> getMessages(String sessionId) {
String messagesPath = getConversationCollectionPath() + "/" + sessionId + "/" + MESSAGES_SUBCOLLECTION;
return firestoreBaseRepository.getDocuments(messagesPath)
.map(documentSnapshot -> {
if (documentSnapshot != null && documentSnapshot.exists()) {
return conversationMessageMapper.fromMap(documentSnapshot.getData());
}
return null;
})
.filter(Objects::nonNull);
}
public Mono<ConversationSessionDTO> getConversationSession(String sessionId) {
logger.info("Attempting to retrieve conversation session for session {}.", sessionId);
return Mono.fromCallable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
try {
DocumentSnapshot documentSnapshot = firestoreBaseRepository.getDocumentSnapshot(sessionDocRef);
if (documentSnapshot != null && documentSnapshot.exists()) {
ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot);
logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionId);
return sessionDTO;
}
logger.info("Conversation session not found for session {}.", sessionId);
return null;
} catch (InterruptedException | ExecutionException e) {
handleException(e, sessionId);
return null;
}
}).subscribeOn(Schedulers.boundedElastic());
}
public Mono<ConversationSessionDTO> getSessionByTelefono(String userPhoneNumber) {
return firestoreBaseRepository.getDocumentsByField(getConversationCollectionPath(), "telefono", userPhoneNumber)
.map(documentSnapshot -> {
if (documentSnapshot != null && documentSnapshot.exists()) {
ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot);
logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionDTO.sessionId());
return sessionDTO;
}
return null;
});
}
public Mono<Void> deleteSession(String sessionId) {
logger.info("Attempting to delete conversation session for session {}.", sessionId);
return Mono.fromRunnable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
try {
firestoreBaseRepository.deleteDocumentAndSubcollections(sessionDocRef, MESSAGES_SUBCOLLECTION);
logger.info("Successfully deleted conversation session for session {}.", sessionId);
} catch (InterruptedException | ExecutionException e) {
handleException(e, sessionId);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
private String getConversationCollectionPath() {
return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
}
private DocumentReference getSessionDocumentReference(String sessionId) {
String collectionPath = getConversationCollectionPath();
return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId);
}
private void handleException(Exception e, String sessionId) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.error("Error processing Firestore operation for session {}: {}", sessionId, e.getMessage(), e);
throw new FirestorePersistenceException("Failed to process Firestore operation for session " + sessionId, e);
}
}