From ad7a94fc149e444a3c157d5cfbb1574b2be47c38 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Mon, 9 Mar 2026 07:33:28 +0000 Subject: [PATCH] Change dynamic from 'pending' to 'recent' --- src/va_agent/dynamic_instruction.py | 89 ++++++++++--------- src/va_agent/notifications.py | 8 +- utils/check_notifications_firestore.py | 107 ++++++++++++++++++++++ utils/register_notification_firestore.py | 108 +++++++++++++++++++++++ 4 files changed, 266 insertions(+), 46 deletions(-) create mode 100644 utils/check_notifications_firestore.py create mode 100644 utils/register_notification_firestore.py diff --git a/src/va_agent/dynamic_instruction.py b/src/va_agent/dynamic_instruction.py index 4854480..664bad4 100644 --- a/src/va_agent/dynamic_instruction.py +++ b/src/va_agent/dynamic_instruction.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import time from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -13,17 +14,38 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +_SECONDS_PER_MINUTE = 60 +_SECONDS_PER_HOUR = 3600 +_MINUTES_PER_HOUR = 60 +_HOURS_PER_DAY = 24 + + +def _format_time_ago(now: float, ts: float) -> str: + """Return a human-readable Spanish label like 'hace 3 horas'.""" + diff = max(now - ts, 0) + minutes = int(diff // _SECONDS_PER_MINUTE) + hours = int(diff // _SECONDS_PER_HOUR) + + if minutes < 1: + return "justo ahora" + if minutes < _MINUTES_PER_HOUR: + return f"hace {minutes} min" + if hours < _HOURS_PER_DAY: + return f"hace {hours}h" + days = hours // _HOURS_PER_DAY + return f"hace {days}d" + + async def provide_dynamic_instruction( notification_service: NotificationBackend, ctx: ReadonlyContext | None = None, ) -> str: - """Provide dynamic instructions based on pending notifications. + """Provide dynamic instructions based on recent notifications. This function is called by the ADK agent on each message. It: - 1. Checks if this is the first message in the session (< 2 events) - 2. Queries Firestore for pending notifications - 3. Marks them as notified - 4. Returns a dynamic instruction for the agent to mention them + 1. Queries Firestore for recent notifications + 2. Marks them as notified + 3. Returns a dynamic instruction for the agent to mention them Args: notification_service: Service for fetching/marking notifications @@ -43,71 +65,54 @@ async def provide_dynamic_instruction( logger.debug("No session available for dynamic instruction") return "" - # FOR TESTING: Always check for notifications - # (comment out to enable first-message-only) - # Only check on first message (when events list is empty - # or has only 1-2 events) - # Events include both user and agent messages, so < 2 means first interaction - # event_count = len(session.events) if session.events else 0 - # - # if event_count >= 2: - # logger.debug( - # "Skipping notification check: not first message (event_count=%d)", - # event_count, - # ) - # return "" - # Extract phone number from user_id (they are the same in this implementation) phone_number = session.user_id logger.info( - "First message detected for user %s, checking for pending notifications", + "Checking recent notifications for user %s", phone_number, ) try: - # Fetch pending notifications - pending_notifications = await notification_service.get_pending_notifications( + # Fetch recent notifications + recent_notifications = await notification_service.get_recent_notifications( phone_number ) - if not pending_notifications: - logger.info("No pending notifications for user %s", phone_number) + if not recent_notifications: + logger.info("No recent notifications for user %s", phone_number) return "" # Build dynamic instruction with notification details notification_ids = [ nid - for n in pending_notifications + for n in recent_notifications if (nid := n.get("id_notificacion")) is not None ] - count = len(pending_notifications) + count = len(recent_notifications) - # Format notification details for the agent + # Format notification details for the agent (most recent first) + now = time.time() notification_details = [] - for notif in pending_notifications: + for i, notif in enumerate(recent_notifications, 1): evento = notif.get("nombre_evento_dialogflow", "notificacion") texto = notif.get("texto", "Sin texto") - notification_details.append(f" - Evento: {evento} | Texto: {texto}") + ts = notif.get("timestamp_creacion", notif.get("timestampCreacion", 0)) + ago = _format_time_ago(now, ts) + notification_details.append( + f" {i}. [{ago}] Evento: {evento} | Texto: {texto}" + ) details_text = "\n".join(notification_details) + header = ( + f"Estas son {count} notificación(es) reciente(s)" + " de las cuales el usuario podría preguntar más:" + ) instruction = f""" -IMPORTANTE - NOTIFICACIONES PENDIENTES: - -El usuario tiene {count} notificación(es) sin leer: +{header} {details_text} - -INSTRUCCIONES: -- Menciona estas notificaciones de forma natural en tu respuesta inicial -- No necesitas leerlas todas literalmente, solo hazle saber que las tiene -- Sé breve y directo según tu personalidad (directo y cálido) -- Si el usuario pregunta algo específico, prioriza responder eso primero\ - y luego menciona las notificaciones - -Ejemplo: "¡Hola! 👋 Tienes {count} notificación(es)\ - pendiente(s). ¿Te gustaría revisarlas?" """ # Mark notifications as notified in Firestore diff --git a/src/va_agent/notifications.py b/src/va_agent/notifications.py index 68ccd7b..fde23a2 100644 --- a/src/va_agent/notifications.py +++ b/src/va_agent/notifications.py @@ -16,10 +16,10 @@ logger = logging.getLogger(__name__) class NotificationBackend(Protocol): """Backend-agnostic interface for notification storage.""" - async def get_pending_notifications( + async def get_recent_notifications( self, phone_number: str ) -> list[dict[str, Any]]: - """Return pending (unread) notifications for *phone_number*.""" + """Return recent notifications for *phone_number*.""" ... async def mark_as_notified( @@ -51,7 +51,7 @@ class FirestoreNotificationBackend: self._max_to_notify = max_to_notify self._window_hours = window_hours - async def get_pending_notifications( + async def get_recent_notifications( self, phone_number: str ) -> list[dict[str, Any]]: """Get recent notifications for a user. @@ -159,7 +159,7 @@ class RedisNotificationBackend: self._max_to_notify = max_to_notify self._window_hours = window_hours - async def get_pending_notifications( + async def get_recent_notifications( self, phone_number: str ) -> list[dict[str, Any]]: """Get recent notifications for a user from Redis. diff --git a/utils/check_notifications_firestore.py b/utils/check_notifications_firestore.py new file mode 100644 index 0000000..1408943 --- /dev/null +++ b/utils/check_notifications_firestore.py @@ -0,0 +1,107 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = ["google-cloud-firestore>=2.0", "pyyaml>=6.0"] +# /// +"""Check recent notifications in Firestore for a phone number. + +Usage: + uv run utils/check_notifications_firestore.py + uv run utils/check_notifications_firestore.py --hours 24 +""" + +import sys +import time + +import yaml +from google.cloud.firestore import Client + +_SECONDS_PER_HOUR = 3600 +_DEFAULT_WINDOW_HOURS = 48 + + +def main() -> None: + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} [--hours N]") + sys.exit(1) + + phone = sys.argv[1] + window_hours = _DEFAULT_WINDOW_HOURS + if "--hours" in sys.argv: + idx = sys.argv.index("--hours") + window_hours = float(sys.argv[idx + 1]) + + with open("config.yaml") as f: + cfg = yaml.safe_load(f) + + db = Client( + project=cfg["google_cloud_project"], + database=cfg["firestore_db"], + ) + + collection_path = cfg["notifications_collection_path"] + doc_ref = db.collection(collection_path).document(phone) + doc = doc_ref.get() + + if not doc.exists: + print(f"📭 No notifications found for {phone}") + sys.exit(0) + + data = doc.to_dict() or {} + all_notifications = data.get("notificaciones", []) + + if not all_notifications: + print(f"📭 No notifications found for {phone}") + sys.exit(0) + + cutoff = time.time() - (window_hours * _SECONDS_PER_HOUR) + + def _ts(n: dict) -> float: + return n.get("timestamp_creacion", n.get("timestampCreacion", 0)) + + recent = [n for n in all_notifications if _ts(n) >= cutoff] + recent.sort(key=_ts, reverse=True) + + if not recent: + print( + f"📭 No notifications within the last" + f" {window_hours:.0f}h for {phone}" + ) + sys.exit(0) + + print( + f"🔔 {len(recent)} notification(s) for {phone}" + f" (last {window_hours:.0f}h)\n" + ) + now = time.time() + for i, n in enumerate(recent, 1): + ts = _ts(n) + ago = _format_time_ago(now, ts) + categoria = n.get("parametros", {}).get( + "notification_po_Categoria", "" + ) + texto = n.get("texto", "") + print(f" [{i}] {ago}") + print(f" ID: {n.get('id_notificacion', '?')}") + if categoria: + print(f" Category: {categoria}") + print(f" {texto[:120]}{'…' if len(texto) > 120 else ''}") + print() + + +def _format_time_ago(now: float, ts: float) -> str: + diff = max(now - ts, 0) + minutes = int(diff // 60) + hours = int(diff // _SECONDS_PER_HOUR) + + if minutes < 1: + return "justo ahora" + if minutes < 60: + return f"hace {minutes} min" + if hours < 24: + return f"hace {hours}h" + days = hours // 24 + return f"hace {days}d" + + +if __name__ == "__main__": + main() diff --git a/utils/register_notification_firestore.py b/utils/register_notification_firestore.py new file mode 100644 index 0000000..c5f01dc --- /dev/null +++ b/utils/register_notification_firestore.py @@ -0,0 +1,108 @@ +# /// script +# requires-python = ">=3.12" +# dependencies = ["google-cloud-firestore>=2.0", "pyyaml>=6.0"] +# /// +"""Register a new notification in Firestore for a given phone number. + +Usage: + uv run utils/register_notification_firestore.py + +Reads project/database/collection settings from config.yaml. +""" + +import random +import sys +import time +import uuid + +import yaml +from google.cloud.firestore import Client + +NOTIFICATION_TEMPLATES = [ + { + "texto": "Se detectó un cargo de $1,500 en tu cuenta", + "parametros": { + "notification_po_transaction_id": "TXN15367", + "notification_po_amount": 5814, + }, + }, + { + "texto": ( + "💡 Recuerda que puedes obtener tu Adelanto de Nómina en" + " cualquier momento, sólo tienes que seleccionar Solicitud" + " adelanto de Nómina en tu app." + ), + "parametros": { + "notification_po_Categoria": "Adelanto de Nómina solicitud", + "notification_po_caption": "Adelanto de Nómina", + }, + }, + { + "texto": ( + "Estás a un clic de Programa de Lealtad, entra a tu app y" + " finaliza Tu contratación en instantes. ⏱ 🤳" + ), + "parametros": { + "notification_po_Categoria": "Tarjeta de Crédito Contratación", + "notification_po_caption": "Tarjeta de Crédito", + }, + }, + { + "texto": ( + "🚀 ¿Listo para obtener tu Cápsula Plus? Continúa en tu app" + " y termina al instante. Conoce más en: va.app" + ), + "parametros": {}, + }, +] + + +def main() -> None: + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + + phone = sys.argv[1] + + with open("config.yaml") as f: + cfg = yaml.safe_load(f) + + db = Client( + project=cfg["google_cloud_project"], + database=cfg["firestore_db"], + ) + + collection_path = cfg["notifications_collection_path"] + doc_ref = db.collection(collection_path).document(phone) + + template = random.choice(NOTIFICATION_TEMPLATES) + notification = { + "id_notificacion": str(uuid.uuid4()), + "telefono": phone, + "timestamp_creacion": time.time(), + "texto": template["texto"], + "nombre_evento_dialogflow": "notificacion", + "codigo_idioma_dialogflow": "es", + "parametros": template["parametros"], + "status": "active", + } + + doc = doc_ref.get() + if doc.exists: + data = doc.to_dict() or {} + notifications = data.get("notificaciones", []) + notifications.append(notification) + doc_ref.update({"notificaciones": notifications}) + else: + doc_ref.set({"notificaciones": [notification]}) + + total = len(doc_ref.get().to_dict().get("notificaciones", [])) + print(f"✅ Registered notification for {phone}") + print(f" ID: {notification['id_notificacion']}") + print(f" Text: {template['texto'][:80]}...") + print(f" Collection: {collection_path}") + print(f" Total notifications for this phone: {total}") + + +if __name__ == "__main__": + main()