Compare commits

..

11 Commits

Author SHA1 Message Date
383efed319 Optimization 2026-02-20 20:38:59 +00:00
Anibal Angulo
ade4689ab7 Add locustfile 2026-02-20 20:38:59 +00:00
3a796dd966 Fix critical bugs 2026-02-20 20:38:59 +00:00
e5ff673a54 Misc improvements 2026-02-20 20:38:59 +00:00
fd6b698077 Improve coverage 2026-02-20 20:38:59 +00:00
f848bbf0f2 Add echo client to app 2026-02-20 20:38:59 +00:00
e03747f526 Add test coverage 2026-02-20 20:38:59 +00:00
b86dfe7373 Create rag-client package 2026-02-20 20:38:59 +00:00
d663394106 Fix type errors 2026-02-20 20:38:58 +00:00
595abd6cd3 Fix lint errors 2026-02-20 20:38:58 +00:00
faa04a0d01 Initial Python rewrite 2026-02-20 20:38:58 +00:00
10 changed files with 73 additions and 18 deletions

View File

@@ -74,7 +74,7 @@ filterwarnings = [
]
env = [
"FIRESTORE_EMULATOR_HOST=[::1]:8488",
"FIRESTORE_EMULATOR_HOST=[::1]:8462",
"GCP_PROJECT_ID=test-project",
"GCP_LOCATION=us-central1",
"GCP_FIRESTORE_DATABASE_ID=(default)",

View File

@@ -1,5 +1,7 @@
"""Dependency injection and service lifecycle management."""
import asyncio
import logging
from functools import lru_cache
from capa_de_integracion.services.rag import (
@@ -16,8 +18,12 @@ from .services import (
QuickReplyContentService,
QuickReplySessionService,
)
from .services.conversation import get_background_tasks as conv_bg_tasks
from .services.notifications import get_background_tasks as notif_bg_tasks
from .services.storage import FirestoreService, RedisService
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def get_redis_service() -> RedisService:
@@ -106,6 +112,12 @@ async def startup_services() -> None:
async def shutdown_services() -> None:
"""Close all service connections on shutdown."""
# Drain in-flight background tasks before closing connections
all_tasks = conv_bg_tasks() | notif_bg_tasks()
if all_tasks:
logger.info("Draining %d background tasks before shutdown…", len(all_tasks))
await asyncio.gather(*all_tasks, return_exceptions=True)
# Close Redis
redis = get_redis_service()
await redis.close()

View File

@@ -26,6 +26,11 @@ logger = logging.getLogger(__name__)
# Keep references to background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task[None]] = set()
def get_background_tasks() -> set[asyncio.Task[None]]:
"""Return the set of pending background tasks (for graceful shutdown)."""
return _background_tasks
MSG_EMPTY_MESSAGE = "Message cannot be empty"
@@ -171,11 +176,15 @@ class ConversationManagerService:
canal: Communication channel
"""
# Save user and assistant entries in parallel
# Save user and assistant entries in parallel.
# Use a single timestamp for both, but offset the assistant entry by 1µs
# to avoid Firestore document ID collision (save_entry uses isoformat()
# as the document ID).
now = datetime.now(UTC)
user_entry = ConversationEntry(
entity="user",
type=entry_type,
timestamp=datetime.now(UTC),
timestamp=now,
text=user_text,
parameters=None,
canal=canal,
@@ -183,7 +192,7 @@ class ConversationManagerService:
assistant_entry = ConversationEntry(
entity="assistant",
type=entry_type,
timestamp=datetime.now(UTC),
timestamp=now + timedelta(microseconds=1),
text=assistant_text,
parameters=None,
canal=canal,

View File

@@ -21,6 +21,11 @@ PREFIX_PO_PARAM = "notification_po_"
_background_tasks: set[asyncio.Task] = set()
def get_background_tasks() -> set[asyncio.Task]:
"""Return the set of pending background tasks (for graceful shutdown)."""
return _background_tasks
class NotificationManagerService:
"""Manages notification processing and integration with conversations.

View File

@@ -149,13 +149,8 @@ class QuickReplyContentService:
if quick_reply is None:
logger.warning("Quick reply not found in cache for screen: %s", screen_id)
return QuickReplyScreen(
header=None,
body=None,
button=None,
header_section=None,
preguntas=[],
)
msg = f"Quick reply not found for screen_id: {screen_id}"
raise ValueError(msg)
logger.info(
"Retrieved %s quick replies for screen: %s from cache",

View File

@@ -1,6 +1,7 @@
"""Quick reply session service for managing FAQ sessions."""
import logging
from datetime import UTC, datetime
from uuid import uuid4
from capa_de_integracion.models.quick_replies import QuickReplyScreen
@@ -99,6 +100,7 @@ class QuickReplySessionService:
pantalla_contexto,
)
session.pantalla_contexto = pantalla_contexto
session.last_modified = datetime.now(UTC)
else:
session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"

View File

@@ -95,14 +95,13 @@ class FirestoreService:
return session
logger.debug("No session found in Firestore for phone: %s", telefono)
return None
except Exception:
logger.exception(
"Error querying session by phone %s from Firestore:",
telefono,
)
return None
else:
return None
async def save_session(self, session: ConversationSession) -> bool:
"""Save conversation session to Firestore."""

View File

@@ -120,10 +120,8 @@ class TestSessionManagement:
mock_collection = MagicMock()
mock_where = MagicMock()
mock_order = MagicMock()
mock_collection.where.return_value = mock_where
mock_where.order_by.return_value = mock_order
mock_order.limit.return_value = mock_query
mock_where.limit.return_value = mock_query
original_collection = clean_firestore.db.collection
clean_firestore.db.collection = MagicMock(return_value=mock_collection)

View File

@@ -1,5 +1,6 @@
"""Tests for QuickReplySessionService."""
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, Mock
from uuid import uuid4
@@ -160,6 +161,39 @@ async def test_start_session_existing_user(service, mock_firestore, mock_redis,
mock_content.get_quick_replies.assert_called_once_with("pagos")
@pytest.mark.asyncio
async def test_start_session_updates_last_modified_on_existing(
service, mock_firestore, mock_redis, mock_content
):
"""Test that last_modified is refreshed when updating pantalla_contexto.
Ensures quick reply context won't be incorrectly marked as stale
when the session was idle before the user opened a quick reply screen.
"""
stale_time = datetime.now(UTC) - timedelta(minutes=20)
test_session = ConversationSession.create(
session_id="session-123",
user_id="user_by_phone_5551234",
telefono="555-1234",
pantalla_contexto=None,
)
test_session.last_modified = stale_time
mock_redis.get_session.return_value = test_session
mock_content.get_quick_replies.return_value = QuickReplyScreen(
header="H", body=None, button=None, header_section=None, preguntas=[]
)
await service.start_quick_reply_session(
telefono="555-1234",
_nombre="John",
pantalla_contexto="pagos",
)
saved_session = mock_redis.save_session.call_args[0][0]
assert saved_session.last_modified > stale_time
@pytest.mark.asyncio
async def test_start_session_invalid_phone(service):
"""Test starting session with invalid phone number."""

View File

@@ -47,14 +47,15 @@ def test_app_has_routers():
def test_main_entry_point():
"""Test main entry point calls uvicorn.run."""
with patch("capa_de_integracion.main.uvicorn.run") as mock_run:
with patch("capa_de_integracion.main.uvicorn.run") as mock_run, \
patch("sys.argv", ["capa-de-integracion"]):
main()
mock_run.assert_called_once()
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["host"] == "0.0.0.0"
assert call_kwargs["port"] == 8080
assert call_kwargs["reload"] is True
assert call_kwargs["workers"] == 1
@pytest.mark.asyncio