# /// script # requires-python = ">=3.11" # dependencies = ["httpx", "rich"] # /// """Send a message to the local RAG agent server. Usage: uv run utils/send_query.py "Hola, ¿cómo estás?" uv run utils/send_query.py --phone 5551234 "¿Qué servicios ofrecen?" uv run utils/send_query.py --base-url http://localhost:8080 "Hola" uv run utils/send_query.py -i # interactive chat mode """ from __future__ import annotations import argparse import httpx from rich import print as rprint from rich.console import Console console = Console() def send_message(url: str, phone: str, text: str) -> dict: payload = { "phone_number": phone, "text": text, "type": "conversation", "language_code": "es", } resp = httpx.post(url, json=payload, timeout=120) resp.raise_for_status() return resp.json() def one_shot(url: str, phone: str, text: str) -> None: rprint(f"[bold]POST[/bold] {url}") rprint(f"[dim]{{'phone_number': {phone!r}, 'text': {text!r}}}[/dim]\n") data = send_message(url, phone, text) rprint(f"[green bold]Response ([/green bold]{data['response_id']}[green bold]):[/green bold]") rprint(data["response_text"]) def interactive(url: str, phone: str) -> None: rprint(f"[bold cyan]Interactive chat[/bold cyan] → {url} (session: {phone})") rprint("[dim]Type /quit or Ctrl-C to exit[/dim]\n") while True: try: text = console.input("[bold yellow]You>[/bold yellow] ").strip() except (EOFError, KeyboardInterrupt): rprint("\n[dim]Bye![/dim]") break if not text: continue if text.lower() in {"/quit", "/exit", "/q"}: rprint("[dim]Bye![/dim]") break try: data = send_message(url, phone, text) rprint(f"[green bold]Agent>[/green bold] {data['response_text']}\n") except httpx.HTTPStatusError as exc: rprint(f"[red bold]Error {exc.response.status_code}:[/red bold] {exc.response.text}\n") except httpx.ConnectError: rprint("[red bold]Connection error:[/red bold] could not reach the server\n") def main() -> None: parser = argparse.ArgumentParser(description="Send a query to the RAG agent") parser.add_argument("text", nargs="?", default=None, help="Message to send (omit for interactive mode)") parser.add_argument("-i", "--interactive", action="store_true", help="Start interactive chat session") parser.add_argument("--phone", default="test-user", help="Phone number / session id") parser.add_argument("--base-url", default="http://localhost:8000", help="Server base URL") args = parser.parse_args() url = f"{args.base_url}/api/v1/query" if args.interactive or args.text is None: interactive(url, args.phone) else: one_shot(url, args.phone, args.text) if __name__ == "__main__": main()