Compare commits
5 Commits
8627901543
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| ac27d12ed3 | |||
| a264276a5d | |||
| 70a3f618bd | |||
| f3515ee71c | |||
| 93c870c8d6 |
@@ -1,5 +1,6 @@
|
|||||||
"""Configuration helper for ADK agent."""
|
"""Configuration helper for ADK agent."""
|
||||||
|
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from pydantic_settings import (
|
from pydantic_settings import (
|
||||||
@@ -37,6 +38,9 @@ class AgentSettings(BaseSettings):
|
|||||||
mcp_audience: str
|
mcp_audience: str
|
||||||
mcp_remote_url: str
|
mcp_remote_url: str
|
||||||
|
|
||||||
|
# Logging
|
||||||
|
log_level: str = "INFO"
|
||||||
|
|
||||||
model_config = SettingsConfigDict(
|
model_config = SettingsConfigDict(
|
||||||
yaml_file=CONFIG_FILE_PATH,
|
yaml_file=CONFIG_FILE_PATH,
|
||||||
extra="ignore", # Ignore extra fields from config.yaml
|
extra="ignore", # Ignore extra fields from config.yaml
|
||||||
@@ -60,3 +64,6 @@ class AgentSettings(BaseSettings):
|
|||||||
|
|
||||||
|
|
||||||
settings = AgentSettings.model_validate({})
|
settings = AgentSettings.model_validate({})
|
||||||
|
|
||||||
|
logging.basicConfig()
|
||||||
|
logging.getLogger("va_agent").setLevel(settings.log_level.upper())
|
||||||
|
|||||||
@@ -84,23 +84,16 @@ async def provide_dynamic_instruction(
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
# Build dynamic instruction with notification details
|
# Build dynamic instruction with notification details
|
||||||
notification_ids = [
|
notification_ids = [n.id_notificacion for n in recent_notifications]
|
||||||
nid
|
|
||||||
for n in recent_notifications
|
|
||||||
if (nid := n.get("id_notificacion")) is not None
|
|
||||||
]
|
|
||||||
count = len(recent_notifications)
|
count = len(recent_notifications)
|
||||||
|
|
||||||
# Format notification details for the agent (most recent first)
|
# Format notification details for the agent (most recent first)
|
||||||
now = time.time()
|
now = time.time()
|
||||||
notification_details = []
|
notification_details = []
|
||||||
for i, notif in enumerate(recent_notifications, 1):
|
for i, notif in enumerate(recent_notifications, 1):
|
||||||
evento = notif.get("nombre_evento_dialogflow", "notificacion")
|
ago = _format_time_ago(now, notif.timestamp_creacion)
|
||||||
texto = notif.get("texto", "Sin texto")
|
|
||||||
ts = notif.get("timestamp_creacion", notif.get("timestampCreacion", 0))
|
|
||||||
ago = _format_time_ago(now, ts)
|
|
||||||
notification_details.append(
|
notification_details.append(
|
||||||
f" {i}. [{ago}] Evento: {evento} | Texto: {texto}"
|
f" {i}. [{ago}] Evento: {notif.nombre_evento} | Texto: {notif.texto}"
|
||||||
)
|
)
|
||||||
|
|
||||||
details_text = "\n".join(notification_details)
|
details_text = "\n".join(notification_details)
|
||||||
@@ -123,6 +116,7 @@ async def provide_dynamic_instruction(
|
|||||||
count,
|
count,
|
||||||
phone_number,
|
phone_number,
|
||||||
)
|
)
|
||||||
|
logger.debug("Dynamic instruction content:\n%s", instruction)
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
|
|||||||
@@ -7,32 +7,78 @@ import time
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
|
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
|
||||||
|
|
||||||
|
from pydantic import AliasChoices, BaseModel, Field, field_validator
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from google.cloud.firestore_v1.async_client import AsyncClient
|
from google.cloud.firestore_v1.async_client import AsyncClient
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _extract_ts(n: dict[str, Any]) -> float:
|
class Notification(BaseModel):
|
||||||
"""Return the creation timestamp of a notification as epoch seconds."""
|
"""A single notification, normalised from either schema.
|
||||||
raw = n.get("timestamp_creacion", n.get("timestampCreacion", 0))
|
|
||||||
if isinstance(raw, (int, float)):
|
Handles snake_case (``id_notificacion``), camelCase
|
||||||
return float(raw)
|
(``idNotificacion``), and English short names (``notificationId``)
|
||||||
if isinstance(raw, datetime):
|
transparently via ``AliasChoices``.
|
||||||
return raw.timestamp()
|
"""
|
||||||
if isinstance(raw, str):
|
|
||||||
try:
|
id_notificacion: str = Field(
|
||||||
return float(raw)
|
validation_alias=AliasChoices(
|
||||||
except ValueError:
|
"id_notificacion", "idNotificacion", "notificationId"
|
||||||
return 0.0
|
),
|
||||||
return 0.0
|
)
|
||||||
|
texto: str = Field(
|
||||||
|
default="Sin texto",
|
||||||
|
validation_alias=AliasChoices("texto", "text"),
|
||||||
|
)
|
||||||
|
nombre_evento: str = Field(
|
||||||
|
default="notificacion",
|
||||||
|
validation_alias=AliasChoices(
|
||||||
|
"nombre_evento_dialogflow", "nombreEventoDialogflow", "event"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
timestamp_creacion: float = Field(
|
||||||
|
default=0.0,
|
||||||
|
validation_alias=AliasChoices("timestamp_creacion", "timestampCreacion"),
|
||||||
|
)
|
||||||
|
status: str = "active"
|
||||||
|
parametros: dict[str, Any] = Field(
|
||||||
|
default_factory=dict,
|
||||||
|
validation_alias=AliasChoices("parametros", "parameters"),
|
||||||
|
)
|
||||||
|
|
||||||
|
@field_validator("timestamp_creacion", mode="before")
|
||||||
|
@classmethod
|
||||||
|
def _coerce_timestamp(cls, v: Any) -> float:
|
||||||
|
"""Normalise Firestore timestamps (float, str, datetime) to float."""
|
||||||
|
if isinstance(v, (int, float)):
|
||||||
|
return float(v)
|
||||||
|
if isinstance(v, datetime):
|
||||||
|
return v.timestamp()
|
||||||
|
if isinstance(v, str):
|
||||||
|
try:
|
||||||
|
return float(v)
|
||||||
|
except ValueError:
|
||||||
|
return 0.0
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationDocument(BaseModel):
|
||||||
|
"""Top-level Firestore / Redis document that wraps a list of notifications.
|
||||||
|
|
||||||
|
Mirrors the schema used by ``utils/check_notifications.py``
|
||||||
|
(``NotificationSession``) but keeps only what the agent needs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
notificaciones: list[Notification] = Field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
@runtime_checkable
|
@runtime_checkable
|
||||||
class NotificationBackend(Protocol):
|
class NotificationBackend(Protocol):
|
||||||
"""Backend-agnostic interface for notification storage."""
|
"""Backend-agnostic interface for notification storage."""
|
||||||
|
|
||||||
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]:
|
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
|
||||||
"""Return recent notifications for *phone_number*."""
|
"""Return recent notifications for *phone_number*."""
|
||||||
...
|
...
|
||||||
|
|
||||||
@@ -65,7 +111,7 @@ class FirestoreNotificationBackend:
|
|||||||
self._max_to_notify = max_to_notify
|
self._max_to_notify = max_to_notify
|
||||||
self._window_hours = window_hours
|
self._window_hours = window_hours
|
||||||
|
|
||||||
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]:
|
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
|
||||||
"""Get recent notifications for a user.
|
"""Get recent notifications for a user.
|
||||||
|
|
||||||
Retrieves notifications created within the configured time window,
|
Retrieves notifications created within the configured time window,
|
||||||
@@ -75,14 +121,7 @@ class FirestoreNotificationBackend:
|
|||||||
phone_number: User's phone number (used as document ID)
|
phone_number: User's phone number (used as document ID)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of notification dictionaries with structure:
|
List of validated :class:`Notification` instances.
|
||||||
{
|
|
||||||
"id_notificacion": str,
|
|
||||||
"texto": str,
|
|
||||||
"status": str,
|
|
||||||
"timestamp_creacion": timestamp,
|
|
||||||
"parametros": {...}
|
|
||||||
}
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
@@ -96,17 +135,19 @@ class FirestoreNotificationBackend:
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
data = doc.to_dict() or {}
|
data = doc.to_dict() or {}
|
||||||
all_notifications = data.get("notificaciones", [])
|
document = NotificationDocument.model_validate(data)
|
||||||
|
|
||||||
if not all_notifications:
|
if not document.notificaciones:
|
||||||
logger.info("No notifications in array for phone: %s", phone_number)
|
logger.info("No notifications in array for phone: %s", phone_number)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
cutoff = time.time() - (self._window_hours * 3600)
|
cutoff = time.time() - (self._window_hours * 3600)
|
||||||
|
|
||||||
recent = [n for n in all_notifications if _extract_ts(n) >= cutoff]
|
parsed = [
|
||||||
|
n for n in document.notificaciones if n.timestamp_creacion >= cutoff
|
||||||
|
]
|
||||||
|
|
||||||
if not recent:
|
if not parsed:
|
||||||
logger.info(
|
logger.info(
|
||||||
"No notifications within the last %.0fh for phone: %s",
|
"No notifications within the last %.0fh for phone: %s",
|
||||||
self._window_hours,
|
self._window_hours,
|
||||||
@@ -114,13 +155,13 @@ class FirestoreNotificationBackend:
|
|||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
recent.sort(key=_extract_ts, reverse=True)
|
parsed.sort(key=lambda n: n.timestamp_creacion, reverse=True)
|
||||||
|
|
||||||
result = recent[: self._max_to_notify]
|
result = parsed[: self._max_to_notify]
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Found %d recent notifications for phone: %s (returning top %d)",
|
"Found %d recent notifications for phone: %s (returning top %d)",
|
||||||
len(recent),
|
len(parsed),
|
||||||
phone_number,
|
phone_number,
|
||||||
len(result),
|
len(result),
|
||||||
)
|
)
|
||||||
@@ -165,7 +206,7 @@ class RedisNotificationBackend:
|
|||||||
self._max_to_notify = max_to_notify
|
self._max_to_notify = max_to_notify
|
||||||
self._window_hours = window_hours
|
self._window_hours = window_hours
|
||||||
|
|
||||||
async def get_recent_notifications(self, phone_number: str) -> list[dict[str, Any]]:
|
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
|
||||||
"""Get recent notifications for a user from Redis.
|
"""Get recent notifications for a user from Redis.
|
||||||
|
|
||||||
Reads from the ``notification:{phone}`` key, parses the JSON
|
Reads from the ``notification:{phone}`` key, parses the JSON
|
||||||
@@ -185,10 +226,9 @@ class RedisNotificationBackend:
|
|||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
data = json.loads(raw)
|
document = NotificationDocument.model_validate(json.loads(raw))
|
||||||
all_notifications: list[dict[str, Any]] = data.get("notificaciones", [])
|
|
||||||
|
|
||||||
if not all_notifications:
|
if not document.notificaciones:
|
||||||
logger.info(
|
logger.info(
|
||||||
"No notifications in array for phone: %s",
|
"No notifications in array for phone: %s",
|
||||||
phone_number,
|
phone_number,
|
||||||
@@ -197,9 +237,11 @@ class RedisNotificationBackend:
|
|||||||
|
|
||||||
cutoff = time.time() - (self._window_hours * 3600)
|
cutoff = time.time() - (self._window_hours * 3600)
|
||||||
|
|
||||||
recent = [n for n in all_notifications if _extract_ts(n) >= cutoff]
|
parsed = [
|
||||||
|
n for n in document.notificaciones if n.timestamp_creacion >= cutoff
|
||||||
|
]
|
||||||
|
|
||||||
if not recent:
|
if not parsed:
|
||||||
logger.info(
|
logger.info(
|
||||||
"No notifications within the last %.0fh for phone: %s",
|
"No notifications within the last %.0fh for phone: %s",
|
||||||
self._window_hours,
|
self._window_hours,
|
||||||
@@ -207,13 +249,13 @@ class RedisNotificationBackend:
|
|||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
recent.sort(key=_extract_ts, reverse=True)
|
parsed.sort(key=lambda n: n.timestamp_creacion, reverse=True)
|
||||||
|
|
||||||
result = recent[: self._max_to_notify]
|
result = parsed[: self._max_to_notify]
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Found %d recent notifications for phone: %s (returning top %d)",
|
"Found %d recent notifications for phone: %s (returning top %d)",
|
||||||
len(recent),
|
len(parsed),
|
||||||
phone_number,
|
phone_number,
|
||||||
len(result),
|
len(result),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import asyncio
|
|||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
from datetime import UTC, datetime
|
||||||
from typing import TYPE_CHECKING, Any, override
|
from typing import TYPE_CHECKING, Any, override
|
||||||
|
|
||||||
from google.adk.errors.already_exists_error import AlreadyExistsError
|
from google.adk.errors.already_exists_error import AlreadyExistsError
|
||||||
@@ -102,6 +103,24 @@ class FirestoreSessionService(BaseSessionService):
|
|||||||
def _events_col(self, app_name: str, user_id: str, session_id: str) -> Any:
|
def _events_col(self, app_name: str, user_id: str, session_id: str) -> Any:
|
||||||
return self._session_ref(app_name, user_id, session_id).collection("events")
|
return self._session_ref(app_name, user_id, session_id).collection("events")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _timestamp_to_float(value: Any, default: float = 0.0) -> float:
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
return float(value)
|
||||||
|
if hasattr(value, "timestamp"):
|
||||||
|
try:
|
||||||
|
return float(value.timestamp())
|
||||||
|
except (
|
||||||
|
TypeError,
|
||||||
|
ValueError,
|
||||||
|
OSError,
|
||||||
|
OverflowError,
|
||||||
|
) as exc: # pragma: no cover
|
||||||
|
logger.debug("Failed to convert timestamp %r: %s", value, exc)
|
||||||
|
return default
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# State helpers
|
# State helpers
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
@@ -171,7 +190,7 @@ class FirestoreSessionService(BaseSessionService):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
now = time.time()
|
now = datetime.now(UTC)
|
||||||
write_coros.append(
|
write_coros.append(
|
||||||
self._session_ref(app_name, user_id, session_id).set(
|
self._session_ref(app_name, user_id, session_id).set(
|
||||||
{
|
{
|
||||||
@@ -196,7 +215,7 @@ class FirestoreSessionService(BaseSessionService):
|
|||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
id=session_id,
|
id=session_id,
|
||||||
state=merged,
|
state=merged,
|
||||||
last_update_time=now,
|
last_update_time=now.timestamp(),
|
||||||
)
|
)
|
||||||
|
|
||||||
@override
|
@override
|
||||||
@@ -283,7 +302,9 @@ class FirestoreSessionService(BaseSessionService):
|
|||||||
id=session_id,
|
id=session_id,
|
||||||
state=merged,
|
state=merged,
|
||||||
events=events,
|
events=events,
|
||||||
last_update_time=session_data.get("last_update_time", 0.0),
|
last_update_time=self._timestamp_to_float(
|
||||||
|
session_data.get("last_update_time"), 0.0
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@override
|
@override
|
||||||
@@ -326,7 +347,9 @@ class FirestoreSessionService(BaseSessionService):
|
|||||||
id=data["session_id"],
|
id=data["session_id"],
|
||||||
state=merged,
|
state=merged,
|
||||||
events=[],
|
events=[],
|
||||||
last_update_time=data.get("last_update_time", 0.0),
|
last_update_time=self._timestamp_to_float(
|
||||||
|
data.get("last_update_time"), 0.0
|
||||||
|
),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -366,6 +389,8 @@ class FirestoreSessionService(BaseSessionService):
|
|||||||
# Persist state deltas
|
# Persist state deltas
|
||||||
session_ref = self._session_ref(app_name, user_id, session_id)
|
session_ref = self._session_ref(app_name, user_id, session_id)
|
||||||
|
|
||||||
|
last_update_dt = datetime.fromtimestamp(event.timestamp, UTC)
|
||||||
|
|
||||||
if event.actions and event.actions.state_delta:
|
if event.actions and event.actions.state_delta:
|
||||||
state_deltas = _session_util.extract_state_delta(event.actions.state_delta)
|
state_deltas = _session_util.extract_state_delta(event.actions.state_delta)
|
||||||
|
|
||||||
@@ -386,16 +411,16 @@ class FirestoreSessionService(BaseSessionService):
|
|||||||
FieldPath("state", k).to_api_repr(): v
|
FieldPath("state", k).to_api_repr(): v
|
||||||
for k, v in state_deltas["session"].items()
|
for k, v in state_deltas["session"].items()
|
||||||
}
|
}
|
||||||
field_updates["last_update_time"] = event.timestamp
|
field_updates["last_update_time"] = last_update_dt
|
||||||
write_coros.append(session_ref.update(field_updates))
|
write_coros.append(session_ref.update(field_updates))
|
||||||
else:
|
else:
|
||||||
write_coros.append(
|
write_coros.append(
|
||||||
session_ref.update({"last_update_time": event.timestamp})
|
session_ref.update({"last_update_time": last_update_dt})
|
||||||
)
|
)
|
||||||
|
|
||||||
await asyncio.gather(*write_coros)
|
await asyncio.gather(*write_coros)
|
||||||
else:
|
else:
|
||||||
await session_ref.update({"last_update_time": event.timestamp})
|
await session_ref.update({"last_update_time": last_update_dt})
|
||||||
|
|
||||||
# Log token usage
|
# Log token usage
|
||||||
if event.usage_metadata:
|
if event.usage_metadata:
|
||||||
|
|||||||
Reference in New Issue
Block a user