Compare commits
14 Commits
f-a-f
...
2d547089dd
| Author | SHA1 | Date | |
|---|---|---|---|
| 2d547089dd | |||
|
|
a42bd380a8 | ||
| 9118913c6b | |||
| dba4122653 | |||
| 734cade8d9 | |||
| 5ceaadb20c | |||
| 59a76fc226 | |||
| c01f4d1ab3 | |||
| 2c722c1166 | |||
| 58393a538e | |||
| bcdc41ecd5 | |||
| 14ed21a1f9 | |||
| 03292a635c | |||
| 6f629c53a6 |
@@ -74,7 +74,7 @@ filterwarnings = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
env = [
|
env = [
|
||||||
"FIRESTORE_EMULATOR_HOST=[::1]:8462",
|
"FIRESTORE_EMULATOR_HOST=[::1]:8488",
|
||||||
"GCP_PROJECT_ID=test-project",
|
"GCP_PROJECT_ID=test-project",
|
||||||
"GCP_LOCATION=us-central1",
|
"GCP_LOCATION=us-central1",
|
||||||
"GCP_FIRESTORE_DATABASE_ID=(default)",
|
"GCP_FIRESTORE_DATABASE_ID=(default)",
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
"""Dependency injection and service lifecycle management."""
|
"""Dependency injection and service lifecycle management."""
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
|
||||||
from capa_de_integracion.services.rag import (
|
from capa_de_integracion.services.rag import (
|
||||||
@@ -18,12 +16,8 @@ from .services import (
|
|||||||
QuickReplyContentService,
|
QuickReplyContentService,
|
||||||
QuickReplySessionService,
|
QuickReplySessionService,
|
||||||
)
|
)
|
||||||
from .services.conversation import get_background_tasks as conv_bg_tasks
|
|
||||||
from .services.notifications import get_background_tasks as notif_bg_tasks
|
|
||||||
from .services.storage import FirestoreService, RedisService
|
from .services.storage import FirestoreService, RedisService
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
@lru_cache(maxsize=1)
|
@lru_cache(maxsize=1)
|
||||||
def get_redis_service() -> RedisService:
|
def get_redis_service() -> RedisService:
|
||||||
@@ -112,12 +106,6 @@ async def startup_services() -> None:
|
|||||||
|
|
||||||
async def shutdown_services() -> None:
|
async def shutdown_services() -> None:
|
||||||
"""Close all service connections on shutdown."""
|
"""Close all service connections on shutdown."""
|
||||||
# Drain in-flight background tasks before closing connections
|
|
||||||
all_tasks = conv_bg_tasks() | notif_bg_tasks()
|
|
||||||
if all_tasks:
|
|
||||||
logger.info("Draining %d background tasks before shutdown…", len(all_tasks))
|
|
||||||
await asyncio.gather(*all_tasks, return_exceptions=True)
|
|
||||||
|
|
||||||
# Close Redis
|
# Close Redis
|
||||||
redis = get_redis_service()
|
redis = get_redis_service()
|
||||||
await redis.close()
|
await redis.close()
|
||||||
|
|||||||
@@ -26,11 +26,6 @@ logger = logging.getLogger(__name__)
|
|||||||
# Keep references to background tasks to prevent garbage collection
|
# Keep references to background tasks to prevent garbage collection
|
||||||
_background_tasks: set[asyncio.Task[None]] = set()
|
_background_tasks: set[asyncio.Task[None]] = set()
|
||||||
|
|
||||||
|
|
||||||
def get_background_tasks() -> set[asyncio.Task[None]]:
|
|
||||||
"""Return the set of pending background tasks (for graceful shutdown)."""
|
|
||||||
return _background_tasks
|
|
||||||
|
|
||||||
MSG_EMPTY_MESSAGE = "Message cannot be empty"
|
MSG_EMPTY_MESSAGE = "Message cannot be empty"
|
||||||
|
|
||||||
|
|
||||||
@@ -176,15 +171,11 @@ class ConversationManagerService:
|
|||||||
canal: Communication channel
|
canal: Communication channel
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# Save user and assistant entries in parallel.
|
# Save user and assistant entries in parallel
|
||||||
# Use a single timestamp for both, but offset the assistant entry by 1µs
|
|
||||||
# to avoid Firestore document ID collision (save_entry uses isoformat()
|
|
||||||
# as the document ID).
|
|
||||||
now = datetime.now(UTC)
|
|
||||||
user_entry = ConversationEntry(
|
user_entry = ConversationEntry(
|
||||||
entity="user",
|
entity="user",
|
||||||
type=entry_type,
|
type=entry_type,
|
||||||
timestamp=now,
|
timestamp=datetime.now(UTC),
|
||||||
text=user_text,
|
text=user_text,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
canal=canal,
|
canal=canal,
|
||||||
@@ -192,7 +183,7 @@ class ConversationManagerService:
|
|||||||
assistant_entry = ConversationEntry(
|
assistant_entry = ConversationEntry(
|
||||||
entity="assistant",
|
entity="assistant",
|
||||||
type=entry_type,
|
type=entry_type,
|
||||||
timestamp=now + timedelta(microseconds=1),
|
timestamp=datetime.now(UTC),
|
||||||
text=assistant_text,
|
text=assistant_text,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
canal=canal,
|
canal=canal,
|
||||||
|
|||||||
@@ -21,11 +21,6 @@ PREFIX_PO_PARAM = "notification_po_"
|
|||||||
_background_tasks: set[asyncio.Task] = set()
|
_background_tasks: set[asyncio.Task] = set()
|
||||||
|
|
||||||
|
|
||||||
def get_background_tasks() -> set[asyncio.Task]:
|
|
||||||
"""Return the set of pending background tasks (for graceful shutdown)."""
|
|
||||||
return _background_tasks
|
|
||||||
|
|
||||||
|
|
||||||
class NotificationManagerService:
|
class NotificationManagerService:
|
||||||
"""Manages notification processing and integration with conversations.
|
"""Manages notification processing and integration with conversations.
|
||||||
|
|
||||||
|
|||||||
@@ -149,8 +149,13 @@ class QuickReplyContentService:
|
|||||||
|
|
||||||
if quick_reply is None:
|
if quick_reply is None:
|
||||||
logger.warning("Quick reply not found in cache for screen: %s", screen_id)
|
logger.warning("Quick reply not found in cache for screen: %s", screen_id)
|
||||||
msg = f"Quick reply not found for screen_id: {screen_id}"
|
return QuickReplyScreen(
|
||||||
raise ValueError(msg)
|
header=None,
|
||||||
|
body=None,
|
||||||
|
button=None,
|
||||||
|
header_section=None,
|
||||||
|
preguntas=[],
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Retrieved %s quick replies for screen: %s from cache",
|
"Retrieved %s quick replies for screen: %s from cache",
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
"""Quick reply session service for managing FAQ sessions."""
|
"""Quick reply session service for managing FAQ sessions."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from datetime import UTC, datetime
|
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from capa_de_integracion.models.quick_replies import QuickReplyScreen
|
from capa_de_integracion.models.quick_replies import QuickReplyScreen
|
||||||
@@ -100,7 +99,6 @@ class QuickReplySessionService:
|
|||||||
pantalla_contexto,
|
pantalla_contexto,
|
||||||
)
|
)
|
||||||
session.pantalla_contexto = pantalla_contexto
|
session.pantalla_contexto = pantalla_contexto
|
||||||
session.last_modified = datetime.now(UTC)
|
|
||||||
else:
|
else:
|
||||||
session_id = str(uuid4())
|
session_id = str(uuid4())
|
||||||
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
||||||
|
|||||||
@@ -95,13 +95,14 @@ class FirestoreService:
|
|||||||
return session
|
return session
|
||||||
|
|
||||||
logger.debug("No session found in Firestore for phone: %s", telefono)
|
logger.debug("No session found in Firestore for phone: %s", telefono)
|
||||||
return None
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
"Error querying session by phone %s from Firestore:",
|
"Error querying session by phone %s from Firestore:",
|
||||||
telefono,
|
telefono,
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
async def save_session(self, session: ConversationSession) -> bool:
|
async def save_session(self, session: ConversationSession) -> bool:
|
||||||
"""Save conversation session to Firestore."""
|
"""Save conversation session to Firestore."""
|
||||||
|
|||||||
@@ -120,8 +120,10 @@ class TestSessionManagement:
|
|||||||
|
|
||||||
mock_collection = MagicMock()
|
mock_collection = MagicMock()
|
||||||
mock_where = MagicMock()
|
mock_where = MagicMock()
|
||||||
|
mock_order = MagicMock()
|
||||||
mock_collection.where.return_value = mock_where
|
mock_collection.where.return_value = mock_where
|
||||||
mock_where.limit.return_value = mock_query
|
mock_where.order_by.return_value = mock_order
|
||||||
|
mock_order.limit.return_value = mock_query
|
||||||
|
|
||||||
original_collection = clean_firestore.db.collection
|
original_collection = clean_firestore.db.collection
|
||||||
clean_firestore.db.collection = MagicMock(return_value=mock_collection)
|
clean_firestore.db.collection = MagicMock(return_value=mock_collection)
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
"""Tests for QuickReplySessionService."""
|
"""Tests for QuickReplySessionService."""
|
||||||
|
|
||||||
from datetime import UTC, datetime, timedelta
|
|
||||||
from unittest.mock import AsyncMock, Mock
|
from unittest.mock import AsyncMock, Mock
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
@@ -161,39 +160,6 @@ async def test_start_session_existing_user(service, mock_firestore, mock_redis,
|
|||||||
mock_content.get_quick_replies.assert_called_once_with("pagos")
|
mock_content.get_quick_replies.assert_called_once_with("pagos")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_start_session_updates_last_modified_on_existing(
|
|
||||||
service, mock_firestore, mock_redis, mock_content
|
|
||||||
):
|
|
||||||
"""Test that last_modified is refreshed when updating pantalla_contexto.
|
|
||||||
|
|
||||||
Ensures quick reply context won't be incorrectly marked as stale
|
|
||||||
when the session was idle before the user opened a quick reply screen.
|
|
||||||
"""
|
|
||||||
stale_time = datetime.now(UTC) - timedelta(minutes=20)
|
|
||||||
test_session = ConversationSession.create(
|
|
||||||
session_id="session-123",
|
|
||||||
user_id="user_by_phone_5551234",
|
|
||||||
telefono="555-1234",
|
|
||||||
pantalla_contexto=None,
|
|
||||||
)
|
|
||||||
test_session.last_modified = stale_time
|
|
||||||
|
|
||||||
mock_redis.get_session.return_value = test_session
|
|
||||||
mock_content.get_quick_replies.return_value = QuickReplyScreen(
|
|
||||||
header="H", body=None, button=None, header_section=None, preguntas=[]
|
|
||||||
)
|
|
||||||
|
|
||||||
await service.start_quick_reply_session(
|
|
||||||
telefono="555-1234",
|
|
||||||
_nombre="John",
|
|
||||||
pantalla_contexto="pagos",
|
|
||||||
)
|
|
||||||
|
|
||||||
saved_session = mock_redis.save_session.call_args[0][0]
|
|
||||||
assert saved_session.last_modified > stale_time
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_start_session_invalid_phone(service):
|
async def test_start_session_invalid_phone(service):
|
||||||
"""Test starting session with invalid phone number."""
|
"""Test starting session with invalid phone number."""
|
||||||
|
|||||||
@@ -47,15 +47,14 @@ def test_app_has_routers():
|
|||||||
|
|
||||||
def test_main_entry_point():
|
def test_main_entry_point():
|
||||||
"""Test main entry point calls uvicorn.run."""
|
"""Test main entry point calls uvicorn.run."""
|
||||||
with patch("capa_de_integracion.main.uvicorn.run") as mock_run, \
|
with patch("capa_de_integracion.main.uvicorn.run") as mock_run:
|
||||||
patch("sys.argv", ["capa-de-integracion"]):
|
|
||||||
main()
|
main()
|
||||||
|
|
||||||
mock_run.assert_called_once()
|
mock_run.assert_called_once()
|
||||||
call_kwargs = mock_run.call_args.kwargs
|
call_kwargs = mock_run.call_args.kwargs
|
||||||
assert call_kwargs["host"] == "0.0.0.0"
|
assert call_kwargs["host"] == "0.0.0.0"
|
||||||
assert call_kwargs["port"] == 8080
|
assert call_kwargs["port"] == 8080
|
||||||
assert call_kwargs["workers"] == 1
|
assert call_kwargs["reload"] is True
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
Reference in New Issue
Block a user