Compare commits

..

11 Commits

Author SHA1 Message Date
383efed319 Optimization 2026-02-20 20:38:59 +00:00
Anibal Angulo
ade4689ab7 Add locustfile 2026-02-20 20:38:59 +00:00
3a796dd966 Fix critical bugs 2026-02-20 20:38:59 +00:00
e5ff673a54 Misc improvements 2026-02-20 20:38:59 +00:00
fd6b698077 Improve coverage 2026-02-20 20:38:59 +00:00
f848bbf0f2 Add echo client to app 2026-02-20 20:38:59 +00:00
e03747f526 Add test coverage 2026-02-20 20:38:59 +00:00
b86dfe7373 Create rag-client package 2026-02-20 20:38:59 +00:00
d663394106 Fix type errors 2026-02-20 20:38:58 +00:00
595abd6cd3 Fix lint errors 2026-02-20 20:38:58 +00:00
faa04a0d01 Initial Python rewrite 2026-02-20 20:38:58 +00:00
10 changed files with 73 additions and 18 deletions

View File

@@ -74,7 +74,7 @@ filterwarnings = [
] ]
env = [ env = [
"FIRESTORE_EMULATOR_HOST=[::1]:8488", "FIRESTORE_EMULATOR_HOST=[::1]:8462",
"GCP_PROJECT_ID=test-project", "GCP_PROJECT_ID=test-project",
"GCP_LOCATION=us-central1", "GCP_LOCATION=us-central1",
"GCP_FIRESTORE_DATABASE_ID=(default)", "GCP_FIRESTORE_DATABASE_ID=(default)",

View File

@@ -1,5 +1,7 @@
"""Dependency injection and service lifecycle management.""" """Dependency injection and service lifecycle management."""
import asyncio
import logging
from functools import lru_cache from functools import lru_cache
from capa_de_integracion.services.rag import ( from capa_de_integracion.services.rag import (
@@ -16,8 +18,12 @@ from .services import (
QuickReplyContentService, QuickReplyContentService,
QuickReplySessionService, QuickReplySessionService,
) )
from .services.conversation import get_background_tasks as conv_bg_tasks
from .services.notifications import get_background_tasks as notif_bg_tasks
from .services.storage import FirestoreService, RedisService from .services.storage import FirestoreService, RedisService
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def get_redis_service() -> RedisService: def get_redis_service() -> RedisService:
@@ -106,6 +112,12 @@ async def startup_services() -> None:
async def shutdown_services() -> None: async def shutdown_services() -> None:
"""Close all service connections on shutdown.""" """Close all service connections on shutdown."""
# Drain in-flight background tasks before closing connections
all_tasks = conv_bg_tasks() | notif_bg_tasks()
if all_tasks:
logger.info("Draining %d background tasks before shutdown…", len(all_tasks))
await asyncio.gather(*all_tasks, return_exceptions=True)
# Close Redis # Close Redis
redis = get_redis_service() redis = get_redis_service()
await redis.close() await redis.close()

View File

@@ -26,6 +26,11 @@ logger = logging.getLogger(__name__)
# Keep references to background tasks to prevent garbage collection # Keep references to background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task[None]] = set() _background_tasks: set[asyncio.Task[None]] = set()
def get_background_tasks() -> set[asyncio.Task[None]]:
"""Return the set of pending background tasks (for graceful shutdown)."""
return _background_tasks
MSG_EMPTY_MESSAGE = "Message cannot be empty" MSG_EMPTY_MESSAGE = "Message cannot be empty"
@@ -171,11 +176,15 @@ class ConversationManagerService:
canal: Communication channel canal: Communication channel
""" """
# Save user and assistant entries in parallel # Save user and assistant entries in parallel.
# Use a single timestamp for both, but offset the assistant entry by 1µs
# to avoid Firestore document ID collision (save_entry uses isoformat()
# as the document ID).
now = datetime.now(UTC)
user_entry = ConversationEntry( user_entry = ConversationEntry(
entity="user", entity="user",
type=entry_type, type=entry_type,
timestamp=datetime.now(UTC), timestamp=now,
text=user_text, text=user_text,
parameters=None, parameters=None,
canal=canal, canal=canal,
@@ -183,7 +192,7 @@ class ConversationManagerService:
assistant_entry = ConversationEntry( assistant_entry = ConversationEntry(
entity="assistant", entity="assistant",
type=entry_type, type=entry_type,
timestamp=datetime.now(UTC), timestamp=now + timedelta(microseconds=1),
text=assistant_text, text=assistant_text,
parameters=None, parameters=None,
canal=canal, canal=canal,

View File

@@ -21,6 +21,11 @@ PREFIX_PO_PARAM = "notification_po_"
_background_tasks: set[asyncio.Task] = set() _background_tasks: set[asyncio.Task] = set()
def get_background_tasks() -> set[asyncio.Task]:
"""Return the set of pending background tasks (for graceful shutdown)."""
return _background_tasks
class NotificationManagerService: class NotificationManagerService:
"""Manages notification processing and integration with conversations. """Manages notification processing and integration with conversations.

View File

@@ -149,13 +149,8 @@ class QuickReplyContentService:
if quick_reply is None: if quick_reply is None:
logger.warning("Quick reply not found in cache for screen: %s", screen_id) logger.warning("Quick reply not found in cache for screen: %s", screen_id)
return QuickReplyScreen( msg = f"Quick reply not found for screen_id: {screen_id}"
header=None, raise ValueError(msg)
body=None,
button=None,
header_section=None,
preguntas=[],
)
logger.info( logger.info(
"Retrieved %s quick replies for screen: %s from cache", "Retrieved %s quick replies for screen: %s from cache",

View File

@@ -1,6 +1,7 @@
"""Quick reply session service for managing FAQ sessions.""" """Quick reply session service for managing FAQ sessions."""
import logging import logging
from datetime import UTC, datetime
from uuid import uuid4 from uuid import uuid4
from capa_de_integracion.models.quick_replies import QuickReplyScreen from capa_de_integracion.models.quick_replies import QuickReplyScreen
@@ -99,6 +100,7 @@ class QuickReplySessionService:
pantalla_contexto, pantalla_contexto,
) )
session.pantalla_contexto = pantalla_contexto session.pantalla_contexto = pantalla_contexto
session.last_modified = datetime.now(UTC)
else: else:
session_id = str(uuid4()) session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"

View File

@@ -95,14 +95,13 @@ class FirestoreService:
return session return session
logger.debug("No session found in Firestore for phone: %s", telefono) logger.debug("No session found in Firestore for phone: %s", telefono)
return None
except Exception: except Exception:
logger.exception( logger.exception(
"Error querying session by phone %s from Firestore:", "Error querying session by phone %s from Firestore:",
telefono, telefono,
) )
return None return None
else:
return None
async def save_session(self, session: ConversationSession) -> bool: async def save_session(self, session: ConversationSession) -> bool:
"""Save conversation session to Firestore.""" """Save conversation session to Firestore."""

View File

@@ -120,10 +120,8 @@ class TestSessionManagement:
mock_collection = MagicMock() mock_collection = MagicMock()
mock_where = MagicMock() mock_where = MagicMock()
mock_order = MagicMock()
mock_collection.where.return_value = mock_where mock_collection.where.return_value = mock_where
mock_where.order_by.return_value = mock_order mock_where.limit.return_value = mock_query
mock_order.limit.return_value = mock_query
original_collection = clean_firestore.db.collection original_collection = clean_firestore.db.collection
clean_firestore.db.collection = MagicMock(return_value=mock_collection) clean_firestore.db.collection = MagicMock(return_value=mock_collection)

View File

@@ -1,5 +1,6 @@
"""Tests for QuickReplySessionService.""" """Tests for QuickReplySessionService."""
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, Mock from unittest.mock import AsyncMock, Mock
from uuid import uuid4 from uuid import uuid4
@@ -160,6 +161,39 @@ async def test_start_session_existing_user(service, mock_firestore, mock_redis,
mock_content.get_quick_replies.assert_called_once_with("pagos") mock_content.get_quick_replies.assert_called_once_with("pagos")
@pytest.mark.asyncio
async def test_start_session_updates_last_modified_on_existing(
service, mock_firestore, mock_redis, mock_content
):
"""Test that last_modified is refreshed when updating pantalla_contexto.
Ensures quick reply context won't be incorrectly marked as stale
when the session was idle before the user opened a quick reply screen.
"""
stale_time = datetime.now(UTC) - timedelta(minutes=20)
test_session = ConversationSession.create(
session_id="session-123",
user_id="user_by_phone_5551234",
telefono="555-1234",
pantalla_contexto=None,
)
test_session.last_modified = stale_time
mock_redis.get_session.return_value = test_session
mock_content.get_quick_replies.return_value = QuickReplyScreen(
header="H", body=None, button=None, header_section=None, preguntas=[]
)
await service.start_quick_reply_session(
telefono="555-1234",
_nombre="John",
pantalla_contexto="pagos",
)
saved_session = mock_redis.save_session.call_args[0][0]
assert saved_session.last_modified > stale_time
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_start_session_invalid_phone(service): async def test_start_session_invalid_phone(service):
"""Test starting session with invalid phone number.""" """Test starting session with invalid phone number."""

View File

@@ -47,14 +47,15 @@ def test_app_has_routers():
def test_main_entry_point(): def test_main_entry_point():
"""Test main entry point calls uvicorn.run.""" """Test main entry point calls uvicorn.run."""
with patch("capa_de_integracion.main.uvicorn.run") as mock_run: with patch("capa_de_integracion.main.uvicorn.run") as mock_run, \
patch("sys.argv", ["capa-de-integracion"]):
main() main()
mock_run.assert_called_once() mock_run.assert_called_once()
call_kwargs = mock_run.call_args.kwargs call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["host"] == "0.0.0.0" assert call_kwargs["host"] == "0.0.0.0"
assert call_kwargs["port"] == 8080 assert call_kwargs["port"] == 8080
assert call_kwargs["reload"] is True assert call_kwargs["workers"] == 1
@pytest.mark.asyncio @pytest.mark.asyncio