Files
agent/utils/send_query.py
Anibal Angulo 2f9d2020c0 Testing prompt
2026-02-23 23:31:50 +00:00

86 lines
2.9 KiB
Python

# /// script
# requires-python = ">=3.11"
# dependencies = ["httpx", "rich"]
# ///
"""Send a message to the local RAG agent server.
Usage:
uv run utils/send_query.py "Hola, ¿cómo estás?"
uv run utils/send_query.py --phone 5551234 "¿Qué servicios ofrecen?"
uv run utils/send_query.py --base-url http://localhost:8080 "Hola"
uv run utils/send_query.py -i # interactive chat mode
"""
from __future__ import annotations
import argparse
import httpx
from rich import print as rprint
from rich.console import Console
console = Console()
def send_message(url: str, phone: str, text: str) -> dict:
payload = {
"phone_number": phone,
"text": text,
"type": "conversation",
"language_code": "es",
}
resp = httpx.post(url, json=payload, timeout=120)
resp.raise_for_status()
return resp.json()
def one_shot(url: str, phone: str, text: str) -> None:
rprint(f"[bold]POST[/bold] {url}")
rprint(f"[dim]{{'phone_number': {phone!r}, 'text': {text!r}}}[/dim]\n")
data = send_message(url, phone, text)
rprint(f"[green bold]Response ([/green bold]{data['response_id']}[green bold]):[/green bold]")
rprint(data["response_text"])
def interactive(url: str, phone: str) -> None:
rprint(f"[bold cyan]Interactive chat[/bold cyan] → {url} (session: {phone})")
rprint("[dim]Type /quit or Ctrl-C to exit[/dim]\n")
while True:
try:
text = console.input("[bold yellow]You>[/bold yellow] ").strip()
except (EOFError, KeyboardInterrupt):
rprint("\n[dim]Bye![/dim]")
break
if not text:
continue
if text.lower() in {"/quit", "/exit", "/q"}:
rprint("[dim]Bye![/dim]")
break
try:
data = send_message(url, phone, text)
rprint(f"[green bold]Agent>[/green bold] {data['response_text']}\n")
except httpx.HTTPStatusError as exc:
rprint(f"[red bold]Error {exc.response.status_code}:[/red bold] {exc.response.text}\n")
except httpx.ConnectError:
rprint("[red bold]Connection error:[/red bold] could not reach the server\n")
def main() -> None:
parser = argparse.ArgumentParser(description="Send a query to the RAG agent")
parser.add_argument("text", nargs="?", default=None, help="Message to send (omit for interactive mode)")
parser.add_argument("-i", "--interactive", action="store_true", help="Start interactive chat session")
parser.add_argument("--phone", default="test-user", help="Phone number / session id")
parser.add_argument("--base-url", default="http://localhost:8000", help="Server base URL")
args = parser.parse_args()
url = f"{args.base_url}/api/v1/query"
if args.interactive or args.text is None:
interactive(url, args.phone)
else:
one_shot(url, args.phone, args.text)
if __name__ == "__main__":
main()