Add echo client to app

This commit is contained in:
2026-02-20 05:42:50 +00:00
committed by Anibal Angulo
parent 825c4a8249
commit 94226ba913
9 changed files with 286 additions and 5 deletions

View File

@@ -17,6 +17,9 @@ class Settings(BaseSettings):
# RAG
rag_endpoint_url: str
rag_echo_enabled: bool = Field(
default=False, alias="RAG_ECHO_ENABLED",
)
# Firestore
firestore_database_id: str = Field(..., alias="GCP_FIRESTORE_DATABASE_ID")

View File

@@ -2,7 +2,11 @@
from functools import lru_cache
from rag_client import HTTPRAGService, RAGServiceBase
from capa_de_integracion.services.rag import (
EchoRAGService,
HTTPRAGService,
RAGServiceBase,
)
from .config import Settings, settings
from .services import (
@@ -53,6 +57,8 @@ def get_notification_manager() -> NotificationManagerService:
@lru_cache(maxsize=1)
def get_rag_service() -> RAGServiceBase:
"""Get RAG service instance."""
if settings.rag_echo_enabled:
return EchoRAGService()
return HTTPRAGService(
endpoint_url=settings.rag_endpoint_url,
max_connections=100,

View File

@@ -5,8 +5,6 @@ import re
from datetime import UTC, datetime, timedelta
from uuid import uuid4
from rag_client import RAGServiceBase
from capa_de_integracion.config import Settings
from capa_de_integracion.models import (
ConversationEntry,
@@ -16,6 +14,7 @@ from capa_de_integracion.models import (
QueryResult,
)
from capa_de_integracion.models.notification import NotificationSession
from capa_de_integracion.services.rag import RAGServiceBase
from .dlp_service import DLPService
from .firestore_service import FirestoreService

View File

@@ -4,6 +4,7 @@ import logging
from datetime import UTC, datetime
from google.cloud import firestore
from google.cloud.firestore_v1.base_query import FieldFilter
from capa_de_integracion.config import Settings
from capa_de_integracion.models import ConversationEntry, ConversationSession
@@ -78,7 +79,7 @@ class FirestoreService:
try:
query = (
self.db.collection(self.conversations_collection)
.where("telefono", "==", telefono)
.where(filter=FieldFilter("telefono", "==", telefono))
.order_by("lastModified", direction=firestore.Query.DESCENDING)
.limit(1)
)

View File

@@ -0,0 +1,19 @@
"""RAG service implementations."""
from capa_de_integracion.services.rag.base import (
Message,
RAGRequest,
RAGResponse,
RAGServiceBase,
)
from capa_de_integracion.services.rag.echo import EchoRAGService
from capa_de_integracion.services.rag.http import HTTPRAGService
__all__ = [
"EchoRAGService",
"HTTPRAGService",
"Message",
"RAGRequest",
"RAGResponse",
"RAGServiceBase",
]

View File

@@ -0,0 +1,69 @@
"""Base RAG service interface."""
from abc import ABC, abstractmethod
from types import TracebackType
from typing import Self
from pydantic import BaseModel, Field
class Message(BaseModel):
"""OpenAI-style message format."""
role: str = Field(..., description="Role: system, user, or assistant")
content: str = Field(..., description="Message content")
class RAGRequest(BaseModel):
"""Request model for RAG endpoint."""
messages: list[Message] = Field(..., description="Conversation history")
class RAGResponse(BaseModel):
"""Response model from RAG endpoint."""
response: str = Field(..., description="Generated response from RAG")
class RAGServiceBase(ABC):
"""Abstract base class for RAG service implementations.
Provides a common interface for different RAG service backends
(HTTP, mock, echo, etc.).
"""
@abstractmethod
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
Response string from RAG endpoint
Raises:
Exception: Implementation-specific exceptions
"""
...
@abstractmethod
async def close(self) -> None:
"""Close the service and release resources."""
...
async def __aenter__(self) -> Self:
"""Async context manager entry."""
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Async context manager exit."""
await self.close()

View File

@@ -0,0 +1,64 @@
"""Echo RAG service implementation for testing."""
import logging
from capa_de_integracion.services.rag.base import RAGServiceBase
logger = logging.getLogger(__name__)
# Error messages
_ERR_NO_MESSAGES = "No messages provided"
_ERR_NO_USER_MESSAGE = "No user message found in conversation history"
class EchoRAGService(RAGServiceBase):
"""Echo RAG service that returns the last user message.
Useful for testing and development without needing a real RAG endpoint.
Simply echoes back the content of the last user message with an optional prefix.
"""
def __init__(self, prefix: str = "Echo: ") -> None:
"""Initialize Echo RAG service.
Args:
prefix: Prefix to add to echoed messages (default: "Echo: ")
"""
self.prefix = prefix
logger.info("EchoRAGService initialized with prefix: %r", prefix)
async def query(self, messages: list[dict[str, str]]) -> str:
"""Echo back the last user message with a prefix.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
The last user message content with prefix
Raises:
ValueError: If no messages or no user messages found
"""
if not messages:
raise ValueError(_ERR_NO_MESSAGES)
# Find the last user message
last_user_message = None
for msg in reversed(messages):
if msg.get("role") == "user":
last_user_message = msg.get("content", "")
break
if last_user_message is None:
raise ValueError(_ERR_NO_USER_MESSAGE)
response = f"{self.prefix}{last_user_message}"
logger.debug("Echo response: %s", response)
return response
async def close(self) -> None:
"""Close the service (no-op for echo service)."""
logger.info("EchoRAGService closed")

View File

@@ -0,0 +1,120 @@
"""HTTP-based RAG service implementation."""
import logging
import httpx
from capa_de_integracion.services.rag.base import (
Message,
RAGRequest,
RAGResponse,
RAGServiceBase,
)
logger = logging.getLogger(__name__)
class HTTPRAGService(RAGServiceBase):
"""HTTP-based RAG service with high concurrency support.
Uses httpx AsyncClient with connection pooling for optimal performance
when handling multiple concurrent requests.
"""
def __init__(
self,
endpoint_url: str,
max_connections: int = 100,
max_keepalive_connections: int = 20,
timeout: float = 30.0,
) -> None:
"""Initialize HTTP RAG service with connection pooling.
Args:
endpoint_url: URL of the RAG endpoint
max_connections: Maximum number of concurrent connections
max_keepalive_connections: Maximum number of idle connections to keep alive
timeout: Request timeout in seconds
"""
self.endpoint_url = endpoint_url
self.timeout = timeout
# Configure connection limits for high concurrency
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
)
# Create async client with connection pooling
self._client = httpx.AsyncClient(
limits=limits,
timeout=httpx.Timeout(timeout),
http2=True, # Enable HTTP/2 for better performance
)
logger.info(
"HTTPRAGService initialized with endpoint: %s, "
"max_connections: %s, timeout: %ss",
self.endpoint_url,
max_connections,
timeout,
)
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
Response string from RAG endpoint
Raises:
httpx.HTTPError: If HTTP request fails
ValueError: If response format is invalid
"""
try:
# Validate and construct request
message_objects = [Message(**msg) for msg in messages]
request = RAGRequest(messages=message_objects)
# Make async HTTP POST request
logger.debug("Sending RAG request with %s messages", len(messages))
response = await self._client.post(
self.endpoint_url,
json=request.model_dump(),
headers={"Content-Type": "application/json"},
)
# Raise exception for HTTP errors
response.raise_for_status()
# Parse response
response_data = response.json()
rag_response = RAGResponse(**response_data)
logger.debug("RAG response received: %s chars", len(rag_response.response))
except httpx.HTTPStatusError as e:
logger.exception(
"HTTP error calling RAG endpoint: %s - %s",
e.response.status_code,
e.response.text,
)
raise
except httpx.RequestError:
logger.exception("Request error calling RAG endpoint:")
raise
except Exception:
logger.exception("Unexpected error calling RAG endpoint")
raise
else:
return rag_response.response
async def close(self) -> None:
"""Close the HTTP client and release connections."""
await self._client.aclose()
logger.info("HTTPRAGService client closed")

View File

@@ -41,7 +41,7 @@ class RedisService:
async def close(self) -> None:
"""Close Redis connection."""
if self.redis:
await self.redis.close()
await self.redis.aclose()
logger.info("Redis connection closed")
def _session_key(self, session_id: str) -> str: