"""Session compaction utilities for managing conversation history.""" from __future__ import annotations import asyncio import logging import time from typing import TYPE_CHECKING, Any from google.adk.events.event import Event from google.cloud.firestore_v1.async_transaction import async_transactional if TYPE_CHECKING: from google import genai from google.adk.sessions.session import Session from google.cloud.firestore_v1.async_client import AsyncClient logger = logging.getLogger("google_adk." + __name__) _COMPACTION_LOCK_TTL = 300 # seconds @async_transactional async def _try_claim_compaction_txn(transaction: Any, session_ref: Any) -> bool: """Atomically claim the compaction lock if it is free or stale.""" snapshot = await session_ref.get(transaction=transaction) if not snapshot.exists: return False data = snapshot.to_dict() or {} lock_time = data.get("compaction_lock") if lock_time and (time.time() - lock_time) < _COMPACTION_LOCK_TTL: return False transaction.update(session_ref, {"compaction_lock": time.time()}) return True class SessionCompactor: """Handles conversation history compaction for Firestore sessions. This class manages the automatic summarization and archival of older conversation events to keep token counts manageable while preserving context through AI-generated summaries. """ def __init__( self, *, db: AsyncClient, genai_client: genai.Client | None = None, compaction_model: str = "gemini-2.5-flash", compaction_keep_recent: int = 10, ) -> None: """Initialize SessionCompactor. Args: db: Firestore async client genai_client: GenAI client for generating summaries compaction_model: Model to use for summarization compaction_keep_recent: Number of recent events to keep uncompacted """ self._db = db self._genai_client = genai_client self._compaction_model = compaction_model self._compaction_keep_recent = compaction_keep_recent self._compaction_locks: dict[str, asyncio.Lock] = {} @staticmethod def _events_to_text(events: list[Event]) -> str: """Convert a list of events to a readable conversation text format.""" lines: list[str] = [] for event in events: if event.content and event.content.parts: text = "".join(p.text or "" for p in event.content.parts) if text: role = "User" if event.author == "user" else "Assistant" lines.append(f"{role}: {text}") return "\n\n".join(lines) async def _generate_summary( self, existing_summary: str, events: list[Event] ) -> str: """Generate or update a conversation summary using the GenAI model.""" conversation_text = self._events_to_text(events) previous = ( f"Previous summary of earlier conversation:\n{existing_summary}\n\n" if existing_summary else "" ) prompt = ( "Summarize the following conversation between a user and an " "assistant. Preserve:\n" "- Key decisions and conclusions\n" "- User preferences and requirements\n" "- Important facts, names, and numbers\n" "- The overall topic and direction of the conversation\n" "- Any pending tasks or open questions\n\n" f"{previous}" f"Conversation:\n{conversation_text}\n\n" "Provide a clear, comprehensive summary." ) if self._genai_client is None: msg = "genai_client is required for compaction" raise RuntimeError(msg) response = await self._genai_client.aio.models.generate_content( model=self._compaction_model, contents=prompt, ) return response.text or "" async def _compact_session( self, session: Session, events_col_ref: Any, session_ref: Any, ) -> None: """Perform the actual compaction: summarize old events and delete them. Args: session: The session to compact events_col_ref: Firestore collection reference for events session_ref: Firestore document reference for the session """ query = events_col_ref.order_by("timestamp") event_docs = await query.get() if len(event_docs) <= self._compaction_keep_recent: return all_events = [Event.model_validate(doc.to_dict()) for doc in event_docs] events_to_summarize = all_events[: -self._compaction_keep_recent] session_snap = await session_ref.get() existing_summary = (session_snap.to_dict() or {}).get( "conversation_summary", "" ) try: summary = await self._generate_summary( existing_summary, events_to_summarize ) except Exception: logger.exception("Compaction summary generation failed; skipping.") return # Write summary BEFORE deleting events so a crash between the two # steps leaves safe duplication rather than data loss. await session_ref.update({"conversation_summary": summary}) docs_to_delete = event_docs[: -self._compaction_keep_recent] for i in range(0, len(docs_to_delete), 500): batch = self._db.batch() for doc in docs_to_delete[i : i + 500]: batch.delete(doc.reference) await batch.commit() logger.info( "Compacted session %s: summarised %d events, kept %d.", session.id, len(docs_to_delete), self._compaction_keep_recent, ) async def guarded_compact( self, session: Session, events_col_ref: Any, session_ref: Any, ) -> None: """Run compaction in the background with per-session locking. This method ensures that only one compaction process runs at a time for a given session, both locally (using asyncio locks) and across multiple instances (using Firestore-backed locks). Args: session: The session to compact events_col_ref: Firestore collection reference for events session_ref: Firestore document reference for the session """ key = f"{session.app_name}__{session.user_id}__{session.id}" lock = self._compaction_locks.setdefault(key, asyncio.Lock()) if lock.locked(): logger.debug("Compaction already running locally for %s; skipping.", key) return async with lock: try: transaction = self._db.transaction() claimed = await _try_claim_compaction_txn(transaction, session_ref) except Exception: logger.exception("Failed to claim compaction lock for %s", key) return if not claimed: logger.debug( "Compaction lock held by another instance for %s; skipping.", key, ) return try: await self._compact_session(session, events_col_ref, session_ref) except Exception: logger.exception("Background compaction failed for %s", key) finally: try: await session_ref.update({"compaction_lock": None}) except Exception: logger.exception("Failed to release compaction lock for %s", key)