Files
int-layer/tests/test_routers_simple.py
2026-02-20 19:32:54 +00:00

220 lines
6.4 KiB
Python

"""Simplified router tests using direct function calls."""
from unittest.mock import AsyncMock, Mock
import pytest
from capa_de_integracion.models import ConversationRequest, DetectIntentResponse, User
from capa_de_integracion.models.notification import ExternalNotificationRequest
from capa_de_integracion.models.quick_replies import QuickReplyScreen
from capa_de_integracion.routers import conversation, notification, quick_replies
from capa_de_integracion.routers.quick_replies import (
QuickReplyScreenRequest,
QuickReplyUser,
)
from capa_de_integracion.services.quick_reply.session import QuickReplySessionResponse
@pytest.mark.asyncio
async def test_detect_intent_success():
"""Test detect intent endpoint with success."""
mock_manager = Mock()
mock_manager.manage_conversation = AsyncMock(
return_value=DetectIntentResponse(
response_id="test-123",
query_result=None,
),
)
request = ConversationRequest(
mensaje="Hello",
usuario=User(telefono="555-1234"),
canal="web",
)
response = await conversation.detect_intent(request, mock_manager)
assert response.response_id == "test-123"
mock_manager.manage_conversation.assert_called_once()
@pytest.mark.asyncio
async def test_detect_intent_value_error():
"""Test detect intent with ValueError."""
mock_manager = Mock()
mock_manager.manage_conversation = AsyncMock(
side_effect=ValueError("Invalid input"),
)
request = ConversationRequest(
mensaje="Test",
usuario=User(telefono="555-1234"),
canal="web",
)
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
await conversation.detect_intent(request, mock_manager)
assert exc_info.value.status_code == 400
assert "Invalid input" in str(exc_info.value.detail)
@pytest.mark.asyncio
async def test_detect_intent_general_error():
"""Test detect intent with general Exception."""
mock_manager = Mock()
mock_manager.manage_conversation = AsyncMock(
side_effect=RuntimeError("Server error"),
)
request = ConversationRequest(
mensaje="Test",
usuario=User(telefono="555-1234"),
canal="web",
)
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
await conversation.detect_intent(request, mock_manager)
assert exc_info.value.status_code == 500
assert "Internal server error" in str(exc_info.value.detail)
@pytest.mark.asyncio
async def test_process_notification_success():
"""Test notification processing success."""
mock_manager = Mock()
mock_manager.process_notification = AsyncMock()
request = ExternalNotificationRequest(
telefono="555-1234",
texto="Your card was blocked",
)
result = await notification.process_notification(request, mock_manager)
assert result is None
mock_manager.process_notification.assert_called_once()
@pytest.mark.asyncio
async def test_process_notification_value_error():
"""Test notification with ValueError."""
mock_manager = Mock()
mock_manager.process_notification = AsyncMock(
side_effect=ValueError("Invalid phone"),
)
request = ExternalNotificationRequest(
telefono="",
texto="Test",
)
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
await notification.process_notification(request, mock_manager)
assert exc_info.value.status_code == 400
@pytest.mark.asyncio
async def test_process_notification_general_error():
"""Test notification with general error."""
mock_manager = Mock()
mock_manager.process_notification = AsyncMock(
side_effect=RuntimeError("Server error"),
)
request = ExternalNotificationRequest(
telefono="555-1234",
texto="Test",
)
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
await notification.process_notification(request, mock_manager)
assert exc_info.value.status_code == 500
@pytest.mark.asyncio
async def test_start_quick_reply_session_success():
"""Test quick reply session endpoint with success."""
mock_service = Mock()
mock_result = QuickReplySessionResponse(
session_id="test-session-123",
quick_replies=QuickReplyScreen(
header="Test Header",
body=None,
button=None,
header_section=None,
preguntas=[],
),
)
mock_service.start_quick_reply_session = AsyncMock(return_value=mock_result)
request = QuickReplyScreenRequest(
usuario=QuickReplyUser(telefono="555-1234", nombre="John"),
pantallaContexto="home",
)
response = await quick_replies.start_quick_reply_session(request, mock_service)
assert response.response_id == "test-session-123"
assert response.quick_replies.header == "Test Header"
mock_service.start_quick_reply_session.assert_called_once_with(
telefono="555-1234",
_nombre="John",
pantalla_contexto="home",
)
@pytest.mark.asyncio
async def test_start_quick_reply_session_value_error():
"""Test quick reply session with ValueError."""
mock_service = Mock()
mock_service.start_quick_reply_session = AsyncMock(
side_effect=ValueError("Invalid screen"),
)
request = QuickReplyScreenRequest(
usuario=QuickReplyUser(telefono="555-1234", nombre="John"),
pantallaContexto="invalid",
)
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
await quick_replies.start_quick_reply_session(request, mock_service)
assert exc_info.value.status_code == 400
assert "Invalid screen" in str(exc_info.value.detail)
@pytest.mark.asyncio
async def test_start_quick_reply_session_general_error():
"""Test quick reply session with general Exception."""
mock_service = Mock()
mock_service.start_quick_reply_session = AsyncMock(
side_effect=RuntimeError("Database error"),
)
request = QuickReplyScreenRequest(
usuario=QuickReplyUser(telefono="555-1234", nombre="John"),
pantallaContexto="home",
)
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
await quick_replies.start_quick_reply_session(request, mock_service)
assert exc_info.value.status_code == 500
assert "Internal server error" in str(exc_info.value.detail)