396 lines
13 KiB
Python
396 lines
13 KiB
Python
"""Redis service for caching conversation sessions and notifications."""
|
|
|
|
import json
|
|
import logging
|
|
from datetime import UTC, datetime
|
|
|
|
from redis.asyncio import Redis
|
|
|
|
from capa_de_integracion.config import Settings
|
|
from capa_de_integracion.models import ConversationEntry, ConversationSession
|
|
from capa_de_integracion.models.notification import Notification, NotificationSession
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RedisService:
|
|
"""Service for Redis operations on conversation sessions."""
|
|
|
|
def __init__(self, settings: Settings) -> None:
|
|
"""Initialize Redis client."""
|
|
self.settings = settings
|
|
self.redis: Redis | None = None
|
|
self.session_ttl = 2592000 # 30 days in seconds
|
|
self.notification_ttl = 2592000 # 30 days in seconds
|
|
self.qr_session_ttl = 86400 # 24 hours in seconds
|
|
|
|
async def connect(self) -> None:
|
|
"""Connect to Redis."""
|
|
self.redis = Redis(
|
|
host=self.settings.redis_host,
|
|
port=self.settings.redis_port,
|
|
password=self.settings.redis_pwd,
|
|
decode_responses=True,
|
|
)
|
|
logger.info(
|
|
"Connected to Redis at %s:%s",
|
|
self.settings.redis_host,
|
|
self.settings.redis_port,
|
|
)
|
|
|
|
async def close(self) -> None:
|
|
"""Close Redis connection."""
|
|
if self.redis:
|
|
await self.redis.close()
|
|
logger.info("Redis connection closed")
|
|
|
|
def _session_key(self, session_id: str) -> str:
|
|
"""Generate Redis key for conversation session."""
|
|
return f"conversation:session:{session_id}"
|
|
|
|
def _phone_to_session_key(self, phone: str) -> str:
|
|
"""Generate Redis key for phone-to-session mapping."""
|
|
return f"conversation:phone:{phone}"
|
|
|
|
async def get_session(self, session_id_or_phone: str) -> ConversationSession | None:
|
|
"""Retrieve conversation session from Redis by session ID or phone number.
|
|
|
|
Args:
|
|
session_id_or_phone: Either a session ID or phone number
|
|
|
|
Returns:
|
|
Conversation session or None if not found
|
|
|
|
"""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
# First try as phone number (lookup session ID)
|
|
phone_key = self._phone_to_session_key(session_id_or_phone)
|
|
mapped_session_id = await self.redis.get(phone_key)
|
|
|
|
# Use mapped session ID if found, otherwise use input directly
|
|
session_id = mapped_session_id or session_id_or_phone
|
|
|
|
# Get session by ID
|
|
key = self._session_key(session_id)
|
|
data = await self.redis.get(key)
|
|
|
|
if not data:
|
|
logger.debug("Session not found in Redis: %s", session_id_or_phone)
|
|
return None
|
|
|
|
try:
|
|
session_dict = json.loads(data)
|
|
session = ConversationSession.model_validate(session_dict)
|
|
logger.debug("Retrieved session from Redis: %s", session_id)
|
|
except Exception:
|
|
logger.exception("Error deserializing session %s:", session_id)
|
|
return None
|
|
else:
|
|
return session
|
|
|
|
async def save_session(self, session: ConversationSession) -> bool:
|
|
"""Save conversation session to Redis with TTL.
|
|
|
|
Also stores phone-to-session mapping for lookup by phone number.
|
|
"""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
key = self._session_key(session.session_id)
|
|
phone_key = self._phone_to_session_key(session.telefono)
|
|
|
|
try:
|
|
# Save session data
|
|
data = session.model_dump_json(by_alias=False)
|
|
await self.redis.setex(key, self.session_ttl, data)
|
|
|
|
# Save phone-to-session mapping
|
|
await self.redis.setex(phone_key, self.session_ttl, session.session_id)
|
|
|
|
logger.debug(
|
|
"Saved session to Redis: %s for phone: %s",
|
|
session.session_id,
|
|
session.telefono,
|
|
)
|
|
except Exception:
|
|
logger.exception("Error saving session %s to Redis:", session.session_id)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
async def delete_session(self, session_id: str) -> bool:
|
|
"""Delete conversation session from Redis."""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
key = self._session_key(session_id)
|
|
|
|
try:
|
|
result = await self.redis.delete(key)
|
|
logger.debug("Deleted session from Redis: %s", session_id)
|
|
except Exception:
|
|
logger.exception("Error deleting session %s from Redis:", session_id)
|
|
return False
|
|
else:
|
|
return result > 0
|
|
|
|
async def exists(self, session_id: str) -> bool:
|
|
"""Check if session exists in Redis."""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
key = self._session_key(session_id)
|
|
return await self.redis.exists(key) > 0
|
|
|
|
# ====== Message Methods ======
|
|
|
|
def _messages_key(self, session_id: str) -> str:
|
|
"""Generate Redis key for conversation messages."""
|
|
return f"conversation:messages:{session_id}"
|
|
|
|
async def save_message(self, session_id: str, message: ConversationEntry) -> bool:
|
|
"""Save a conversation message to Redis sorted set.
|
|
|
|
Messages are stored in a sorted set with timestamp as score.
|
|
|
|
Args:
|
|
session_id: The session ID
|
|
message: ConversationEntry
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
|
|
"""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
key = self._messages_key(session_id)
|
|
|
|
try:
|
|
# Convert message to JSON
|
|
message_data = message.model_dump_json(by_alias=False)
|
|
# Use timestamp as score (in milliseconds)
|
|
score = message.timestamp.timestamp() * 1000
|
|
|
|
# Add to sorted set
|
|
await self.redis.zadd(key, {message_data: score})
|
|
# Set TTL on the messages key to match session TTL
|
|
await self.redis.expire(key, self.session_ttl)
|
|
|
|
logger.debug("Saved message to Redis: %s", session_id)
|
|
except Exception:
|
|
logger.exception(
|
|
"Error saving message to Redis for session %s:",
|
|
session_id,
|
|
)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
async def get_messages(self, session_id: str) -> list:
|
|
"""Retrieve all conversation messages for a session from Redis.
|
|
|
|
Returns messages ordered by timestamp (oldest first).
|
|
|
|
Args:
|
|
session_id: The session ID
|
|
|
|
Returns:
|
|
List of message dictionaries (parsed from JSON)
|
|
|
|
"""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
key = self._messages_key(session_id)
|
|
|
|
try:
|
|
# Get all messages from sorted set (ordered by score/timestamp)
|
|
message_strings = await self.redis.zrange(key, 0, -1)
|
|
|
|
if not message_strings:
|
|
logger.debug("No messages found in Redis for session: %s", session_id)
|
|
return []
|
|
|
|
# Parse JSON strings to dictionaries
|
|
messages = []
|
|
for msg_str in message_strings:
|
|
try:
|
|
messages.append(json.loads(msg_str))
|
|
except json.JSONDecodeError:
|
|
logger.exception("Error parsing message JSON:")
|
|
continue
|
|
|
|
logger.debug(
|
|
"Retrieved %s messages from Redis for session: %s",
|
|
len(messages),
|
|
session_id,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Error retrieving messages from Redis for session %s:",
|
|
session_id,
|
|
)
|
|
return []
|
|
else:
|
|
return messages
|
|
|
|
# ====== Notification Methods ======
|
|
|
|
def _notification_key(self, session_id: str) -> str:
|
|
"""Generate Redis key for notification session."""
|
|
return f"notification:{session_id}"
|
|
|
|
def _phone_to_notification_key(self, phone: str) -> str:
|
|
"""Generate Redis key for phone-to-notification mapping."""
|
|
return f"notification:phone_to_notification:{phone}"
|
|
|
|
async def save_or_append_notification(self, new_entry: Notification) -> None:
|
|
"""Save or append notification entry to session.
|
|
|
|
Args:
|
|
new_entry: Notification entry to save
|
|
|
|
Raises:
|
|
ValueError: If phone number is missing
|
|
|
|
"""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
phone_number = new_entry.telefono
|
|
if not phone_number or not phone_number.strip():
|
|
msg = "Phone number is required to manage notification entries"
|
|
raise ValueError(msg)
|
|
|
|
# Use phone number as session ID for notifications
|
|
notification_session_id = phone_number
|
|
|
|
# Get existing session or create new one
|
|
existing_session = await self.get_notification_session(notification_session_id)
|
|
|
|
if existing_session:
|
|
# Append to existing session
|
|
updated_notifications = [*existing_session.notificaciones, new_entry]
|
|
updated_session = NotificationSession(
|
|
sessionId=notification_session_id,
|
|
telefono=phone_number,
|
|
fechaCreacion=existing_session.fecha_creacion,
|
|
ultimaActualizacion=datetime.now(UTC),
|
|
notificaciones=updated_notifications,
|
|
)
|
|
else:
|
|
# Create new session
|
|
updated_session = NotificationSession(
|
|
sessionId=notification_session_id,
|
|
telefono=phone_number,
|
|
fechaCreacion=datetime.now(UTC),
|
|
ultimaActualizacion=datetime.now(UTC),
|
|
notificaciones=[new_entry],
|
|
)
|
|
|
|
# Save to Redis
|
|
await self._cache_notification_session(updated_session)
|
|
|
|
async def _cache_notification_session(self, session: NotificationSession) -> bool:
|
|
"""Cache notification session in Redis."""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
key = self._notification_key(session.session_id)
|
|
phone_key = self._phone_to_notification_key(session.telefono)
|
|
|
|
try:
|
|
# Save notification session
|
|
data = session.model_dump_json(by_alias=False)
|
|
await self.redis.setex(key, self.notification_ttl, data)
|
|
|
|
# Save phone-to-session mapping
|
|
await self.redis.setex(phone_key, self.notification_ttl, session.session_id)
|
|
|
|
logger.debug("Cached notification session: %s", session.session_id)
|
|
except Exception:
|
|
logger.exception(
|
|
"Error caching notification session %s:",
|
|
session.session_id,
|
|
)
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
async def get_notification_session(
|
|
self, session_id: str,
|
|
) -> NotificationSession | None:
|
|
"""Retrieve notification session from Redis."""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
key = self._notification_key(session_id)
|
|
data = await self.redis.get(key)
|
|
|
|
if not data:
|
|
logger.debug("Notification session not found in Redis: %s", session_id)
|
|
return None
|
|
|
|
try:
|
|
session_dict = json.loads(data)
|
|
session = NotificationSession.model_validate(session_dict)
|
|
logger.info("Notification session %s retrieved from Redis", session_id)
|
|
except Exception:
|
|
logger.exception(
|
|
"Error deserializing notification session %s:",
|
|
session_id,
|
|
)
|
|
return None
|
|
else:
|
|
return session
|
|
|
|
async def get_notification_id_for_phone(self, phone: str) -> str | None:
|
|
"""Get notification session ID for a phone number."""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
key = self._phone_to_notification_key(phone)
|
|
session_id = await self.redis.get(key)
|
|
|
|
if session_id:
|
|
logger.info("Session ID %s found for phone", session_id)
|
|
else:
|
|
logger.debug("Session ID not found for phone")
|
|
|
|
return session_id
|
|
|
|
async def delete_notification_session(self, phone_number: str) -> bool:
|
|
"""Delete notification session from Redis."""
|
|
if not self.redis:
|
|
msg = "Redis client not connected"
|
|
raise RuntimeError(msg)
|
|
|
|
notification_key = self._notification_key(phone_number)
|
|
phone_key = self._phone_to_notification_key(phone_number)
|
|
|
|
try:
|
|
logger.info("Deleting notification session for phone %s", phone_number)
|
|
await self.redis.delete(notification_key)
|
|
await self.redis.delete(phone_key)
|
|
except Exception:
|
|
logger.exception(
|
|
"Error deleting notification session for phone %s:",
|
|
phone_number,
|
|
)
|
|
return False
|
|
else:
|
|
return True
|