From 2d547089dd6da7371c47813a91aed6ff2306ceb1 Mon Sep 17 00:00:00 2001 From: Anibal Angulo Date: Fri, 20 Feb 2026 15:59:19 +0000 Subject: [PATCH] Optimization --- pyproject.toml | 2 +- .../services/conversation.py | 135 +++++++++++------- .../services/storage/redis.py | 16 ++- tests/services/test_conversation_service.py | 6 + 4 files changed, 97 insertions(+), 62 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9a41602..9c515c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,7 +74,7 @@ filterwarnings = [ ] env = [ - "FIRESTORE_EMULATOR_HOST=[::1]:8469", + "FIRESTORE_EMULATOR_HOST=[::1]:8488", "GCP_PROJECT_ID=test-project", "GCP_LOCATION=us-central1", "GCP_FIRESTORE_DATABASE_ID=(default)", diff --git a/src/capa_de_integracion/services/conversation.py b/src/capa_de_integracion/services/conversation.py index 009cdf8..ddf515a 100644 --- a/src/capa_de_integracion/services/conversation.py +++ b/src/capa_de_integracion/services/conversation.py @@ -1,5 +1,6 @@ """Conversation manager service for orchestrating user conversations.""" +import asyncio import logging import re from datetime import UTC, datetime, timedelta @@ -22,6 +23,9 @@ from capa_de_integracion.services.storage.redis import RedisService logger = logging.getLogger(__name__) +# Keep references to background tasks to prevent garbage collection +_background_tasks: set[asyncio.Task[None]] = set() + MSG_EMPTY_MESSAGE = "Message cannot be empty" @@ -88,16 +92,16 @@ class ConversationManagerService: # Step 1: Validate message is not empty self._validate_message(request.mensaje) - # Step 2: Apply DLP security - obfuscated_message = await self.dlp_service.get_obfuscated_string( - request.mensaje, - self.settings.dlp_template_complete_flow, + # Step 2+3: Apply DLP security and obtain session in parallel + telefono = request.usuario.telefono + obfuscated_message, session = await asyncio.gather( + self.dlp_service.get_obfuscated_string( + request.mensaje, + self.settings.dlp_template_complete_flow, + ), + self._obtain_or_create_session(telefono), ) request.mensaje = obfuscated_message - telefono = request.usuario.telefono - - # Step 3: Obtain or create session - session = await self._obtain_or_create_session(telefono) # Step 4: Try quick reply path first response = await self._handle_quick_reply_path(request, session) @@ -131,6 +135,8 @@ class ConversationManagerService: # Try Firestore if Redis miss session = await self.firestore_service.get_session_by_phone(telefono) if session: + # Cache to Redis for subsequent requests + await self.redis_service.save_session(session) return session # Create new session if both miss @@ -165,7 +171,7 @@ class ConversationManagerService: canal: Communication channel """ - # Save user entry + # Save user and assistant entries in parallel user_entry = ConversationEntry( entity="user", type=entry_type, @@ -174,9 +180,6 @@ class ConversationManagerService: parameters=None, canal=canal, ) - await self.firestore_service.save_entry(session_id, user_entry) - - # Save assistant entry assistant_entry = ConversationEntry( entity="assistant", type=entry_type, @@ -185,7 +188,10 @@ class ConversationManagerService: parameters=None, canal=canal, ) - await self.firestore_service.save_entry(session_id, assistant_entry) + await asyncio.gather( + self.firestore_service.save_entry(session_id, user_entry), + self.firestore_service.save_entry(session_id, assistant_entry), + ) async def _update_session_after_turn( self, @@ -204,8 +210,10 @@ class ConversationManagerService: """ session.last_message = last_message session.last_modified = datetime.now(UTC) - await self.firestore_service.save_session(session) - await self.redis_service.save_session(session) + await asyncio.gather( + self.firestore_service.save_session(session), + self.redis_service.save_session(session), + ) async def _handle_quick_reply_path( self, @@ -253,17 +261,25 @@ class ConversationManagerService: response.query_result.response_text if response.query_result else "" ) or "" - # Save conversation turn - await self._save_conversation_turn( - session_id=session.session_id, - user_text=request.mensaje, - assistant_text=response_text, - entry_type="CONVERSACION", - canal=getattr(request, "canal", None), - ) + # Fire-and-forget: persist conversation turn and update session + async def _post_response() -> None: + try: + await asyncio.gather( + self._save_conversation_turn( + session_id=session.session_id, + user_text=request.mensaje, + assistant_text=response_text, + entry_type="CONVERSACION", + canal=getattr(request, "canal", None), + ), + self._update_session_after_turn(session, response_text), + ) + except Exception: + logger.exception("Error in quick-reply post-response work") - # Update session - await self._update_session_after_turn(session, response_text) + task = asyncio.create_task(_post_response()) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) return response @@ -292,13 +308,19 @@ class ConversationManagerService: telefono, ) - # Load conversation history only if session is older than threshold - # (optimization: new/recent sessions don't need history context) + # Load conversation history and notifications in parallel session_age = datetime.now(UTC) - session.created_at - if session_age > timedelta(minutes=self.SESSION_RESET_THRESHOLD_MINUTES): - entries = await self.firestore_service.get_entries( - session.session_id, - limit=self.settings.conversation_context_message_limit, + load_history = session_age > timedelta( + minutes=self.SESSION_RESET_THRESHOLD_MINUTES, + ) + + if load_history: + entries, notifications = await asyncio.gather( + self.firestore_service.get_entries( + session.session_id, + limit=self.settings.conversation_context_message_limit, + ), + self._get_active_notifications(telefono), ) logger.info( "Session is %s minutes old. Loaded %s conversation entries.", @@ -307,13 +329,12 @@ class ConversationManagerService: ) else: entries = [] + notifications = await self._get_active_notifications(telefono) logger.info( "Session is only %s minutes old. Skipping history load.", session_age.total_seconds() / 60, ) - # Retrieve active notifications for this user - notifications = await self._get_active_notifications(telefono) logger.info("Retrieved %s active notifications", len(notifications)) # Prepare current user message @@ -344,27 +365,8 @@ class ConversationManagerService: assistant_response[:100], ) - # Save conversation turn - await self._save_conversation_turn( - session_id=session.session_id, - user_text=request.mensaje, - assistant_text=assistant_response, - entry_type="LLM", - canal=getattr(request, "canal", None), - ) - logger.info("Saved user message and assistant response to Firestore") - - # Update session - await self._update_session_after_turn(session, assistant_response) - logger.info("Updated session in Firestore and Redis") - - # Mark notifications as processed if any were included - if notifications: - await self._mark_notifications_as_processed(telefono) - logger.info("Marked %s notifications as processed", len(notifications)) - - # Return response object - return DetectIntentResponse( + # Build response object first, then fire-and-forget persistence + response = DetectIntentResponse( responseId=str(uuid4()), queryResult=QueryResult( responseText=assistant_response, @@ -373,6 +375,31 @@ class ConversationManagerService: quick_replies=None, ) + # Fire-and-forget: persist conversation and update session + async def _post_response() -> None: + try: + coros = [ + self._save_conversation_turn( + session_id=session.session_id, + user_text=request.mensaje, + assistant_text=assistant_response, + entry_type="LLM", + canal=getattr(request, "canal", None), + ), + self._update_session_after_turn(session, assistant_response), + ] + if notifications: + coros.append(self._mark_notifications_as_processed(telefono)) + await asyncio.gather(*coros) + except Exception: + logger.exception("Error in post-response background work") + + task = asyncio.create_task(_post_response()) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) + + return response + def _is_pantalla_context_valid(self, last_modified: datetime) -> bool: """Check if pantallaContexto is still valid (not stale).""" time_diff = datetime.now(UTC) - last_modified diff --git a/src/capa_de_integracion/services/storage/redis.py b/src/capa_de_integracion/services/storage/redis.py index 3868cf9..ac90b0f 100644 --- a/src/capa_de_integracion/services/storage/redis.py +++ b/src/capa_de_integracion/services/storage/redis.py @@ -104,12 +104,12 @@ class RedisService: phone_key = self._phone_to_session_key(session.telefono) try: - # Save session data + # Save session data and phone mapping in a single pipeline data = session.model_dump_json(by_alias=False) - await self.redis.setex(key, self.session_ttl, data) - - # Save phone-to-session mapping - await self.redis.setex(phone_key, self.session_ttl, session.session_id) + async with self.redis.pipeline(transaction=False) as pipe: + pipe.setex(key, self.session_ttl, data) + pipe.setex(phone_key, self.session_ttl, session.session_id) + await pipe.execute() logger.debug( "Saved session to Redis: %s for phone: %s", @@ -384,8 +384,10 @@ class RedisService: try: logger.info("Deleting notification session for phone %s", phone_number) - await self.redis.delete(notification_key) - await self.redis.delete(phone_key) + async with self.redis.pipeline(transaction=False) as pipe: + pipe.delete(notification_key) + pipe.delete(phone_key) + await pipe.execute() except Exception: logger.exception( "Error deleting notification session for phone %s:", diff --git a/tests/services/test_conversation_service.py b/tests/services/test_conversation_service.py index b20a5c4..6f573a8 100644 --- a/tests/services/test_conversation_service.py +++ b/tests/services/test_conversation_service.py @@ -1,5 +1,6 @@ """Unit tests for ConversationManagerService.""" +import asyncio from datetime import UTC, datetime, timedelta from typing import Literal from unittest.mock import AsyncMock, Mock, patch @@ -471,6 +472,7 @@ class TestQuickReplyPath: session=sample_session, ) + await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete assert mock_firestore.save_entry.await_count == 2 @pytest.mark.asyncio @@ -499,6 +501,7 @@ class TestQuickReplyPath: session=sample_session, ) + await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete mock_firestore.save_session.assert_awaited_once() mock_redis.save_session.assert_awaited_once() @@ -571,6 +574,7 @@ class TestStandardConversation: session=sample_session, ) + await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete assert mock_firestore.save_entry.await_count == 2 @pytest.mark.asyncio @@ -588,6 +592,7 @@ class TestStandardConversation: session=sample_session, ) + await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete # save_session is called in _update_session_after_turn assert mock_firestore.save_session.await_count >= 1 assert mock_redis.save_session.await_count >= 1 @@ -611,6 +616,7 @@ class TestStandardConversation: session=sample_session, ) + await asyncio.sleep(0.01) # Let fire-and-forget background tasks complete mock_firestore.update_notification_status.assert_awaited_once() @pytest.mark.asyncio