#!/usr/bin/env python3 """Minimal CLI chat agent to test FirestoreSessionService.""" import asyncio from google.adk.agents import LlmAgent from google.adk.runners import Runner from google.cloud.firestore_v1.async_client import AsyncClient from google.genai import types from adk_firestore_sessionmanager import FirestoreSessionService APP_NAME = "test_agent" USER_ID = "dev_user" root_agent = LlmAgent( name=APP_NAME, model="gemini-2.5-flash", instruction="You are a helpful conversational assistant.", ) async def main() -> None: db = AsyncClient() session_service = FirestoreSessionService(db=db) runner = Runner( app_name=APP_NAME, agent=root_agent, session_service=session_service, ) session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, ) print(f"Session {session.id} created. Type 'exit' to quit.\n") while True: try: user_input = input("You: ").strip() except (EOFError, KeyboardInterrupt): break if not user_input or user_input.lower() == "exit": break async for event in runner.run_async( user_id=USER_ID, session_id=session.id, new_message=types.Content( role="user", parts=[types.Part(text=user_input)] ), ): if event.content and event.content.parts and not event.partial: text = "".join(p.text or "" for p in event.content.parts) if text: print(f"Agent: {text}") await runner.close() print("\nGoodbye!") if __name__ == "__main__": asyncio.run(main())