Compare commits
11 Commits
2d547089dd
...
f-a-f
| Author | SHA1 | Date | |
|---|---|---|---|
| 383efed319 | |||
|
|
ade4689ab7 | ||
| 3a796dd966 | |||
| e5ff673a54 | |||
| fd6b698077 | |||
| f848bbf0f2 | |||
| e03747f526 | |||
| b86dfe7373 | |||
| d663394106 | |||
| 595abd6cd3 | |||
| faa04a0d01 |
@@ -74,7 +74,7 @@ filterwarnings = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
env = [
|
env = [
|
||||||
"FIRESTORE_EMULATOR_HOST=[::1]:8488",
|
"FIRESTORE_EMULATOR_HOST=[::1]:8462",
|
||||||
"GCP_PROJECT_ID=test-project",
|
"GCP_PROJECT_ID=test-project",
|
||||||
"GCP_LOCATION=us-central1",
|
"GCP_LOCATION=us-central1",
|
||||||
"GCP_FIRESTORE_DATABASE_ID=(default)",
|
"GCP_FIRESTORE_DATABASE_ID=(default)",
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
"""Dependency injection and service lifecycle management."""
|
"""Dependency injection and service lifecycle management."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
|
||||||
from capa_de_integracion.services.rag import (
|
from capa_de_integracion.services.rag import (
|
||||||
@@ -16,8 +18,12 @@ from .services import (
|
|||||||
QuickReplyContentService,
|
QuickReplyContentService,
|
||||||
QuickReplySessionService,
|
QuickReplySessionService,
|
||||||
)
|
)
|
||||||
|
from .services.conversation import get_background_tasks as conv_bg_tasks
|
||||||
|
from .services.notifications import get_background_tasks as notif_bg_tasks
|
||||||
from .services.storage import FirestoreService, RedisService
|
from .services.storage import FirestoreService, RedisService
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@lru_cache(maxsize=1)
|
@lru_cache(maxsize=1)
|
||||||
def get_redis_service() -> RedisService:
|
def get_redis_service() -> RedisService:
|
||||||
@@ -106,6 +112,12 @@ async def startup_services() -> None:
|
|||||||
|
|
||||||
async def shutdown_services() -> None:
|
async def shutdown_services() -> None:
|
||||||
"""Close all service connections on shutdown."""
|
"""Close all service connections on shutdown."""
|
||||||
|
# Drain in-flight background tasks before closing connections
|
||||||
|
all_tasks = conv_bg_tasks() | notif_bg_tasks()
|
||||||
|
if all_tasks:
|
||||||
|
logger.info("Draining %d background tasks before shutdown…", len(all_tasks))
|
||||||
|
await asyncio.gather(*all_tasks, return_exceptions=True)
|
||||||
|
|
||||||
# Close Redis
|
# Close Redis
|
||||||
redis = get_redis_service()
|
redis = get_redis_service()
|
||||||
await redis.close()
|
await redis.close()
|
||||||
|
|||||||
@@ -26,6 +26,11 @@ logger = logging.getLogger(__name__)
|
|||||||
# Keep references to background tasks to prevent garbage collection
|
# Keep references to background tasks to prevent garbage collection
|
||||||
_background_tasks: set[asyncio.Task[None]] = set()
|
_background_tasks: set[asyncio.Task[None]] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def get_background_tasks() -> set[asyncio.Task[None]]:
|
||||||
|
"""Return the set of pending background tasks (for graceful shutdown)."""
|
||||||
|
return _background_tasks
|
||||||
|
|
||||||
MSG_EMPTY_MESSAGE = "Message cannot be empty"
|
MSG_EMPTY_MESSAGE = "Message cannot be empty"
|
||||||
|
|
||||||
|
|
||||||
@@ -171,11 +176,15 @@ class ConversationManagerService:
|
|||||||
canal: Communication channel
|
canal: Communication channel
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# Save user and assistant entries in parallel
|
# Save user and assistant entries in parallel.
|
||||||
|
# Use a single timestamp for both, but offset the assistant entry by 1µs
|
||||||
|
# to avoid Firestore document ID collision (save_entry uses isoformat()
|
||||||
|
# as the document ID).
|
||||||
|
now = datetime.now(UTC)
|
||||||
user_entry = ConversationEntry(
|
user_entry = ConversationEntry(
|
||||||
entity="user",
|
entity="user",
|
||||||
type=entry_type,
|
type=entry_type,
|
||||||
timestamp=datetime.now(UTC),
|
timestamp=now,
|
||||||
text=user_text,
|
text=user_text,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
canal=canal,
|
canal=canal,
|
||||||
@@ -183,7 +192,7 @@ class ConversationManagerService:
|
|||||||
assistant_entry = ConversationEntry(
|
assistant_entry = ConversationEntry(
|
||||||
entity="assistant",
|
entity="assistant",
|
||||||
type=entry_type,
|
type=entry_type,
|
||||||
timestamp=datetime.now(UTC),
|
timestamp=now + timedelta(microseconds=1),
|
||||||
text=assistant_text,
|
text=assistant_text,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
canal=canal,
|
canal=canal,
|
||||||
|
|||||||
@@ -21,6 +21,11 @@ PREFIX_PO_PARAM = "notification_po_"
|
|||||||
_background_tasks: set[asyncio.Task] = set()
|
_background_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def get_background_tasks() -> set[asyncio.Task]:
|
||||||
|
"""Return the set of pending background tasks (for graceful shutdown)."""
|
||||||
|
return _background_tasks
|
||||||
|
|
||||||
|
|
||||||
class NotificationManagerService:
|
class NotificationManagerService:
|
||||||
"""Manages notification processing and integration with conversations.
|
"""Manages notification processing and integration with conversations.
|
||||||
|
|
||||||
|
|||||||
@@ -149,13 +149,8 @@ class QuickReplyContentService:
|
|||||||
|
|
||||||
if quick_reply is None:
|
if quick_reply is None:
|
||||||
logger.warning("Quick reply not found in cache for screen: %s", screen_id)
|
logger.warning("Quick reply not found in cache for screen: %s", screen_id)
|
||||||
return QuickReplyScreen(
|
msg = f"Quick reply not found for screen_id: {screen_id}"
|
||||||
header=None,
|
raise ValueError(msg)
|
||||||
body=None,
|
|
||||||
button=None,
|
|
||||||
header_section=None,
|
|
||||||
preguntas=[],
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Retrieved %s quick replies for screen: %s from cache",
|
"Retrieved %s quick replies for screen: %s from cache",
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
"""Quick reply session service for managing FAQ sessions."""
|
"""Quick reply session service for managing FAQ sessions."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from datetime import UTC, datetime
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from capa_de_integracion.models.quick_replies import QuickReplyScreen
|
from capa_de_integracion.models.quick_replies import QuickReplyScreen
|
||||||
@@ -99,6 +100,7 @@ class QuickReplySessionService:
|
|||||||
pantalla_contexto,
|
pantalla_contexto,
|
||||||
)
|
)
|
||||||
session.pantalla_contexto = pantalla_contexto
|
session.pantalla_contexto = pantalla_contexto
|
||||||
|
session.last_modified = datetime.now(UTC)
|
||||||
else:
|
else:
|
||||||
session_id = str(uuid4())
|
session_id = str(uuid4())
|
||||||
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
||||||
|
|||||||
@@ -95,14 +95,13 @@ class FirestoreService:
|
|||||||
return session
|
return session
|
||||||
|
|
||||||
logger.debug("No session found in Firestore for phone: %s", telefono)
|
logger.debug("No session found in Firestore for phone: %s", telefono)
|
||||||
|
return None
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
"Error querying session by phone %s from Firestore:",
|
"Error querying session by phone %s from Firestore:",
|
||||||
telefono,
|
telefono,
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def save_session(self, session: ConversationSession) -> bool:
|
async def save_session(self, session: ConversationSession) -> bool:
|
||||||
"""Save conversation session to Firestore."""
|
"""Save conversation session to Firestore."""
|
||||||
|
|||||||
@@ -120,10 +120,8 @@ class TestSessionManagement:
|
|||||||
|
|
||||||
mock_collection = MagicMock()
|
mock_collection = MagicMock()
|
||||||
mock_where = MagicMock()
|
mock_where = MagicMock()
|
||||||
mock_order = MagicMock()
|
|
||||||
mock_collection.where.return_value = mock_where
|
mock_collection.where.return_value = mock_where
|
||||||
mock_where.order_by.return_value = mock_order
|
mock_where.limit.return_value = mock_query
|
||||||
mock_order.limit.return_value = mock_query
|
|
||||||
|
|
||||||
original_collection = clean_firestore.db.collection
|
original_collection = clean_firestore.db.collection
|
||||||
clean_firestore.db.collection = MagicMock(return_value=mock_collection)
|
clean_firestore.db.collection = MagicMock(return_value=mock_collection)
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
"""Tests for QuickReplySessionService."""
|
"""Tests for QuickReplySessionService."""
|
||||||
|
|
||||||
|
from datetime import UTC, datetime, timedelta
|
||||||
from unittest.mock import AsyncMock, Mock
|
from unittest.mock import AsyncMock, Mock
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
@@ -160,6 +161,39 @@ async def test_start_session_existing_user(service, mock_firestore, mock_redis,
|
|||||||
mock_content.get_quick_replies.assert_called_once_with("pagos")
|
mock_content.get_quick_replies.assert_called_once_with("pagos")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_session_updates_last_modified_on_existing(
|
||||||
|
service, mock_firestore, mock_redis, mock_content
|
||||||
|
):
|
||||||
|
"""Test that last_modified is refreshed when updating pantalla_contexto.
|
||||||
|
|
||||||
|
Ensures quick reply context won't be incorrectly marked as stale
|
||||||
|
when the session was idle before the user opened a quick reply screen.
|
||||||
|
"""
|
||||||
|
stale_time = datetime.now(UTC) - timedelta(minutes=20)
|
||||||
|
test_session = ConversationSession.create(
|
||||||
|
session_id="session-123",
|
||||||
|
user_id="user_by_phone_5551234",
|
||||||
|
telefono="555-1234",
|
||||||
|
pantalla_contexto=None,
|
||||||
|
)
|
||||||
|
test_session.last_modified = stale_time
|
||||||
|
|
||||||
|
mock_redis.get_session.return_value = test_session
|
||||||
|
mock_content.get_quick_replies.return_value = QuickReplyScreen(
|
||||||
|
header="H", body=None, button=None, header_section=None, preguntas=[]
|
||||||
|
)
|
||||||
|
|
||||||
|
await service.start_quick_reply_session(
|
||||||
|
telefono="555-1234",
|
||||||
|
_nombre="John",
|
||||||
|
pantalla_contexto="pagos",
|
||||||
|
)
|
||||||
|
|
||||||
|
saved_session = mock_redis.save_session.call_args[0][0]
|
||||||
|
assert saved_session.last_modified > stale_time
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_start_session_invalid_phone(service):
|
async def test_start_session_invalid_phone(service):
|
||||||
"""Test starting session with invalid phone number."""
|
"""Test starting session with invalid phone number."""
|
||||||
|
|||||||
@@ -47,14 +47,15 @@ def test_app_has_routers():
|
|||||||
|
|
||||||
def test_main_entry_point():
|
def test_main_entry_point():
|
||||||
"""Test main entry point calls uvicorn.run."""
|
"""Test main entry point calls uvicorn.run."""
|
||||||
with patch("capa_de_integracion.main.uvicorn.run") as mock_run:
|
with patch("capa_de_integracion.main.uvicorn.run") as mock_run, \
|
||||||
|
patch("sys.argv", ["capa-de-integracion"]):
|
||||||
main()
|
main()
|
||||||
|
|
||||||
mock_run.assert_called_once()
|
mock_run.assert_called_once()
|
||||||
call_kwargs = mock_run.call_args.kwargs
|
call_kwargs = mock_run.call_args.kwargs
|
||||||
assert call_kwargs["host"] == "0.0.0.0"
|
assert call_kwargs["host"] == "0.0.0.0"
|
||||||
assert call_kwargs["port"] == 8080
|
assert call_kwargs["port"] == 8080
|
||||||
assert call_kwargs["reload"] is True
|
assert call_kwargs["workers"] == 1
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
Reference in New Issue
Block a user