137 lines
7.0 KiB
Java
137 lines
7.0 KiB
Java
/*
|
|
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
|
|
* Your use of it is subject to your agreement with Google.
|
|
*/
|
|
|
|
package com.example.service.conversation;
|
|
|
|
import com.example.dto.dialogflow.conversation.ConversationMessageDTO;
|
|
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
|
|
import com.example.exception.FirestorePersistenceException;
|
|
import com.example.mapper.conversation.ConversationMessageMapper;
|
|
import com.example.mapper.conversation.FirestoreConversationMapper;
|
|
import com.example.repository.FirestoreBaseRepository;
|
|
import com.google.cloud.firestore.DocumentReference;
|
|
import com.google.cloud.firestore.DocumentSnapshot;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.stereotype.Service;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
|
import java.util.Objects;
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
@Service
|
|
public class FirestoreConversationService {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationService.class);
|
|
private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations";
|
|
private static final String MESSAGES_SUBCOLLECTION = "mensajes";
|
|
private final FirestoreBaseRepository firestoreBaseRepository;
|
|
private final FirestoreConversationMapper firestoreConversationMapper;
|
|
private final ConversationMessageMapper conversationMessageMapper;
|
|
|
|
public FirestoreConversationService(FirestoreBaseRepository firestoreBaseRepository, FirestoreConversationMapper firestoreConversationMapper, ConversationMessageMapper conversationMessageMapper) {
|
|
this.firestoreBaseRepository = firestoreBaseRepository;
|
|
this.firestoreConversationMapper = firestoreConversationMapper;
|
|
this.conversationMessageMapper = conversationMessageMapper;
|
|
}
|
|
|
|
public Mono<Void> saveSession(ConversationSessionDTO session) {
|
|
return Mono.fromRunnable(() -> {
|
|
DocumentReference sessionDocRef = getSessionDocumentReference(session.sessionId());
|
|
try {
|
|
firestoreBaseRepository.setDocument(sessionDocRef, firestoreConversationMapper.createSessionMap(session));
|
|
} catch (ExecutionException | InterruptedException e) {
|
|
handleException(e, session.sessionId());
|
|
}
|
|
}).subscribeOn(Schedulers.boundedElastic()).then();
|
|
}
|
|
|
|
public Mono<Void> saveMessage(String sessionId, ConversationMessageDTO message) {
|
|
return Mono.fromRunnable(() -> {
|
|
DocumentReference messageDocRef = getSessionDocumentReference(sessionId).collection(MESSAGES_SUBCOLLECTION).document();
|
|
try {
|
|
firestoreBaseRepository.setDocument(messageDocRef, conversationMessageMapper.toMap(message));
|
|
} catch (ExecutionException | InterruptedException e) {
|
|
handleException(e, sessionId);
|
|
}
|
|
}).subscribeOn(Schedulers.boundedElastic()).then();
|
|
}
|
|
|
|
public Flux<ConversationMessageDTO> getMessages(String sessionId) {
|
|
String messagesPath = getConversationCollectionPath() + "/" + sessionId + "/" + MESSAGES_SUBCOLLECTION;
|
|
return firestoreBaseRepository.getDocuments(messagesPath)
|
|
.map(documentSnapshot -> {
|
|
if (documentSnapshot != null && documentSnapshot.exists()) {
|
|
return conversationMessageMapper.fromMap(documentSnapshot.getData());
|
|
}
|
|
return null;
|
|
})
|
|
.filter(Objects::nonNull);
|
|
}
|
|
|
|
public Mono<ConversationSessionDTO> getConversationSession(String sessionId) {
|
|
logger.info("Attempting to retrieve conversation session for session {}.", sessionId);
|
|
return Mono.fromCallable(() -> {
|
|
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
|
|
try {
|
|
DocumentSnapshot documentSnapshot = firestoreBaseRepository.getDocumentSnapshot(sessionDocRef);
|
|
if (documentSnapshot != null && documentSnapshot.exists()) {
|
|
ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot);
|
|
logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionId);
|
|
return sessionDTO;
|
|
}
|
|
logger.info("Conversation session not found for session {}.", sessionId);
|
|
return null;
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
handleException(e, sessionId);
|
|
return null;
|
|
}
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
|
}
|
|
|
|
public Mono<ConversationSessionDTO> getSessionByTelefono(String userPhoneNumber) {
|
|
return firestoreBaseRepository.getDocumentsByField(getConversationCollectionPath(), "telefono", userPhoneNumber)
|
|
.map(documentSnapshot -> {
|
|
if (documentSnapshot != null && documentSnapshot.exists()) {
|
|
ConversationSessionDTO sessionDTO = firestoreConversationMapper.mapFirestoreDocumentToConversationSessionDTO(documentSnapshot);
|
|
logger.info("Successfully retrieved and mapped conversation session for session {}.", sessionDTO.sessionId());
|
|
return sessionDTO;
|
|
}
|
|
return null;
|
|
});
|
|
}
|
|
|
|
public Mono<Void> deleteSession(String sessionId) {
|
|
logger.info("Attempting to delete conversation session for session {}.", sessionId);
|
|
return Mono.fromRunnable(() -> {
|
|
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
|
|
try {
|
|
firestoreBaseRepository.deleteDocumentAndSubcollections(sessionDocRef, MESSAGES_SUBCOLLECTION);
|
|
logger.info("Successfully deleted conversation session for session {}.", sessionId);
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
handleException(e, sessionId);
|
|
}
|
|
}).subscribeOn(Schedulers.boundedElastic()).then();
|
|
}
|
|
|
|
private String getConversationCollectionPath() {
|
|
return String.format(CONVERSATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId());
|
|
}
|
|
|
|
private DocumentReference getSessionDocumentReference(String sessionId) {
|
|
String collectionPath = getConversationCollectionPath();
|
|
return firestoreBaseRepository.getDocumentReference(collectionPath, sessionId);
|
|
}
|
|
|
|
private void handleException(Exception e, String sessionId) {
|
|
if (e instanceof InterruptedException) {
|
|
Thread.currentThread().interrupt();
|
|
}
|
|
logger.error("Error processing Firestore operation for session {}: {}", sessionId, e.getMessage(), e);
|
|
throw new FirestorePersistenceException("Failed to process Firestore operation for session " + sessionId, e);
|
|
}
|
|
} |