"""Firestore service for conversation and notification persistence.""" import logging from datetime import UTC, datetime from google.cloud import firestore from google.cloud.firestore_v1.base_query import FieldFilter from capa_de_integracion.config import Settings from capa_de_integracion.models import ConversationEntry, ConversationSession from capa_de_integracion.models.notification import Notification logger = logging.getLogger(__name__) class FirestoreService: """Service for Firestore operations on conversations.""" def __init__(self, settings: Settings) -> None: """Initialize Firestore client.""" self.settings = settings self.db = firestore.AsyncClient( project=settings.gcp_project_id, database=settings.firestore_database_id, ) self.conversations_collection = ( f"artifacts/{settings.gcp_project_id}/conversations" ) self.entries_subcollection = "mensajes" self.notifications_collection = ( f"artifacts/{settings.gcp_project_id}/notifications" ) logger.info( "Firestore client initialized for project: %s", settings.gcp_project_id, ) async def close(self) -> None: """Close Firestore client.""" self.db.close() logger.info("Firestore client closed") def _session_ref(self, session_id: str) -> firestore.AsyncDocumentReference: """Get Firestore document reference for session.""" return self.db.collection(self.conversations_collection).document(session_id) async def get_session(self, session_id: str) -> ConversationSession | None: """Retrieve conversation session from Firestore by session ID.""" try: doc_ref = self._session_ref(session_id) doc = await doc_ref.get() if not doc.exists: logger.debug("Session not found in Firestore: %s", session_id) return None data = doc.to_dict() session = ConversationSession.model_validate(data) logger.debug("Retrieved session from Firestore: %s", session_id) except Exception: logger.exception( "Error retrieving session %s from Firestore:", session_id, ) return None else: return session async def get_session_by_phone(self, telefono: str) -> ConversationSession | None: """Retrieve most recent conversation session from Firestore by phone number. Args: telefono: User phone number Returns: Most recent session for this phone, or None if not found """ try: query = ( self.db.collection(self.conversations_collection) .where(filter=FieldFilter("telefono", "==", telefono)) .limit(1) ) docs = query.stream() async for doc in docs: data = doc.to_dict() session = ConversationSession.model_validate(data) logger.debug( "Retrieved session from Firestore for phone %s: %s", telefono, session.session_id, ) return session logger.debug("No session found in Firestore for phone: %s", telefono) except Exception: logger.exception( "Error querying session by phone %s from Firestore:", telefono, ) return None else: return None async def save_session(self, session: ConversationSession) -> bool: """Save conversation session to Firestore.""" try: doc_ref = self._session_ref(session.session_id) data = session.model_dump() await doc_ref.set(data, merge=True) logger.debug("Saved session to Firestore: %s", session.session_id) except Exception: logger.exception( "Error saving session %s to Firestore:", session.session_id, ) return False else: return True async def create_session( self, session_id: str, user_id: str, telefono: str, pantalla_contexto: str | None = None, last_message: str | None = None, ) -> ConversationSession: """Create and save a new conversation session to Firestore. Args: session_id: Unique session identifier user_id: User identifier telefono: User phone number pantalla_contexto: Optional screen context for the conversation last_message: Optional last message in the conversation Returns: The created session Raises: Exception: If session creation or save fails """ session = ConversationSession.create( session_id=session_id, user_id=user_id, telefono=telefono, pantalla_contexto=pantalla_contexto, last_message=last_message, ) doc_ref = self._session_ref(session.session_id) data = session.model_dump() await doc_ref.set(data, merge=True) logger.info("Created new session in Firestore: %s", session_id) return session async def save_entry(self, session_id: str, entry: ConversationEntry) -> bool: """Save conversation entry to Firestore subcollection.""" try: doc_ref = self._session_ref(session_id) entries_ref = doc_ref.collection(self.entries_subcollection) # Use timestamp as document ID for chronological ordering entry_id = entry.timestamp.isoformat() entry_doc = entries_ref.document(entry_id) data = entry.model_dump() await entry_doc.set(data) logger.debug("Saved entry to Firestore for session: %s", session_id) except Exception: logger.exception( "Error saving entry for session %s to Firestore:", session_id, ) return False else: return True async def get_entries( self, session_id: str, limit: int = 10, ) -> list[ConversationEntry]: """Retrieve recent conversation entries from Firestore.""" try: doc_ref = self._session_ref(session_id) entries_ref = doc_ref.collection(self.entries_subcollection) # Get entries ordered by timestamp descending query = entries_ref.order_by( "timestamp", direction=firestore.Query.DESCENDING, ).limit(limit) docs = query.stream() entries = [] async for doc in docs: entry_data = doc.to_dict() entry = ConversationEntry.model_validate(entry_data) entries.append(entry) # Reverse to get chronological order entries.reverse() logger.debug( "Retrieved %s entries for session: %s", len(entries), session_id, ) except Exception: logger.exception( "Error retrieving entries for session %s from Firestore:", session_id, ) return [] else: return entries async def delete_session(self, session_id: str) -> bool: """Delete conversation session and all entries from Firestore.""" try: doc_ref = self._session_ref(session_id) # Delete all entries first entries_ref = doc_ref.collection(self.entries_subcollection) async for doc in entries_ref.stream(): await doc.reference.delete() # Delete session document await doc_ref.delete() logger.debug("Deleted session from Firestore: %s", session_id) except Exception: logger.exception( "Error deleting session %s from Firestore:", session_id, ) return False else: return True async def update_pantalla_contexto( self, session_id: str, pantalla_contexto: str | None, ) -> bool: """Update the pantallaContexto field for a conversation session. Args: session_id: Session ID to update pantalla_contexto: New pantalla contexto value Returns: True if update was successful, False otherwise """ try: doc_ref = self._session_ref(session_id) doc = await doc_ref.get() if not doc.exists: logger.warning( "Session %s not found in Firestore. Cannot update pantallaContexto", session_id, ) return False await doc_ref.update( { "pantallaContexto": pantalla_contexto, "lastModified": datetime.now(UTC), }, ) logger.debug( "Updated pantallaContexto for session %s in Firestore", session_id, ) except Exception: logger.exception( "Error updating pantallaContexto for session %s in Firestore:", session_id, ) return False else: return True # ====== Notification Methods ====== def _notification_ref( self, notification_id: str, ) -> firestore.AsyncDocumentReference: """Get Firestore document reference for notification.""" return self.db.collection(self.notifications_collection).document( notification_id, ) async def save_or_append_notification(self, new_entry: Notification) -> None: """Save or append notification entry to Firestore. Args: new_entry: Notification entry to save Raises: ValueError: If phone number is missing """ phone_number = new_entry.telefono if not phone_number or not phone_number.strip(): msg = "Phone number is required to manage notification entries" raise ValueError(msg) # Use phone number as document ID notification_session_id = phone_number try: doc_ref = self._notification_ref(notification_session_id) doc = await doc_ref.get() entry_dict = new_entry.model_dump() if doc.exists: # Append to existing session await doc_ref.update( { "notificaciones": firestore.ArrayUnion([entry_dict]), "ultima_actualizacion": datetime.now(UTC), }, ) logger.info( "Successfully appended notification entry " "to session %s in Firestore", notification_session_id, ) else: # Create new notification session new_session_data = { "session_id": notification_session_id, "telefono": phone_number, "fecha_creacion": datetime.now(UTC), "ultima_actualizacion": datetime.now(UTC), "notificaciones": [entry_dict], } await doc_ref.set(new_session_data) logger.info( "Successfully created new notification session %s in Firestore", notification_session_id, ) except Exception: logger.exception( "Error saving notification to Firestore for phone %s", phone_number, ) raise async def update_notification_status(self, session_id: str, status: str) -> None: """Update the status of all notifications in a session. Args: session_id: Notification session ID (phone number) status: New status value """ try: doc_ref = self._notification_ref(session_id) doc = await doc_ref.get() if not doc.exists: logger.warning( "Notification session %s not found in Firestore. " "Cannot update status", session_id, ) return session_data = doc.to_dict() if not session_data: logger.warning( "Notification session %s has no data in Firestore", session_id, ) return notifications = session_data.get("notificaciones", []) # Update status for all notifications updated_notifications = [ {**notif, "status": status} for notif in notifications ] await doc_ref.update( { "notificaciones": updated_notifications, "ultima_actualizacion": datetime.now(UTC), }, ) logger.info( "Successfully updated notification status to '%s' " "for session %s in Firestore", status, session_id, ) except Exception: logger.exception( "Error updating notification status in Firestore for session %s", session_id, ) raise async def delete_notification(self, notification_id: str) -> bool: """Delete notification session from Firestore.""" try: logger.info( "Deleting notification session %s from Firestore", notification_id, ) doc_ref = self._notification_ref(notification_id) await doc_ref.delete() logger.info( "Successfully deleted notification session %s from Firestore", notification_id, ) except Exception: logger.exception( "Error deleting notification session %s from Firestore", notification_id, ) return False else: return True