929 lines
34 KiB
Python
929 lines
34 KiB
Python
"""Tests for RedisService."""
|
|
|
|
from datetime import UTC, datetime
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
from inline_snapshot import snapshot
|
|
|
|
from capa_de_integracion.config import Settings
|
|
from capa_de_integracion.models import ConversationEntry, ConversationSession
|
|
from capa_de_integracion.models.notification import Notification, NotificationSession
|
|
from capa_de_integracion.services.redis_service import RedisService
|
|
|
|
|
|
class TestConnectionManagement:
|
|
"""Tests for Redis connection management."""
|
|
|
|
async def test_connect_and_close(self, emulator_settings: Settings) -> None:
|
|
"""Test connecting to and closing Redis."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
# Initially not connected
|
|
assert service.redis is None
|
|
|
|
# Connect
|
|
await service.connect()
|
|
assert service.redis is not None
|
|
|
|
# Close
|
|
await service.close()
|
|
|
|
async def test_close_when_not_connected(self, emulator_settings: Settings) -> None:
|
|
"""Test closing Redis when not connected does not raise error."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
# Initially not connected
|
|
assert service.redis is None
|
|
|
|
# Close should not raise error
|
|
await service.close()
|
|
|
|
async def test_settings_initialization(self, emulator_settings: Settings) -> None:
|
|
"""Test RedisService initializes with correct settings."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
assert service.settings == emulator_settings
|
|
assert service.session_ttl == 2592000 # 30 days
|
|
assert service.notification_ttl == 2592000 # 30 days
|
|
assert service.qr_session_ttl == 86400 # 24 hours
|
|
|
|
|
|
class TestSessionManagement:
|
|
"""Tests for conversation session management in Redis."""
|
|
|
|
async def test_save_and_get_session_by_id(self, clean_redis: RedisService) -> None:
|
|
"""Test saving and retrieving a session by session ID."""
|
|
session = ConversationSession.create(
|
|
session_id="test-session-1",
|
|
user_id="user-123",
|
|
telefono="+1234567890",
|
|
pantalla_contexto="home_screen",
|
|
last_message="Hello",
|
|
)
|
|
|
|
# Save session
|
|
success = await clean_redis.save_session(session)
|
|
assert success is True
|
|
|
|
# Retrieve by session ID
|
|
retrieved = await clean_redis.get_session("test-session-1")
|
|
assert retrieved is not None
|
|
assert retrieved.session_id == "test-session-1"
|
|
assert retrieved.user_id == "user-123"
|
|
assert retrieved.telefono == "+1234567890"
|
|
assert retrieved.pantalla_contexto == "home_screen"
|
|
assert retrieved.last_message == "Hello"
|
|
|
|
async def test_save_and_get_session_by_phone(self, clean_redis: RedisService) -> None:
|
|
"""Test saving and retrieving a session by phone number."""
|
|
session = ConversationSession.create(
|
|
session_id="test-session-2",
|
|
user_id="user-456",
|
|
telefono="+9876543210",
|
|
pantalla_contexto="settings",
|
|
)
|
|
|
|
# Save session
|
|
await clean_redis.save_session(session)
|
|
|
|
# Retrieve by phone number (should use phone-to-session mapping)
|
|
retrieved = await clean_redis.get_session("+9876543210")
|
|
assert retrieved is not None
|
|
assert retrieved.session_id == "test-session-2"
|
|
assert retrieved.telefono == "+9876543210"
|
|
|
|
async def test_get_session_not_found(self, clean_redis: RedisService) -> None:
|
|
"""Test retrieving a non-existent session returns None."""
|
|
session = await clean_redis.get_session("nonexistent-session")
|
|
assert session is None
|
|
|
|
async def test_save_session_updates_existing(self, clean_redis: RedisService) -> None:
|
|
"""Test saving a session updates existing session."""
|
|
session = ConversationSession.create(
|
|
session_id="test-session-3",
|
|
user_id="user-789",
|
|
telefono="+5555555555",
|
|
last_message="Original message",
|
|
)
|
|
|
|
# Save initial session
|
|
await clean_redis.save_session(session)
|
|
|
|
# Update and save again
|
|
session.last_message = "Updated message"
|
|
session.pantalla_contexto = "new_screen"
|
|
await clean_redis.save_session(session)
|
|
|
|
# Retrieve and verify
|
|
retrieved = await clean_redis.get_session("test-session-3")
|
|
assert retrieved is not None
|
|
assert retrieved.last_message == "Updated message"
|
|
assert retrieved.pantalla_contexto == "new_screen"
|
|
|
|
async def test_delete_session(self, clean_redis: RedisService) -> None:
|
|
"""Test deleting a session."""
|
|
session = ConversationSession.create(
|
|
session_id="test-session-4",
|
|
user_id="user-101",
|
|
telefono="+2222222222",
|
|
)
|
|
|
|
# Save and verify
|
|
await clean_redis.save_session(session)
|
|
assert await clean_redis.exists("test-session-4") is True
|
|
|
|
# Delete
|
|
success = await clean_redis.delete_session("test-session-4")
|
|
assert success is True
|
|
|
|
# Verify deletion
|
|
assert await clean_redis.exists("test-session-4") is False
|
|
retrieved = await clean_redis.get_session("test-session-4")
|
|
assert retrieved is None
|
|
|
|
async def test_delete_nonexistent_session(self, clean_redis: RedisService) -> None:
|
|
"""Test deleting a non-existent session returns False."""
|
|
success = await clean_redis.delete_session("nonexistent-session")
|
|
assert success is False
|
|
|
|
async def test_exists_session(self, clean_redis: RedisService) -> None:
|
|
"""Test checking if session exists."""
|
|
session = ConversationSession.create(
|
|
session_id="test-session-5",
|
|
user_id="user-202",
|
|
telefono="+3333333333",
|
|
)
|
|
|
|
# Should not exist initially
|
|
assert await clean_redis.exists("test-session-5") is False
|
|
|
|
# Save and check again
|
|
await clean_redis.save_session(session)
|
|
assert await clean_redis.exists("test-session-5") is True
|
|
|
|
async def test_phone_to_session_mapping(self, clean_redis: RedisService) -> None:
|
|
"""Test that phone-to-session mapping is created and used."""
|
|
session = ConversationSession.create(
|
|
session_id="test-session-6",
|
|
user_id="user-303",
|
|
telefono="+4444444444",
|
|
)
|
|
|
|
# Save session
|
|
await clean_redis.save_session(session)
|
|
|
|
# Verify phone mapping key exists in Redis
|
|
assert clean_redis.redis is not None
|
|
phone_key = clean_redis._phone_to_session_key("+4444444444")
|
|
mapped_session_id = await clean_redis.redis.get(phone_key)
|
|
assert mapped_session_id == "test-session-6"
|
|
|
|
async def test_get_session_deserialization_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test get_session handles deserialization errors gracefully."""
|
|
# Manually insert invalid JSON
|
|
assert clean_redis.redis is not None
|
|
key = clean_redis._session_key("invalid-session")
|
|
await clean_redis.redis.set(key, "invalid json data")
|
|
|
|
# Should return None on deserialization error
|
|
session = await clean_redis.get_session("invalid-session")
|
|
assert session is None
|
|
|
|
|
|
class TestMessageManagement:
|
|
"""Tests for conversation message management in Redis."""
|
|
|
|
async def test_save_and_get_messages(self, clean_redis: RedisService) -> None:
|
|
"""Test saving and retrieving conversation messages."""
|
|
session_id = "test-session-7"
|
|
|
|
# Create messages
|
|
message1 = ConversationEntry(
|
|
timestamp=datetime(2024, 1, 1, 10, 0, 0, tzinfo=UTC),
|
|
entity="user",
|
|
type="CONVERSACION",
|
|
text="First message",
|
|
)
|
|
message2 = ConversationEntry(
|
|
timestamp=datetime(2024, 1, 1, 10, 1, 0, tzinfo=UTC),
|
|
entity="assistant",
|
|
type="CONVERSACION",
|
|
text="First response",
|
|
)
|
|
|
|
# Save messages
|
|
success1 = await clean_redis.save_message(session_id, message1)
|
|
success2 = await clean_redis.save_message(session_id, message2)
|
|
assert success1 is True
|
|
assert success2 is True
|
|
|
|
# Retrieve messages
|
|
messages = await clean_redis.get_messages(session_id)
|
|
assert len(messages) == 2
|
|
|
|
# Use inline-snapshot to verify structure
|
|
assert messages[0]["entity"] == snapshot("user")
|
|
assert messages[0]["type"] == snapshot("CONVERSACION")
|
|
assert messages[0]["text"] == snapshot("First message")
|
|
|
|
assert messages[1]["entity"] == snapshot("assistant")
|
|
assert messages[1]["text"] == snapshot("First response")
|
|
|
|
async def test_get_messages_empty_session(self, clean_redis: RedisService) -> None:
|
|
"""Test retrieving messages from session with no messages."""
|
|
messages = await clean_redis.get_messages("nonexistent-session")
|
|
assert messages == []
|
|
|
|
async def test_messages_ordered_by_timestamp(self, clean_redis: RedisService) -> None:
|
|
"""Test that messages are returned in chronological order."""
|
|
session_id = "test-session-8"
|
|
|
|
# Create messages with different timestamps
|
|
messages_to_save = [
|
|
ConversationEntry(
|
|
timestamp=datetime(2024, 1, 1, 10, 2, 0, tzinfo=UTC),
|
|
entity="user",
|
|
type="CONVERSACION",
|
|
text="Third message",
|
|
),
|
|
ConversationEntry(
|
|
timestamp=datetime(2024, 1, 1, 10, 0, 0, tzinfo=UTC),
|
|
entity="user",
|
|
type="CONVERSACION",
|
|
text="First message",
|
|
),
|
|
ConversationEntry(
|
|
timestamp=datetime(2024, 1, 1, 10, 1, 0, tzinfo=UTC),
|
|
entity="assistant",
|
|
type="CONVERSACION",
|
|
text="Second message",
|
|
),
|
|
]
|
|
|
|
# Save messages in random order
|
|
for msg in messages_to_save:
|
|
await clean_redis.save_message(session_id, msg)
|
|
|
|
# Retrieve and verify order
|
|
retrieved_messages = await clean_redis.get_messages(session_id)
|
|
assert len(retrieved_messages) == 3
|
|
assert retrieved_messages[0]["text"] == "First message"
|
|
assert retrieved_messages[1]["text"] == "Second message"
|
|
assert retrieved_messages[2]["text"] == "Third message"
|
|
|
|
async def test_get_messages_json_decode_error(self, clean_redis: RedisService) -> None:
|
|
"""Test get_messages handles JSON decode errors gracefully."""
|
|
assert clean_redis.redis is not None
|
|
session_id = "test-session-9"
|
|
key = clean_redis._messages_key(session_id)
|
|
|
|
# Insert invalid JSON into sorted set
|
|
await clean_redis.redis.zadd(key, {"invalid json": 1000})
|
|
await clean_redis.redis.zadd(
|
|
key, {'{"entity": "user", "text": "valid"}': 2000},
|
|
)
|
|
|
|
# Should skip invalid JSON and return valid messages
|
|
messages = await clean_redis.get_messages(session_id)
|
|
# Only the valid message should be returned
|
|
assert len(messages) == 1
|
|
assert messages[0]["entity"] == "user"
|
|
|
|
|
|
class TestNotificationManagement:
|
|
"""Tests for notification management in Redis."""
|
|
|
|
async def test_save_new_notification(self, clean_redis: RedisService) -> None:
|
|
"""Test saving a new notification creates new session."""
|
|
notification = Notification.create(
|
|
id_notificacion="notif-1",
|
|
telefono="+8888888888",
|
|
texto="Test notification",
|
|
)
|
|
|
|
# Save notification
|
|
await clean_redis.save_or_append_notification(notification)
|
|
|
|
# Retrieve notification session
|
|
session = await clean_redis.get_notification_session("+8888888888")
|
|
assert session is not None
|
|
|
|
# Use inline-snapshot to verify structure
|
|
assert session.session_id == snapshot("+8888888888")
|
|
assert session.telefono == snapshot("+8888888888")
|
|
assert len(session.notificaciones) == snapshot(1)
|
|
assert session.notificaciones[0].texto == snapshot("Test notification")
|
|
assert session.notificaciones[0].id_notificacion == snapshot("notif-1")
|
|
|
|
async def test_append_to_existing_notification_session(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test appending notification to existing session."""
|
|
phone = "+9999999999"
|
|
|
|
# Create first notification
|
|
notification1 = Notification.create(
|
|
id_notificacion="notif-2",
|
|
telefono=phone,
|
|
texto="First notification",
|
|
)
|
|
await clean_redis.save_or_append_notification(notification1)
|
|
|
|
# Append second notification
|
|
notification2 = Notification.create(
|
|
id_notificacion="notif-3",
|
|
telefono=phone,
|
|
texto="Second notification",
|
|
)
|
|
await clean_redis.save_or_append_notification(notification2)
|
|
|
|
# Verify both notifications exist
|
|
session = await clean_redis.get_notification_session(phone)
|
|
assert session is not None
|
|
assert len(session.notificaciones) == 2
|
|
assert session.notificaciones[0].texto == "First notification"
|
|
assert session.notificaciones[1].texto == "Second notification"
|
|
|
|
async def test_save_notification_without_phone_raises_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test saving notification without phone number raises ValueError."""
|
|
notification = Notification.create(
|
|
id_notificacion="notif-4",
|
|
telefono="",
|
|
texto="Test",
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="Phone number is required"):
|
|
await clean_redis.save_or_append_notification(notification)
|
|
|
|
async def test_save_notification_with_whitespace_phone_raises_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test saving notification with whitespace-only phone raises ValueError."""
|
|
notification = Notification.create(
|
|
id_notificacion="notif-5",
|
|
telefono=" ",
|
|
texto="Test",
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="Phone number is required"):
|
|
await clean_redis.save_or_append_notification(notification)
|
|
|
|
async def test_get_notification_session_not_found(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test retrieving non-existent notification session returns None."""
|
|
session = await clean_redis.get_notification_session("+0000000000")
|
|
assert session is None
|
|
|
|
async def test_get_notification_id_for_phone(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test getting notification session ID for phone number."""
|
|
phone = "+1010101010"
|
|
|
|
# Create notification
|
|
notification = Notification.create(
|
|
id_notificacion="notif-6",
|
|
telefono=phone,
|
|
texto="Test",
|
|
)
|
|
await clean_redis.save_or_append_notification(notification)
|
|
|
|
# Get session ID for phone
|
|
session_id = await clean_redis.get_notification_id_for_phone(phone)
|
|
assert session_id == phone # Phone number is used as session ID
|
|
|
|
async def test_get_notification_id_for_phone_not_found(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test getting notification ID for non-existent phone returns None."""
|
|
session_id = await clean_redis.get_notification_id_for_phone("+0000000000")
|
|
assert session_id is None
|
|
|
|
async def test_delete_notification_session(self, clean_redis: RedisService) -> None:
|
|
"""Test deleting notification session."""
|
|
phone = "+1212121212"
|
|
|
|
# Create notification
|
|
notification = Notification.create(
|
|
id_notificacion="notif-7",
|
|
telefono=phone,
|
|
texto="Test",
|
|
)
|
|
await clean_redis.save_or_append_notification(notification)
|
|
|
|
# Verify it exists
|
|
session = await clean_redis.get_notification_session(phone)
|
|
assert session is not None
|
|
|
|
# Delete notification session
|
|
success = await clean_redis.delete_notification_session(phone)
|
|
assert success is True
|
|
|
|
# Verify deletion
|
|
session = await clean_redis.get_notification_session(phone)
|
|
assert session is None
|
|
|
|
async def test_delete_nonexistent_notification_session(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test deleting non-existent notification session succeeds."""
|
|
# Should not raise error
|
|
success = await clean_redis.delete_notification_session("+0000000000")
|
|
assert success is True
|
|
|
|
async def test_phone_to_notification_mapping(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test that phone-to-notification mapping is created."""
|
|
phone = "+1313131313"
|
|
notification = Notification.create(
|
|
id_notificacion="notif-8",
|
|
telefono=phone,
|
|
texto="Test",
|
|
)
|
|
|
|
# Save notification
|
|
await clean_redis.save_or_append_notification(notification)
|
|
|
|
# Verify phone mapping key exists in Redis
|
|
assert clean_redis.redis is not None
|
|
phone_key = clean_redis._phone_to_notification_key(phone)
|
|
mapped_session_id = await clean_redis.redis.get(phone_key)
|
|
assert mapped_session_id == phone
|
|
|
|
async def test_notification_timestamps_updated(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test that notification session timestamps are updated correctly."""
|
|
phone = "+1414141414"
|
|
|
|
# Create first notification
|
|
notification1 = Notification.create(
|
|
id_notificacion="notif-9",
|
|
telefono=phone,
|
|
texto="First",
|
|
)
|
|
await clean_redis.save_or_append_notification(notification1)
|
|
|
|
# Get initial session
|
|
session1 = await clean_redis.get_notification_session(phone)
|
|
assert session1 is not None
|
|
initial_update_time = session1.ultima_actualizacion
|
|
|
|
# Wait a moment and add another notification
|
|
import asyncio
|
|
await asyncio.sleep(0.01)
|
|
|
|
notification2 = Notification.create(
|
|
id_notificacion="notif-10",
|
|
telefono=phone,
|
|
texto="Second",
|
|
)
|
|
await clean_redis.save_or_append_notification(notification2)
|
|
|
|
# Get updated session
|
|
session2 = await clean_redis.get_notification_session(phone)
|
|
assert session2 is not None
|
|
|
|
# Creation time should stay the same
|
|
assert session2.fecha_creacion == session1.fecha_creacion
|
|
|
|
# Update time should be newer
|
|
assert session2.ultima_actualizacion > initial_update_time
|
|
|
|
async def test_get_notification_session_deserialization_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test get_notification_session handles deserialization errors gracefully."""
|
|
# Manually insert invalid JSON
|
|
assert clean_redis.redis is not None
|
|
key = clean_redis._notification_key("invalid-notif-session")
|
|
await clean_redis.redis.set(key, "invalid json data")
|
|
|
|
# Should return None on deserialization error
|
|
session = await clean_redis.get_notification_session("invalid-notif-session")
|
|
assert session is None
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Tests for error handling in Redis operations."""
|
|
|
|
async def test_get_session_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test get_session raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.get_session("test-session")
|
|
|
|
async def test_save_session_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test save_session raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
session = ConversationSession.create(
|
|
session_id="test",
|
|
user_id="user",
|
|
telefono="+1234567890",
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.save_session(session)
|
|
|
|
async def test_delete_session_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test delete_session raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.delete_session("test-session")
|
|
|
|
async def test_exists_when_not_connected(self, emulator_settings: Settings) -> None:
|
|
"""Test exists raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.exists("test-session")
|
|
|
|
async def test_save_message_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test save_message raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
message = ConversationEntry(
|
|
timestamp=datetime.now(UTC),
|
|
entity="user",
|
|
type="CONVERSACION",
|
|
text="Test",
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.save_message("test-session", message)
|
|
|
|
async def test_get_messages_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test get_messages raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.get_messages("test-session")
|
|
|
|
async def test_save_notification_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test save_or_append_notification raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
notification = Notification.create(
|
|
id_notificacion="notif-1",
|
|
telefono="+1234567890",
|
|
texto="Test",
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.save_or_append_notification(notification)
|
|
|
|
async def test_get_notification_session_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test get_notification_session raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.get_notification_session("test-session")
|
|
|
|
async def test_get_notification_id_for_phone_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test get_notification_id_for_phone raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.get_notification_id_for_phone("+1234567890")
|
|
|
|
async def test_delete_notification_session_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test delete_notification_session raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service.delete_notification_session("+1234567890")
|
|
|
|
async def test_save_session_with_redis_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test save_session handles Redis errors gracefully."""
|
|
session = ConversationSession.create(
|
|
session_id="test",
|
|
user_id="user",
|
|
telefono="+1234567890",
|
|
)
|
|
|
|
# Mock redis to raise exception
|
|
original_redis = clean_redis.redis
|
|
mock_redis = AsyncMock()
|
|
mock_redis.setex.side_effect = Exception("Redis error")
|
|
clean_redis.redis = mock_redis
|
|
|
|
try:
|
|
result = await clean_redis.save_session(session)
|
|
assert result is False
|
|
finally:
|
|
clean_redis.redis = original_redis
|
|
|
|
async def test_delete_session_with_redis_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test delete_session handles Redis errors gracefully."""
|
|
# Mock redis to raise exception
|
|
original_redis = clean_redis.redis
|
|
mock_redis = AsyncMock()
|
|
mock_redis.delete.side_effect = Exception("Redis error")
|
|
clean_redis.redis = mock_redis
|
|
|
|
try:
|
|
result = await clean_redis.delete_session("test-session")
|
|
assert result is False
|
|
finally:
|
|
clean_redis.redis = original_redis
|
|
|
|
async def test_save_message_with_redis_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test save_message handles Redis errors gracefully."""
|
|
message = ConversationEntry(
|
|
timestamp=datetime.now(UTC),
|
|
entity="user",
|
|
type="CONVERSACION",
|
|
text="Test",
|
|
)
|
|
|
|
# Mock redis to raise exception
|
|
original_redis = clean_redis.redis
|
|
mock_redis = AsyncMock()
|
|
mock_redis.zadd.side_effect = Exception("Redis error")
|
|
clean_redis.redis = mock_redis
|
|
|
|
try:
|
|
result = await clean_redis.save_message("test-session", message)
|
|
assert result is False
|
|
finally:
|
|
clean_redis.redis = original_redis
|
|
|
|
async def test_get_messages_with_redis_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test get_messages handles Redis errors gracefully."""
|
|
# Mock redis to raise exception
|
|
original_redis = clean_redis.redis
|
|
mock_redis = AsyncMock()
|
|
mock_redis.zrange.side_effect = Exception("Redis error")
|
|
clean_redis.redis = mock_redis
|
|
|
|
try:
|
|
result = await clean_redis.get_messages("test-session")
|
|
assert result == []
|
|
finally:
|
|
clean_redis.redis = original_redis
|
|
|
|
async def test_delete_notification_session_with_redis_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test delete_notification_session handles Redis errors gracefully."""
|
|
# Mock redis to raise exception
|
|
original_redis = clean_redis.redis
|
|
mock_redis = AsyncMock()
|
|
mock_redis.delete.side_effect = Exception("Redis error")
|
|
clean_redis.redis = mock_redis
|
|
|
|
try:
|
|
result = await clean_redis.delete_notification_session("+1234567890")
|
|
assert result is False
|
|
finally:
|
|
clean_redis.redis = original_redis
|
|
|
|
async def test_cache_notification_session_when_not_connected(
|
|
self, emulator_settings: Settings,
|
|
) -> None:
|
|
"""Test _cache_notification_session raises error when Redis not connected."""
|
|
service = RedisService(emulator_settings)
|
|
|
|
notification_session = NotificationSession(
|
|
sessionId="test",
|
|
telefono="+1234567890",
|
|
notificaciones=[],
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="Redis client not connected"):
|
|
await service._cache_notification_session(notification_session)
|
|
|
|
async def test_cache_notification_session_with_redis_error(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test _cache_notification_session handles Redis errors gracefully."""
|
|
notification_session = NotificationSession(
|
|
sessionId="test",
|
|
telefono="+1234567890",
|
|
notificaciones=[],
|
|
)
|
|
|
|
# Mock redis to raise exception on setex
|
|
original_redis = clean_redis.redis
|
|
mock_redis = AsyncMock()
|
|
mock_redis.setex.side_effect = Exception("Redis error")
|
|
clean_redis.redis = mock_redis
|
|
|
|
try:
|
|
result = await clean_redis._cache_notification_session(notification_session)
|
|
assert result is False
|
|
finally:
|
|
clean_redis.redis = original_redis
|
|
|
|
|
|
class TestEdgeCases:
|
|
"""Tests for edge cases and boundary conditions."""
|
|
|
|
async def test_concurrent_session_operations(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test concurrent operations on same session."""
|
|
import asyncio
|
|
|
|
session = ConversationSession.create(
|
|
session_id="concurrent-test",
|
|
user_id="user-999",
|
|
telefono="+1515151515",
|
|
)
|
|
|
|
# Save session concurrently
|
|
tasks = [clean_redis.save_session(session) for _ in range(5)]
|
|
results = await asyncio.gather(*tasks)
|
|
assert all(results)
|
|
|
|
# Verify session exists
|
|
retrieved = await clean_redis.get_session("concurrent-test")
|
|
assert retrieved is not None
|
|
|
|
async def test_special_characters_in_session_data(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test handling special characters in session data."""
|
|
session = ConversationSession.create(
|
|
session_id="special-chars-test",
|
|
user_id="user-special",
|
|
telefono="+1616161616",
|
|
pantalla_contexto="screen/with/slashes",
|
|
last_message='Message with emoji 🎉 and special chars: <>&"\'',
|
|
)
|
|
|
|
# Save and retrieve
|
|
await clean_redis.save_session(session)
|
|
retrieved = await clean_redis.get_session("special-chars-test")
|
|
|
|
assert retrieved is not None
|
|
assert retrieved.pantalla_contexto == "screen/with/slashes"
|
|
assert retrieved.last_message is not None
|
|
assert "🎉" in retrieved.last_message
|
|
assert '<>&"' in retrieved.last_message
|
|
|
|
async def test_unicode_in_notification_text(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test handling unicode characters in notification text."""
|
|
notification = Notification.create(
|
|
id_notificacion="unicode-test",
|
|
telefono="+1717171717",
|
|
texto="Notification with unicode: 你好世界 مرحبا العالم 🌍",
|
|
)
|
|
|
|
# Save and retrieve
|
|
await clean_redis.save_or_append_notification(notification)
|
|
session = await clean_redis.get_notification_session("+1717171717")
|
|
|
|
assert session is not None
|
|
assert "你好世界" in session.notificaciones[0].texto
|
|
assert "مرحبا العالم" in session.notificaciones[0].texto
|
|
assert "🌍" in session.notificaciones[0].texto
|
|
|
|
async def test_large_message_text(self, clean_redis: RedisService) -> None:
|
|
"""Test handling large message text."""
|
|
large_text = "A" * 10000 # 10KB of text
|
|
message = ConversationEntry(
|
|
timestamp=datetime.now(UTC),
|
|
entity="user",
|
|
type="CONVERSACION",
|
|
text=large_text,
|
|
)
|
|
|
|
session_id = "large-message-test"
|
|
success = await clean_redis.save_message(session_id, message)
|
|
assert success is True
|
|
|
|
# Retrieve and verify
|
|
messages = await clean_redis.get_messages(session_id)
|
|
assert len(messages) == 1
|
|
assert len(messages[0]["text"]) == 10000
|
|
|
|
async def test_many_messages_in_session(self, clean_redis: RedisService) -> None:
|
|
"""Test handling many messages in a single session."""
|
|
session_id = "many-messages-test"
|
|
|
|
# Save 100 messages
|
|
for i in range(100):
|
|
message = ConversationEntry(
|
|
timestamp=datetime.now(UTC),
|
|
entity="user" if i % 2 == 0 else "assistant",
|
|
type="CONVERSACION",
|
|
text=f"Message {i}",
|
|
)
|
|
await clean_redis.save_message(session_id, message)
|
|
|
|
# Retrieve all messages
|
|
messages = await clean_redis.get_messages(session_id)
|
|
assert len(messages) == 100
|
|
|
|
async def test_many_notifications_in_session(
|
|
self, clean_redis: RedisService,
|
|
) -> None:
|
|
"""Test handling many notifications in a single session."""
|
|
phone = "+1818181818"
|
|
|
|
# Add 50 notifications
|
|
for i in range(50):
|
|
notification = Notification.create(
|
|
id_notificacion=f"notif-{i}",
|
|
telefono=phone,
|
|
texto=f"Notification {i}",
|
|
)
|
|
await clean_redis.save_or_append_notification(notification)
|
|
|
|
# Retrieve session
|
|
session = await clean_redis.get_notification_session(phone)
|
|
assert session is not None
|
|
assert len(session.notificaciones) == 50
|
|
|
|
async def test_session_ttl_is_set(self, clean_redis: RedisService) -> None:
|
|
"""Test that session TTL is set in Redis."""
|
|
session = ConversationSession.create(
|
|
session_id="ttl-test",
|
|
user_id="user-ttl",
|
|
telefono="+1919191919",
|
|
)
|
|
|
|
# Save session
|
|
await clean_redis.save_session(session)
|
|
|
|
# Check TTL is set
|
|
assert clean_redis.redis is not None
|
|
key = clean_redis._session_key("ttl-test")
|
|
ttl = await clean_redis.redis.ttl(key)
|
|
assert ttl > 0
|
|
assert ttl <= clean_redis.session_ttl
|
|
|
|
async def test_notification_ttl_is_set(self, clean_redis: RedisService) -> None:
|
|
"""Test that notification TTL is set in Redis."""
|
|
notification = Notification.create(
|
|
id_notificacion="ttl-notif",
|
|
telefono="+2020202020",
|
|
texto="Test",
|
|
)
|
|
|
|
# Save notification
|
|
await clean_redis.save_or_append_notification(notification)
|
|
|
|
# Check TTL is set
|
|
assert clean_redis.redis is not None
|
|
key = clean_redis._notification_key("+2020202020")
|
|
ttl = await clean_redis.redis.ttl(key)
|
|
assert ttl > 0
|
|
assert ttl <= clean_redis.notification_ttl
|
|
|
|
async def test_message_ttl_is_set(self, clean_redis: RedisService) -> None:
|
|
"""Test that message TTL is set in Redis."""
|
|
session_id = "message-ttl-test"
|
|
message = ConversationEntry(
|
|
timestamp=datetime.now(UTC),
|
|
entity="user",
|
|
type="CONVERSACION",
|
|
text="Test",
|
|
)
|
|
|
|
# Save message
|
|
await clean_redis.save_message(session_id, message)
|
|
|
|
# Check TTL is set
|
|
assert clean_redis.redis is not None
|
|
key = clean_redis._messages_key(session_id)
|
|
ttl = await clean_redis.redis.ttl(key)
|
|
assert ttl > 0
|
|
assert ttl <= clean_redis.session_ttl
|