From 7926d9881c1c2317534b562eb80c98624105a9a1 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Tue, 10 Mar 2026 23:47:11 +0000 Subject: [PATCH 1/2] Add notification model --- src/va_agent/config.py | 7 ++ src/va_agent/dynamic_instruction.py | 14 +-- src/va_agent/notifications.py | 130 +++++++++++++++++++--------- 3 files changed, 102 insertions(+), 49 deletions(-) diff --git a/src/va_agent/config.py b/src/va_agent/config.py index a4e202e..49192d3 100644 --- a/src/va_agent/config.py +++ b/src/va_agent/config.py @@ -1,5 +1,6 @@ """Configuration helper for ADK agent.""" +import logging import os from pydantic_settings import ( @@ -37,6 +38,9 @@ class AgentSettings(BaseSettings): mcp_audience: str mcp_remote_url: str + # Logging + log_level: str = "INFO" + model_config = SettingsConfigDict( yaml_file=CONFIG_FILE_PATH, extra="ignore", # Ignore extra fields from config.yaml @@ -60,3 +64,6 @@ class AgentSettings(BaseSettings): settings = AgentSettings.model_validate({}) + +logging.basicConfig() +logging.getLogger("va_agent").setLevel(settings.log_level.upper()) diff --git a/src/va_agent/dynamic_instruction.py b/src/va_agent/dynamic_instruction.py index 664bad4..0cb4267 100644 --- a/src/va_agent/dynamic_instruction.py +++ b/src/va_agent/dynamic_instruction.py @@ -84,23 +84,16 @@ async def provide_dynamic_instruction( return "" # Build dynamic instruction with notification details - notification_ids = [ - nid - for n in recent_notifications - if (nid := n.get("id_notificacion")) is not None - ] + notification_ids = [n.id_notificacion for n in recent_notifications] count = len(recent_notifications) # Format notification details for the agent (most recent first) now = time.time() notification_details = [] for i, notif in enumerate(recent_notifications, 1): - evento = notif.get("nombre_evento_dialogflow", "notificacion") - texto = notif.get("texto", "Sin texto") - ts = notif.get("timestamp_creacion", notif.get("timestampCreacion", 0)) - ago = _format_time_ago(now, ts) + ago = _format_time_ago(now, notif.timestamp_creacion) notification_details.append( - f" {i}. [{ago}] Evento: {evento} | Texto: {texto}" + f" {i}. [{ago}] Evento: {notif.nombre_evento} | Texto: {notif.texto}" ) details_text = "\n".join(notification_details) @@ -123,6 +116,7 @@ async def provide_dynamic_instruction( count, phone_number, ) + logger.debug("Dynamic instruction content:\n%s", instruction) except Exception: logger.exception( diff --git a/src/va_agent/notifications.py b/src/va_agent/notifications.py index 918823b..4177937 100644 --- a/src/va_agent/notifications.py +++ b/src/va_agent/notifications.py @@ -7,32 +7,80 @@ import time from datetime import datetime from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable +from pydantic import AliasChoices, BaseModel, Field, field_validator + if TYPE_CHECKING: from google.cloud.firestore_v1.async_client import AsyncClient logger = logging.getLogger(__name__) -def _extract_ts(n: dict[str, Any]) -> float: - """Return the creation timestamp of a notification as epoch seconds.""" - raw = n.get("timestamp_creacion", n.get("timestampCreacion", 0)) - if isinstance(raw, (int, float)): - return float(raw) - if isinstance(raw, datetime): - return raw.timestamp() - if isinstance(raw, str): - try: - return float(raw) - except ValueError: - return 0.0 - return 0.0 +class Notification(BaseModel): + """A single notification, normalised from either schema. + + Handles snake_case (``id_notificacion``), camelCase + (``idNotificacion``), and English short names (``notificationId``) + transparently via ``AliasChoices``. + """ + + id_notificacion: str = Field( + validation_alias=AliasChoices( + "id_notificacion", "idNotificacion", "notificationId" + ), + ) + texto: str = Field( + default="Sin texto", + validation_alias=AliasChoices("texto", "text"), + ) + nombre_evento: str = Field( + default="notificacion", + validation_alias=AliasChoices( + "nombre_evento_dialogflow", "nombreEventoDialogflow", "event" + ), + ) + timestamp_creacion: float = Field( + default=0.0, + validation_alias=AliasChoices("timestamp_creacion", "timestampCreacion"), + ) + status: str = "active" + parametros: dict[str, Any] = Field( + default_factory=dict, + validation_alias=AliasChoices("parametros", "parameters"), + ) + + @field_validator("timestamp_creacion", mode="before") + @classmethod + def _coerce_timestamp(cls, v: Any) -> float: + """Normalise Firestore timestamps (float, str, datetime) to float.""" + if isinstance(v, (int, float)): + return float(v) + if isinstance(v, datetime): + return v.timestamp() + if isinstance(v, str): + try: + return float(v) + except ValueError: + return 0.0 + return 0.0 + + +class NotificationDocument(BaseModel): + """Top-level Firestore / Redis document that wraps a list of notifications. + + Mirrors the schema used by ``utils/check_notifications.py`` + (``NotificationSession``) but keeps only what the agent needs. + """ + + notificaciones: list[Notification] = Field(default_factory=list) @runtime_checkable class NotificationBackend(Protocol): """Backend-agnostic interface for notification storage.""" - async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: + async def get_recent_notifications( + self, phone_number: str + ) -> list[Notification]: """Return recent notifications for *phone_number*.""" ... @@ -65,7 +113,9 @@ class FirestoreNotificationBackend: self._max_to_notify = max_to_notify self._window_hours = window_hours - async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: + async def get_recent_notifications( + self, phone_number: str + ) -> list[Notification]: """Get recent notifications for a user. Retrieves notifications created within the configured time window, @@ -75,14 +125,7 @@ class FirestoreNotificationBackend: phone_number: User's phone number (used as document ID) Returns: - List of notification dictionaries with structure: - { - "id_notificacion": str, - "texto": str, - "status": str, - "timestamp_creacion": timestamp, - "parametros": {...} - } + List of validated :class:`Notification` instances. """ try: @@ -96,17 +139,21 @@ class FirestoreNotificationBackend: return [] data = doc.to_dict() or {} - all_notifications = data.get("notificaciones", []) + document = NotificationDocument.model_validate(data) - if not all_notifications: + if not document.notificaciones: logger.info("No notifications in array for phone: %s", phone_number) return [] cutoff = time.time() - (self._window_hours * 3600) - recent = [n for n in all_notifications if _extract_ts(n) >= cutoff] + parsed = [ + n + for n in document.notificaciones + if n.timestamp_creacion >= cutoff + ] - if not recent: + if not parsed: logger.info( "No notifications within the last %.0fh for phone: %s", self._window_hours, @@ -114,13 +161,13 @@ class FirestoreNotificationBackend: ) return [] - recent.sort(key=_extract_ts, reverse=True) + parsed.sort(key=lambda n: n.timestamp_creacion, reverse=True) - result = recent[: self._max_to_notify] + result = parsed[: self._max_to_notify] logger.info( "Found %d recent notifications for phone: %s (returning top %d)", - len(recent), + len(parsed), phone_number, len(result), ) @@ -165,7 +212,9 @@ class RedisNotificationBackend: self._max_to_notify = max_to_notify self._window_hours = window_hours - async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: + async def get_recent_notifications( + self, phone_number: str + ) -> list[Notification]: """Get recent notifications for a user from Redis. Reads from the ``notification:{phone}`` key, parses the JSON @@ -185,10 +234,9 @@ class RedisNotificationBackend: ) return [] - data = json.loads(raw) - all_notifications: list[dict[str, Any]] = data.get("notificaciones", []) + document = NotificationDocument.model_validate(json.loads(raw)) - if not all_notifications: + if not document.notificaciones: logger.info( "No notifications in array for phone: %s", phone_number, @@ -197,9 +245,13 @@ class RedisNotificationBackend: cutoff = time.time() - (self._window_hours * 3600) - recent = [n for n in all_notifications if _extract_ts(n) >= cutoff] + parsed = [ + n + for n in document.notificaciones + if n.timestamp_creacion >= cutoff + ] - if not recent: + if not parsed: logger.info( "No notifications within the last %.0fh for phone: %s", self._window_hours, @@ -207,13 +259,13 @@ class RedisNotificationBackend: ) return [] - recent.sort(key=_extract_ts, reverse=True) + parsed.sort(key=lambda n: n.timestamp_creacion, reverse=True) - result = recent[: self._max_to_notify] + result = parsed[: self._max_to_notify] logger.info( "Found %d recent notifications for phone: %s (returning top %d)", - len(recent), + len(parsed), phone_number, len(result), ) -- 2.49.1 From 014ceb9da6b8b111b6f1cc0d3a6ad9734f4f9850 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Tue, 10 Mar 2026 23:49:24 +0000 Subject: [PATCH 2/2] formatting --- src/va_agent/notifications.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/src/va_agent/notifications.py b/src/va_agent/notifications.py index 4177937..2e12eda 100644 --- a/src/va_agent/notifications.py +++ b/src/va_agent/notifications.py @@ -78,9 +78,7 @@ class NotificationDocument(BaseModel): class NotificationBackend(Protocol): """Backend-agnostic interface for notification storage.""" - async def get_recent_notifications( - self, phone_number: str - ) -> list[Notification]: + async def get_recent_notifications(self, phone_number: str) -> list[Notification]: """Return recent notifications for *phone_number*.""" ... @@ -113,9 +111,7 @@ class FirestoreNotificationBackend: self._max_to_notify = max_to_notify self._window_hours = window_hours - async def get_recent_notifications( - self, phone_number: str - ) -> list[Notification]: + async def get_recent_notifications(self, phone_number: str) -> list[Notification]: """Get recent notifications for a user. Retrieves notifications created within the configured time window, @@ -148,9 +144,7 @@ class FirestoreNotificationBackend: cutoff = time.time() - (self._window_hours * 3600) parsed = [ - n - for n in document.notificaciones - if n.timestamp_creacion >= cutoff + n for n in document.notificaciones if n.timestamp_creacion >= cutoff ] if not parsed: @@ -212,9 +206,7 @@ class RedisNotificationBackend: self._max_to_notify = max_to_notify self._window_hours = window_hours - async def get_recent_notifications( - self, phone_number: str - ) -> list[Notification]: + async def get_recent_notifications(self, phone_number: str) -> list[Notification]: """Get recent notifications for a user from Redis. Reads from the ``notification:{phone}`` key, parses the JSON @@ -246,9 +238,7 @@ class RedisNotificationBackend: cutoff = time.time() - (self._window_hours * 3600) parsed = [ - n - for n in document.notificaciones - if n.timestamp_creacion >= cutoff + n for n in document.notificaciones if n.timestamp_creacion >= cutoff ] if not parsed: -- 2.49.1