Compare commits
7 Commits
7926d9881c
...
issue/sess
| Author | SHA1 | Date | |
|---|---|---|---|
| f3afdff515 | |||
| 8826d84e59 | |||
| ac27d12ed3 | |||
| a264276a5d | |||
| 70a3f618bd | |||
| f3515ee71c | |||
| 93c870c8d6 |
@@ -78,9 +78,7 @@ class NotificationDocument(BaseModel):
|
||||
class NotificationBackend(Protocol):
|
||||
"""Backend-agnostic interface for notification storage."""
|
||||
|
||||
async def get_recent_notifications(
|
||||
self, phone_number: str
|
||||
) -> list[Notification]:
|
||||
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
|
||||
"""Return recent notifications for *phone_number*."""
|
||||
...
|
||||
|
||||
@@ -113,9 +111,7 @@ class FirestoreNotificationBackend:
|
||||
self._max_to_notify = max_to_notify
|
||||
self._window_hours = window_hours
|
||||
|
||||
async def get_recent_notifications(
|
||||
self, phone_number: str
|
||||
) -> list[Notification]:
|
||||
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
|
||||
"""Get recent notifications for a user.
|
||||
|
||||
Retrieves notifications created within the configured time window,
|
||||
@@ -148,9 +144,7 @@ class FirestoreNotificationBackend:
|
||||
cutoff = time.time() - (self._window_hours * 3600)
|
||||
|
||||
parsed = [
|
||||
n
|
||||
for n in document.notificaciones
|
||||
if n.timestamp_creacion >= cutoff
|
||||
n for n in document.notificaciones if n.timestamp_creacion >= cutoff
|
||||
]
|
||||
|
||||
if not parsed:
|
||||
@@ -212,9 +206,7 @@ class RedisNotificationBackend:
|
||||
self._max_to_notify = max_to_notify
|
||||
self._window_hours = window_hours
|
||||
|
||||
async def get_recent_notifications(
|
||||
self, phone_number: str
|
||||
) -> list[Notification]:
|
||||
async def get_recent_notifications(self, phone_number: str) -> list[Notification]:
|
||||
"""Get recent notifications for a user from Redis.
|
||||
|
||||
Reads from the ``notification:{phone}`` key, parses the JSON
|
||||
@@ -246,9 +238,7 @@ class RedisNotificationBackend:
|
||||
cutoff = time.time() - (self._window_hours * 3600)
|
||||
|
||||
parsed = [
|
||||
n
|
||||
for n in document.notificaciones
|
||||
if n.timestamp_creacion >= cutoff
|
||||
n for n in document.notificaciones if n.timestamp_creacion >= cutoff
|
||||
]
|
||||
|
||||
if not parsed:
|
||||
|
||||
@@ -6,6 +6,7 @@ import asyncio
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from typing import TYPE_CHECKING, Any, override
|
||||
|
||||
from google.adk.errors.already_exists_error import AlreadyExistsError
|
||||
@@ -42,8 +43,9 @@ class FirestoreSessionService(BaseSessionService):
|
||||
adk_user_states/{app_name}__{user_id}
|
||||
→ user-scoped state key/values
|
||||
|
||||
adk_sessions/{app_name}__{user_id}__{session_id}
|
||||
adk_sessions/{app_name}__{user_id}
|
||||
→ {app_name, user_id, session_id, state: {…}, last_update_time}
|
||||
→ Single continuous session per user (session_id is ignored)
|
||||
└─ events/{event_id} → serialised Event
|
||||
"""
|
||||
|
||||
@@ -95,13 +97,32 @@ class FirestoreSessionService(BaseSessionService):
|
||||
)
|
||||
|
||||
def _session_ref(self, app_name: str, user_id: str, session_id: str) -> Any:
|
||||
# Single continuous session per user: use only user_id, ignore session_id
|
||||
return self._db.collection(f"{self._prefix}_sessions").document(
|
||||
f"{app_name}__{user_id}__{session_id}"
|
||||
f"{app_name}__{user_id}"
|
||||
)
|
||||
|
||||
def _events_col(self, app_name: str, user_id: str, session_id: str) -> Any:
|
||||
return self._session_ref(app_name, user_id, session_id).collection("events")
|
||||
|
||||
@staticmethod
|
||||
def _timestamp_to_float(value: Any, default: float = 0.0) -> float:
|
||||
if value is None:
|
||||
return default
|
||||
if isinstance(value, (int, float)):
|
||||
return float(value)
|
||||
if hasattr(value, "timestamp"):
|
||||
try:
|
||||
return float(value.timestamp())
|
||||
except (
|
||||
TypeError,
|
||||
ValueError,
|
||||
OSError,
|
||||
OverflowError,
|
||||
) as exc: # pragma: no cover
|
||||
logger.debug("Failed to convert timestamp %r: %s", value, exc)
|
||||
return default
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# State helpers
|
||||
# ------------------------------------------------------------------
|
||||
@@ -171,7 +192,7 @@ class FirestoreSessionService(BaseSessionService):
|
||||
)
|
||||
)
|
||||
|
||||
now = time.time()
|
||||
now = datetime.now(UTC)
|
||||
write_coros.append(
|
||||
self._session_ref(app_name, user_id, session_id).set(
|
||||
{
|
||||
@@ -196,7 +217,7 @@ class FirestoreSessionService(BaseSessionService):
|
||||
user_id=user_id,
|
||||
id=session_id,
|
||||
state=merged,
|
||||
last_update_time=now,
|
||||
last_update_time=now.timestamp(),
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -283,7 +304,9 @@ class FirestoreSessionService(BaseSessionService):
|
||||
id=session_id,
|
||||
state=merged,
|
||||
events=events,
|
||||
last_update_time=session_data.get("last_update_time", 0.0),
|
||||
last_update_time=self._timestamp_to_float(
|
||||
session_data.get("last_update_time"), 0.0
|
||||
),
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -326,7 +349,9 @@ class FirestoreSessionService(BaseSessionService):
|
||||
id=data["session_id"],
|
||||
state=merged,
|
||||
events=[],
|
||||
last_update_time=data.get("last_update_time", 0.0),
|
||||
last_update_time=self._timestamp_to_float(
|
||||
data.get("last_update_time"), 0.0
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
@@ -366,6 +391,8 @@ class FirestoreSessionService(BaseSessionService):
|
||||
# Persist state deltas
|
||||
session_ref = self._session_ref(app_name, user_id, session_id)
|
||||
|
||||
last_update_dt = datetime.fromtimestamp(event.timestamp, UTC)
|
||||
|
||||
if event.actions and event.actions.state_delta:
|
||||
state_deltas = _session_util.extract_state_delta(event.actions.state_delta)
|
||||
|
||||
@@ -386,16 +413,16 @@ class FirestoreSessionService(BaseSessionService):
|
||||
FieldPath("state", k).to_api_repr(): v
|
||||
for k, v in state_deltas["session"].items()
|
||||
}
|
||||
field_updates["last_update_time"] = event.timestamp
|
||||
field_updates["last_update_time"] = last_update_dt
|
||||
write_coros.append(session_ref.update(field_updates))
|
||||
else:
|
||||
write_coros.append(
|
||||
session_ref.update({"last_update_time": event.timestamp})
|
||||
session_ref.update({"last_update_time": last_update_dt})
|
||||
)
|
||||
|
||||
await asyncio.gather(*write_coros)
|
||||
else:
|
||||
await session_ref.update({"last_update_time": event.timestamp})
|
||||
await session_ref.update({"last_update_time": last_update_dt})
|
||||
|
||||
# Log token usage
|
||||
if event.usage_metadata:
|
||||
|
||||
Reference in New Issue
Block a user