Files
int-layer/tests/services/test_redis_service.py
2026-02-20 20:38:59 +00:00

929 lines
34 KiB
Python

"""Tests for RedisService."""
from datetime import UTC, datetime
from unittest.mock import AsyncMock
import pytest
from inline_snapshot import snapshot
from capa_de_integracion.config import Settings
from capa_de_integracion.models import ConversationEntry, ConversationSession
from capa_de_integracion.models.notification import Notification, NotificationSession
from capa_de_integracion.services.storage import RedisService
class TestConnectionManagement:
"""Tests for Redis connection management."""
async def test_connect_and_close(self, emulator_settings: Settings) -> None:
"""Test connecting to and closing Redis."""
service = RedisService(emulator_settings)
# Initially not connected
assert service.redis is None
# Connect
await service.connect()
assert service.redis is not None
# Close
await service.close()
async def test_close_when_not_connected(self, emulator_settings: Settings) -> None:
"""Test closing Redis when not connected does not raise error."""
service = RedisService(emulator_settings)
# Initially not connected
assert service.redis is None
# Close should not raise error
await service.close()
async def test_settings_initialization(self, emulator_settings: Settings) -> None:
"""Test RedisService initializes with correct settings."""
service = RedisService(emulator_settings)
assert service.settings == emulator_settings
assert service.session_ttl == 2592000 # 30 days
assert service.notification_ttl == 2592000 # 30 days
assert service.qr_session_ttl == 86400 # 24 hours
class TestSessionManagement:
"""Tests for conversation session management in Redis."""
async def test_save_and_get_session_by_id(self, clean_redis: RedisService) -> None:
"""Test saving and retrieving a session by session ID."""
session = ConversationSession.create(
session_id="test-session-1",
user_id="user-123",
telefono="+1234567890",
pantalla_contexto="home_screen",
last_message="Hello",
)
# Save session
success = await clean_redis.save_session(session)
assert success is True
# Retrieve by session ID
retrieved = await clean_redis.get_session("test-session-1")
assert retrieved is not None
assert retrieved.session_id == "test-session-1"
assert retrieved.user_id == "user-123"
assert retrieved.telefono == "+1234567890"
assert retrieved.pantalla_contexto == "home_screen"
assert retrieved.last_message == "Hello"
async def test_save_and_get_session_by_phone(self, clean_redis: RedisService) -> None:
"""Test saving and retrieving a session by phone number."""
session = ConversationSession.create(
session_id="test-session-2",
user_id="user-456",
telefono="+9876543210",
pantalla_contexto="settings",
)
# Save session
await clean_redis.save_session(session)
# Retrieve by phone number (should use phone-to-session mapping)
retrieved = await clean_redis.get_session("+9876543210")
assert retrieved is not None
assert retrieved.session_id == "test-session-2"
assert retrieved.telefono == "+9876543210"
async def test_get_session_not_found(self, clean_redis: RedisService) -> None:
"""Test retrieving a non-existent session returns None."""
session = await clean_redis.get_session("nonexistent-session")
assert session is None
async def test_save_session_updates_existing(self, clean_redis: RedisService) -> None:
"""Test saving a session updates existing session."""
session = ConversationSession.create(
session_id="test-session-3",
user_id="user-789",
telefono="+5555555555",
last_message="Original message",
)
# Save initial session
await clean_redis.save_session(session)
# Update and save again
session.last_message = "Updated message"
session.pantalla_contexto = "new_screen"
await clean_redis.save_session(session)
# Retrieve and verify
retrieved = await clean_redis.get_session("test-session-3")
assert retrieved is not None
assert retrieved.last_message == "Updated message"
assert retrieved.pantalla_contexto == "new_screen"
async def test_delete_session(self, clean_redis: RedisService) -> None:
"""Test deleting a session."""
session = ConversationSession.create(
session_id="test-session-4",
user_id="user-101",
telefono="+2222222222",
)
# Save and verify
await clean_redis.save_session(session)
assert await clean_redis.exists("test-session-4") is True
# Delete
success = await clean_redis.delete_session("test-session-4")
assert success is True
# Verify deletion
assert await clean_redis.exists("test-session-4") is False
retrieved = await clean_redis.get_session("test-session-4")
assert retrieved is None
async def test_delete_nonexistent_session(self, clean_redis: RedisService) -> None:
"""Test deleting a non-existent session returns False."""
success = await clean_redis.delete_session("nonexistent-session")
assert success is False
async def test_exists_session(self, clean_redis: RedisService) -> None:
"""Test checking if session exists."""
session = ConversationSession.create(
session_id="test-session-5",
user_id="user-202",
telefono="+3333333333",
)
# Should not exist initially
assert await clean_redis.exists("test-session-5") is False
# Save and check again
await clean_redis.save_session(session)
assert await clean_redis.exists("test-session-5") is True
async def test_phone_to_session_mapping(self, clean_redis: RedisService) -> None:
"""Test that phone-to-session mapping is created and used."""
session = ConversationSession.create(
session_id="test-session-6",
user_id="user-303",
telefono="+4444444444",
)
# Save session
await clean_redis.save_session(session)
# Verify phone mapping key exists in Redis
assert clean_redis.redis is not None
phone_key = clean_redis._phone_to_session_key("+4444444444")
mapped_session_id = await clean_redis.redis.get(phone_key)
assert mapped_session_id == "test-session-6"
async def test_get_session_deserialization_error(
self, clean_redis: RedisService,
) -> None:
"""Test get_session handles deserialization errors gracefully."""
# Manually insert invalid JSON
assert clean_redis.redis is not None
key = clean_redis._session_key("invalid-session")
await clean_redis.redis.set(key, "invalid json data")
# Should return None on deserialization error
session = await clean_redis.get_session("invalid-session")
assert session is None
class TestMessageManagement:
"""Tests for conversation message management in Redis."""
async def test_save_and_get_messages(self, clean_redis: RedisService) -> None:
"""Test saving and retrieving conversation messages."""
session_id = "test-session-7"
# Create messages
message1 = ConversationEntry(
timestamp=datetime(2024, 1, 1, 10, 0, 0, tzinfo=UTC),
entity="user",
type="CONVERSACION",
text="First message",
)
message2 = ConversationEntry(
timestamp=datetime(2024, 1, 1, 10, 1, 0, tzinfo=UTC),
entity="assistant",
type="CONVERSACION",
text="First response",
)
# Save messages
success1 = await clean_redis.save_message(session_id, message1)
success2 = await clean_redis.save_message(session_id, message2)
assert success1 is True
assert success2 is True
# Retrieve messages
messages = await clean_redis.get_messages(session_id)
assert len(messages) == 2
# Use inline-snapshot to verify structure
assert messages[0]["entity"] == snapshot("user")
assert messages[0]["type"] == snapshot("CONVERSACION")
assert messages[0]["text"] == snapshot("First message")
assert messages[1]["entity"] == snapshot("assistant")
assert messages[1]["text"] == snapshot("First response")
async def test_get_messages_empty_session(self, clean_redis: RedisService) -> None:
"""Test retrieving messages from session with no messages."""
messages = await clean_redis.get_messages("nonexistent-session")
assert messages == []
async def test_messages_ordered_by_timestamp(self, clean_redis: RedisService) -> None:
"""Test that messages are returned in chronological order."""
session_id = "test-session-8"
# Create messages with different timestamps
messages_to_save = [
ConversationEntry(
timestamp=datetime(2024, 1, 1, 10, 2, 0, tzinfo=UTC),
entity="user",
type="CONVERSACION",
text="Third message",
),
ConversationEntry(
timestamp=datetime(2024, 1, 1, 10, 0, 0, tzinfo=UTC),
entity="user",
type="CONVERSACION",
text="First message",
),
ConversationEntry(
timestamp=datetime(2024, 1, 1, 10, 1, 0, tzinfo=UTC),
entity="assistant",
type="CONVERSACION",
text="Second message",
),
]
# Save messages in random order
for msg in messages_to_save:
await clean_redis.save_message(session_id, msg)
# Retrieve and verify order
retrieved_messages = await clean_redis.get_messages(session_id)
assert len(retrieved_messages) == 3
assert retrieved_messages[0]["text"] == "First message"
assert retrieved_messages[1]["text"] == "Second message"
assert retrieved_messages[2]["text"] == "Third message"
async def test_get_messages_json_decode_error(self, clean_redis: RedisService) -> None:
"""Test get_messages handles JSON decode errors gracefully."""
assert clean_redis.redis is not None
session_id = "test-session-9"
key = clean_redis._messages_key(session_id)
# Insert invalid JSON into sorted set
await clean_redis.redis.zadd(key, {"invalid json": 1000})
await clean_redis.redis.zadd(
key, {'{"entity": "user", "text": "valid"}': 2000},
)
# Should skip invalid JSON and return valid messages
messages = await clean_redis.get_messages(session_id)
# Only the valid message should be returned
assert len(messages) == 1
assert messages[0]["entity"] == "user"
class TestNotificationManagement:
"""Tests for notification management in Redis."""
async def test_save_new_notification(self, clean_redis: RedisService) -> None:
"""Test saving a new notification creates new session."""
notification = Notification.create(
id_notificacion="notif-1",
telefono="+8888888888",
texto="Test notification",
)
# Save notification
await clean_redis.save_or_append_notification(notification)
# Retrieve notification session
session = await clean_redis.get_notification_session("+8888888888")
assert session is not None
# Use inline-snapshot to verify structure
assert session.session_id == snapshot("+8888888888")
assert session.telefono == snapshot("+8888888888")
assert len(session.notificaciones) == snapshot(1)
assert session.notificaciones[0].texto == snapshot("Test notification")
assert session.notificaciones[0].id_notificacion == snapshot("notif-1")
async def test_append_to_existing_notification_session(
self, clean_redis: RedisService,
) -> None:
"""Test appending notification to existing session."""
phone = "+9999999999"
# Create first notification
notification1 = Notification.create(
id_notificacion="notif-2",
telefono=phone,
texto="First notification",
)
await clean_redis.save_or_append_notification(notification1)
# Append second notification
notification2 = Notification.create(
id_notificacion="notif-3",
telefono=phone,
texto="Second notification",
)
await clean_redis.save_or_append_notification(notification2)
# Verify both notifications exist
session = await clean_redis.get_notification_session(phone)
assert session is not None
assert len(session.notificaciones) == 2
assert session.notificaciones[0].texto == "First notification"
assert session.notificaciones[1].texto == "Second notification"
async def test_save_notification_without_phone_raises_error(
self, clean_redis: RedisService,
) -> None:
"""Test saving notification without phone number raises ValueError."""
notification = Notification.create(
id_notificacion="notif-4",
telefono="",
texto="Test",
)
with pytest.raises(ValueError, match="Phone number is required"):
await clean_redis.save_or_append_notification(notification)
async def test_save_notification_with_whitespace_phone_raises_error(
self, clean_redis: RedisService,
) -> None:
"""Test saving notification with whitespace-only phone raises ValueError."""
notification = Notification.create(
id_notificacion="notif-5",
telefono=" ",
texto="Test",
)
with pytest.raises(ValueError, match="Phone number is required"):
await clean_redis.save_or_append_notification(notification)
async def test_get_notification_session_not_found(
self, clean_redis: RedisService,
) -> None:
"""Test retrieving non-existent notification session returns None."""
session = await clean_redis.get_notification_session("+0000000000")
assert session is None
async def test_get_notification_id_for_phone(
self, clean_redis: RedisService,
) -> None:
"""Test getting notification session ID for phone number."""
phone = "+1010101010"
# Create notification
notification = Notification.create(
id_notificacion="notif-6",
telefono=phone,
texto="Test",
)
await clean_redis.save_or_append_notification(notification)
# Get session ID for phone
session_id = await clean_redis.get_notification_id_for_phone(phone)
assert session_id == phone # Phone number is used as session ID
async def test_get_notification_id_for_phone_not_found(
self, clean_redis: RedisService,
) -> None:
"""Test getting notification ID for non-existent phone returns None."""
session_id = await clean_redis.get_notification_id_for_phone("+0000000000")
assert session_id is None
async def test_delete_notification_session(self, clean_redis: RedisService) -> None:
"""Test deleting notification session."""
phone = "+1212121212"
# Create notification
notification = Notification.create(
id_notificacion="notif-7",
telefono=phone,
texto="Test",
)
await clean_redis.save_or_append_notification(notification)
# Verify it exists
session = await clean_redis.get_notification_session(phone)
assert session is not None
# Delete notification session
success = await clean_redis.delete_notification_session(phone)
assert success is True
# Verify deletion
session = await clean_redis.get_notification_session(phone)
assert session is None
async def test_delete_nonexistent_notification_session(
self, clean_redis: RedisService,
) -> None:
"""Test deleting non-existent notification session succeeds."""
# Should not raise error
success = await clean_redis.delete_notification_session("+0000000000")
assert success is True
async def test_phone_to_notification_mapping(
self, clean_redis: RedisService,
) -> None:
"""Test that phone-to-notification mapping is created."""
phone = "+1313131313"
notification = Notification.create(
id_notificacion="notif-8",
telefono=phone,
texto="Test",
)
# Save notification
await clean_redis.save_or_append_notification(notification)
# Verify phone mapping key exists in Redis
assert clean_redis.redis is not None
phone_key = clean_redis._phone_to_notification_key(phone)
mapped_session_id = await clean_redis.redis.get(phone_key)
assert mapped_session_id == phone
async def test_notification_timestamps_updated(
self, clean_redis: RedisService,
) -> None:
"""Test that notification session timestamps are updated correctly."""
phone = "+1414141414"
# Create first notification
notification1 = Notification.create(
id_notificacion="notif-9",
telefono=phone,
texto="First",
)
await clean_redis.save_or_append_notification(notification1)
# Get initial session
session1 = await clean_redis.get_notification_session(phone)
assert session1 is not None
initial_update_time = session1.ultima_actualizacion
# Wait a moment and add another notification
import asyncio
await asyncio.sleep(0.01)
notification2 = Notification.create(
id_notificacion="notif-10",
telefono=phone,
texto="Second",
)
await clean_redis.save_or_append_notification(notification2)
# Get updated session
session2 = await clean_redis.get_notification_session(phone)
assert session2 is not None
# Creation time should stay the same
assert session2.fecha_creacion == session1.fecha_creacion
# Update time should be newer
assert session2.ultima_actualizacion > initial_update_time
async def test_get_notification_session_deserialization_error(
self, clean_redis: RedisService,
) -> None:
"""Test get_notification_session handles deserialization errors gracefully."""
# Manually insert invalid JSON
assert clean_redis.redis is not None
key = clean_redis._notification_key("invalid-notif-session")
await clean_redis.redis.set(key, "invalid json data")
# Should return None on deserialization error
session = await clean_redis.get_notification_session("invalid-notif-session")
assert session is None
class TestErrorHandling:
"""Tests for error handling in Redis operations."""
async def test_get_session_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test get_session raises error when Redis not connected."""
service = RedisService(emulator_settings)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.get_session("test-session")
async def test_save_session_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test save_session raises error when Redis not connected."""
service = RedisService(emulator_settings)
session = ConversationSession.create(
session_id="test",
user_id="user",
telefono="+1234567890",
)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.save_session(session)
async def test_delete_session_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test delete_session raises error when Redis not connected."""
service = RedisService(emulator_settings)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.delete_session("test-session")
async def test_exists_when_not_connected(self, emulator_settings: Settings) -> None:
"""Test exists raises error when Redis not connected."""
service = RedisService(emulator_settings)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.exists("test-session")
async def test_save_message_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test save_message raises error when Redis not connected."""
service = RedisService(emulator_settings)
message = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user",
type="CONVERSACION",
text="Test",
)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.save_message("test-session", message)
async def test_get_messages_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test get_messages raises error when Redis not connected."""
service = RedisService(emulator_settings)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.get_messages("test-session")
async def test_save_notification_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test save_or_append_notification raises error when Redis not connected."""
service = RedisService(emulator_settings)
notification = Notification.create(
id_notificacion="notif-1",
telefono="+1234567890",
texto="Test",
)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.save_or_append_notification(notification)
async def test_get_notification_session_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test get_notification_session raises error when Redis not connected."""
service = RedisService(emulator_settings)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.get_notification_session("test-session")
async def test_get_notification_id_for_phone_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test get_notification_id_for_phone raises error when Redis not connected."""
service = RedisService(emulator_settings)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.get_notification_id_for_phone("+1234567890")
async def test_delete_notification_session_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test delete_notification_session raises error when Redis not connected."""
service = RedisService(emulator_settings)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service.delete_notification_session("+1234567890")
async def test_save_session_with_redis_error(
self, clean_redis: RedisService,
) -> None:
"""Test save_session handles Redis errors gracefully."""
session = ConversationSession.create(
session_id="test",
user_id="user",
telefono="+1234567890",
)
# Mock redis to raise exception
original_redis = clean_redis.redis
mock_redis = AsyncMock()
mock_redis.setex.side_effect = Exception("Redis error")
clean_redis.redis = mock_redis
try:
result = await clean_redis.save_session(session)
assert result is False
finally:
clean_redis.redis = original_redis
async def test_delete_session_with_redis_error(
self, clean_redis: RedisService,
) -> None:
"""Test delete_session handles Redis errors gracefully."""
# Mock redis to raise exception
original_redis = clean_redis.redis
mock_redis = AsyncMock()
mock_redis.delete.side_effect = Exception("Redis error")
clean_redis.redis = mock_redis
try:
result = await clean_redis.delete_session("test-session")
assert result is False
finally:
clean_redis.redis = original_redis
async def test_save_message_with_redis_error(
self, clean_redis: RedisService,
) -> None:
"""Test save_message handles Redis errors gracefully."""
message = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user",
type="CONVERSACION",
text="Test",
)
# Mock redis to raise exception
original_redis = clean_redis.redis
mock_redis = AsyncMock()
mock_redis.zadd.side_effect = Exception("Redis error")
clean_redis.redis = mock_redis
try:
result = await clean_redis.save_message("test-session", message)
assert result is False
finally:
clean_redis.redis = original_redis
async def test_get_messages_with_redis_error(
self, clean_redis: RedisService,
) -> None:
"""Test get_messages handles Redis errors gracefully."""
# Mock redis to raise exception
original_redis = clean_redis.redis
mock_redis = AsyncMock()
mock_redis.zrange.side_effect = Exception("Redis error")
clean_redis.redis = mock_redis
try:
result = await clean_redis.get_messages("test-session")
assert result == []
finally:
clean_redis.redis = original_redis
async def test_delete_notification_session_with_redis_error(
self, clean_redis: RedisService,
) -> None:
"""Test delete_notification_session handles Redis errors gracefully."""
# Mock redis to raise exception
original_redis = clean_redis.redis
mock_redis = AsyncMock()
mock_redis.delete.side_effect = Exception("Redis error")
clean_redis.redis = mock_redis
try:
result = await clean_redis.delete_notification_session("+1234567890")
assert result is False
finally:
clean_redis.redis = original_redis
async def test_cache_notification_session_when_not_connected(
self, emulator_settings: Settings,
) -> None:
"""Test _cache_notification_session raises error when Redis not connected."""
service = RedisService(emulator_settings)
notification_session = NotificationSession(
sessionId="test",
telefono="+1234567890",
notificaciones=[],
)
with pytest.raises(RuntimeError, match="Redis client not connected"):
await service._cache_notification_session(notification_session)
async def test_cache_notification_session_with_redis_error(
self, clean_redis: RedisService,
) -> None:
"""Test _cache_notification_session handles Redis errors gracefully."""
notification_session = NotificationSession(
sessionId="test",
telefono="+1234567890",
notificaciones=[],
)
# Mock redis to raise exception on setex
original_redis = clean_redis.redis
mock_redis = AsyncMock()
mock_redis.setex.side_effect = Exception("Redis error")
clean_redis.redis = mock_redis
try:
result = await clean_redis._cache_notification_session(notification_session)
assert result is False
finally:
clean_redis.redis = original_redis
class TestEdgeCases:
"""Tests for edge cases and boundary conditions."""
async def test_concurrent_session_operations(
self, clean_redis: RedisService,
) -> None:
"""Test concurrent operations on same session."""
import asyncio
session = ConversationSession.create(
session_id="concurrent-test",
user_id="user-999",
telefono="+1515151515",
)
# Save session concurrently
tasks = [clean_redis.save_session(session) for _ in range(5)]
results = await asyncio.gather(*tasks)
assert all(results)
# Verify session exists
retrieved = await clean_redis.get_session("concurrent-test")
assert retrieved is not None
async def test_special_characters_in_session_data(
self, clean_redis: RedisService,
) -> None:
"""Test handling special characters in session data."""
session = ConversationSession.create(
session_id="special-chars-test",
user_id="user-special",
telefono="+1616161616",
pantalla_contexto="screen/with/slashes",
last_message='Message with emoji 🎉 and special chars: <>&"\'',
)
# Save and retrieve
await clean_redis.save_session(session)
retrieved = await clean_redis.get_session("special-chars-test")
assert retrieved is not None
assert retrieved.pantalla_contexto == "screen/with/slashes"
assert retrieved.last_message is not None
assert "🎉" in retrieved.last_message
assert '<>&"' in retrieved.last_message
async def test_unicode_in_notification_text(
self, clean_redis: RedisService,
) -> None:
"""Test handling unicode characters in notification text."""
notification = Notification.create(
id_notificacion="unicode-test",
telefono="+1717171717",
texto="Notification with unicode: 你好世界 مرحبا العالم 🌍",
)
# Save and retrieve
await clean_redis.save_or_append_notification(notification)
session = await clean_redis.get_notification_session("+1717171717")
assert session is not None
assert "你好世界" in session.notificaciones[0].texto
assert "مرحبا العالم" in session.notificaciones[0].texto
assert "🌍" in session.notificaciones[0].texto
async def test_large_message_text(self, clean_redis: RedisService) -> None:
"""Test handling large message text."""
large_text = "A" * 10000 # 10KB of text
message = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user",
type="CONVERSACION",
text=large_text,
)
session_id = "large-message-test"
success = await clean_redis.save_message(session_id, message)
assert success is True
# Retrieve and verify
messages = await clean_redis.get_messages(session_id)
assert len(messages) == 1
assert len(messages[0]["text"]) == 10000
async def test_many_messages_in_session(self, clean_redis: RedisService) -> None:
"""Test handling many messages in a single session."""
session_id = "many-messages-test"
# Save 100 messages
for i in range(100):
message = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user" if i % 2 == 0 else "assistant",
type="CONVERSACION",
text=f"Message {i}",
)
await clean_redis.save_message(session_id, message)
# Retrieve all messages
messages = await clean_redis.get_messages(session_id)
assert len(messages) == 100
async def test_many_notifications_in_session(
self, clean_redis: RedisService,
) -> None:
"""Test handling many notifications in a single session."""
phone = "+1818181818"
# Add 50 notifications
for i in range(50):
notification = Notification.create(
id_notificacion=f"notif-{i}",
telefono=phone,
texto=f"Notification {i}",
)
await clean_redis.save_or_append_notification(notification)
# Retrieve session
session = await clean_redis.get_notification_session(phone)
assert session is not None
assert len(session.notificaciones) == 50
async def test_session_ttl_is_set(self, clean_redis: RedisService) -> None:
"""Test that session TTL is set in Redis."""
session = ConversationSession.create(
session_id="ttl-test",
user_id="user-ttl",
telefono="+1919191919",
)
# Save session
await clean_redis.save_session(session)
# Check TTL is set
assert clean_redis.redis is not None
key = clean_redis._session_key("ttl-test")
ttl = await clean_redis.redis.ttl(key)
assert ttl > 0
assert ttl <= clean_redis.session_ttl
async def test_notification_ttl_is_set(self, clean_redis: RedisService) -> None:
"""Test that notification TTL is set in Redis."""
notification = Notification.create(
id_notificacion="ttl-notif",
telefono="+2020202020",
texto="Test",
)
# Save notification
await clean_redis.save_or_append_notification(notification)
# Check TTL is set
assert clean_redis.redis is not None
key = clean_redis._notification_key("+2020202020")
ttl = await clean_redis.redis.ttl(key)
assert ttl > 0
assert ttl <= clean_redis.notification_ttl
async def test_message_ttl_is_set(self, clean_redis: RedisService) -> None:
"""Test that message TTL is set in Redis."""
session_id = "message-ttl-test"
message = ConversationEntry(
timestamp=datetime.now(UTC),
entity="user",
type="CONVERSACION",
text="Test",
)
# Save message
await clean_redis.save_message(session_id, message)
# Check TTL is set
assert clean_redis.redis is not None
key = clean_redis._messages_key(session_id)
ttl = await clean_redis.redis.ttl(key)
assert ttl > 0
assert ttl <= clean_redis.session_ttl