# /// script # requires-python = ">=3.12" # dependencies = ["redis>=5.0"] # /// """Register a new notification in Redis for a given phone number. Usage: REDIS_HOST=10.33.22.4 uv run utils/register_notification.py The notification content is randomly picked from a predefined set based on existing entries in Memorystore. """ import json import os import random import sys import uuid from datetime import UTC, datetime import redis HOST = os.environ.get("REDIS_HOST", "127.0.0.1") PORT = int(os.environ.get("REDIS_PORT", "6379")) TTL_SECONDS = 18 * 24 * 3600 # ~18 days, matching existing keys NOTIFICATION_TEMPLATES = [ { "texto": ( "Se detectó un cargo de $1,500 en tu cuenta" ), "parametros": { "notification_po_transaction_id": "TXN15367", "notification_po_amount": 5814, }, }, { "texto": ( "💡 Recuerda que puedes obtener tu Adelanto de Nómina en cualquier" " momento, sólo tienes que seleccionar Solicitud adelanto de Nómina" " en tu app." ), "parametros": { "notification_po_Categoria": "Adelanto de Nómina solicitud", "notification_po_caption": "Adelanto de Nómina", "notification_po_CTA": "Realiza la solicitud desde tu app", "notification_po_Descripcion": ( "Notificación para incentivar la solicitud de Adelanto de" " Nómina desde la APP" ), "notification_po_link": ( "https://public-media.yalochat.com/banorte/" "1764025754-10e06fb8-b4e6-484c-ad0b-7f677429380e-03-ADN-Toque-1.jpg" ), "notification_po_Beneficios": ( "Tasa de interés de 0%: Solicita tu Adelanto sin preocuparte" " por los intereses, así de fácil. No requiere garantías o aval." ), "notification_po_Requisitos": ( "Tener Cuenta Digital o Cuenta Digital Ilimitada con dispersión" " de Nómina No tener otro Adelanto vigente Ingreso neto mensual" " mayor a $2,000" ), }, }, { "texto": ( "Estás a un clic de Programa de Lealtad, entra a tu app y finaliza" " Tu contratación en instantes. ⏱ 🤳" ), "parametros": { "notification_po_Categoria": "Tarjeta de Crédito Contratación", "notification_po_caption": "Tarjeta de Crédito", "notification_po_CTA": "Entra a tu app y contrata en instantes", "notification_po_Descripcion": ( "Notificación para terminar el proceso de contratación de la" " Tarjeta de Crédito, desde la app" ), "notification_po_link": ( "https://public-media.yalochat.com/banorte/" "1764363798-05dadc23-6e47-447c-8e38-0346f25e31c0-15-TDC-Toque-1.jpg" ), "notification_po_Beneficios": ( "Acceso al Programa de Lealtad: Cada compra suma, gana" " experiencias exclusivas" ), "notification_po_Requisitos": ( "Ser persona física o física con actividad empresarial." " Ingresos mínimos de $2,000 pesos mensuales. Sin historial de" " crédito o con buró positivo" ), }, }, { "texto": ( "🚀 ¿Listo para obtener tu Cápsula Plus? Continúa en tu app y" " termina al instante. Conoce más en: va.app" ), "parametros": {}, }, { "texto": ( "🚀 ¿Listo para obtener tu Cuenta Digital ilimitada? Continúa en" " tu app y termina al instante. Conoce más en: va.app" ), "parametros": {}, }, ] def main() -> None: if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} ") sys.exit(1) phone = sys.argv[1] r = redis.Redis(host=HOST, port=PORT, decode_responses=True, socket_connect_timeout=5) now = datetime.now(UTC).isoformat() template = random.choice(NOTIFICATION_TEMPLATES) notification = { "id_notificacion": str(uuid.uuid4()), "telefono": phone, "timestamp_creacion": now, "texto": template["texto"], "nombre_evento_dialogflow": "notificacion", "codigo_idioma_dialogflow": "es", "parametros": template["parametros"], "status": "active", } session_key = f"notification:{phone}" existing = r.get(session_key) if existing: session = json.loads(existing) session["ultima_actualizacion"] = now session["notificaciones"].append(notification) else: session = { "session_id": phone, "telefono": phone, "fecha_creacion": now, "ultima_actualizacion": now, "notificaciones": [notification], } r.set(session_key, json.dumps(session, ensure_ascii=False), ex=TTL_SECONDS) r.set(f"notification:phone_to_notification:{phone}", phone, ex=TTL_SECONDS) total = len(session["notificaciones"]) print(f"✅ Registered notification for {phone}") print(f" ID: {notification['id_notificacion']}") print(f" Text: {template['texto'][:80]}...") print(f" Total notifications for this phone: {total}") if __name__ == "__main__": main()