Misc improvements

This commit is contained in:
2026-02-20 06:59:31 +00:00
parent fd6b698077
commit e5ff673a54
33 changed files with 1844 additions and 420 deletions

View File

@@ -0,0 +1,396 @@
"""Redis service for caching conversation sessions and notifications."""
import json
import logging
from datetime import UTC, datetime
from redis.asyncio import Redis
from capa_de_integracion.config import Settings
from capa_de_integracion.models import ConversationEntry, ConversationSession
from capa_de_integracion.models.notification import Notification, NotificationSession
logger = logging.getLogger(__name__)
class RedisService:
"""Service for Redis operations on conversation sessions."""
def __init__(self, settings: Settings) -> None:
"""Initialize Redis client."""
self.settings = settings
self.redis: Redis | None = None
self.session_ttl = 2592000 # 30 days in seconds
self.notification_ttl = 2592000 # 30 days in seconds
self.qr_session_ttl = 86400 # 24 hours in seconds
async def connect(self) -> None:
"""Connect to Redis."""
self.redis = Redis(
host=self.settings.redis_host,
port=self.settings.redis_port,
password=self.settings.redis_pwd,
decode_responses=True,
)
logger.info(
"Connected to Redis at %s:%s",
self.settings.redis_host,
self.settings.redis_port,
)
async def close(self) -> None:
"""Close Redis connection."""
if self.redis:
await self.redis.aclose()
logger.info("Redis connection closed")
def _session_key(self, session_id: str) -> str:
"""Generate Redis key for conversation session."""
return f"conversation:session:{session_id}"
def _phone_to_session_key(self, phone: str) -> str:
"""Generate Redis key for phone-to-session mapping."""
return f"conversation:phone:{phone}"
async def get_session(self, session_id_or_phone: str) -> ConversationSession | None:
"""Retrieve conversation session from Redis by session ID or phone number.
Args:
session_id_or_phone: Either a session ID or phone number
Returns:
Conversation session or None if not found
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
# First try as phone number (lookup session ID)
phone_key = self._phone_to_session_key(session_id_or_phone)
mapped_session_id = await self.redis.get(phone_key)
# Use mapped session ID if found, otherwise use input directly
session_id = mapped_session_id or session_id_or_phone
# Get session by ID
key = self._session_key(session_id)
data = await self.redis.get(key)
if not data:
logger.debug("Session not found in Redis: %s", session_id_or_phone)
return None
try:
session_dict = json.loads(data)
session = ConversationSession.model_validate(session_dict)
logger.debug("Retrieved session from Redis: %s", session_id)
except Exception:
logger.exception("Error deserializing session %s:", session_id)
return None
else:
return session
async def save_session(self, session: ConversationSession) -> bool:
"""Save conversation session to Redis with TTL.
Also stores phone-to-session mapping for lookup by phone number.
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session.session_id)
phone_key = self._phone_to_session_key(session.telefono)
try:
# Save session data
data = session.model_dump_json(by_alias=False)
await self.redis.setex(key, self.session_ttl, data)
# Save phone-to-session mapping
await self.redis.setex(phone_key, self.session_ttl, session.session_id)
logger.debug(
"Saved session to Redis: %s for phone: %s",
session.session_id,
session.telefono,
)
except Exception:
logger.exception("Error saving session %s to Redis:", session.session_id)
return False
else:
return True
async def delete_session(self, session_id: str) -> bool:
"""Delete conversation session from Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session_id)
try:
result = await self.redis.delete(key)
logger.debug("Deleted session from Redis: %s", session_id)
except Exception:
logger.exception("Error deleting session %s from Redis:", session_id)
return False
else:
return result > 0
async def exists(self, session_id: str) -> bool:
"""Check if session exists in Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session_id)
return await self.redis.exists(key) > 0
# ====== Message Methods ======
def _messages_key(self, session_id: str) -> str:
"""Generate Redis key for conversation messages."""
return f"conversation:messages:{session_id}"
async def save_message(self, session_id: str, message: ConversationEntry) -> bool:
"""Save a conversation message to Redis sorted set.
Messages are stored in a sorted set with timestamp as score.
Args:
session_id: The session ID
message: ConversationEntry
Returns:
True if successful, False otherwise
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._messages_key(session_id)
try:
# Convert message to JSON
message_data = message.model_dump_json(by_alias=False)
# Use timestamp as score (in milliseconds)
score = message.timestamp.timestamp() * 1000
# Add to sorted set
await self.redis.zadd(key, {message_data: score})
# Set TTL on the messages key to match session TTL
await self.redis.expire(key, self.session_ttl)
logger.debug("Saved message to Redis: %s", session_id)
except Exception:
logger.exception(
"Error saving message to Redis for session %s:",
session_id,
)
return False
else:
return True
async def get_messages(self, session_id: str) -> list:
"""Retrieve all conversation messages for a session from Redis.
Returns messages ordered by timestamp (oldest first).
Args:
session_id: The session ID
Returns:
List of message dictionaries (parsed from JSON)
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._messages_key(session_id)
try:
# Get all messages from sorted set (ordered by score/timestamp)
message_strings = await self.redis.zrange(key, 0, -1)
if not message_strings:
logger.debug("No messages found in Redis for session: %s", session_id)
return []
# Parse JSON strings to dictionaries
messages = []
for msg_str in message_strings:
try:
messages.append(json.loads(msg_str))
except json.JSONDecodeError:
logger.exception("Error parsing message JSON:")
continue
logger.debug(
"Retrieved %s messages from Redis for session: %s",
len(messages),
session_id,
)
except Exception:
logger.exception(
"Error retrieving messages from Redis for session %s:",
session_id,
)
return []
else:
return messages
# ====== Notification Methods ======
def _notification_key(self, session_id: str) -> str:
"""Generate Redis key for notification session."""
return f"notification:{session_id}"
def _phone_to_notification_key(self, phone: str) -> str:
"""Generate Redis key for phone-to-notification mapping."""
return f"notification:phone_to_notification:{phone}"
async def save_or_append_notification(self, new_entry: Notification) -> None:
"""Save or append notification entry to session.
Args:
new_entry: Notification entry to save
Raises:
ValueError: If phone number is missing
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
phone_number = new_entry.telefono
if not phone_number or not phone_number.strip():
msg = "Phone number is required to manage notification entries"
raise ValueError(msg)
# Use phone number as session ID for notifications
notification_session_id = phone_number
# Get existing session or create new one
existing_session = await self.get_notification_session(notification_session_id)
if existing_session:
# Append to existing session
updated_notifications = [*existing_session.notificaciones, new_entry]
updated_session = NotificationSession(
sessionId=notification_session_id,
telefono=phone_number,
fechaCreacion=existing_session.fecha_creacion,
ultimaActualizacion=datetime.now(UTC),
notificaciones=updated_notifications,
)
else:
# Create new session
updated_session = NotificationSession(
sessionId=notification_session_id,
telefono=phone_number,
fechaCreacion=datetime.now(UTC),
ultimaActualizacion=datetime.now(UTC),
notificaciones=[new_entry],
)
# Save to Redis
await self._cache_notification_session(updated_session)
async def _cache_notification_session(self, session: NotificationSession) -> bool:
"""Cache notification session in Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._notification_key(session.session_id)
phone_key = self._phone_to_notification_key(session.telefono)
try:
# Save notification session
data = session.model_dump_json(by_alias=False)
await self.redis.setex(key, self.notification_ttl, data)
# Save phone-to-session mapping
await self.redis.setex(phone_key, self.notification_ttl, session.session_id)
logger.debug("Cached notification session: %s", session.session_id)
except Exception:
logger.exception(
"Error caching notification session %s:",
session.session_id,
)
return False
else:
return True
async def get_notification_session(
self,
session_id: str,
) -> NotificationSession | None:
"""Retrieve notification session from Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._notification_key(session_id)
data = await self.redis.get(key)
if not data:
logger.debug("Notification session not found in Redis: %s", session_id)
return None
try:
session_dict = json.loads(data)
session = NotificationSession.model_validate(session_dict)
logger.info("Notification session %s retrieved from Redis", session_id)
except Exception:
logger.exception(
"Error deserializing notification session %s:",
session_id,
)
return None
else:
return session
async def get_notification_id_for_phone(self, phone: str) -> str | None:
"""Get notification session ID for a phone number."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._phone_to_notification_key(phone)
session_id = await self.redis.get(key)
if session_id:
logger.info("Session ID %s found for phone", session_id)
else:
logger.debug("Session ID not found for phone")
return session_id
async def delete_notification_session(self, phone_number: str) -> bool:
"""Delete notification session from Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
notification_key = self._notification_key(phone_number)
phone_key = self._phone_to_notification_key(phone_number)
try:
logger.info("Deleting notification session for phone %s", phone_number)
await self.redis.delete(notification_key)
await self.redis.delete(phone_key)
except Exception:
logger.exception(
"Error deleting notification session for phone %s:",
phone_number,
)
return False
else:
return True