Create rag-client package

This commit is contained in:
2026-02-20 05:11:24 +00:00
parent d663394106
commit b86dfe7373
16 changed files with 1325 additions and 5 deletions

View File

View File

@@ -0,0 +1,17 @@
[project]
name = "rag-client"
version = "0.1.0"
description = "RAG client library with HTTP and Echo implementations"
readme = "README.md"
authors = [
{ name = "A8065384", email = "anibal.angulo.cardoza@banorte.com" }
]
requires-python = ">=3.12"
dependencies = [
"httpx>=0.27.0",
"pydantic>=2.0.0",
]
[build-system]
requires = ["uv_build>=0.9.22,<0.10.0"]
build-backend = "uv_build"

View File

@@ -0,0 +1,19 @@
"""RAG client package for interacting with RAG services."""
from rag_client.base import (
Message,
RAGRequest,
RAGResponse,
RAGServiceBase,
)
from rag_client.echo import EchoRAGService
from rag_client.http import HTTPRAGService
__all__ = [
"EchoRAGService",
"HTTPRAGService",
"Message",
"RAGRequest",
"RAGResponse",
"RAGServiceBase",
]

View File

@@ -0,0 +1,69 @@
"""Base RAG service interface."""
from abc import ABC, abstractmethod
from types import TracebackType
from typing import Self
from pydantic import BaseModel, Field
class Message(BaseModel):
"""OpenAI-style message format."""
role: str = Field(..., description="Role: system, user, or assistant")
content: str = Field(..., description="Message content")
class RAGRequest(BaseModel):
"""Request model for RAG endpoint."""
messages: list[Message] = Field(..., description="Conversation history")
class RAGResponse(BaseModel):
"""Response model from RAG endpoint."""
response: str = Field(..., description="Generated response from RAG")
class RAGServiceBase(ABC):
"""Abstract base class for RAG service implementations.
Provides a common interface for different RAG service backends
(HTTP, mock, echo, etc.).
"""
@abstractmethod
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
Response string from RAG endpoint
Raises:
Exception: Implementation-specific exceptions
"""
...
@abstractmethod
async def close(self) -> None:
"""Close the service and release resources."""
...
async def __aenter__(self) -> Self:
"""Async context manager entry."""
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Async context manager exit."""
await self.close()

View File

@@ -0,0 +1,64 @@
"""Echo RAG service implementation for testing."""
import logging
from rag_client.base import RAGServiceBase
logger = logging.getLogger(__name__)
# Error messages
_ERR_NO_MESSAGES = "No messages provided"
_ERR_NO_USER_MESSAGE = "No user message found in conversation history"
class EchoRAGService(RAGServiceBase):
"""Echo RAG service that returns the last user message.
Useful for testing and development without needing a real RAG endpoint.
Simply echoes back the content of the last user message with an optional prefix.
"""
def __init__(self, prefix: str = "Echo: ") -> None:
"""Initialize Echo RAG service.
Args:
prefix: Prefix to add to echoed messages (default: "Echo: ")
"""
self.prefix = prefix
logger.info("EchoRAGService initialized with prefix: %r", prefix)
async def query(self, messages: list[dict[str, str]]) -> str:
"""Echo back the last user message with a prefix.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
The last user message content with prefix
Raises:
ValueError: If no messages or no user messages found
"""
if not messages:
raise ValueError(_ERR_NO_MESSAGES)
# Find the last user message
last_user_message = None
for msg in reversed(messages):
if msg.get("role") == "user":
last_user_message = msg.get("content", "")
break
if last_user_message is None:
raise ValueError(_ERR_NO_USER_MESSAGE)
response = f"{self.prefix}{last_user_message}"
logger.debug("Echo response: %s", response)
return response
async def close(self) -> None:
"""Close the service (no-op for echo service)."""
logger.info("EchoRAGService closed")

View File

@@ -0,0 +1,115 @@
"""HTTP-based RAG service implementation."""
import logging
import httpx
from rag_client.base import Message, RAGRequest, RAGResponse, RAGServiceBase
logger = logging.getLogger(__name__)
class HTTPRAGService(RAGServiceBase):
"""HTTP-based RAG service with high concurrency support.
Uses httpx AsyncClient with connection pooling for optimal performance
when handling multiple concurrent requests.
"""
def __init__(
self,
endpoint_url: str,
max_connections: int = 100,
max_keepalive_connections: int = 20,
timeout: float = 30.0,
) -> None:
"""Initialize HTTP RAG service with connection pooling.
Args:
endpoint_url: URL of the RAG endpoint
max_connections: Maximum number of concurrent connections
max_keepalive_connections: Maximum number of idle connections to keep alive
timeout: Request timeout in seconds
"""
self.endpoint_url = endpoint_url
self.timeout = timeout
# Configure connection limits for high concurrency
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
)
# Create async client with connection pooling
self._client = httpx.AsyncClient(
limits=limits,
timeout=httpx.Timeout(timeout),
http2=True, # Enable HTTP/2 for better performance
)
logger.info(
"HTTPRAGService initialized with endpoint: %s, "
"max_connections: %s, timeout: %ss",
self.endpoint_url,
max_connections,
timeout,
)
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
Response string from RAG endpoint
Raises:
httpx.HTTPError: If HTTP request fails
ValueError: If response format is invalid
"""
try:
# Validate and construct request
message_objects = [Message(**msg) for msg in messages]
request = RAGRequest(messages=message_objects)
# Make async HTTP POST request
logger.debug("Sending RAG request with %s messages", len(messages))
response = await self._client.post(
self.endpoint_url,
json=request.model_dump(),
headers={"Content-Type": "application/json"},
)
# Raise exception for HTTP errors
response.raise_for_status()
# Parse response
response_data = response.json()
rag_response = RAGResponse(**response_data)
logger.debug("RAG response received: %s chars", len(rag_response.response))
except httpx.HTTPStatusError as e:
logger.exception(
"HTTP error calling RAG endpoint: %s - %s",
e.response.status_code,
e.response.text,
)
raise
except httpx.RequestError:
logger.exception("Request error calling RAG endpoint:")
raise
except Exception:
logger.exception("Unexpected error calling RAG endpoint")
raise
else:
return rag_response.response
async def close(self) -> None:
"""Close the HTTP client and release connections."""
await self._client.aclose()
logger.info("HTTPRAGService client closed")

View File

@@ -20,6 +20,7 @@ dependencies = [
"redis[hiredis]>=5.2.0", "redis[hiredis]>=5.2.0",
"tenacity>=9.0.0", "tenacity>=9.0.0",
"python-multipart>=0.0.12", "python-multipart>=0.0.12",
"rag-client",
] ]
[project.scripts] [project.scripts]
@@ -42,3 +43,11 @@ dev = [
[tool.ruff.lint] [tool.ruff.lint]
select = ['ALL'] select = ['ALL']
ignore = ['D203', 'D213'] ignore = ['D203', 'D213']
[tool.uv.sources]
rag-client = { workspace = true }
[tool.uv.workspace]
members = [
"packages/rag-client",
]

View File

@@ -2,6 +2,8 @@
from functools import lru_cache from functools import lru_cache
from rag_client import HTTPRAGService, RAGServiceBase
from .config import Settings, settings from .config import Settings, settings
from .services import ( from .services import (
ConversationManagerService, ConversationManagerService,
@@ -10,7 +12,6 @@ from .services import (
QuickReplyContentService, QuickReplyContentService,
) )
from .services.firestore_service import FirestoreService from .services.firestore_service import FirestoreService
from .services.rag_service import RAGService
from .services.redis_service import RedisService from .services.redis_service import RedisService
@@ -50,9 +51,14 @@ def get_notification_manager() -> NotificationManagerService:
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def get_rag_service() -> RAGService: def get_rag_service() -> RAGServiceBase:
"""Get RAG service instance.""" """Get RAG service instance."""
return RAGService(settings) return HTTPRAGService(
endpoint_url=settings.rag_endpoint_url,
max_connections=100,
max_keepalive_connections=20,
timeout=30.0,
)
@lru_cache(maxsize=1) @lru_cache(maxsize=1)

View File

@@ -5,6 +5,8 @@ import re
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from uuid import uuid4 from uuid import uuid4
from rag_client import RAGServiceBase
from capa_de_integracion.config import Settings from capa_de_integracion.config import Settings
from capa_de_integracion.models import ( from capa_de_integracion.models import (
ConversationEntry, ConversationEntry,
@@ -18,7 +20,6 @@ from capa_de_integracion.models.notification import NotificationSession
from .dlp_service import DLPService from .dlp_service import DLPService
from .firestore_service import FirestoreService from .firestore_service import FirestoreService
from .quick_reply_content import QuickReplyContentService from .quick_reply_content import QuickReplyContentService
from .rag_service import RAGService
from .redis_service import RedisService from .redis_service import RedisService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -35,7 +36,7 @@ class ConversationManagerService:
def __init__( def __init__(
self, self,
settings: Settings, settings: Settings,
rag_service: RAGService, rag_service: RAGServiceBase,
redis_service: RedisService, redis_service: RedisService,
firestore_service: FirestoreService, firestore_service: FirestoreService,
dlp_service: DLPService, dlp_service: DLPService,

147
tests/README.md Normal file
View File

@@ -0,0 +1,147 @@
# Tests
This directory contains the test suite for the capa-de-integracion application.
## Test Dependencies
- **pytest** - Test framework
- **pytest-asyncio** - Async test support
- **pytest-cov** - Coverage reporting
- **pytest-env** - Environment variable configuration (cleaner than manual setup)
- **pytest-recording** - HTTP recording (configured but not used for gRPC Firestore)
- **inline-snapshot** - Snapshot testing support
## Running Tests
```bash
# Run all tests
uv run pytest
# Run specific test file
uv run pytest tests/services/test_firestore_service.py
# Run with coverage
uv run pytest --cov=capa_de_integracion
# Run with verbose output
uv run pytest -v
```
## Firestore Service Tests
The Firestore service tests require the Firestore emulator to be running.
### Prerequisites
1. **Start the Firestore Emulator**:
```bash
gcloud firestore emulators start --host-port=[::1]:8911
```
Or if using Firebase CLI:
```bash
firebase emulators:start --only firestore
```
2. **Configure Indexes** (optional, for advanced queries):
The `firestore.indexes.json` file defines composite indexes needed for some queries.
The emulator should automatically apply these when started in the project directory.
### Test Structure
- `test_firestore_service.py` - Comprehensive tests for FirestoreService
- `TestSessionManagement` - Conversation session CRUD operations
- `TestEntryManagement` - Conversation entry operations
- `TestNotificationManagement` - Notification operations
- `TestEdgeCases` - Edge cases and error handling
### Important Notes
#### Why No pytest-recording Cassettes?
While pytest-recording is configured in the project, **cassettes are not generated** for Firestore tests. This is because:
- **Firestore uses gRPC protocol**, not HTTP
- **pytest-recording/vcrpy only supports HTTP** requests
- The Firestore Python client communicates via gRPC, which cannot be recorded by vcrpy
**Solution**: Tests run directly against the Firestore emulator. This provides:
- ✅ Real integration testing with actual Firestore behavior
- ✅ No mocking - tests verify actual data operations
- ✅ Fast execution (emulator is local)
- ❌ Requires emulator to be running
If you need offline/recorded tests, consider:
1. Using the emulator's export/import feature for test data
2. Implementing a mock FirestoreService for unit tests
3. Using snapshot testing with inline-snapshot for assertions
### Composite Index Requirements
Some tests require composite indexes (e.g., queries with both `where` and `order_by`):
- `test_get_session_by_phone_existing` - Currently **skipped**
- Requires index on: `telefono` (ASC) + `lastModified` (DESC)
- To enable: Configure index in `firestore.indexes.json` and restart emulator
### Environment Variables
Tests automatically configure environment variables via **pytest-env** plugin.
Configuration is in `pytest.ini`:
```ini
[pytest]
env =
FIRESTORE_EMULATOR_HOST=[::1]:8911
GCP_PROJECT_ID=test-project
GCP_LOCATION=us-central1
GCP_FIRESTORE_DATABASE_ID=(default)
RAG_ENDPOINT_URL=http://localhost:8000/rag
REDIS_HOST=localhost
REDIS_PORT=6379
DLP_TEMPLATE_COMPLETE_FLOW=projects/test/dlpJobTriggers/test
```
These are automatically loaded before any test runs, ensuring consistent test environment setup.
## Fixtures
### `emulator_settings`
Session-scoped fixture providing Settings configured for emulator testing.
### `firestore_service`
Function-scoped fixture providing a FirestoreService instance connected to the emulator.
Automatically closes the service after each test.
### `clean_firestore`
Function-scoped fixture providing a FirestoreService with cleaned collections before and after each test.
Use this fixture to ensure test isolation.
## Adding New Tests
When adding new Firestore tests:
1. Use `clean_firestore` fixture for test isolation
2. Use the actual model classes (`ConversationSession`, `ConversationEntry`, `Notification`)
3. Use model creation methods (e.g., `ConversationSession.create()`, `Notification.create()`)
4. Assert on actual data structures, not mocked values
5. Consider adding inline-snapshot assertions for complex data validation
Example:
```python
async def test_new_feature(clean_firestore: FirestoreService) -> None:
"""Test description."""
# Create test data
session = await clean_firestore.create_session(
session_id="test-id",
user_id="user-123",
telefono="+1234567890",
)
# Test the feature
result = await clean_firestore.some_method(session.session_id)
# Assert results
assert result is not None
assert result.some_field == "expected_value"
```

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tests for capa-de-integracion."""

74
tests/conftest.py Normal file
View File

@@ -0,0 +1,74 @@
"""Shared pytest fixtures and configuration.
Environment variables for testing are configured in pytest.ini via pytest-env plugin.
"""
from collections.abc import AsyncGenerator
import pytest
import pytest_asyncio
from capa_de_integracion.config import Settings
from capa_de_integracion.services.firestore_service import FirestoreService
# Configure pytest-asyncio
pytest_plugins = ("pytest_asyncio",)
@pytest.fixture(scope="session")
def emulator_settings() -> Settings:
"""Create settings configured for Firestore emulator."""
# Settings will be loaded from environment variables set at module level
return Settings.model_validate({})
@pytest_asyncio.fixture
async def firestore_service(
emulator_settings: Settings,
) -> AsyncGenerator[FirestoreService, None]:
"""Create FirestoreService instance connected to emulator."""
service = FirestoreService(emulator_settings)
yield service
# Cleanup: Close the service
await service.close()
@pytest_asyncio.fixture
async def clean_firestore(firestore_service: FirestoreService) -> AsyncGenerator[FirestoreService, None]:
"""Provide a clean Firestore service and cleanup after test."""
# Cleanup before test
await _cleanup_collections(firestore_service)
yield firestore_service
# Cleanup after test
await _cleanup_collections(firestore_service)
async def _cleanup_collections(service: FirestoreService) -> None:
"""Delete all documents from test collections."""
# Clean conversations collection
conversations_ref = service.db.collection(service.conversations_collection)
async for doc in conversations_ref.stream():
# Delete subcollection entries first
entries_ref = doc.reference.collection(service.entries_subcollection)
async for entry_doc in entries_ref.stream():
await entry_doc.reference.delete()
# Delete session document
await doc.reference.delete()
# Clean notifications collection
notifications_ref = service.db.collection(service.notifications_collection)
async for doc in notifications_ref.stream():
await doc.reference.delete()
def pytest_recording_configure(config, vcr):
"""Configure pytest-recording for Firestore emulator."""
# Don't filter requests to the emulator
vcr.ignore_hosts = []
# Match on method and path for emulator requests
vcr.match_on = ["method", "path", "body"]

View File

@@ -0,0 +1 @@
"""Service tests."""

View File

@@ -0,0 +1,774 @@
"""Tests for FirestoreService."""
from datetime import UTC, datetime
import pytest
from inline_snapshot import snapshot
from capa_de_integracion.models import ConversationEntry, ConversationSession
from capa_de_integracion.models.notification import Notification
from capa_de_integracion.services.firestore_service import FirestoreService
@pytest.mark.vcr
class TestSessionManagement:
"""Tests for conversation session management."""
async def test_create_session(self, clean_firestore: FirestoreService) -> None:
"""Test creating a new conversation session."""
session = await clean_firestore.create_session(
session_id="test-session-1",
user_id="user-123",
telefono="+1234567890",
pantalla_contexto="home_screen",
last_message="Hello",
)
assert session.session_id == "test-session-1"
assert session.user_id == "user-123"
assert session.telefono == "+1234567890"
assert session.pantalla_contexto == "home_screen"
assert session.last_message == "Hello"
assert isinstance(session.last_modified, datetime)
async def test_get_session_existing(self, clean_firestore: FirestoreService) -> None:
"""Test retrieving an existing session."""
# Create a session first
created_session = await clean_firestore.create_session(
session_id="test-session-2",
user_id="user-456",
telefono="+9876543210",
)
# Retrieve it
retrieved_session = await clean_firestore.get_session("test-session-2")
assert retrieved_session is not None
assert retrieved_session.session_id == created_session.session_id
assert retrieved_session.user_id == created_session.user_id
assert retrieved_session.telefono == created_session.telefono
async def test_get_session_not_found(self, clean_firestore: FirestoreService) -> None:
"""Test retrieving a non-existent session returns None."""
session = await clean_firestore.get_session("nonexistent-session")
assert session is None
@pytest.mark.skip(reason="Requires composite index in Firestore emulator. See firestore.indexes.json")
async def test_get_session_by_phone_existing(
self, clean_firestore: FirestoreService,
) -> None:
"""Test retrieving session by phone number.
Note: This test requires a composite index (telefono + lastModified)
which must be configured in the Firestore emulator. To enable this test,
restart the emulator with: firebase emulators:start --only firestore --import=./
"""
phone = "+1111111111"
# Create multiple sessions for same phone
await clean_firestore.create_session(
session_id="session-1",
user_id="user-1",
telefono=phone,
)
# Wait a bit to ensure different timestamps
import asyncio
await asyncio.sleep(0.1)
session_2 = await clean_firestore.create_session(
session_id="session-2",
user_id="user-1",
telefono=phone,
)
# Should retrieve the most recent one
retrieved = await clean_firestore.get_session_by_phone(phone)
assert retrieved is not None
assert retrieved.session_id == session_2.session_id
async def test_get_session_by_phone_not_found(
self, clean_firestore: FirestoreService,
) -> None:
"""Test retrieving session by phone when none exists."""
session = await clean_firestore.get_session_by_phone("+9999999999")
assert session is None
async def test_get_session_by_phone_found(
self, clean_firestore: FirestoreService,
) -> None:
"""Test retrieving session by phone when it exists (mocked)."""
from unittest.mock import MagicMock
phone = "+1111111111"
expected_session = ConversationSession.create(
session_id="session-1",
user_id="user-1",
telefono=phone,
)
# Mock the query to return a session
mock_doc = MagicMock()
mock_doc.to_dict.return_value = expected_session.model_dump()
async def mock_stream():
yield mock_doc
mock_query = MagicMock()
mock_query.stream.return_value = mock_stream()
mock_collection = MagicMock()
mock_where = MagicMock()
mock_order = MagicMock()
mock_collection.where.return_value = mock_where
mock_where.order_by.return_value = mock_order
mock_order.limit.return_value = mock_query
original_collection = clean_firestore.db.collection
clean_firestore.db.collection = MagicMock(return_value=mock_collection)
try:
result = await clean_firestore.get_session_by_phone(phone)
assert result is not None
assert result.session_id == "session-1"
assert result.telefono == phone
finally:
clean_firestore.db.collection = original_collection
async def test_save_session(self, clean_firestore: FirestoreService) -> None:
"""Test saving an updated session."""
# Create initial session
session = await clean_firestore.create_session(
session_id="test-session-3",
user_id="user-789",
telefono="+5555555555",
)
# Update session
session.last_message = "Updated message"
session.pantalla_contexto = "new_screen"
# Save updated session
success = await clean_firestore.save_session(session)
assert success is True
# Retrieve and verify
retrieved = await clean_firestore.get_session("test-session-3")
assert retrieved is not None
assert retrieved.last_message == "Updated message"
assert retrieved.pantalla_contexto == "new_screen"
async def test_update_pantalla_contexto(
self, clean_firestore: FirestoreService,
) -> None:
"""Test updating pantalla_contexto field."""
# Create session
await clean_firestore.create_session(
session_id="test-session-4",
user_id="user-101",
telefono="+2222222222",
pantalla_contexto="initial_screen",
)
# Update pantalla_contexto
success = await clean_firestore.update_pantalla_contexto(
"test-session-4",
"updated_screen",
)
assert success is True
# Verify update
session = await clean_firestore.get_session("test-session-4")
assert session is not None
assert session.pantalla_contexto == "updated_screen"
async def test_update_pantalla_contexto_nonexistent_session(
self, clean_firestore: FirestoreService,
) -> None:
"""Test updating pantalla_contexto for non-existent session."""
success = await clean_firestore.update_pantalla_contexto(
"nonexistent-session",
"some_screen",
)
assert success is False
async def test_delete_session(self, clean_firestore: FirestoreService) -> None:
"""Test deleting a session."""
# Create session
await clean_firestore.create_session(
session_id="test-session-5",
user_id="user-202",
telefono="+3333333333",
)
# Delete session
success = await clean_firestore.delete_session("test-session-5")
assert success is True
# Verify deletion
session = await clean_firestore.get_session("test-session-5")
assert session is None
@pytest.mark.vcr
class TestEntryManagement:
"""Tests for conversation entry management."""
async def test_save_and_get_entries(self, clean_firestore: FirestoreService) -> None:
"""Test saving and retrieving conversation entries."""
# Create session
await clean_firestore.create_session(
session_id="test-session-6",
user_id="user-303",
telefono="+4444444444",
)
# Create entries
entry1 = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user",
type="CONVERSACION",
text="First message",
)
entry2 = ConversationEntry(
timestamp=datetime.now(UTC),
entity="assistant",
type="CONVERSACION",
text="First response",
)
# Save entries
success1 = await clean_firestore.save_entry("test-session-6", entry1)
success2 = await clean_firestore.save_entry("test-session-6", entry2)
assert success1 is True
assert success2 is True
# Retrieve entries
entries = await clean_firestore.get_entries("test-session-6")
assert len(entries) == 2
assert entries[0].entity == "user"
assert entries[0].text == "First message"
assert entries[1].entity == "assistant"
assert entries[1].text == "First response"
async def test_get_entries_with_limit(self, clean_firestore: FirestoreService) -> None:
"""Test retrieving entries with limit."""
# Create session
await clean_firestore.create_session(
session_id="test-session-7",
user_id="user-404",
telefono="+5555555555",
)
# Create multiple entries
for i in range(5):
entry = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user" if i % 2 == 0 else "assistant",
type="CONVERSACION",
text=f"Message {i}",
)
await clean_firestore.save_entry("test-session-7", entry)
# Retrieve with limit
entries = await clean_firestore.get_entries("test-session-7", limit=3)
assert len(entries) == 3
# Should get the most recent 3 in chronological order
assert entries[-1].text == "Message 4"
async def test_get_entries_empty_session(
self, clean_firestore: FirestoreService,
) -> None:
"""Test retrieving entries from session with no entries."""
# Create session without entries
await clean_firestore.create_session(
session_id="test-session-8",
user_id="user-505",
telefono="+6666666666",
)
entries = await clean_firestore.get_entries("test-session-8")
assert entries == []
async def test_delete_session_with_entries(
self, clean_firestore: FirestoreService,
) -> None:
"""Test deleting session also deletes all entries."""
# Create session with entries
await clean_firestore.create_session(
session_id="test-session-9",
user_id="user-606",
telefono="+7777777777",
)
entry = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user",
type="CONVERSACION",
text="Test message",
)
await clean_firestore.save_entry("test-session-9", entry)
# Delete session
success = await clean_firestore.delete_session("test-session-9")
assert success is True
# Verify entries are also deleted
entries = await clean_firestore.get_entries("test-session-9")
assert entries == []
@pytest.mark.vcr
class TestNotificationManagement:
"""Tests for notification management."""
async def test_save_new_notification(self, clean_firestore: FirestoreService) -> None:
"""Test saving a new notification creates new session."""
notification = Notification.create(
id_notificacion="notif-1",
telefono="+8888888888",
texto="Test notification",
)
await clean_firestore.save_or_append_notification(notification)
# Verify notification was saved
doc_ref = clean_firestore._notification_ref("+8888888888")
doc = await doc_ref.get()
assert doc.exists
data = doc.to_dict()
assert data is not None
assert data["telefono"] == "+8888888888"
assert data["session_id"] == "+8888888888"
assert len(data["notificaciones"]) == 1
assert data["notificaciones"][0]["texto"] == "Test notification"
async def test_append_to_existing_notification_session(
self, clean_firestore: FirestoreService,
) -> None:
"""Test appending notification to existing session."""
phone = "+9999999999"
# Create first notification
notification1 = Notification.create(
id_notificacion="notif-2",
telefono=phone,
texto="First notification",
)
await clean_firestore.save_or_append_notification(notification1)
# Append second notification
notification2 = Notification.create(
id_notificacion="notif-3",
telefono=phone,
texto="Second notification",
)
await clean_firestore.save_or_append_notification(notification2)
# Verify both notifications exist
doc_ref = clean_firestore._notification_ref(phone)
doc = await doc_ref.get()
data = doc.to_dict()
assert data is not None
assert len(data["notificaciones"]) == 2
assert data["notificaciones"][0]["texto"] == "First notification"
assert data["notificaciones"][1]["texto"] == "Second notification"
async def test_save_notification_without_phone_raises_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test saving notification without phone number raises ValueError."""
notification = Notification.create(
id_notificacion="notif-4",
telefono="",
texto="Test",
)
with pytest.raises(ValueError, match="Phone number is required"):
await clean_firestore.save_or_append_notification(notification)
async def test_update_notification_status(
self, clean_firestore: FirestoreService,
) -> None:
"""Test updating status of all notifications in session."""
phone = "+1010101010"
# Create notifications
notification1 = Notification.create(
id_notificacion="notif-5",
telefono=phone,
texto="Notification 1",
status="pending",
)
notification2 = Notification.create(
id_notificacion="notif-6",
telefono=phone,
texto="Notification 2",
status="pending",
)
await clean_firestore.save_or_append_notification(notification1)
await clean_firestore.save_or_append_notification(notification2)
# Update status
await clean_firestore.update_notification_status(phone, "sent")
# Verify status update
doc_ref = clean_firestore._notification_ref(phone)
doc = await doc_ref.get()
data = doc.to_dict()
assert data is not None
assert all(notif["status"] == "sent" for notif in data["notificaciones"])
async def test_update_notification_status_nonexistent_session(
self, clean_firestore: FirestoreService,
) -> None:
"""Test updating status for non-existent session logs warning."""
# Should not raise exception, just log warning
await clean_firestore.update_notification_status("+0000000000", "sent")
async def test_delete_notification(self, clean_firestore: FirestoreService) -> None:
"""Test deleting notification session."""
phone = "+1212121212"
# Create notification
notification = Notification.create(
id_notificacion="notif-7",
telefono=phone,
texto="Test",
)
await clean_firestore.save_or_append_notification(notification)
# Delete notification session
success = await clean_firestore.delete_notification(phone)
assert success is True
# Verify deletion
doc_ref = clean_firestore._notification_ref(phone)
doc = await doc_ref.get()
assert not doc.exists
@pytest.mark.vcr
class TestEdgeCases:
"""Tests for edge cases and error handling."""
async def test_concurrent_session_updates(
self, clean_firestore: FirestoreService,
) -> None:
"""Test concurrent updates to same session."""
import asyncio
# Create session
session = await clean_firestore.create_session(
session_id="test-concurrent",
user_id="user-999",
telefono="+1313131313",
)
# Update session fields concurrently
session.last_message = "Message 1"
task1 = clean_firestore.save_session(session)
session.last_message = "Message 2"
task2 = clean_firestore.save_session(session)
results = await asyncio.gather(task1, task2)
assert all(results)
# Verify final state
final_session = await clean_firestore.get_session("test-concurrent")
assert final_session is not None
# Last write wins
assert final_session.last_message in ["Message 1", "Message 2"]
async def test_special_characters_in_data(
self, clean_firestore: FirestoreService,
) -> None:
"""Test handling special characters in session data."""
session = await clean_firestore.create_session(
session_id="test-special-chars",
user_id="user-special",
telefono="+1414141414",
pantalla_contexto="screen/with/slashes",
last_message="Message with emoji 🎉 and special chars: <>&\"'",
)
# Retrieve and verify
retrieved = await clean_firestore.get_session("test-special-chars")
assert retrieved is not None
assert retrieved.pantalla_contexto == "screen/with/slashes"
assert "🎉" in retrieved.last_message
assert "<>&\"'" in retrieved.last_message
@pytest.mark.vcr
class TestErrorHandling:
"""Tests for error handling in Firestore operations."""
async def test_get_session_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test get_session handles database errors gracefully."""
from unittest.mock import AsyncMock, MagicMock
mock_doc_ref = AsyncMock()
mock_doc_ref.get.side_effect = Exception("Database error")
original_session_ref = clean_firestore._session_ref
clean_firestore._session_ref = MagicMock(return_value=mock_doc_ref)
try:
result = await clean_firestore.get_session("error-session")
assert result is None
finally:
clean_firestore._session_ref = original_session_ref
async def test_get_session_by_phone_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test get_session_by_phone handles database errors gracefully."""
from unittest.mock import MagicMock
mock_collection = MagicMock()
mock_collection.where.side_effect = Exception("Database error")
original_collection = clean_firestore.db.collection
clean_firestore.db.collection = MagicMock(return_value=mock_collection)
try:
result = await clean_firestore.get_session_by_phone("+0000000000")
assert result is None
finally:
clean_firestore.db.collection = original_collection
async def test_save_session_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test save_session handles database errors gracefully."""
from unittest.mock import AsyncMock, MagicMock
session = ConversationSession.create(
session_id="error-session",
user_id="user-error",
telefono="+0000000000",
)
mock_doc_ref = AsyncMock()
mock_doc_ref.set.side_effect = Exception("Database error")
original_session_ref = clean_firestore._session_ref
clean_firestore._session_ref = MagicMock(return_value=mock_doc_ref)
try:
result = await clean_firestore.save_session(session)
assert result is False
finally:
clean_firestore._session_ref = original_session_ref
async def test_save_entry_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test save_entry handles database errors gracefully."""
from unittest.mock import AsyncMock, MagicMock
entry = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user",
type="CONVERSACION",
text="Test",
)
mock_entry_doc = AsyncMock()
mock_entry_doc.set.side_effect = Exception("Database error")
mock_collection = MagicMock()
mock_collection.document.return_value = mock_entry_doc
mock_doc_ref = MagicMock()
mock_doc_ref.collection.return_value = mock_collection
original_session_ref = clean_firestore._session_ref
clean_firestore._session_ref = MagicMock(return_value=mock_doc_ref)
try:
result = await clean_firestore.save_entry("error-session", entry)
assert result is False
finally:
clean_firestore._session_ref = original_session_ref
async def test_get_entries_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test get_entries handles database errors gracefully."""
from unittest.mock import MagicMock
mock_collection = MagicMock()
mock_collection.order_by.side_effect = Exception("Database error")
mock_doc_ref = MagicMock()
mock_doc_ref.collection.return_value = mock_collection
original_session_ref = clean_firestore._session_ref
clean_firestore._session_ref = MagicMock(return_value=mock_doc_ref)
try:
result = await clean_firestore.get_entries("error-session")
assert result == []
finally:
clean_firestore._session_ref = original_session_ref
async def test_delete_session_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test delete_session handles database errors gracefully."""
from unittest.mock import AsyncMock, MagicMock
mock_collection = MagicMock()
async def mock_stream():
mock_entry = MagicMock()
mock_entry.reference = AsyncMock()
yield mock_entry
mock_collection.stream.return_value = mock_stream()
mock_doc_ref = AsyncMock()
mock_doc_ref.collection.return_value = mock_collection
mock_doc_ref.delete.side_effect = Exception("Database error")
original_session_ref = clean_firestore._session_ref
clean_firestore._session_ref = MagicMock(return_value=mock_doc_ref)
try:
result = await clean_firestore.delete_session("error-session")
assert result is False
finally:
clean_firestore._session_ref = original_session_ref
async def test_update_pantalla_contexto_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test update_pantalla_contexto handles database errors gracefully."""
from unittest.mock import AsyncMock, MagicMock
mock_doc = MagicMock()
mock_doc.exists = True
mock_doc_ref = AsyncMock()
async def mock_get():
return mock_doc
mock_doc_ref.get = mock_get
mock_doc_ref.update.side_effect = Exception("Database error")
original_session_ref = clean_firestore._session_ref
clean_firestore._session_ref = MagicMock(return_value=mock_doc_ref)
try:
result = await clean_firestore.update_pantalla_contexto("error-session", "screen")
assert result is False
finally:
clean_firestore._session_ref = original_session_ref
async def test_save_or_append_notification_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test save_or_append_notification handles database errors gracefully."""
from unittest.mock import AsyncMock, MagicMock
notification = Notification.create(
id_notificacion="notif-error",
telefono="+0000000000",
texto="Test",
)
mock_doc_ref = AsyncMock()
mock_doc_ref.get.side_effect = Exception("Database error")
original_notification_ref = clean_firestore._notification_ref
clean_firestore._notification_ref = MagicMock(return_value=mock_doc_ref)
try:
with pytest.raises(Exception):
await clean_firestore.save_or_append_notification(notification)
finally:
clean_firestore._notification_ref = original_notification_ref
async def test_update_notification_status_with_empty_data(
self, clean_firestore: FirestoreService,
) -> None:
"""Test update_notification_status handles empty session data."""
from unittest.mock import AsyncMock, MagicMock
mock_doc_ref = AsyncMock()
mock_doc = MagicMock()
mock_doc.exists = True
mock_doc.to_dict.return_value = None
async def mock_get():
return mock_doc
mock_doc_ref.get = mock_get
original_notification_ref = clean_firestore._notification_ref
clean_firestore._notification_ref = MagicMock(return_value=mock_doc_ref)
try:
# Should not raise, just log warning
await clean_firestore.update_notification_status("+0000000000", "sent")
finally:
clean_firestore._notification_ref = original_notification_ref
async def test_update_notification_status_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test update_notification_status handles database errors gracefully."""
from unittest.mock import AsyncMock, MagicMock
mock_doc = MagicMock()
mock_doc.exists = True
mock_doc.to_dict.return_value = {"notificaciones": [{"status": "pending"}]}
mock_doc_ref = AsyncMock()
async def mock_get():
return mock_doc
mock_doc_ref.get = mock_get
mock_doc_ref.update.side_effect = Exception("Database error")
original_notification_ref = clean_firestore._notification_ref
clean_firestore._notification_ref = MagicMock(return_value=mock_doc_ref)
try:
with pytest.raises(Exception):
await clean_firestore.update_notification_status("+0000000000", "sent")
finally:
clean_firestore._notification_ref = original_notification_ref
async def test_delete_notification_with_db_error(
self, clean_firestore: FirestoreService,
) -> None:
"""Test delete_notification handles database errors gracefully."""
from unittest.mock import AsyncMock, MagicMock
mock_doc_ref = AsyncMock()
mock_doc_ref.delete.side_effect = Exception("Database error")
original_notification_ref = clean_firestore._notification_ref
clean_firestore._notification_ref = MagicMock(return_value=mock_doc_ref)
try:
result = await clean_firestore.delete_notification("+0000000000")
assert result is False
finally:
clean_firestore._notification_ref = original_notification_ref

23
uv.lock generated
View File

@@ -7,6 +7,12 @@ resolution-markers = [
"python_full_version < '3.13'", "python_full_version < '3.13'",
] ]
[manifest]
members = [
"capa-de-integracion",
"rag-client",
]
[[package]] [[package]]
name = "aiohappyeyeballs" name = "aiohappyeyeballs"
version = "2.6.1" version = "2.6.1"
@@ -177,6 +183,7 @@ dependencies = [
{ name = "pydantic" }, { name = "pydantic" },
{ name = "pydantic-settings" }, { name = "pydantic-settings" },
{ name = "python-multipart" }, { name = "python-multipart" },
{ name = "rag-client" },
{ name = "redis", extra = ["hiredis"] }, { name = "redis", extra = ["hiredis"] },
{ name = "tenacity" }, { name = "tenacity" },
{ name = "uvicorn", extra = ["standard"] }, { name = "uvicorn", extra = ["standard"] },
@@ -203,6 +210,7 @@ requires-dist = [
{ name = "pydantic", specifier = ">=2.10.0" }, { name = "pydantic", specifier = ">=2.10.0" },
{ name = "pydantic-settings", specifier = ">=2.6.0" }, { name = "pydantic-settings", specifier = ">=2.6.0" },
{ name = "python-multipart", specifier = ">=0.0.12" }, { name = "python-multipart", specifier = ">=0.0.12" },
{ name = "rag-client", editable = "packages/rag-client" },
{ name = "redis", extras = ["hiredis"], specifier = ">=5.2.0" }, { name = "redis", extras = ["hiredis"], specifier = ">=5.2.0" },
{ name = "tenacity", specifier = ">=9.0.0" }, { name = "tenacity", specifier = ">=9.0.0" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.32.0" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.32.0" },
@@ -1700,6 +1708,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" }, { url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" },
] ]
[[package]]
name = "rag-client"
version = "0.1.0"
source = { editable = "packages/rag-client" }
dependencies = [
{ name = "httpx" },
{ name = "pydantic" },
]
[package.metadata]
requires-dist = [
{ name = "httpx", specifier = ">=0.27.0" },
{ name = "pydantic", specifier = ">=2.0.0" },
]
[[package]] [[package]]
name = "redis" name = "redis"
version = "7.2.0" version = "7.2.0"