/* * Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. * Your use of it is subject to your agreement with Google. */ package com.example.service.notification; import com.example.dto.dialogflow.notification.NotificationDTO; import com.example.exception.FirestorePersistenceException; import com.example.mapper.notification.FirestoreNotificationMapper; import com.example.repository.FirestoreBaseRepository; import com.google.cloud.Timestamp; import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.FieldValue; import java.time.Instant; import java.util.Collections; import java.util.Map; import java.util.List; import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @Service public class FirestoreNotificationService { private static final Logger logger = LoggerFactory.getLogger(FirestoreNotificationService.class); private static final String NOTIFICATION_COLLECTION_PATH_FORMAT = "artifacts/%s/notifications"; private static final String FIELD_MESSAGES = "notificaciones"; private static final String FIELD_LAST_UPDATED = "ultimaActualizacion"; private static final String FIELD_PHONE_NUMBER = "telefono"; private static final String FIELD_NOTIFICATION_ID = "sessionId"; private final FirestoreBaseRepository firestoreBaseRepository; private final FirestoreNotificationMapper firestoreNotificationMapper; public FirestoreNotificationService( FirestoreBaseRepository firestoreBaseRepository, FirestoreNotificationMapper firestoreNotificationMapper, MemoryStoreNotificationService memoryStoreNotificationService) { this.firestoreBaseRepository = firestoreBaseRepository; this.firestoreNotificationMapper = firestoreNotificationMapper; } public Mono saveOrAppendNotificationEntry(NotificationDTO newEntry) { return Mono.fromRunnable( () -> { String phoneNumber = newEntry.telefono(); if (phoneNumber == null || phoneNumber.isBlank()) { throw new IllegalArgumentException( "Phone number is required to manage notification entries."); } // Use the phone number as the document ID for the session. String notificationSessionId = phoneNumber; // Synchronize on the notification session ID to prevent race conditions when // creating a new session. synchronized (notificationSessionId.intern()) { DocumentReference notificationDocRef = getNotificationDocumentReference(notificationSessionId); Map entryMap = firestoreNotificationMapper.mapNotificationDTOToMap(newEntry); try { // Check if the session document exists. boolean docExists = firestoreBaseRepository.documentExists(notificationDocRef); if (docExists) { // If the document exists, append the new entry to the 'notificaciones' array. Map updates = Map.of( FIELD_MESSAGES, FieldValue.arrayUnion(entryMap), FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now()))); firestoreBaseRepository.updateDocument(notificationDocRef, updates); logger.info( "Successfully appended new entry to notification session {} in Firestore.", notificationSessionId); } else { // If the document does not exist, create a new session document. Map newSessionData = Map.of( FIELD_NOTIFICATION_ID, notificationSessionId, FIELD_PHONE_NUMBER, phoneNumber, "fechaCreacion", Timestamp.of(java.util.Date.from(Instant.now())), FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now())), FIELD_MESSAGES, Collections.singletonList(entryMap)); firestoreBaseRepository.setDocument(notificationDocRef, newSessionData); logger.info( "Successfully created a new notification session {} in Firestore.", notificationSessionId); } } catch (ExecutionException e) { logger.error( "Error saving notification to Firestore for phone: {}", e.getMessage(), e); throw new FirestorePersistenceException( "Failed to save notification to Firestore for phone ", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error( "Thread interrupted while saving notification to Firestore for phone {}: {}", phoneNumber, e.getMessage(), e); throw new FirestorePersistenceException( "Saving notification was interrupted for phone ", e); } } }) .subscribeOn(Schedulers.boundedElastic()) .then(); } private String getNotificationCollectionPath() { return String.format(NOTIFICATION_COLLECTION_PATH_FORMAT, firestoreBaseRepository.getAppId()); } private DocumentReference getNotificationDocumentReference(String notificationId) { String collectionPath = getNotificationCollectionPath(); return firestoreBaseRepository.getDocumentReference(collectionPath, notificationId); } @SuppressWarnings("unchecked") public Mono updateNotificationStatus(String sessionId, String status) { return Mono.fromRunnable(() -> { DocumentReference notificationDocRef = getNotificationDocumentReference(sessionId); try { Map sessionData = firestoreBaseRepository.getDocument(notificationDocRef, Map.class); if (sessionData != null) { List> notifications = (List>) sessionData .get(FIELD_MESSAGES); if (notifications != null) { List> updatedNotifications = new ArrayList<>(); for (Map notification : notifications) { Map updatedNotification = new HashMap<>(notification); updatedNotification.put("status", status); updatedNotifications.add(updatedNotification); } Map updates = new HashMap<>(); updates.put(FIELD_MESSAGES, updatedNotifications); updates.put(FIELD_LAST_UPDATED, Timestamp.of(java.util.Date.from(Instant.now()))); firestoreBaseRepository.updateDocument(notificationDocRef, updates); logger.info("Successfully updated notification status to '{}' for session {} in Firestore.", status, sessionId); } } else { logger.warn("Notification session {} not found in Firestore. Cannot update status.", sessionId); } } catch (ExecutionException e) { logger.error("Error updating notification status in Firestore for session {}: {}", sessionId, e.getMessage(), e); throw new FirestorePersistenceException( "Failed to update notification status in Firestore for session " + sessionId, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Thread interrupted while updating notification status in Firestore for session {}: {}", sessionId, e.getMessage(), e); throw new FirestorePersistenceException( "Updating notification status was interrupted for session " + sessionId, e); } }) .subscribeOn(Schedulers.boundedElastic()) .then(); } public Mono deleteNotification(String notificationId) { logger.info("Attempting to delete notification session {} from Firestore.", notificationId); return Mono.fromRunnable(() -> { try { DocumentReference notificationDocRef = getNotificationDocumentReference(notificationId); firestoreBaseRepository.deleteDocument(notificationDocRef); logger.info("Successfully deleted notification session {} from Firestore.", notificationId); } catch (ExecutionException e) { logger.error("Error deleting notification session {} from Firestore: {}", notificationId, e.getMessage(), e); throw new FirestorePersistenceException("Failed to delete notification session " + notificationId, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Thread interrupted while deleting notification session {} from Firestore: {}", notificationId, e.getMessage(), e); throw new FirestorePersistenceException("Deleting notification session was interrupted for " + notificationId, e); } }).subscribeOn(Schedulers.boundedElastic()).then(); } }