"""Notification management for VAia agent.""" from __future__ import annotations import logging import time from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable if TYPE_CHECKING: from google.cloud.firestore_v1.async_client import AsyncClient logger = logging.getLogger(__name__) @runtime_checkable class NotificationBackend(Protocol): """Backend-agnostic interface for notification storage.""" async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: """Return recent notifications for *phone_number*.""" ... async def mark_as_notified( self, phone_number: str, notification_ids: list[str] ) -> bool: """Mark the given notification IDs as notified. Return success.""" ... class FirestoreNotificationBackend: """Firestore-backed notification backend (read-only). Reads notifications from a Firestore document keyed by phone number. Filters by a configurable time window instead of tracking read/unread state — the agent is awareness-only; delivery happens in the app. """ def __init__( self, *, db: AsyncClient, collection_path: str, max_to_notify: int = 5, window_hours: float = 48, ) -> None: """Initialize with Firestore client and collection path.""" self._db = db self._collection_path = collection_path self._max_to_notify = max_to_notify self._window_hours = window_hours async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: """Get recent notifications for a user. Retrieves notifications created within the configured time window, ordered by timestamp (most recent first), limited to max_to_notify. Args: phone_number: User's phone number (used as document ID) Returns: List of notification dictionaries with structure: { "id_notificacion": str, "texto": str, "status": str, "timestamp_creacion": timestamp, "parametros": {...} } """ try: doc_ref = self._db.collection(self._collection_path).document(phone_number) doc = await doc_ref.get() if not doc.exists: logger.info( "No notification document found for phone: %s", phone_number ) return [] data = doc.to_dict() or {} all_notifications = data.get("notificaciones", []) if not all_notifications: logger.info("No notifications in array for phone: %s", phone_number) return [] cutoff = time.time() - (self._window_hours * 3600) def _ts(n: dict[str, Any]) -> Any: return n.get( "timestamp_creacion", n.get("timestampCreacion", 0), ) recent = [n for n in all_notifications if _ts(n) >= cutoff] if not recent: logger.info( "No notifications within the last %.0fh for phone: %s", self._window_hours, phone_number, ) return [] recent.sort(key=_ts, reverse=True) result = recent[: self._max_to_notify] logger.info( "Found %d recent notifications for phone: %s (returning top %d)", len(recent), phone_number, len(result), ) except Exception: logger.exception( "Failed to fetch notifications for phone: %s", phone_number ) return [] else: return result async def mark_as_notified( self, phone_number: str, # noqa: ARG002 notification_ids: list[str], # noqa: ARG002 ) -> bool: """No-op — the agent is not the delivery mechanism.""" return True class RedisNotificationBackend: """Redis-backed notification backend (read-only).""" def __init__( self, *, host: str = "127.0.0.1", port: int = 6379, max_to_notify: int = 5, window_hours: float = 48, ) -> None: """Initialize with Redis connection parameters.""" import redis.asyncio as aioredis # noqa: PLC0415 self._client = aioredis.Redis( host=host, port=port, decode_responses=True, socket_connect_timeout=5, ) self._max_to_notify = max_to_notify self._window_hours = window_hours async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: """Get recent notifications for a user from Redis. Reads from the ``notification:{phone}`` key, parses the JSON payload, and returns notifications created within the configured time window, sorted by creation timestamp (most recent first), limited to *max_to_notify*. """ import json # noqa: PLC0415 try: raw = await self._client.get(f"notification:{phone_number}") if not raw: logger.info( "No notification data in Redis for phone: %s", phone_number, ) return [] data = json.loads(raw) all_notifications: list[dict[str, Any]] = data.get("notificaciones", []) if not all_notifications: logger.info( "No notifications in array for phone: %s", phone_number, ) return [] cutoff = time.time() - (self._window_hours * 3600) def _ts(n: dict[str, Any]) -> Any: return n.get( "timestamp_creacion", n.get("timestampCreacion", 0), ) recent = [n for n in all_notifications if _ts(n) >= cutoff] if not recent: logger.info( "No notifications within the last %.0fh for phone: %s", self._window_hours, phone_number, ) return [] recent.sort(key=_ts, reverse=True) result = recent[: self._max_to_notify] logger.info( "Found %d recent notifications for phone: %s (returning top %d)", len(recent), phone_number, len(result), ) except Exception: logger.exception( "Failed to fetch notifications from Redis for phone: %s", phone_number, ) return [] else: return result async def mark_as_notified( self, phone_number: str, # noqa: ARG002 notification_ids: list[str], # noqa: ARG002 ) -> bool: """No-op — the agent is not the delivery mechanism.""" return True