Compare commits

..

13 Commits

Author SHA1 Message Date
Anibal Angulo
26cc34e0af Add locustfile 2026-02-20 14:57:48 +00:00
1b592df6d4 Fix critical bugs 2026-02-20 14:52:14 +00:00
e9d80def08 Misc improvements 2026-02-20 08:43:08 +00:00
734cade8d9 Improve coverage 2026-02-20 08:43:08 +00:00
5ceaadb20c Add echo client to app 2026-02-20 08:43:08 +00:00
59a76fc226 Add test coverage 2026-02-20 08:43:08 +00:00
c01f4d1ab3 Create rag-client package 2026-02-20 08:43:08 +00:00
2c722c1166 Fix type errors 2026-02-20 08:43:08 +00:00
58393a538e Fix lint errors 2026-02-20 08:43:08 +00:00
bcdc41ecd5 . 2026-02-20 08:43:08 +00:00
14ed21a1f9 . 2026-02-20 08:43:00 +00:00
03292a635c . 2026-02-20 08:42:56 +00:00
6f629c53a6 . 2026-02-20 08:42:45 +00:00
12 changed files with 78 additions and 168 deletions

View File

@@ -74,7 +74,7 @@ filterwarnings = [
] ]
env = [ env = [
"FIRESTORE_EMULATOR_HOST=[::1]:8462", "FIRESTORE_EMULATOR_HOST=[::1]:8469",
"GCP_PROJECT_ID=test-project", "GCP_PROJECT_ID=test-project",
"GCP_LOCATION=us-central1", "GCP_LOCATION=us-central1",
"GCP_FIRESTORE_DATABASE_ID=(default)", "GCP_FIRESTORE_DATABASE_ID=(default)",

View File

@@ -1,7 +1,5 @@
"""Dependency injection and service lifecycle management.""" """Dependency injection and service lifecycle management."""
import asyncio
import logging
from functools import lru_cache from functools import lru_cache
from capa_de_integracion.services.rag import ( from capa_de_integracion.services.rag import (
@@ -18,12 +16,8 @@ from .services import (
QuickReplyContentService, QuickReplyContentService,
QuickReplySessionService, QuickReplySessionService,
) )
from .services.conversation import get_background_tasks as conv_bg_tasks
from .services.notifications import get_background_tasks as notif_bg_tasks
from .services.storage import FirestoreService, RedisService from .services.storage import FirestoreService, RedisService
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def get_redis_service() -> RedisService: def get_redis_service() -> RedisService:
@@ -112,12 +106,6 @@ async def startup_services() -> None:
async def shutdown_services() -> None: async def shutdown_services() -> None:
"""Close all service connections on shutdown.""" """Close all service connections on shutdown."""
# Drain in-flight background tasks before closing connections
all_tasks = conv_bg_tasks() | notif_bg_tasks()
if all_tasks:
logger.info("Draining %d background tasks before shutdown…", len(all_tasks))
await asyncio.gather(*all_tasks, return_exceptions=True)
# Close Redis # Close Redis
redis = get_redis_service() redis = get_redis_service()
await redis.close() await redis.close()

View File

@@ -1,6 +1,5 @@
"""Conversation manager service for orchestrating user conversations.""" """Conversation manager service for orchestrating user conversations."""
import asyncio
import logging import logging
import re import re
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
@@ -23,14 +22,6 @@ from capa_de_integracion.services.storage.redis import RedisService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Keep references to background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task[None]] = set()
def get_background_tasks() -> set[asyncio.Task[None]]:
"""Return the set of pending background tasks (for graceful shutdown)."""
return _background_tasks
MSG_EMPTY_MESSAGE = "Message cannot be empty" MSG_EMPTY_MESSAGE = "Message cannot be empty"
@@ -97,16 +88,16 @@ class ConversationManagerService:
# Step 1: Validate message is not empty # Step 1: Validate message is not empty
self._validate_message(request.mensaje) self._validate_message(request.mensaje)
# Step 2+3: Apply DLP security and obtain session in parallel # Step 2: Apply DLP security
telefono = request.usuario.telefono obfuscated_message = await self.dlp_service.get_obfuscated_string(
obfuscated_message, session = await asyncio.gather( request.mensaje,
self.dlp_service.get_obfuscated_string( self.settings.dlp_template_complete_flow,
request.mensaje,
self.settings.dlp_template_complete_flow,
),
self._obtain_or_create_session(telefono),
) )
request.mensaje = obfuscated_message request.mensaje = obfuscated_message
telefono = request.usuario.telefono
# Step 3: Obtain or create session
session = await self._obtain_or_create_session(telefono)
# Step 4: Try quick reply path first # Step 4: Try quick reply path first
response = await self._handle_quick_reply_path(request, session) response = await self._handle_quick_reply_path(request, session)
@@ -140,8 +131,6 @@ class ConversationManagerService:
# Try Firestore if Redis miss # Try Firestore if Redis miss
session = await self.firestore_service.get_session_by_phone(telefono) session = await self.firestore_service.get_session_by_phone(telefono)
if session: if session:
# Cache to Redis for subsequent requests
await self.redis_service.save_session(session)
return session return session
# Create new session if both miss # Create new session if both miss
@@ -176,31 +165,27 @@ class ConversationManagerService:
canal: Communication channel canal: Communication channel
""" """
# Save user and assistant entries in parallel. # Save user entry
# Use a single timestamp for both, but offset the assistant entry by 1µs
# to avoid Firestore document ID collision (save_entry uses isoformat()
# as the document ID).
now = datetime.now(UTC)
user_entry = ConversationEntry( user_entry = ConversationEntry(
entity="user", entity="user",
type=entry_type, type=entry_type,
timestamp=now, timestamp=datetime.now(UTC),
text=user_text, text=user_text,
parameters=None, parameters=None,
canal=canal, canal=canal,
) )
await self.firestore_service.save_entry(session_id, user_entry)
# Save assistant entry
assistant_entry = ConversationEntry( assistant_entry = ConversationEntry(
entity="assistant", entity="assistant",
type=entry_type, type=entry_type,
timestamp=now + timedelta(microseconds=1), timestamp=datetime.now(UTC),
text=assistant_text, text=assistant_text,
parameters=None, parameters=None,
canal=canal, canal=canal,
) )
await asyncio.gather( await self.firestore_service.save_entry(session_id, assistant_entry)
self.firestore_service.save_entry(session_id, user_entry),
self.firestore_service.save_entry(session_id, assistant_entry),
)
async def _update_session_after_turn( async def _update_session_after_turn(
self, self,
@@ -219,10 +204,8 @@ class ConversationManagerService:
""" """
session.last_message = last_message session.last_message = last_message
session.last_modified = datetime.now(UTC) session.last_modified = datetime.now(UTC)
await asyncio.gather( await self.firestore_service.save_session(session)
self.firestore_service.save_session(session), await self.redis_service.save_session(session)
self.redis_service.save_session(session),
)
async def _handle_quick_reply_path( async def _handle_quick_reply_path(
self, self,
@@ -270,25 +253,17 @@ class ConversationManagerService:
response.query_result.response_text if response.query_result else "" response.query_result.response_text if response.query_result else ""
) or "" ) or ""
# Fire-and-forget: persist conversation turn and update session # Save conversation turn
async def _post_response() -> None: await self._save_conversation_turn(
try: session_id=session.session_id,
await asyncio.gather( user_text=request.mensaje,
self._save_conversation_turn( assistant_text=response_text,
session_id=session.session_id, entry_type="CONVERSACION",
user_text=request.mensaje, canal=getattr(request, "canal", None),
assistant_text=response_text, )
entry_type="CONVERSACION",
canal=getattr(request, "canal", None),
),
self._update_session_after_turn(session, response_text),
)
except Exception:
logger.exception("Error in quick-reply post-response work")
task = asyncio.create_task(_post_response()) # Update session
_background_tasks.add(task) await self._update_session_after_turn(session, response_text)
task.add_done_callback(_background_tasks.discard)
return response return response
@@ -317,19 +292,13 @@ class ConversationManagerService:
telefono, telefono,
) )
# Load conversation history and notifications in parallel # Load conversation history only if session is older than threshold
# (optimization: new/recent sessions don't need history context)
session_age = datetime.now(UTC) - session.created_at session_age = datetime.now(UTC) - session.created_at
load_history = session_age > timedelta( if session_age > timedelta(minutes=self.SESSION_RESET_THRESHOLD_MINUTES):
minutes=self.SESSION_RESET_THRESHOLD_MINUTES, entries = await self.firestore_service.get_entries(
) session.session_id,
limit=self.settings.conversation_context_message_limit,
if load_history:
entries, notifications = await asyncio.gather(
self.firestore_service.get_entries(
session.session_id,
limit=self.settings.conversation_context_message_limit,
),
self._get_active_notifications(telefono),
) )
logger.info( logger.info(
"Session is %s minutes old. Loaded %s conversation entries.", "Session is %s minutes old. Loaded %s conversation entries.",
@@ -338,12 +307,13 @@ class ConversationManagerService:
) )
else: else:
entries = [] entries = []
notifications = await self._get_active_notifications(telefono)
logger.info( logger.info(
"Session is only %s minutes old. Skipping history load.", "Session is only %s minutes old. Skipping history load.",
session_age.total_seconds() / 60, session_age.total_seconds() / 60,
) )
# Retrieve active notifications for this user
notifications = await self._get_active_notifications(telefono)
logger.info("Retrieved %s active notifications", len(notifications)) logger.info("Retrieved %s active notifications", len(notifications))
# Prepare current user message # Prepare current user message
@@ -374,8 +344,27 @@ class ConversationManagerService:
assistant_response[:100], assistant_response[:100],
) )
# Build response object first, then fire-and-forget persistence # Save conversation turn
response = DetectIntentResponse( await self._save_conversation_turn(
session_id=session.session_id,
user_text=request.mensaje,
assistant_text=assistant_response,
entry_type="LLM",
canal=getattr(request, "canal", None),
)
logger.info("Saved user message and assistant response to Firestore")
# Update session
await self._update_session_after_turn(session, assistant_response)
logger.info("Updated session in Firestore and Redis")
# Mark notifications as processed if any were included
if notifications:
await self._mark_notifications_as_processed(telefono)
logger.info("Marked %s notifications as processed", len(notifications))
# Return response object
return DetectIntentResponse(
responseId=str(uuid4()), responseId=str(uuid4()),
queryResult=QueryResult( queryResult=QueryResult(
responseText=assistant_response, responseText=assistant_response,
@@ -384,31 +373,6 @@ class ConversationManagerService:
quick_replies=None, quick_replies=None,
) )
# Fire-and-forget: persist conversation and update session
async def _post_response() -> None:
try:
coros = [
self._save_conversation_turn(
session_id=session.session_id,
user_text=request.mensaje,
assistant_text=assistant_response,
entry_type="LLM",
canal=getattr(request, "canal", None),
),
self._update_session_after_turn(session, assistant_response),
]
if notifications:
coros.append(self._mark_notifications_as_processed(telefono))
await asyncio.gather(*coros)
except Exception:
logger.exception("Error in post-response background work")
task = asyncio.create_task(_post_response())
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return response
def _is_pantalla_context_valid(self, last_modified: datetime) -> bool: def _is_pantalla_context_valid(self, last_modified: datetime) -> bool:
"""Check if pantallaContexto is still valid (not stale).""" """Check if pantallaContexto is still valid (not stale)."""
time_diff = datetime.now(UTC) - last_modified time_diff = datetime.now(UTC) - last_modified

View File

@@ -21,11 +21,6 @@ PREFIX_PO_PARAM = "notification_po_"
_background_tasks: set[asyncio.Task] = set() _background_tasks: set[asyncio.Task] = set()
def get_background_tasks() -> set[asyncio.Task]:
"""Return the set of pending background tasks (for graceful shutdown)."""
return _background_tasks
class NotificationManagerService: class NotificationManagerService:
"""Manages notification processing and integration with conversations. """Manages notification processing and integration with conversations.

View File

@@ -149,8 +149,13 @@ class QuickReplyContentService:
if quick_reply is None: if quick_reply is None:
logger.warning("Quick reply not found in cache for screen: %s", screen_id) logger.warning("Quick reply not found in cache for screen: %s", screen_id)
msg = f"Quick reply not found for screen_id: {screen_id}" return QuickReplyScreen(
raise ValueError(msg) header=None,
body=None,
button=None,
header_section=None,
preguntas=[],
)
logger.info( logger.info(
"Retrieved %s quick replies for screen: %s from cache", "Retrieved %s quick replies for screen: %s from cache",

View File

@@ -1,7 +1,6 @@
"""Quick reply session service for managing FAQ sessions.""" """Quick reply session service for managing FAQ sessions."""
import logging import logging
from datetime import UTC, datetime
from uuid import uuid4 from uuid import uuid4
from capa_de_integracion.models.quick_replies import QuickReplyScreen from capa_de_integracion.models.quick_replies import QuickReplyScreen
@@ -100,7 +99,6 @@ class QuickReplySessionService:
pantalla_contexto, pantalla_contexto,
) )
session.pantalla_contexto = pantalla_contexto session.pantalla_contexto = pantalla_contexto
session.last_modified = datetime.now(UTC)
else: else:
session_id = str(uuid4()) session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}" user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"

View File

@@ -95,13 +95,14 @@ class FirestoreService:
return session return session
logger.debug("No session found in Firestore for phone: %s", telefono) logger.debug("No session found in Firestore for phone: %s", telefono)
return None
except Exception: except Exception:
logger.exception( logger.exception(
"Error querying session by phone %s from Firestore:", "Error querying session by phone %s from Firestore:",
telefono, telefono,
) )
return None return None
else:
return None
async def save_session(self, session: ConversationSession) -> bool: async def save_session(self, session: ConversationSession) -> bool:
"""Save conversation session to Firestore.""" """Save conversation session to Firestore."""

View File

@@ -104,12 +104,12 @@ class RedisService:
phone_key = self._phone_to_session_key(session.telefono) phone_key = self._phone_to_session_key(session.telefono)
try: try:
# Save session data and phone mapping in a single pipeline # Save session data
data = session.model_dump_json(by_alias=False) data = session.model_dump_json(by_alias=False)
async with self.redis.pipeline(transaction=False) as pipe: await self.redis.setex(key, self.session_ttl, data)
pipe.setex(key, self.session_ttl, data)
pipe.setex(phone_key, self.session_ttl, session.session_id) # Save phone-to-session mapping
await pipe.execute() await self.redis.setex(phone_key, self.session_ttl, session.session_id)
logger.debug( logger.debug(
"Saved session to Redis: %s for phone: %s", "Saved session to Redis: %s for phone: %s",
@@ -384,10 +384,8 @@ class RedisService:
try: try:
logger.info("Deleting notification session for phone %s", phone_number) logger.info("Deleting notification session for phone %s", phone_number)
async with self.redis.pipeline(transaction=False) as pipe: await self.redis.delete(notification_key)
pipe.delete(notification_key) await self.redis.delete(phone_key)
pipe.delete(phone_key)
await pipe.execute()
except Exception: except Exception:
logger.exception( logger.exception(
"Error deleting notification session for phone %s:", "Error deleting notification session for phone %s:",

View File

@@ -1,6 +1,5 @@
"""Unit tests for ConversationManagerService.""" """Unit tests for ConversationManagerService."""
import asyncio
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from typing import Literal from typing import Literal
from unittest.mock import AsyncMock, Mock, patch from unittest.mock import AsyncMock, Mock, patch
@@ -472,7 +471,6 @@ class TestQuickReplyPath:
session=sample_session, session=sample_session,
) )
await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete
assert mock_firestore.save_entry.await_count == 2 assert mock_firestore.save_entry.await_count == 2
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -501,7 +499,6 @@ class TestQuickReplyPath:
session=sample_session, session=sample_session,
) )
await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete
mock_firestore.save_session.assert_awaited_once() mock_firestore.save_session.assert_awaited_once()
mock_redis.save_session.assert_awaited_once() mock_redis.save_session.assert_awaited_once()
@@ -574,7 +571,6 @@ class TestStandardConversation:
session=sample_session, session=sample_session,
) )
await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete
assert mock_firestore.save_entry.await_count == 2 assert mock_firestore.save_entry.await_count == 2
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -592,7 +588,6 @@ class TestStandardConversation:
session=sample_session, session=sample_session,
) )
await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete
# save_session is called in _update_session_after_turn # save_session is called in _update_session_after_turn
assert mock_firestore.save_session.await_count >= 1 assert mock_firestore.save_session.await_count >= 1
assert mock_redis.save_session.await_count >= 1 assert mock_redis.save_session.await_count >= 1
@@ -616,7 +611,6 @@ class TestStandardConversation:
session=sample_session, session=sample_session,
) )
await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete
mock_firestore.update_notification_status.assert_awaited_once() mock_firestore.update_notification_status.assert_awaited_once()
@pytest.mark.asyncio @pytest.mark.asyncio

View File

@@ -120,8 +120,10 @@ class TestSessionManagement:
mock_collection = MagicMock() mock_collection = MagicMock()
mock_where = MagicMock() mock_where = MagicMock()
mock_order = MagicMock()
mock_collection.where.return_value = mock_where mock_collection.where.return_value = mock_where
mock_where.limit.return_value = mock_query mock_where.order_by.return_value = mock_order
mock_order.limit.return_value = mock_query
original_collection = clean_firestore.db.collection original_collection = clean_firestore.db.collection
clean_firestore.db.collection = MagicMock(return_value=mock_collection) clean_firestore.db.collection = MagicMock(return_value=mock_collection)

View File

@@ -1,6 +1,5 @@
"""Tests for QuickReplySessionService.""" """Tests for QuickReplySessionService."""
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, Mock from unittest.mock import AsyncMock, Mock
from uuid import uuid4 from uuid import uuid4
@@ -161,39 +160,6 @@ async def test_start_session_existing_user(service, mock_firestore, mock_redis,
mock_content.get_quick_replies.assert_called_once_with("pagos") mock_content.get_quick_replies.assert_called_once_with("pagos")
@pytest.mark.asyncio
async def test_start_session_updates_last_modified_on_existing(
service, mock_firestore, mock_redis, mock_content
):
"""Test that last_modified is refreshed when updating pantalla_contexto.
Ensures quick reply context won't be incorrectly marked as stale
when the session was idle before the user opened a quick reply screen.
"""
stale_time = datetime.now(UTC) - timedelta(minutes=20)
test_session = ConversationSession.create(
session_id="session-123",
user_id="user_by_phone_5551234",
telefono="555-1234",
pantalla_contexto=None,
)
test_session.last_modified = stale_time
mock_redis.get_session.return_value = test_session
mock_content.get_quick_replies.return_value = QuickReplyScreen(
header="H", body=None, button=None, header_section=None, preguntas=[]
)
await service.start_quick_reply_session(
telefono="555-1234",
_nombre="John",
pantalla_contexto="pagos",
)
saved_session = mock_redis.save_session.call_args[0][0]
assert saved_session.last_modified > stale_time
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_start_session_invalid_phone(service): async def test_start_session_invalid_phone(service):
"""Test starting session with invalid phone number.""" """Test starting session with invalid phone number."""

View File

@@ -47,15 +47,14 @@ def test_app_has_routers():
def test_main_entry_point(): def test_main_entry_point():
"""Test main entry point calls uvicorn.run.""" """Test main entry point calls uvicorn.run."""
with patch("capa_de_integracion.main.uvicorn.run") as mock_run, \ with patch("capa_de_integracion.main.uvicorn.run") as mock_run:
patch("sys.argv", ["capa-de-integracion"]):
main() main()
mock_run.assert_called_once() mock_run.assert_called_once()
call_kwargs = mock_run.call_args.kwargs call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["host"] == "0.0.0.0" assert call_kwargs["host"] == "0.0.0.0"
assert call_kwargs["port"] == 8080 assert call_kwargs["port"] == 8080
assert call_kwargs["workers"] == 1 assert call_kwargs["reload"] is True
@pytest.mark.asyncio @pytest.mark.asyncio