Add notification model (#31)
All checks were successful
CI / ci (push) Successful in 21s

Co-authored-by: Anibal Angulo <a8065384@banorte.com>
Reviewed-on: #31
This commit was merged in pull request #31.
This commit is contained in:
2026-03-10 23:50:41 +00:00
parent a264276a5d
commit ac27d12ed3
3 changed files with 92 additions and 49 deletions

View File

@@ -7,32 +7,78 @@ import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
from pydantic import AliasChoices, BaseModel, Field, field_validator
if TYPE_CHECKING:
from google.cloud.firestore_v1.async_client import AsyncClient
logger = logging.getLogger(__name__)
def _extract_ts(n: dict[str, Any]) -> float:
"""Return the creation timestamp of a notification as epoch seconds."""
raw = n.get("timestamp_creacion", n.get("timestampCreacion", 0))
if isinstance(raw, (int, float)):
return float(raw)
if isinstance(raw, datetime):
return raw.timestamp()
if isinstance(raw, str):
try:
return float(raw)
except ValueError:
return 0.0
return 0.0
class Notification(BaseModel):
"""A single notification, normalised from either schema.
Handles snake_case (``id_notificacion``), camelCase
(``idNotificacion``), and English short names (``notificationId``)
transparently via ``AliasChoices``.
"""
id_notificacion: str = Field(
validation_alias=AliasChoices(
"id_notificacion", "idNotificacion", "notificationId"
),
)
texto: str = Field(
default="Sin texto",
validation_alias=AliasChoices("texto", "text"),
)
nombre_evento: str = Field(
default="notificacion",
validation_alias=AliasChoices(
"nombre_evento_dialogflow", "nombreEventoDialogflow", "event"
),
)
timestamp_creacion: float = Field(
default=0.0,
validation_alias=AliasChoices("timestamp_creacion", "timestampCreacion"),
)
status: str = "active"
parametros: dict[str, Any] = Field(
default_factory=dict,
validation_alias=AliasChoices("parametros", "parameters"),
)
@field_validator("timestamp_creacion", mode="before")
@classmethod
def _coerce_timestamp(cls, v: Any) -> float:
"""Normalise Firestore timestamps (float, str, datetime) to float."""
if isinstance(v, (int, float)):
return float(v)
if isinstance(v, datetime):
return v.timestamp()
if isinstance(v, str):
try:
return float(v)
except ValueError:
return 0.0
return 0.0
class NotificationDocument(BaseModel):
"""Top-level Firestore / Redis document that wraps a list of notifications.
Mirrors the schema used by ``utils/check_notifications.py``
(``NotificationSession``) but keeps only what the agent needs.
"""
notificaciones: list[Notification] = Field(default_factory=list)
@runtime_checkable
class NotificationBackend(Protocol):
"""Backend-agnostic interface for notification storage."""
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]:
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
"""Return recent notifications for *phone_number*."""
...
@@ -65,7 +111,7 @@ class FirestoreNotificationBackend:
self._max_to_notify = max_to_notify
self._window_hours = window_hours
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]:
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
"""Get recent notifications for a user.
Retrieves notifications created within the configured time window,
@@ -75,14 +121,7 @@ class FirestoreNotificationBackend:
phone_number: User's phone number (used as document ID)
Returns:
List of notification dictionaries with structure:
{
"id_notificacion": str,
"texto": str,
"status": str,
"timestamp_creacion": timestamp,
"parametros": {...}
}
List of validated :class:`Notification` instances.
"""
try:
@@ -96,17 +135,19 @@ class FirestoreNotificationBackend:
return []
data = doc.to_dict() or {}
all_notifications = data.get("notificaciones", [])
document = NotificationDocument.model_validate(data)
if not all_notifications:
if not document.notificaciones:
logger.info("No notifications in array for phone: %s", phone_number)
return []
cutoff = time.time() - (self._window_hours * 3600)
recent = [n for n in all_notifications if _extract_ts(n) >= cutoff]
parsed = [
n for n in document.notificaciones if n.timestamp_creacion >= cutoff
]
if not recent:
if not parsed:
logger.info(
"No notifications within the last %.0fh for phone: %s",
self._window_hours,
@@ -114,13 +155,13 @@ class FirestoreNotificationBackend:
)
return []
recent.sort(key=_extract_ts, reverse=True)
parsed.sort(key=lambda n: n.timestamp_creacion, reverse=True)
result = recent[: self._max_to_notify]
result = parsed[: self._max_to_notify]
logger.info(
"Found %d recent notifications for phone: %s (returning top %d)",
len(recent),
len(parsed),
phone_number,
len(result),
)
@@ -165,7 +206,7 @@ class RedisNotificationBackend:
self._max_to_notify = max_to_notify
self._window_hours = window_hours
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]:
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
"""Get recent notifications for a user from Redis.
Reads from the ``notification:{phone}`` key, parses the JSON
@@ -185,10 +226,9 @@ class RedisNotificationBackend:
)
return []
data = json.loads(raw)
all_notifications: list[dict[str, Any]] = data.get("notificaciones", [])
document = NotificationDocument.model_validate(json.loads(raw))
if not all_notifications:
if not document.notificaciones:
logger.info(
"No notifications in array for phone: %s",
phone_number,
@@ -197,9 +237,11 @@ class RedisNotificationBackend:
cutoff = time.time() - (self._window_hours * 3600)
recent = [n for n in all_notifications if _extract_ts(n) >= cutoff]
parsed = [
n for n in document.notificaciones if n.timestamp_creacion >= cutoff
]
if not recent:
if not parsed:
logger.info(
"No notifications within the last %.0fh for phone: %s",
self._window_hours,
@@ -207,13 +249,13 @@ class RedisNotificationBackend:
)
return []
recent.sort(key=_extract_ts, reverse=True)
parsed.sort(key=lambda n: n.timestamp_creacion, reverse=True)
result = recent[: self._max_to_notify]
result = parsed[: self._max_to_notify]
logger.info(
"Found %d recent notifications for phone: %s (returning top %d)",
len(recent),
len(parsed),
phone_number,
len(result),
)