Compare commits

7 Commits

Author SHA1 Message Date
f3afdff515 Merge branch 'main' into issue/session
Some checks failed
CI / ci (pull_request) Failing after 12s
2026-03-11 23:09:59 +00:00
8826d84e59 Remove redudant session_id from document path
Some checks failed
CI / ci (pull_request) Failing after 12s
2026-03-11 17:28:45 +00:00
ac27d12ed3 Add notification model (#31)
All checks were successful
CI / ci (push) Successful in 21s
Co-authored-by: Anibal Angulo <a8065384@banorte.com>
Reviewed-on: #31
2026-03-10 23:50:41 +00:00
a264276a5d Merge pull request 'refactor: timestamp compatible with Firestore' (#30) from refactor/timestamp-to-date into main
Some checks failed
CI / ci (push) Failing after 12s
Reviewed-on: #30
2026-03-10 23:47:48 +00:00
70a3f618bd Merge branch 'main' into refactor/timestamp-to-date
All checks were successful
CI / ci (pull_request) Successful in 20s
2026-03-10 22:56:55 +00:00
f3515ee71c fix(session): use datetime UTC and tighten timestamp logging
All checks were successful
CI / ci (pull_request) Successful in 19s
2026-03-10 21:24:11 +00:00
93c870c8d6 fix(session): normalize firestore timestamps 2026-03-10 21:19:19 +00:00
4 changed files with 128 additions and 58 deletions

View File

@@ -1,5 +1,6 @@
"""Configuration helper for ADK agent.""" """Configuration helper for ADK agent."""
import logging
import os import os
from pydantic_settings import ( from pydantic_settings import (
@@ -37,6 +38,9 @@ class AgentSettings(BaseSettings):
mcp_audience: str mcp_audience: str
mcp_remote_url: str mcp_remote_url: str
# Logging
log_level: str = "INFO"
model_config = SettingsConfigDict( model_config = SettingsConfigDict(
yaml_file=CONFIG_FILE_PATH, yaml_file=CONFIG_FILE_PATH,
extra="ignore", # Ignore extra fields from config.yaml extra="ignore", # Ignore extra fields from config.yaml
@@ -60,3 +64,6 @@ class AgentSettings(BaseSettings):
settings = AgentSettings.model_validate({}) settings = AgentSettings.model_validate({})
logging.basicConfig()
logging.getLogger("va_agent").setLevel(settings.log_level.upper())

View File

@@ -84,23 +84,16 @@ async def provide_dynamic_instruction(
return "" return ""
# Build dynamic instruction with notification details # Build dynamic instruction with notification details
notification_ids = [ notification_ids = [n.id_notificacion for n in recent_notifications]
nid
for n in recent_notifications
if (nid := n.get("id_notificacion")) is not None
]
count = len(recent_notifications) count = len(recent_notifications)
# Format notification details for the agent (most recent first) # Format notification details for the agent (most recent first)
now = time.time() now = time.time()
notification_details = [] notification_details = []
for i, notif in enumerate(recent_notifications, 1): for i, notif in enumerate(recent_notifications, 1):
evento = notif.get("nombre_evento_dialogflow", "notificacion") ago = _format_time_ago(now, notif.timestamp_creacion)
texto = notif.get("texto", "Sin texto")
ts = notif.get("timestamp_creacion", notif.get("timestampCreacion", 0))
ago = _format_time_ago(now, ts)
notification_details.append( notification_details.append(
f" {i}. [{ago}] Evento: {evento} | Texto: {texto}" f" {i}. [{ago}] Evento: {notif.nombre_evento} | Texto: {notif.texto}"
) )
details_text = "\n".join(notification_details) details_text = "\n".join(notification_details)
@@ -123,6 +116,7 @@ async def provide_dynamic_instruction(
count, count,
phone_number, phone_number,
) )
logger.debug("Dynamic instruction content:\n%s", instruction)
except Exception: except Exception:
logger.exception( logger.exception(

View File

@@ -7,32 +7,78 @@ import time
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
from pydantic import AliasChoices, BaseModel, Field, field_validator
if TYPE_CHECKING: if TYPE_CHECKING:
from google.cloud.firestore_v1.async_client import AsyncClient from google.cloud.firestore_v1.async_client import AsyncClient
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _extract_ts(n: dict[str, Any]) -> float: class Notification(BaseModel):
"""Return the creation timestamp of a notification as epoch seconds.""" """A single notification, normalised from either schema.
raw = n.get("timestamp_creacion", n.get("timestampCreacion", 0))
if isinstance(raw, (int, float)): Handles snake_case (``id_notificacion``), camelCase
return float(raw) (``idNotificacion``), and English short names (``notificationId``)
if isinstance(raw, datetime): transparently via ``AliasChoices``.
return raw.timestamp() """
if isinstance(raw, str):
id_notificacion: str = Field(
validation_alias=AliasChoices(
"id_notificacion", "idNotificacion", "notificationId"
),
)
texto: str = Field(
default="Sin texto",
validation_alias=AliasChoices("texto", "text"),
)
nombre_evento: str = Field(
default="notificacion",
validation_alias=AliasChoices(
"nombre_evento_dialogflow", "nombreEventoDialogflow", "event"
),
)
timestamp_creacion: float = Field(
default=0.0,
validation_alias=AliasChoices("timestamp_creacion", "timestampCreacion"),
)
status: str = "active"
parametros: dict[str, Any] = Field(
default_factory=dict,
validation_alias=AliasChoices("parametros", "parameters"),
)
@field_validator("timestamp_creacion", mode="before")
@classmethod
def _coerce_timestamp(cls, v: Any) -> float:
"""Normalise Firestore timestamps (float, str, datetime) to float."""
if isinstance(v, (int, float)):
return float(v)
if isinstance(v, datetime):
return v.timestamp()
if isinstance(v, str):
try: try:
return float(raw) return float(v)
except ValueError: except ValueError:
return 0.0 return 0.0
return 0.0 return 0.0
class NotificationDocument(BaseModel):
"""Top-level Firestore / Redis document that wraps a list of notifications.
Mirrors the schema used by ``utils/check_notifications.py``
(``NotificationSession``) but keeps only what the agent needs.
"""
notificaciones: list[Notification] = Field(default_factory=list)
@runtime_checkable @runtime_checkable
class NotificationBackend(Protocol): class NotificationBackend(Protocol):
"""Backend-agnostic interface for notification storage.""" """Backend-agnostic interface for notification storage."""
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
"""Return recent notifications for *phone_number*.""" """Return recent notifications for *phone_number*."""
... ...
@@ -65,7 +111,7 @@ class FirestoreNotificationBackend:
self._max_to_notify = max_to_notify self._max_to_notify = max_to_notify
self._window_hours = window_hours self._window_hours = window_hours
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
"""Get recent notifications for a user. """Get recent notifications for a user.
Retrieves notifications created within the configured time window, Retrieves notifications created within the configured time window,
@@ -75,14 +121,7 @@ class FirestoreNotificationBackend:
phone_number: User's phone number (used as document ID) phone_number: User's phone number (used as document ID)
Returns: Returns:
List of notification dictionaries with structure: List of validated :class:`Notification` instances.
{
"id_notificacion": str,
"texto": str,
"status": str,
"timestamp_creacion": timestamp,
"parametros": {...}
}
""" """
try: try:
@@ -96,17 +135,19 @@ class FirestoreNotificationBackend:
return [] return []
data = doc.to_dict() or {} data = doc.to_dict() or {}
all_notifications = data.get("notificaciones", []) document = NotificationDocument.model_validate(data)
if not all_notifications: if not document.notificaciones:
logger.info("No notifications in array for phone: %s", phone_number) logger.info("No notifications in array for phone: %s", phone_number)
return [] return []
cutoff = time.time() - (self._window_hours * 3600) cutoff = time.time() - (self._window_hours * 3600)
recent = [n for n in all_notifications if _extract_ts(n) >= cutoff] parsed = [
n for n in document.notificaciones if n.timestamp_creacion >= cutoff
]
if not recent: if not parsed:
logger.info( logger.info(
"No notifications within the last %.0fh for phone: %s", "No notifications within the last %.0fh for phone: %s",
self._window_hours, self._window_hours,
@@ -114,13 +155,13 @@ class FirestoreNotificationBackend:
) )
return [] return []
recent.sort(key=_extract_ts, reverse=True) parsed.sort(key=lambda n: n.timestamp_creacion, reverse=True)
result = recent[: self._max_to_notify] result = parsed[: self._max_to_notify]
logger.info( logger.info(
"Found %d recent notifications for phone: %s (returning top %d)", "Found %d recent notifications for phone: %s (returning top %d)",
len(recent), len(parsed),
phone_number, phone_number,
len(result), len(result),
) )
@@ -165,7 +206,7 @@ class RedisNotificationBackend:
self._max_to_notify = max_to_notify self._max_to_notify = max_to_notify
self._window_hours = window_hours self._window_hours = window_hours
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]: async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
"""Get recent notifications for a user from Redis. """Get recent notifications for a user from Redis.
Reads from the ``notification:{phone}`` key, parses the JSON Reads from the ``notification:{phone}`` key, parses the JSON
@@ -185,10 +226,9 @@ class RedisNotificationBackend:
) )
return [] return []
data = json.loads(raw) document = NotificationDocument.model_validate(json.loads(raw))
all_notifications: list[dict[str, Any]] = data.get("notificaciones", [])
if not all_notifications: if not document.notificaciones:
logger.info( logger.info(
"No notifications in array for phone: %s", "No notifications in array for phone: %s",
phone_number, phone_number,
@@ -197,9 +237,11 @@ class RedisNotificationBackend:
cutoff = time.time() - (self._window_hours * 3600) cutoff = time.time() - (self._window_hours * 3600)
recent = [n for n in all_notifications if _extract_ts(n) >= cutoff] parsed = [
n for n in document.notificaciones if n.timestamp_creacion >= cutoff
]
if not recent: if not parsed:
logger.info( logger.info(
"No notifications within the last %.0fh for phone: %s", "No notifications within the last %.0fh for phone: %s",
self._window_hours, self._window_hours,
@@ -207,13 +249,13 @@ class RedisNotificationBackend:
) )
return [] return []
recent.sort(key=_extract_ts, reverse=True) parsed.sort(key=lambda n: n.timestamp_creacion, reverse=True)
result = recent[: self._max_to_notify] result = parsed[: self._max_to_notify]
logger.info( logger.info(
"Found %d recent notifications for phone: %s (returning top %d)", "Found %d recent notifications for phone: %s (returning top %d)",
len(recent), len(parsed),
phone_number, phone_number,
len(result), len(result),
) )

View File

@@ -6,6 +6,7 @@ import asyncio
import logging import logging
import time import time
import uuid import uuid
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, override from typing import TYPE_CHECKING, Any, override
from google.adk.errors.already_exists_error import AlreadyExistsError from google.adk.errors.already_exists_error import AlreadyExistsError
@@ -42,8 +43,9 @@ class FirestoreSessionService(BaseSessionService):
adk_user_states/{app_name}__{user_id} adk_user_states/{app_name}__{user_id}
→ user-scoped state key/values → user-scoped state key/values
adk_sessions/{app_name}__{user_id}__{session_id} adk_sessions/{app_name}__{user_id}
{app_name, user_id, session_id, state: {…}, last_update_time} {app_name, user_id, session_id, state: {…}, last_update_time}
→ Single continuous session per user (session_id is ignored)
└─ events/{event_id} → serialised Event └─ events/{event_id} → serialised Event
""" """
@@ -95,13 +97,32 @@ class FirestoreSessionService(BaseSessionService):
) )
def _session_ref(self, app_name: str, user_id: str, session_id: str) -> Any: def _session_ref(self, app_name: str, user_id: str, session_id: str) -> Any:
# Single continuous session per user: use only user_id, ignore session_id
return self._db.collection(f"{self._prefix}_sessions").document( return self._db.collection(f"{self._prefix}_sessions").document(
f"{app_name}__{user_id}__{session_id}" f"{app_name}__{user_id}"
) )
def _events_col(self, app_name: str, user_id: str, session_id: str) -> Any: def _events_col(self, app_name: str, user_id: str, session_id: str) -> Any:
return self._session_ref(app_name, user_id, session_id).collection("events") return self._session_ref(app_name, user_id, session_id).collection("events")
@staticmethod
def _timestamp_to_float(value: Any, default: float = 0.0) -> float:
if value is None:
return default
if isinstance(value, (int, float)):
return float(value)
if hasattr(value, "timestamp"):
try:
return float(value.timestamp())
except (
TypeError,
ValueError,
OSError,
OverflowError,
) as exc: # pragma: no cover
logger.debug("Failed to convert timestamp %r: %s", value, exc)
return default
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# State helpers # State helpers
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -171,7 +192,7 @@ class FirestoreSessionService(BaseSessionService):
) )
) )
now = time.time() now = datetime.now(UTC)
write_coros.append( write_coros.append(
self._session_ref(app_name, user_id, session_id).set( self._session_ref(app_name, user_id, session_id).set(
{ {
@@ -196,7 +217,7 @@ class FirestoreSessionService(BaseSessionService):
user_id=user_id, user_id=user_id,
id=session_id, id=session_id,
state=merged, state=merged,
last_update_time=now, last_update_time=now.timestamp(),
) )
@override @override
@@ -283,7 +304,9 @@ class FirestoreSessionService(BaseSessionService):
id=session_id, id=session_id,
state=merged, state=merged,
events=events, events=events,
last_update_time=session_data.get("last_update_time", 0.0), last_update_time=self._timestamp_to_float(
session_data.get("last_update_time"), 0.0
),
) )
@override @override
@@ -326,7 +349,9 @@ class FirestoreSessionService(BaseSessionService):
id=data["session_id"], id=data["session_id"],
state=merged, state=merged,
events=[], events=[],
last_update_time=data.get("last_update_time", 0.0), last_update_time=self._timestamp_to_float(
data.get("last_update_time"), 0.0
),
) )
) )
@@ -366,6 +391,8 @@ class FirestoreSessionService(BaseSessionService):
# Persist state deltas # Persist state deltas
session_ref = self._session_ref(app_name, user_id, session_id) session_ref = self._session_ref(app_name, user_id, session_id)
last_update_dt = datetime.fromtimestamp(event.timestamp, UTC)
if event.actions and event.actions.state_delta: if event.actions and event.actions.state_delta:
state_deltas = _session_util.extract_state_delta(event.actions.state_delta) state_deltas = _session_util.extract_state_delta(event.actions.state_delta)
@@ -386,16 +413,16 @@ class FirestoreSessionService(BaseSessionService):
FieldPath("state", k).to_api_repr(): v FieldPath("state", k).to_api_repr(): v
for k, v in state_deltas["session"].items() for k, v in state_deltas["session"].items()
} }
field_updates["last_update_time"] = event.timestamp field_updates["last_update_time"] = last_update_dt
write_coros.append(session_ref.update(field_updates)) write_coros.append(session_ref.update(field_updates))
else: else:
write_coros.append( write_coros.append(
session_ref.update({"last_update_time": event.timestamp}) session_ref.update({"last_update_time": last_update_dt})
) )
await asyncio.gather(*write_coros) await asyncio.gather(*write_coros)
else: else:
await session_ref.update({"last_update_time": event.timestamp}) await session_ref.update({"last_update_time": last_update_dt})
# Log token usage # Log token usage
if event.usage_metadata: if event.usage_metadata: