feat: refactor compaction module
This commit is contained in:
213
src/va_agent/compaction.py
Normal file
213
src/va_agent/compaction.py
Normal file
@@ -0,0 +1,213 @@
|
||||
"""Session compaction utilities for managing conversation history."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from google.adk.events.event import Event
|
||||
from google.cloud.firestore_v1.async_transaction import async_transactional
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from google import genai
|
||||
from google.adk.sessions.session import Session
|
||||
from google.cloud.firestore_v1.async_client import AsyncClient
|
||||
|
||||
logger = logging.getLogger("google_adk." + __name__)
|
||||
|
||||
_COMPACTION_LOCK_TTL = 300 # seconds
|
||||
|
||||
|
||||
@async_transactional
|
||||
async def _try_claim_compaction_txn(transaction: Any, session_ref: Any) -> bool:
|
||||
"""Atomically claim the compaction lock if it is free or stale."""
|
||||
snapshot = await session_ref.get(transaction=transaction)
|
||||
if not snapshot.exists:
|
||||
return False
|
||||
data = snapshot.to_dict() or {}
|
||||
lock_time = data.get("compaction_lock")
|
||||
if lock_time and (time.time() - lock_time) < _COMPACTION_LOCK_TTL:
|
||||
return False
|
||||
transaction.update(session_ref, {"compaction_lock": time.time()})
|
||||
return True
|
||||
|
||||
|
||||
class SessionCompactor:
|
||||
"""Handles conversation history compaction for Firestore sessions.
|
||||
|
||||
This class manages the automatic summarization and archival of older
|
||||
conversation events to keep token counts manageable while preserving
|
||||
context through AI-generated summaries.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
db: AsyncClient,
|
||||
genai_client: genai.Client | None = None,
|
||||
compaction_model: str = "gemini-2.5-flash",
|
||||
compaction_keep_recent: int = 10,
|
||||
) -> None:
|
||||
"""Initialize SessionCompactor.
|
||||
|
||||
Args:
|
||||
db: Firestore async client
|
||||
genai_client: GenAI client for generating summaries
|
||||
compaction_model: Model to use for summarization
|
||||
compaction_keep_recent: Number of recent events to keep uncompacted
|
||||
|
||||
"""
|
||||
self._db = db
|
||||
self._genai_client = genai_client
|
||||
self._compaction_model = compaction_model
|
||||
self._compaction_keep_recent = compaction_keep_recent
|
||||
self._compaction_locks: dict[str, asyncio.Lock] = {}
|
||||
|
||||
@staticmethod
|
||||
def _events_to_text(events: list[Event]) -> str:
|
||||
"""Convert a list of events to a readable conversation text format."""
|
||||
lines: list[str] = []
|
||||
for event in events:
|
||||
if event.content and event.content.parts:
|
||||
text = "".join(p.text or "" for p in event.content.parts)
|
||||
if text:
|
||||
role = "User" if event.author == "user" else "Assistant"
|
||||
lines.append(f"{role}: {text}")
|
||||
return "\n\n".join(lines)
|
||||
|
||||
async def _generate_summary(
|
||||
self, existing_summary: str, events: list[Event]
|
||||
) -> str:
|
||||
"""Generate or update a conversation summary using the GenAI model."""
|
||||
conversation_text = self._events_to_text(events)
|
||||
previous = (
|
||||
f"Previous summary of earlier conversation:\n{existing_summary}\n\n"
|
||||
if existing_summary
|
||||
else ""
|
||||
)
|
||||
prompt = (
|
||||
"Summarize the following conversation between a user and an "
|
||||
"assistant. Preserve:\n"
|
||||
"- Key decisions and conclusions\n"
|
||||
"- User preferences and requirements\n"
|
||||
"- Important facts, names, and numbers\n"
|
||||
"- The overall topic and direction of the conversation\n"
|
||||
"- Any pending tasks or open questions\n\n"
|
||||
f"{previous}"
|
||||
f"Conversation:\n{conversation_text}\n\n"
|
||||
"Provide a clear, comprehensive summary."
|
||||
)
|
||||
if self._genai_client is None:
|
||||
msg = "genai_client is required for compaction"
|
||||
raise RuntimeError(msg)
|
||||
response = await self._genai_client.aio.models.generate_content(
|
||||
model=self._compaction_model,
|
||||
contents=prompt,
|
||||
)
|
||||
return response.text or ""
|
||||
|
||||
async def _compact_session(
|
||||
self,
|
||||
session: Session,
|
||||
events_col_ref: Any,
|
||||
session_ref: Any,
|
||||
) -> None:
|
||||
"""Perform the actual compaction: summarize old events and delete them.
|
||||
|
||||
Args:
|
||||
session: The session to compact
|
||||
events_col_ref: Firestore collection reference for events
|
||||
session_ref: Firestore document reference for the session
|
||||
|
||||
"""
|
||||
query = events_col_ref.order_by("timestamp")
|
||||
event_docs = await query.get()
|
||||
|
||||
if len(event_docs) <= self._compaction_keep_recent:
|
||||
return
|
||||
|
||||
all_events = [Event.model_validate(doc.to_dict()) for doc in event_docs]
|
||||
events_to_summarize = all_events[: -self._compaction_keep_recent]
|
||||
|
||||
session_snap = await session_ref.get()
|
||||
existing_summary = (session_snap.to_dict() or {}).get(
|
||||
"conversation_summary", ""
|
||||
)
|
||||
|
||||
try:
|
||||
summary = await self._generate_summary(
|
||||
existing_summary, events_to_summarize
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Compaction summary generation failed; skipping.")
|
||||
return
|
||||
|
||||
# Write summary BEFORE deleting events so a crash between the two
|
||||
# steps leaves safe duplication rather than data loss.
|
||||
await session_ref.update({"conversation_summary": summary})
|
||||
|
||||
docs_to_delete = event_docs[: -self._compaction_keep_recent]
|
||||
for i in range(0, len(docs_to_delete), 500):
|
||||
batch = self._db.batch()
|
||||
for doc in docs_to_delete[i : i + 500]:
|
||||
batch.delete(doc.reference)
|
||||
await batch.commit()
|
||||
|
||||
logger.info(
|
||||
"Compacted session %s: summarised %d events, kept %d.",
|
||||
session.id,
|
||||
len(docs_to_delete),
|
||||
self._compaction_keep_recent,
|
||||
)
|
||||
|
||||
async def guarded_compact(
|
||||
self,
|
||||
session: Session,
|
||||
events_col_ref: Any,
|
||||
session_ref: Any,
|
||||
) -> None:
|
||||
"""Run compaction in the background with per-session locking.
|
||||
|
||||
This method ensures that only one compaction process runs at a time
|
||||
for a given session, both locally (using asyncio locks) and across
|
||||
multiple instances (using Firestore-backed locks).
|
||||
|
||||
Args:
|
||||
session: The session to compact
|
||||
events_col_ref: Firestore collection reference for events
|
||||
session_ref: Firestore document reference for the session
|
||||
|
||||
"""
|
||||
key = f"{session.app_name}__{session.user_id}__{session.id}"
|
||||
lock = self._compaction_locks.setdefault(key, asyncio.Lock())
|
||||
|
||||
if lock.locked():
|
||||
logger.debug("Compaction already running locally for %s; skipping.", key)
|
||||
return
|
||||
|
||||
async with lock:
|
||||
try:
|
||||
transaction = self._db.transaction()
|
||||
claimed = await _try_claim_compaction_txn(transaction, session_ref)
|
||||
except Exception:
|
||||
logger.exception("Failed to claim compaction lock for %s", key)
|
||||
return
|
||||
|
||||
if not claimed:
|
||||
logger.debug(
|
||||
"Compaction lock held by another instance for %s; skipping.",
|
||||
key,
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
await self._compact_session(session, events_col_ref, session_ref)
|
||||
except Exception:
|
||||
logger.exception("Background compaction failed for %s", key)
|
||||
finally:
|
||||
try:
|
||||
await session_ref.update({"compaction_lock": None})
|
||||
except Exception:
|
||||
logger.exception("Failed to release compaction lock for %s", key)
|
||||
Reference in New Issue
Block a user