# /// script # requires-python = ">=3.12" # dependencies = ["google-cloud-firestore>=2.0", "pyyaml>=6.0"] # /// """Register a new notification in Firestore for a given phone number. Usage: uv run utils/register_notification_firestore.py Reads project/database/collection settings from config.yaml. """ import random import sys import time import uuid import yaml from google.cloud.firestore import Client NOTIFICATION_TEMPLATES = [ { "texto": "Se detectó un cargo de $1,500 en tu cuenta", "parametros": { "notification_po_transaction_id": "TXN15367", "notification_po_amount": 5814, }, }, { "texto": ( "💡 Recuerda que puedes obtener tu Adelanto de Nómina en" " cualquier momento, sólo tienes que seleccionar Solicitud" " adelanto de Nómina en tu app." ), "parametros": { "notification_po_Categoria": "Adelanto de Nómina solicitud", "notification_po_caption": "Adelanto de Nómina", }, }, { "texto": ( "Estás a un clic de Programa de Lealtad, entra a tu app y" " finaliza Tu contratación en instantes. ⏱ 🤳" ), "parametros": { "notification_po_Categoria": "Tarjeta de Crédito Contratación", "notification_po_caption": "Tarjeta de Crédito", }, }, { "texto": ( "🚀 ¿Listo para obtener tu Cápsula Plus? Continúa en tu app" " y termina al instante. Conoce más en: va.app" ), "parametros": {}, }, ] def main() -> None: if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} ") sys.exit(1) phone = sys.argv[1] with open("config.yaml") as f: cfg = yaml.safe_load(f) db = Client( project=cfg["google_cloud_project"], database=cfg["firestore_db"], ) collection_path = cfg["notifications_collection_path"] doc_ref = db.collection(collection_path).document(phone) template = random.choice(NOTIFICATION_TEMPLATES) notification = { "id_notificacion": str(uuid.uuid4()), "telefono": phone, "timestamp_creacion": time.time(), "texto": template["texto"], "nombre_evento_dialogflow": "notificacion", "codigo_idioma_dialogflow": "es", "parametros": template["parametros"], "status": "active", } doc = doc_ref.get() if doc.exists: data = doc.to_dict() or {} notifications = data.get("notificaciones", []) notifications.append(notification) doc_ref.update({"notificaciones": notifications}) else: doc_ref.set({"notificaciones": [notification]}) total = len(doc_ref.get().to_dict().get("notificaciones", [])) print(f"✅ Registered notification for {phone}") print(f" ID: {notification['id_notificacion']}") print(f" Text: {template['texto'][:80]}...") print(f" Collection: {collection_path}") print(f" Total notifications for this phone: {total}") if __name__ == "__main__": main()