Compare commits
14 Commits
383efed319
...
2d547089dd
| Author | SHA1 | Date | |
|---|---|---|---|
| 2d547089dd | |||
|
|
a42bd380a8 | ||
| 9118913c6b | |||
| dba4122653 | |||
| 734cade8d9 | |||
| 5ceaadb20c | |||
| 59a76fc226 | |||
| c01f4d1ab3 | |||
| 2c722c1166 | |||
| 58393a538e | |||
| bcdc41ecd5 | |||
| 14ed21a1f9 | |||
| 03292a635c | |||
| 6f629c53a6 |
@@ -74,7 +74,7 @@ filterwarnings = [
|
||||
]
|
||||
|
||||
env = [
|
||||
"FIRESTORE_EMULATOR_HOST=[::1]:8462",
|
||||
"FIRESTORE_EMULATOR_HOST=[::1]:8488",
|
||||
"GCP_PROJECT_ID=test-project",
|
||||
"GCP_LOCATION=us-central1",
|
||||
"GCP_FIRESTORE_DATABASE_ID=(default)",
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
"""Dependency injection and service lifecycle management."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from functools import lru_cache
|
||||
|
||||
from capa_de_integracion.services.rag import (
|
||||
@@ -18,12 +16,8 @@ from .services import (
|
||||
QuickReplyContentService,
|
||||
QuickReplySessionService,
|
||||
)
|
||||
from .services.conversation import get_background_tasks as conv_bg_tasks
|
||||
from .services.notifications import get_background_tasks as notif_bg_tasks
|
||||
from .services.storage import FirestoreService, RedisService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_redis_service() -> RedisService:
|
||||
@@ -112,12 +106,6 @@ async def startup_services() -> None:
|
||||
|
||||
async def shutdown_services() -> None:
|
||||
"""Close all service connections on shutdown."""
|
||||
# Drain in-flight background tasks before closing connections
|
||||
all_tasks = conv_bg_tasks() | notif_bg_tasks()
|
||||
if all_tasks:
|
||||
logger.info("Draining %d background tasks before shutdown…", len(all_tasks))
|
||||
await asyncio.gather(*all_tasks, return_exceptions=True)
|
||||
|
||||
# Close Redis
|
||||
redis = get_redis_service()
|
||||
await redis.close()
|
||||
|
||||
@@ -26,11 +26,6 @@ logger = logging.getLogger(__name__)
|
||||
# Keep references to background tasks to prevent garbage collection
|
||||
_background_tasks: set[asyncio.Task[None]] = set()
|
||||
|
||||
|
||||
def get_background_tasks() -> set[asyncio.Task[None]]:
|
||||
"""Return the set of pending background tasks (for graceful shutdown)."""
|
||||
return _background_tasks
|
||||
|
||||
MSG_EMPTY_MESSAGE = "Message cannot be empty"
|
||||
|
||||
|
||||
@@ -176,15 +171,11 @@ class ConversationManagerService:
|
||||
canal: Communication channel
|
||||
|
||||
"""
|
||||
# Save user and assistant entries in parallel.
|
||||
# Use a single timestamp for both, but offset the assistant entry by 1µs
|
||||
# to avoid Firestore document ID collision (save_entry uses isoformat()
|
||||
# as the document ID).
|
||||
now = datetime.now(UTC)
|
||||
# Save user and assistant entries in parallel
|
||||
user_entry = ConversationEntry(
|
||||
entity="user",
|
||||
type=entry_type,
|
||||
timestamp=now,
|
||||
timestamp=datetime.now(UTC),
|
||||
text=user_text,
|
||||
parameters=None,
|
||||
canal=canal,
|
||||
@@ -192,7 +183,7 @@ class ConversationManagerService:
|
||||
assistant_entry = ConversationEntry(
|
||||
entity="assistant",
|
||||
type=entry_type,
|
||||
timestamp=now + timedelta(microseconds=1),
|
||||
timestamp=datetime.now(UTC),
|
||||
text=assistant_text,
|
||||
parameters=None,
|
||||
canal=canal,
|
||||
|
||||
@@ -21,11 +21,6 @@ PREFIX_PO_PARAM = "notification_po_"
|
||||
_background_tasks: set[asyncio.Task] = set()
|
||||
|
||||
|
||||
def get_background_tasks() -> set[asyncio.Task]:
|
||||
"""Return the set of pending background tasks (for graceful shutdown)."""
|
||||
return _background_tasks
|
||||
|
||||
|
||||
class NotificationManagerService:
|
||||
"""Manages notification processing and integration with conversations.
|
||||
|
||||
|
||||
@@ -149,8 +149,13 @@ class QuickReplyContentService:
|
||||
|
||||
if quick_reply is None:
|
||||
logger.warning("Quick reply not found in cache for screen: %s", screen_id)
|
||||
msg = f"Quick reply not found for screen_id: {screen_id}"
|
||||
raise ValueError(msg)
|
||||
return QuickReplyScreen(
|
||||
header=None,
|
||||
body=None,
|
||||
button=None,
|
||||
header_section=None,
|
||||
preguntas=[],
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Retrieved %s quick replies for screen: %s from cache",
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"""Quick reply session service for managing FAQ sessions."""
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from uuid import uuid4
|
||||
|
||||
from capa_de_integracion.models.quick_replies import QuickReplyScreen
|
||||
@@ -100,7 +99,6 @@ class QuickReplySessionService:
|
||||
pantalla_contexto,
|
||||
)
|
||||
session.pantalla_contexto = pantalla_contexto
|
||||
session.last_modified = datetime.now(UTC)
|
||||
else:
|
||||
session_id = str(uuid4())
|
||||
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
||||
|
||||
@@ -95,13 +95,14 @@ class FirestoreService:
|
||||
return session
|
||||
|
||||
logger.debug("No session found in Firestore for phone: %s", telefono)
|
||||
return None
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Error querying session by phone %s from Firestore:",
|
||||
telefono,
|
||||
)
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
async def save_session(self, session: ConversationSession) -> bool:
|
||||
"""Save conversation session to Firestore."""
|
||||
|
||||
@@ -120,8 +120,10 @@ class TestSessionManagement:
|
||||
|
||||
mock_collection = MagicMock()
|
||||
mock_where = MagicMock()
|
||||
mock_order = MagicMock()
|
||||
mock_collection.where.return_value = mock_where
|
||||
mock_where.limit.return_value = mock_query
|
||||
mock_where.order_by.return_value = mock_order
|
||||
mock_order.limit.return_value = mock_query
|
||||
|
||||
original_collection = clean_firestore.db.collection
|
||||
clean_firestore.db.collection = MagicMock(return_value=mock_collection)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Tests for QuickReplySessionService."""
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
from uuid import uuid4
|
||||
|
||||
@@ -161,39 +160,6 @@ async def test_start_session_existing_user(service, mock_firestore, mock_redis,
|
||||
mock_content.get_quick_replies.assert_called_once_with("pagos")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_session_updates_last_modified_on_existing(
|
||||
service, mock_firestore, mock_redis, mock_content
|
||||
):
|
||||
"""Test that last_modified is refreshed when updating pantalla_contexto.
|
||||
|
||||
Ensures quick reply context won't be incorrectly marked as stale
|
||||
when the session was idle before the user opened a quick reply screen.
|
||||
"""
|
||||
stale_time = datetime.now(UTC) - timedelta(minutes=20)
|
||||
test_session = ConversationSession.create(
|
||||
session_id="session-123",
|
||||
user_id="user_by_phone_5551234",
|
||||
telefono="555-1234",
|
||||
pantalla_contexto=None,
|
||||
)
|
||||
test_session.last_modified = stale_time
|
||||
|
||||
mock_redis.get_session.return_value = test_session
|
||||
mock_content.get_quick_replies.return_value = QuickReplyScreen(
|
||||
header="H", body=None, button=None, header_section=None, preguntas=[]
|
||||
)
|
||||
|
||||
await service.start_quick_reply_session(
|
||||
telefono="555-1234",
|
||||
_nombre="John",
|
||||
pantalla_contexto="pagos",
|
||||
)
|
||||
|
||||
saved_session = mock_redis.save_session.call_args[0][0]
|
||||
assert saved_session.last_modified > stale_time
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_session_invalid_phone(service):
|
||||
"""Test starting session with invalid phone number."""
|
||||
|
||||
@@ -47,15 +47,14 @@ def test_app_has_routers():
|
||||
|
||||
def test_main_entry_point():
|
||||
"""Test main entry point calls uvicorn.run."""
|
||||
with patch("capa_de_integracion.main.uvicorn.run") as mock_run, \
|
||||
patch("sys.argv", ["capa-de-integracion"]):
|
||||
with patch("capa_de_integracion.main.uvicorn.run") as mock_run:
|
||||
main()
|
||||
|
||||
mock_run.assert_called_once()
|
||||
call_kwargs = mock_run.call_args.kwargs
|
||||
assert call_kwargs["host"] == "0.0.0.0"
|
||||
assert call_kwargs["port"] == 8080
|
||||
assert call_kwargs["workers"] == 1
|
||||
assert call_kwargs["reload"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Reference in New Issue
Block a user