From 5a353f9582e7b46c3d765688b5066359b3a63f32 Mon Sep 17 00:00:00 2001 From: A8072846 Date: Tue, 3 Mar 2026 22:24:31 +0000 Subject: [PATCH] Add notification service using Google ADK --- config.yaml | 4 + src/va_agent/agent.py | 37 ++++- src/va_agent/config.py | 6 + src/va_agent/dynamic_instruction.py | 120 ++++++++++++++ src/va_agent/notifications.py | 232 ++++++++++++++++++++++++++++ 5 files changed, 392 insertions(+), 7 deletions(-) create mode 100644 src/va_agent/dynamic_instruction.py create mode 100644 src/va_agent/notifications.py diff --git a/config.yaml b/config.yaml index f8be685..2af4060 100644 --- a/config.yaml +++ b/config.yaml @@ -3,6 +3,10 @@ google_cloud_location: us-central1 firestore_db: bnt-orquestador-cognitivo-firestore-bdo-dev +# Notifications configuration +notifications_collection_path: "artifacts/bnt-orquestador-cognitivo-dev/notifications" +notifications_max_to_notify: 5 + mcp_remote_url: "https://ap01194-orq-cog-rag-connector-1007577023101.us-central1.run.app/mcp" # audience sin la ruta, para emitir el ID Token: mcp_audience: "https://ap01194-orq-cog-rag-connector-1007577023101.us-central1.run.app" diff --git a/src/va_agent/agent.py b/src/va_agent/agent.py index 3ffcc13..8846292 100644 --- a/src/va_agent/agent.py +++ b/src/va_agent/agent.py @@ -1,34 +1,57 @@ """ADK agent with vector search RAG tool.""" +from functools import partial + from google import genai from google.adk.agents.llm_agent import Agent from google.adk.runners import Runner from google.adk.tools.mcp_tool import McpToolset from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams from google.cloud.firestore_v1.async_client import AsyncClient +from google.genai.types import Content, Part from va_agent.auth import auth_headers_provider from va_agent.config import settings +from va_agent.dynamic_instruction import provide_dynamic_instruction +from va_agent.notifications import NotificationService from va_agent.session import FirestoreSessionService +# MCP Toolset for RAG knowledge search toolset = McpToolset( connection_params=StreamableHTTPConnectionParams(url=settings.mcp_remote_url), header_provider=auth_headers_provider, ) -agent = Agent( - model=settings.agent_model, - name=settings.agent_name, - instruction=settings.agent_instructions, - tools=[toolset], -) +# Shared Firestore client for session service and notifications +firestore_db = AsyncClient(database=settings.firestore_db) +# Session service with compaction session_service = FirestoreSessionService( - db=AsyncClient(database=settings.firestore_db), + db=firestore_db, compaction_token_threshold=10_000, genai_client=genai.Client(), ) +# Notification service +notification_service = NotificationService( + db=firestore_db, + collection_path=settings.notifications_collection_path, + max_to_notify=settings.notifications_max_to_notify, +) + +# Agent with static and dynamic instructions +agent = Agent( + model=settings.agent_model, + name=settings.agent_name, + static_instruction=Content( + role="user", + parts=[Part(text=settings.agent_instructions)], + ), + instruction=partial(provide_dynamic_instruction, notification_service), + tools=[toolset], +) + +# Runner runner = Runner( app_name="va_agent", agent=agent, diff --git a/src/va_agent/config.py b/src/va_agent/config.py index 5e112b1..f3502b5 100644 --- a/src/va_agent/config.py +++ b/src/va_agent/config.py @@ -26,6 +26,12 @@ class AgentSettings(BaseSettings): # Firestore configuration firestore_db: str + # Notifications configuration + notifications_collection_path: str = ( + "artifacts/bnt-orquestador-cognitivo-dev/notifications" + ) + notifications_max_to_notify: int = 5 + # MCP configuration mcp_audience: str mcp_remote_url: str diff --git a/src/va_agent/dynamic_instruction.py b/src/va_agent/dynamic_instruction.py new file mode 100644 index 0000000..86f190d --- /dev/null +++ b/src/va_agent/dynamic_instruction.py @@ -0,0 +1,120 @@ +"""Dynamic instruction provider for VAia agent.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from google.adk.agents.readonly_context import ReadonlyContext + + from va_agent.notifications import NotificationService + +logger = logging.getLogger(__name__) + + +async def provide_dynamic_instruction( + notification_service: NotificationService, + ctx: ReadonlyContext | None = None, +) -> str: + """Provide dynamic instructions based on pending notifications. + + This function is called by the ADK agent on each message. It: + 1. Checks if this is the first message in the session (< 2 events) + 2. Queries Firestore for pending notifications + 3. Marks them as notified + 4. Returns a dynamic instruction for the agent to mention them + + Args: + notification_service: Service for fetching/marking notifications + ctx: Agent context containing session information + + Returns: + Dynamic instruction string (empty if no notifications or not first message) + + """ + # Only check notifications on the first message + if not ctx or not ctx._invocation_context: + logger.debug("No context available for dynamic instruction") + return "" + + session = ctx._invocation_context.session + if not session: + logger.debug("No session available for dynamic instruction") + return "" + + # FOR TESTING: Always check for notifications (comment out to enable first-message-only) + # Only check on first message (when events list is empty or has only 1-2 events) + # Events include both user and agent messages, so < 2 means first interaction + # event_count = len(session.events) if session.events else 0 + # + # if event_count >= 2: + # logger.debug( + # "Skipping notification check: not first message (event_count=%d)", + # event_count, + # ) + # return "" + + # Extract phone number from user_id (they are the same in this implementation) + phone_number = session.user_id + + logger.info( + "First message detected for user %s, checking for pending notifications", + phone_number, + ) + + try: + # Fetch pending notifications + pending_notifications = await notification_service.get_pending_notifications( + phone_number + ) + + if not pending_notifications: + logger.info("No pending notifications for user %s", phone_number) + return "" + + # Build dynamic instruction with notification details + notification_ids = [n.get("id_notificacion") for n in pending_notifications] + count = len(pending_notifications) + + # Format notification details for the agent + notification_details = [] + for notif in pending_notifications: + evento = notif.get("nombre_evento_dialogflow", "notificacion") + texto = notif.get("texto", "Sin texto") + notification_details.append(f" - Evento: {evento} | Texto: {texto}") + + details_text = "\n".join(notification_details) + + instruction = f""" +IMPORTANTE - NOTIFICACIONES PENDIENTES: + +El usuario tiene {count} notificación(es) sin leer: + +{details_text} + +INSTRUCCIONES: +- Menciona estas notificaciones de forma natural en tu respuesta inicial +- No necesitas leerlas todas literalmente, solo hazle saber que las tiene +- Sé breve y directo según tu personalidad (directo y cálido) +- Si el usuario pregunta algo específico, prioriza responder eso primero y luego menciona las notificaciones + +Ejemplo: "¡Hola! 👋 Antes de empezar, veo que tienes {count} notificación(es) pendiente(s) en tu cuenta. ¿Te gustaría revisarlas o prefieres que te ayude con algo más?" +""" + + # Mark notifications as notified in Firestore + await notification_service.mark_as_notified(phone_number, notification_ids) + + logger.info( + "Returning dynamic instruction with %d notification(s) for user %s", + count, + phone_number, + ) + + return instruction + + except Exception: + logger.exception( + "Error building dynamic instruction for user %s", phone_number + ) + return "" diff --git a/src/va_agent/notifications.py b/src/va_agent/notifications.py new file mode 100644 index 0000000..e7cc57a --- /dev/null +++ b/src/va_agent/notifications.py @@ -0,0 +1,232 @@ +"""Notification management for VAia agent.""" + +from __future__ import annotations + +import logging +import time +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from google.cloud.firestore_v1.async_client import AsyncClient + +logger = logging.getLogger(__name__) + + +class NotificationService: + """Service for fetching and managing user notifications from Firestore.""" + + def __init__( + self, + *, + db: AsyncClient, + collection_path: str, + max_to_notify: int = 5, + ) -> None: + """Initialize NotificationService. + + Args: + db: Firestore async client + collection_path: Path to notifications collection + max_to_notify: Maximum number of notifications to return + + """ + self._db = db + self._collection_path = collection_path + self._max_to_notify = max_to_notify + + async def get_pending_notifications( + self, phone_number: str + ) -> list[dict[str, Any]]: + """Get pending notifications for a user. + + Retrieves notifications that have not been notified by the agent yet, + ordered by timestamp (most recent first), limited to max_to_notify. + + Args: + phone_number: User's phone number (used as document ID) + + Returns: + List of notification dictionaries with structure: + { + "id_notificacion": str, + "texto": str, + "status": str, + "timestamp_creacion": timestamp, + "parametros": {...} + } + + """ + try: + # Query Firestore document by phone number + doc_ref = self._db.collection(self._collection_path).document( + phone_number + ) + doc = await doc_ref.get() + + if not doc.exists: + logger.info( + "No notification document found for phone: %s", phone_number + ) + return [] + + data = doc.to_dict() or {} + all_notifications = data.get("notificaciones", []) + + if not all_notifications: + logger.info("No notifications in array for phone: %s", phone_number) + return [] + + # Filter notifications that have NOT been notified by the agent + pending = [ + n + for n in all_notifications + if not n.get("notified_by_agent", False) + ] + + if not pending: + logger.info( + "All notifications already notified for phone: %s", phone_number + ) + return [] + + # Sort by timestamp_creacion (most recent first) + pending.sort( + key=lambda n: n.get("timestamp_creacion", 0), reverse=True + ) + + # Return top N most recent + result = pending[: self._max_to_notify] + + logger.info( + "Found %d pending notifications for phone: %s (returning top %d)", + len(pending), + phone_number, + len(result), + ) + + return result + + except Exception: + logger.exception( + "Failed to fetch notifications for phone: %s", phone_number + ) + return [] + + async def mark_as_notified( + self, phone_number: str, notification_ids: list[str] + ) -> bool: + """Mark notifications as notified by the agent. + + Updates the notifications in Firestore by adding: + - notified_by_agent: true + - notified_at: current timestamp + + Args: + phone_number: User's phone number (document ID) + notification_ids: List of id_notificacion values to mark + + Returns: + True if update was successful, False otherwise + + """ + if not notification_ids: + return True + + try: + doc_ref = self._db.collection(self._collection_path).document( + phone_number + ) + doc = await doc_ref.get() + + if not doc.exists: + logger.warning( + "Cannot mark notifications as notified: document not found for %s", + phone_number, + ) + return False + + data = doc.to_dict() or {} + notificaciones = data.get("notificaciones", []) + + if not notificaciones: + logger.warning( + "Cannot mark notifications: empty array for %s", phone_number + ) + return False + + # Update matching notifications + now = time.time() + updated_count = 0 + + for notif in notificaciones: + if notif.get("id_notificacion") in notification_ids: + notif["notified_by_agent"] = True + notif["notified_at"] = now + updated_count += 1 + + if updated_count == 0: + logger.warning( + "No notifications matched IDs for phone: %s", phone_number + ) + return False + + # Save back to Firestore + await doc_ref.update( + { + "notificaciones": notificaciones, + "ultima_actualizacion": now, + } + ) + + logger.info( + "Marked %d notification(s) as notified for phone: %s", + updated_count, + phone_number, + ) + + return True + + except Exception: + logger.exception( + "Failed to mark notifications as notified for phone: %s", + phone_number, + ) + return False + + def format_notification_summary( + self, notifications: list[dict[str, Any]] + ) -> str: + """Format notifications into a human-readable summary. + + Args: + notifications: List of notification dictionaries + + Returns: + Formatted string summarizing the notifications + + """ + if not notifications: + return "" + + count = len(notifications) + summary_lines = [ + f"El usuario tiene {count} notificación(es) pendiente(s):" + ] + + for i, notif in enumerate(notifications, 1): + texto = notif.get("texto", "Sin texto") + params = notif.get("parametros", {}) + + # Extract key parameters if available + amount = params.get("notification_po_amount") + tx_id = params.get("notification_po_transaction_id") + + line = f"{i}. {texto}" + if amount: + line += f" (monto: ${amount})" + if tx_id: + line += f" [ID: {tx_id}]" + + summary_lines.append(line) + + return "\n".join(summary_lines)