UPDATE FIX BUG_679

This commit is contained in:
PAVEL PALMA
2025-10-27 18:20:19 -06:00
parent 9fb1088f7d
commit 6d68ae16ef
9 changed files with 210 additions and 47 deletions

View File

@@ -1,106 +1,214 @@
package com.example.service.base;
import com.example.repository.FirestoreBaseRepository;
import com.google.cloud.firestore.CollectionReference;
import com.google.cloud.firestore.Firestore;
import com.google.cloud.firestore.QueryDocumentSnapshot;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Service
public class DataPurgeService {
private static final Logger logger = LoggerFactory.getLogger(DataPurgeService.class);
private final ReactiveRedisTemplate<String, ?> redisTemplate;
private final FirestoreBaseRepository firestoreBaseRepository;
private final Firestore firestore;
@Autowired
public DataPurgeService(
@Qualifier("reactiveRedisTemplate") ReactiveRedisTemplate<String, ?> redisTemplate,
FirestoreBaseRepository firestoreBaseRepository, Firestore firestore) {
this.redisTemplate = redisTemplate;
this.firestoreBaseRepository = firestoreBaseRepository;
this.firestore = firestore;
}
public Mono<Void> purgeAllData() {
return purgeRedis()
.then(purgeFirestore());
}
private Mono<Void> purgeRedis() {
logger.info("Starting Redis data purge.");
return redisTemplate.getConnectionFactory().getReactiveConnection().serverCommands().flushAll()
.doOnSuccess(v -> logger.info("Successfully purged all data from Redis."))
.doOnError(e -> logger.error("Error purging data from Redis.", e))
.then();
}
private Mono<Void> purgeFirestore() {
logger.info("Starting Firestore data purge.");
return Mono.fromRunnable(() -> {
try {
String appId = firestoreBaseRepository.getAppId();
String conversationsCollectionPath = String.format("artifacts/%s/conversations", appId);
String notificationsCollectionPath = String.format("artifacts/%s/notifications", appId);
// Delete 'messages' sub-collections in 'conversations'
logger.info("Deleting 'messages' sub-collections from '{}'", conversationsCollectionPath);
// Delete 'mensajes' sub-collections in 'conversations'
logger.info("Deleting 'mensajes' sub-collections from '{}'", conversationsCollectionPath);
try {
List<QueryDocumentSnapshot> conversationDocuments = firestore.collection(conversationsCollectionPath).get().get().getDocuments();
for (QueryDocumentSnapshot document : conversationDocuments) {
String messagesCollectionPath = document.getReference().getPath() + "/messages";
String messagesCollectionPath = document.getReference().getPath() + "/mensajes";
logger.info("Deleting sub-collection: {}", messagesCollectionPath);
firestoreBaseRepository.deleteCollection(messagesCollectionPath, 50);
}
} catch (Exception e) {
if (e.getMessage().contains("NOT_FOUND")) {
logger.warn("Collection '{}' not found, skipping.", conversationsCollectionPath);
} else {
throw e;
}
}
// Delete the 'conversations' collection
logger.info("Deleting collection: {}", conversationsCollectionPath);
try {
firestoreBaseRepository.deleteCollection(conversationsCollectionPath, 50);
} catch (Exception e) {
if (e.getMessage().contains("NOT_FOUND")) {
logger.warn("Collection '{}' not found, skipping.", conversationsCollectionPath);
} else {
throw e;
}
else {
throw e;
}
}
// Delete the 'notifications' collection
logger.info("Deleting collection: {}", notificationsCollectionPath);
try {
firestoreBaseRepository.deleteCollection(notificationsCollectionPath, 50);
} catch (Exception e) {
if (e.getMessage().contains("NOT_FOUND")) {
logger.warn("Collection '{}' not found, skipping.", notificationsCollectionPath);
} else {
throw e;
}
}
logger.info("Successfully purged Firestore collections.");
} catch (Exception e) {
logger.error("Error purging Firestore collections.", e);
throw new RuntimeException("Failed to purge Firestore collections.", e);
}
}).subscribeOn(Schedulers.boundedElastic()).then();
}
}

View File

@@ -7,6 +7,7 @@ package com.example.service.base;
import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
import com.example.service.conversation.FirestoreConversationService;
import com.example.service.conversation.MemoryStoreConversationService;
import com.example.service.notification.FirestoreNotificationService;
import com.example.service.notification.MemoryStoreNotificationService;
import org.slf4j.Logger;
@@ -21,10 +22,9 @@ public class SessionPurgeService {
private static final Logger logger = LoggerFactory.getLogger(SessionPurgeService.class);
private static final String SESSION_KEY_PREFIX = "conversation:session:";
private static final String PHONE_TO_SESSION_KEY_PREFIX = "conversation:phone_to_session:";
private final ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate;
private final ReactiveRedisTemplate<String, String> stringRedisTemplate;
private final MemoryStoreConversationService memoryStoreConversationService;
private final FirestoreConversationService firestoreConversationService;
private final MemoryStoreNotificationService memoryStoreNotificationService;
private final FirestoreNotificationService firestoreNotificationService;
@@ -32,12 +32,12 @@ public class SessionPurgeService {
@Autowired
public SessionPurgeService(
ReactiveRedisTemplate<String, ConversationSessionDTO> redisTemplate,
ReactiveRedisTemplate<String, String> stringRedisTemplate,
MemoryStoreConversationService memoryStoreConversationService,
FirestoreConversationService firestoreConversationService,
MemoryStoreNotificationService memoryStoreNotificationService,
FirestoreNotificationService firestoreNotificationService) {
this.redisTemplate = redisTemplate;
this.stringRedisTemplate = stringRedisTemplate;
this.memoryStoreConversationService = memoryStoreConversationService;
this.firestoreConversationService = firestoreConversationService;
this.memoryStoreNotificationService = memoryStoreNotificationService;
this.firestoreNotificationService = firestoreNotificationService;
@@ -50,14 +50,12 @@ public class SessionPurgeService {
return redisTemplate.opsForValue().get(sessionKey)
.flatMap(session -> {
if (session != null && session.telefono() != null) {
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + session.telefono();
return redisTemplate.opsForValue().delete(sessionKey)
.then(stringRedisTemplate.opsForValue().delete(phoneToSessionKey))
return memoryStoreConversationService.deleteSession(sessionId)
.then(firestoreConversationService.deleteSession(sessionId))
.then(memoryStoreNotificationService.deleteNotificationSession(session.telefono()))
.then(firestoreNotificationService.deleteNotification(session.telefono()));
} else {
return redisTemplate.opsForValue().delete(sessionKey)
return memoryStoreConversationService.deleteSession(sessionId)
.then(firestoreConversationService.deleteSession(sessionId));
}
});

View File

@@ -28,7 +28,7 @@ public class FirestoreConversationService {
private static final Logger logger = LoggerFactory.getLogger(FirestoreConversationService.class);
private static final String CONVERSATION_COLLECTION_PATH_FORMAT = "artifacts/%s/conversations";
private static final String MESSAGES_SUBCOLLECTION = "messages";
private static final String MESSAGES_SUBCOLLECTION = "mensajes";
private final FirestoreBaseRepository firestoreBaseRepository;
private final FirestoreConversationMapper firestoreConversationMapper;
private final ConversationMessageMapper conversationMessageMapper;
@@ -110,7 +110,7 @@ public class FirestoreConversationService {
return Mono.fromRunnable(() -> {
DocumentReference sessionDocRef = getSessionDocumentReference(sessionId);
try {
firestoreBaseRepository.deleteDocument(sessionDocRef);
firestoreBaseRepository.deleteDocumentAndSubcollections(sessionDocRef, MESSAGES_SUBCOLLECTION);
logger.info("Successfully deleted conversation session for session {}.", sessionId);
} catch (InterruptedException | ExecutionException e) {
handleException(e, sessionId);

View File

@@ -84,7 +84,18 @@ public class MemoryStoreConversationService {
String sessionKey = SESSION_KEY_PREFIX + sessionId;
String messagesKey = MESSAGES_KEY_PREFIX + sessionId;
logger.info("Deleting session {} from Memorystore.", sessionId);
return redisTemplate.opsForValue().delete(sessionKey)
.then(messageRedisTemplate.opsForList().delete(messagesKey)).then();
return redisTemplate.opsForValue().get(sessionKey)
.flatMap(session -> {
if (session != null && session.telefono() != null) {
String phoneToSessionKey = PHONE_TO_SESSION_KEY_PREFIX + session.telefono();
return redisTemplate.opsForValue().delete(sessionKey)
.then(stringRedisTemplate.opsForValue().delete(phoneToSessionKey))
.then(messageRedisTemplate.opsForList().delete(messagesKey));
} else {
return redisTemplate.opsForValue().delete(sessionKey)
.then(messageRedisTemplate.opsForList().delete(messagesKey));
}
}).then();
}
}