diff --git a/src/capa_de_integracion/config.py b/src/capa_de_integracion/config.py index 766cdb8..9d4fa6e 100644 --- a/src/capa_de_integracion/config.py +++ b/src/capa_de_integracion/config.py @@ -17,6 +17,9 @@ class Settings(BaseSettings): # RAG rag_endpoint_url: str + rag_echo_enabled: bool = Field( + default=False, alias="RAG_ECHO_ENABLED", + ) # Firestore firestore_database_id: str = Field(..., alias="GCP_FIRESTORE_DATABASE_ID") diff --git a/src/capa_de_integracion/dependencies.py b/src/capa_de_integracion/dependencies.py index effbff4..286d93a 100644 --- a/src/capa_de_integracion/dependencies.py +++ b/src/capa_de_integracion/dependencies.py @@ -2,7 +2,11 @@ from functools import lru_cache -from rag_client import HTTPRAGService, RAGServiceBase +from capa_de_integracion.services.rag import ( + EchoRAGService, + HTTPRAGService, + RAGServiceBase, +) from .config import Settings, settings from .services import ( @@ -53,6 +57,8 @@ def get_notification_manager() -> NotificationManagerService: @lru_cache(maxsize=1) def get_rag_service() -> RAGServiceBase: """Get RAG service instance.""" + if settings.rag_echo_enabled: + return EchoRAGService() return HTTPRAGService( endpoint_url=settings.rag_endpoint_url, max_connections=100, diff --git a/src/capa_de_integracion/services/conversation_manager.py b/src/capa_de_integracion/services/conversation_manager.py index 1eaff93..7354326 100644 --- a/src/capa_de_integracion/services/conversation_manager.py +++ b/src/capa_de_integracion/services/conversation_manager.py @@ -5,8 +5,6 @@ import re from datetime import UTC, datetime, timedelta from uuid import uuid4 -from rag_client import RAGServiceBase - from capa_de_integracion.config import Settings from capa_de_integracion.models import ( ConversationEntry, @@ -16,6 +14,7 @@ from capa_de_integracion.models import ( QueryResult, ) from capa_de_integracion.models.notification import NotificationSession +from capa_de_integracion.services.rag import RAGServiceBase from .dlp_service import DLPService from .firestore_service import FirestoreService diff --git a/src/capa_de_integracion/services/firestore_service.py b/src/capa_de_integracion/services/firestore_service.py index 7c94652..960a76e 100644 --- a/src/capa_de_integracion/services/firestore_service.py +++ b/src/capa_de_integracion/services/firestore_service.py @@ -4,6 +4,7 @@ import logging from datetime import UTC, datetime from google.cloud import firestore +from google.cloud.firestore_v1.base_query import FieldFilter from capa_de_integracion.config import Settings from capa_de_integracion.models import ConversationEntry, ConversationSession @@ -78,7 +79,7 @@ class FirestoreService: try: query = ( self.db.collection(self.conversations_collection) - .where("telefono", "==", telefono) + .where(filter=FieldFilter("telefono", "==", telefono)) .order_by("lastModified", direction=firestore.Query.DESCENDING) .limit(1) ) diff --git a/src/capa_de_integracion/services/rag/__init__.py b/src/capa_de_integracion/services/rag/__init__.py new file mode 100644 index 0000000..c54cbbc --- /dev/null +++ b/src/capa_de_integracion/services/rag/__init__.py @@ -0,0 +1,19 @@ +"""RAG service implementations.""" + +from capa_de_integracion.services.rag.base import ( + Message, + RAGRequest, + RAGResponse, + RAGServiceBase, +) +from capa_de_integracion.services.rag.echo import EchoRAGService +from capa_de_integracion.services.rag.http import HTTPRAGService + +__all__ = [ + "EchoRAGService", + "HTTPRAGService", + "Message", + "RAGRequest", + "RAGResponse", + "RAGServiceBase", +] diff --git a/src/capa_de_integracion/services/rag/base.py b/src/capa_de_integracion/services/rag/base.py new file mode 100644 index 0000000..8cb62eb --- /dev/null +++ b/src/capa_de_integracion/services/rag/base.py @@ -0,0 +1,69 @@ +"""Base RAG service interface.""" + +from abc import ABC, abstractmethod +from types import TracebackType +from typing import Self + +from pydantic import BaseModel, Field + + +class Message(BaseModel): + """OpenAI-style message format.""" + + role: str = Field(..., description="Role: system, user, or assistant") + content: str = Field(..., description="Message content") + + +class RAGRequest(BaseModel): + """Request model for RAG endpoint.""" + + messages: list[Message] = Field(..., description="Conversation history") + + +class RAGResponse(BaseModel): + """Response model from RAG endpoint.""" + + response: str = Field(..., description="Generated response from RAG") + + +class RAGServiceBase(ABC): + """Abstract base class for RAG service implementations. + + Provides a common interface for different RAG service backends + (HTTP, mock, echo, etc.). + """ + + @abstractmethod + async def query(self, messages: list[dict[str, str]]) -> str: + """Send conversation history to RAG endpoint and get response. + + Args: + messages: OpenAI-style conversation history + e.g., [{"role": "user", "content": "Hello"}, ...] + + Returns: + Response string from RAG endpoint + + Raises: + Exception: Implementation-specific exceptions + + """ + ... + + @abstractmethod + async def close(self) -> None: + """Close the service and release resources.""" + ... + + async def __aenter__(self) -> Self: + """Async context manager entry.""" + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Async context manager exit.""" + await self.close() diff --git a/src/capa_de_integracion/services/rag/echo.py b/src/capa_de_integracion/services/rag/echo.py new file mode 100644 index 0000000..0991556 --- /dev/null +++ b/src/capa_de_integracion/services/rag/echo.py @@ -0,0 +1,64 @@ +"""Echo RAG service implementation for testing.""" + +import logging + +from capa_de_integracion.services.rag.base import RAGServiceBase + +logger = logging.getLogger(__name__) + +# Error messages +_ERR_NO_MESSAGES = "No messages provided" +_ERR_NO_USER_MESSAGE = "No user message found in conversation history" + + +class EchoRAGService(RAGServiceBase): + """Echo RAG service that returns the last user message. + + Useful for testing and development without needing a real RAG endpoint. + Simply echoes back the content of the last user message with an optional prefix. + """ + + def __init__(self, prefix: str = "Echo: ") -> None: + """Initialize Echo RAG service. + + Args: + prefix: Prefix to add to echoed messages (default: "Echo: ") + + """ + self.prefix = prefix + logger.info("EchoRAGService initialized with prefix: %r", prefix) + + async def query(self, messages: list[dict[str, str]]) -> str: + """Echo back the last user message with a prefix. + + Args: + messages: OpenAI-style conversation history + e.g., [{"role": "user", "content": "Hello"}, ...] + + Returns: + The last user message content with prefix + + Raises: + ValueError: If no messages or no user messages found + + """ + if not messages: + raise ValueError(_ERR_NO_MESSAGES) + + # Find the last user message + last_user_message = None + for msg in reversed(messages): + if msg.get("role") == "user": + last_user_message = msg.get("content", "") + break + + if last_user_message is None: + raise ValueError(_ERR_NO_USER_MESSAGE) + + response = f"{self.prefix}{last_user_message}" + logger.debug("Echo response: %s", response) + return response + + async def close(self) -> None: + """Close the service (no-op for echo service).""" + logger.info("EchoRAGService closed") diff --git a/src/capa_de_integracion/services/rag/http.py b/src/capa_de_integracion/services/rag/http.py new file mode 100644 index 0000000..376c085 --- /dev/null +++ b/src/capa_de_integracion/services/rag/http.py @@ -0,0 +1,120 @@ +"""HTTP-based RAG service implementation.""" + +import logging + +import httpx + +from capa_de_integracion.services.rag.base import ( + Message, + RAGRequest, + RAGResponse, + RAGServiceBase, +) + +logger = logging.getLogger(__name__) + + +class HTTPRAGService(RAGServiceBase): + """HTTP-based RAG service with high concurrency support. + + Uses httpx AsyncClient with connection pooling for optimal performance + when handling multiple concurrent requests. + """ + + def __init__( + self, + endpoint_url: str, + max_connections: int = 100, + max_keepalive_connections: int = 20, + timeout: float = 30.0, + ) -> None: + """Initialize HTTP RAG service with connection pooling. + + Args: + endpoint_url: URL of the RAG endpoint + max_connections: Maximum number of concurrent connections + max_keepalive_connections: Maximum number of idle connections to keep alive + timeout: Request timeout in seconds + + """ + self.endpoint_url = endpoint_url + self.timeout = timeout + + # Configure connection limits for high concurrency + limits = httpx.Limits( + max_connections=max_connections, + max_keepalive_connections=max_keepalive_connections, + ) + + # Create async client with connection pooling + self._client = httpx.AsyncClient( + limits=limits, + timeout=httpx.Timeout(timeout), + http2=True, # Enable HTTP/2 for better performance + ) + + logger.info( + "HTTPRAGService initialized with endpoint: %s, " + "max_connections: %s, timeout: %ss", + self.endpoint_url, + max_connections, + timeout, + ) + + async def query(self, messages: list[dict[str, str]]) -> str: + """Send conversation history to RAG endpoint and get response. + + Args: + messages: OpenAI-style conversation history + e.g., [{"role": "user", "content": "Hello"}, ...] + + Returns: + Response string from RAG endpoint + + Raises: + httpx.HTTPError: If HTTP request fails + ValueError: If response format is invalid + + """ + try: + # Validate and construct request + message_objects = [Message(**msg) for msg in messages] + request = RAGRequest(messages=message_objects) + + # Make async HTTP POST request + logger.debug("Sending RAG request with %s messages", len(messages)) + + response = await self._client.post( + self.endpoint_url, + json=request.model_dump(), + headers={"Content-Type": "application/json"}, + ) + + # Raise exception for HTTP errors + response.raise_for_status() + + # Parse response + response_data = response.json() + rag_response = RAGResponse(**response_data) + + logger.debug("RAG response received: %s chars", len(rag_response.response)) + except httpx.HTTPStatusError as e: + logger.exception( + "HTTP error calling RAG endpoint: %s - %s", + e.response.status_code, + e.response.text, + ) + raise + except httpx.RequestError: + logger.exception("Request error calling RAG endpoint:") + raise + except Exception: + logger.exception("Unexpected error calling RAG endpoint") + raise + else: + return rag_response.response + + async def close(self) -> None: + """Close the HTTP client and release connections.""" + await self._client.aclose() + logger.info("HTTPRAGService client closed") diff --git a/src/capa_de_integracion/services/redis_service.py b/src/capa_de_integracion/services/redis_service.py index 0f86a0c..5d6f828 100644 --- a/src/capa_de_integracion/services/redis_service.py +++ b/src/capa_de_integracion/services/redis_service.py @@ -41,7 +41,7 @@ class RedisService: async def close(self) -> None: """Close Redis connection.""" if self.redis: - await self.redis.close() + await self.redis.aclose() logger.info("Redis connection closed") def _session_key(self, session_id: str) -> str: