Create rag-client package

This commit is contained in:
2026-02-20 05:11:24 +00:00
committed by Anibal Angulo
parent f4eae2e2b5
commit f0adce2c3c
16 changed files with 1325 additions and 5 deletions

View File

View File

@@ -0,0 +1,17 @@
[project]
name = "rag-client"
version = "0.1.0"
description = "RAG client library with HTTP and Echo implementations"
readme = "README.md"
authors = [
{ name = "A8065384", email = "anibal.angulo.cardoza@banorte.com" }
]
requires-python = ">=3.12"
dependencies = [
"httpx>=0.27.0",
"pydantic>=2.0.0",
]
[build-system]
requires = ["uv_build>=0.9.22,<0.10.0"]
build-backend = "uv_build"

View File

@@ -0,0 +1,19 @@
"""RAG client package for interacting with RAG services."""
from rag_client.base import (
Message,
RAGRequest,
RAGResponse,
RAGServiceBase,
)
from rag_client.echo import EchoRAGService
from rag_client.http import HTTPRAGService
__all__ = [
"EchoRAGService",
"HTTPRAGService",
"Message",
"RAGRequest",
"RAGResponse",
"RAGServiceBase",
]

View File

@@ -0,0 +1,69 @@
"""Base RAG service interface."""
from abc import ABC, abstractmethod
from types import TracebackType
from typing import Self
from pydantic import BaseModel, Field
class Message(BaseModel):
"""OpenAI-style message format."""
role: str = Field(..., description="Role: system, user, or assistant")
content: str = Field(..., description="Message content")
class RAGRequest(BaseModel):
"""Request model for RAG endpoint."""
messages: list[Message] = Field(..., description="Conversation history")
class RAGResponse(BaseModel):
"""Response model from RAG endpoint."""
response: str = Field(..., description="Generated response from RAG")
class RAGServiceBase(ABC):
"""Abstract base class for RAG service implementations.
Provides a common interface for different RAG service backends
(HTTP, mock, echo, etc.).
"""
@abstractmethod
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
Response string from RAG endpoint
Raises:
Exception: Implementation-specific exceptions
"""
...
@abstractmethod
async def close(self) -> None:
"""Close the service and release resources."""
...
async def __aenter__(self) -> Self:
"""Async context manager entry."""
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Async context manager exit."""
await self.close()

View File

@@ -0,0 +1,64 @@
"""Echo RAG service implementation for testing."""
import logging
from rag_client.base import RAGServiceBase
logger = logging.getLogger(__name__)
# Error messages
_ERR_NO_MESSAGES = "No messages provided"
_ERR_NO_USER_MESSAGE = "No user message found in conversation history"
class EchoRAGService(RAGServiceBase):
"""Echo RAG service that returns the last user message.
Useful for testing and development without needing a real RAG endpoint.
Simply echoes back the content of the last user message with an optional prefix.
"""
def __init__(self, prefix: str = "Echo: ") -> None:
"""Initialize Echo RAG service.
Args:
prefix: Prefix to add to echoed messages (default: "Echo: ")
"""
self.prefix = prefix
logger.info("EchoRAGService initialized with prefix: %r", prefix)
async def query(self, messages: list[dict[str, str]]) -> str:
"""Echo back the last user message with a prefix.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
The last user message content with prefix
Raises:
ValueError: If no messages or no user messages found
"""
if not messages:
raise ValueError(_ERR_NO_MESSAGES)
# Find the last user message
last_user_message = None
for msg in reversed(messages):
if msg.get("role") == "user":
last_user_message = msg.get("content", "")
break
if last_user_message is None:
raise ValueError(_ERR_NO_USER_MESSAGE)
response = f"{self.prefix}{last_user_message}"
logger.debug("Echo response: %s", response)
return response
async def close(self) -> None:
"""Close the service (no-op for echo service)."""
logger.info("EchoRAGService closed")

View File

@@ -0,0 +1,115 @@
"""HTTP-based RAG service implementation."""
import logging
import httpx
from rag_client.base import Message, RAGRequest, RAGResponse, RAGServiceBase
logger = logging.getLogger(__name__)
class HTTPRAGService(RAGServiceBase):
"""HTTP-based RAG service with high concurrency support.
Uses httpx AsyncClient with connection pooling for optimal performance
when handling multiple concurrent requests.
"""
def __init__(
self,
endpoint_url: str,
max_connections: int = 100,
max_keepalive_connections: int = 20,
timeout: float = 30.0,
) -> None:
"""Initialize HTTP RAG service with connection pooling.
Args:
endpoint_url: URL of the RAG endpoint
max_connections: Maximum number of concurrent connections
max_keepalive_connections: Maximum number of idle connections to keep alive
timeout: Request timeout in seconds
"""
self.endpoint_url = endpoint_url
self.timeout = timeout
# Configure connection limits for high concurrency
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
)
# Create async client with connection pooling
self._client = httpx.AsyncClient(
limits=limits,
timeout=httpx.Timeout(timeout),
http2=True, # Enable HTTP/2 for better performance
)
logger.info(
"HTTPRAGService initialized with endpoint: %s, "
"max_connections: %s, timeout: %ss",
self.endpoint_url,
max_connections,
timeout,
)
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
Response string from RAG endpoint
Raises:
httpx.HTTPError: If HTTP request fails
ValueError: If response format is invalid
"""
try:
# Validate and construct request
message_objects = [Message(**msg) for msg in messages]
request = RAGRequest(messages=message_objects)
# Make async HTTP POST request
logger.debug("Sending RAG request with %s messages", len(messages))
response = await self._client.post(
self.endpoint_url,
json=request.model_dump(),
headers={"Content-Type": "application/json"},
)
# Raise exception for HTTP errors
response.raise_for_status()
# Parse response
response_data = response.json()
rag_response = RAGResponse(**response_data)
logger.debug("RAG response received: %s chars", len(rag_response.response))
except httpx.HTTPStatusError as e:
logger.exception(
"HTTP error calling RAG endpoint: %s - %s",
e.response.status_code,
e.response.text,
)
raise
except httpx.RequestError:
logger.exception("Request error calling RAG endpoint:")
raise
except Exception:
logger.exception("Unexpected error calling RAG endpoint")
raise
else:
return rag_response.response
async def close(self) -> None:
"""Close the HTTP client and release connections."""
await self._client.aclose()
logger.info("HTTPRAGService client closed")