# /// script # requires-python = ">=3.12" # dependencies = ["google-cloud-firestore>=2.0", "pyyaml>=6.0"] # /// """Register a new notification in Firestore for a given phone number. Usage: uv run utils/register_notification_firestore.py Reads project/database/collection settings from config.yaml. The generated notification follows the latest English-camelCase schema used in the production collection (``artifacts/default-app-id/notifications``). """ import random import sys import uuid from datetime import datetime, timezone import yaml from google.cloud.firestore import Client, SERVER_TIMESTAMP NOTIFICATION_TEMPLATES = [ { "text": "Se detectó un cargo de $1,500 en tu cuenta", "parameters": { "notification_po_transaction_id": "TXN15367", "notification_po_amount": 5814, }, }, { "text": ( "💡 Recuerda que puedes obtener tu Adelanto de Nómina en" " cualquier momento, sólo tienes que seleccionar Solicitud" " adelanto de Nómina en tu app." ), "parameters": { "notification_po_Categoria": "Adelanto de Nómina solicitud", "notification_po_caption": "Adelanto de Nómina", }, }, { "text": ( "Estás a un clic de Programa de Lealtad, entra a tu app y" " finaliza Tu contratación en instantes. ⏱ 🤳" ), "parameters": { "notification_po_Categoria": "Tarjeta de Crédito Contratación", "notification_po_caption": "Tarjeta de Crédito", }, }, { "text": ( "🚀 ¿Listo para obtener tu Cápsula Plus? Continúa en tu app" " y termina al instante. Conoce más en: va.app" ), "parameters": {}, }, ] def main() -> None: if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} ") sys.exit(1) phone = sys.argv[1] with open("config.yaml") as f: cfg = yaml.safe_load(f) db = Client( project=cfg["google_cloud_project"], database=cfg["firestore_db"], ) collection_path = cfg["notifications_collection_path"] doc_ref = db.collection(collection_path).document(phone) now = datetime.now(tz=timezone.utc) template = random.choice(NOTIFICATION_TEMPLATES) notification = { "notificationId": str(uuid.uuid4()), "telefono": phone, "timestampCreacion": now, "text": template["text"], "event": "notificacion", "languageCode": "es", "parameters": template["parameters"], "status": "active", } doc = doc_ref.get() if doc.exists: data = doc.to_dict() or {} notifications = data.get("notificaciones", []) notifications.append(notification) doc_ref.update({ "notificaciones": notifications, "ultimaActualizacion": SERVER_TIMESTAMP, }) else: doc_ref.set({ "sessionId": "", "telefono": phone, "fechaCreacion": SERVER_TIMESTAMP, "ultimaActualizacion": SERVER_TIMESTAMP, "notificaciones": [notification], }) total = len(doc_ref.get().to_dict().get("notificaciones", [])) print(f"✅ Registered notification for {phone}") print(f" ID: {notification['notificationId']}") print(f" Text: {template['text'][:80]}...") print(f" Collection: {collection_path}") print(f" Total notifications for this phone: {total}") if __name__ == "__main__": main()