This commit is contained in:
2026-02-20 04:38:32 +00:00
committed by Anibal Angulo
parent 5fecad8e2d
commit 52a674959c
20 changed files with 309 additions and 283 deletions

View File

@@ -1,12 +1,12 @@
import json
import logging
from datetime import datetime
from redis.asyncio import Redis
from ..config import Settings
from ..models import ConversationSession
from ..models.notification import NotificationSession, Notification
from capa_de_integracion.config import Settings
from capa_de_integracion.models import ConversationSession
from capa_de_integracion.models.notification import Notification, NotificationSession
logger = logging.getLogger(__name__)
@@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class RedisService:
"""Service for Redis operations on conversation sessions."""
def __init__(self, settings: Settings):
def __init__(self, settings: Settings) -> None:
"""Initialize Redis client."""
self.settings = settings
self.redis: Redis | None = None
@@ -22,7 +22,7 @@ class RedisService:
self.notification_ttl = 2592000 # 30 days in seconds
self.qr_session_ttl = 86400 # 24 hours in seconds
async def connect(self):
async def connect(self) -> None:
"""Connect to Redis."""
self.redis = Redis(
host=self.settings.redis_host,
@@ -31,10 +31,10 @@ class RedisService:
decode_responses=True,
)
logger.info(
f"Connected to Redis at {self.settings.redis_host}:{self.settings.redis_port}"
f"Connected to Redis at {self.settings.redis_host}:{self.settings.redis_port}",
)
async def close(self):
async def close(self) -> None:
"""Close Redis connection."""
if self.redis:
await self.redis.close()
@@ -49,17 +49,18 @@ class RedisService:
return f"conversation:phone:{phone}"
async def get_session(self, session_id_or_phone: str) -> ConversationSession | None:
"""
Retrieve conversation session from Redis by session ID or phone number.
"""Retrieve conversation session from Redis by session ID or phone number.
Args:
session_id_or_phone: Either a session ID or phone number
Returns:
Conversation session or None if not found
"""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
# First try as phone number (lookup session ID)
phone_key = self._phone_to_session_key(session_id_or_phone)
@@ -86,17 +87,17 @@ class RedisService:
logger.debug(f"Retrieved session from Redis: {session_id}")
return session
except Exception as e:
logger.error(f"Error deserializing session {session_id}: {str(e)}")
logger.exception(f"Error deserializing session {session_id}: {e!s}")
return None
async def save_session(self, session: ConversationSession) -> bool:
"""
Save conversation session to Redis with TTL.
"""Save conversation session to Redis with TTL.
Also stores phone-to-session mapping for lookup by phone number.
"""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session.sessionId)
phone_key = self._phone_to_session_key(session.telefono)
@@ -110,17 +111,18 @@ class RedisService:
await self.redis.setex(phone_key, self.session_ttl, session.sessionId)
logger.debug(
f"Saved session to Redis: {session.sessionId} for phone: {session.telefono}"
f"Saved session to Redis: {session.sessionId} for phone: {session.telefono}",
)
return True
except Exception as e:
logger.error(f"Error saving session {session.sessionId} to Redis: {str(e)}")
logger.exception(f"Error saving session {session.sessionId} to Redis: {e!s}")
return False
async def delete_session(self, session_id: str) -> bool:
"""Delete conversation session from Redis."""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session_id)
@@ -129,13 +131,14 @@ class RedisService:
logger.debug(f"Deleted session from Redis: {session_id}")
return result > 0
except Exception as e:
logger.error(f"Error deleting session {session_id} from Redis: {str(e)}")
logger.exception(f"Error deleting session {session_id} from Redis: {e!s}")
return False
async def exists(self, session_id: str) -> bool:
"""Check if session exists in Redis."""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session_id)
return await self.redis.exists(key) > 0
@@ -147,8 +150,7 @@ class RedisService:
return f"conversation:messages:{session_id}"
async def save_message(self, session_id: str, message) -> bool:
"""
Save a conversation message to Redis sorted set.
"""Save a conversation message to Redis sorted set.
Messages are stored in a sorted set with timestamp as score.
@@ -158,9 +160,11 @@ class RedisService:
Returns:
True if successful, False otherwise
"""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._messages_key(session_id)
@@ -178,14 +182,13 @@ class RedisService:
logger.debug(f"Saved message to Redis: {session_id}")
return True
except Exception as e:
logger.error(
f"Error saving message to Redis for session {session_id}: {str(e)}"
logger.exception(
f"Error saving message to Redis for session {session_id}: {e!s}",
)
return False
async def get_messages(self, session_id: str) -> list:
"""
Retrieve all conversation messages for a session from Redis.
"""Retrieve all conversation messages for a session from Redis.
Returns messages ordered by timestamp (oldest first).
@@ -194,9 +197,11 @@ class RedisService:
Returns:
List of message dictionaries (parsed from JSON)
"""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._messages_key(session_id)
@@ -214,16 +219,16 @@ class RedisService:
try:
messages.append(json.loads(msg_str))
except json.JSONDecodeError as e:
logger.error(f"Error parsing message JSON: {str(e)}")
logger.exception(f"Error parsing message JSON: {e!s}")
continue
logger.debug(
f"Retrieved {len(messages)} messages from Redis for session: {session_id}"
f"Retrieved {len(messages)} messages from Redis for session: {session_id}",
)
return messages
except Exception as e:
logger.error(
f"Error retrieving messages from Redis for session {session_id}: {str(e)}"
logger.exception(
f"Error retrieving messages from Redis for session {session_id}: {e!s}",
)
return []
@@ -238,21 +243,23 @@ class RedisService:
return f"notification:phone_to_notification:{phone}"
async def save_or_append_notification(self, new_entry: Notification) -> None:
"""
Save or append notification entry to session.
"""Save or append notification entry to session.
Args:
new_entry: Notification entry to save
Raises:
ValueError: If phone number is missing
"""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
phone_number = new_entry.telefono
if not phone_number or not phone_number.strip():
raise ValueError("Phone number is required to manage notification entries")
msg = "Phone number is required to manage notification entries"
raise ValueError(msg)
# Use phone number as session ID for notifications
notification_session_id = phone_number
@@ -262,7 +269,7 @@ class RedisService:
if existing_session:
# Append to existing session
updated_notifications = existing_session.notificaciones + [new_entry]
updated_notifications = [*existing_session.notificaciones, new_entry]
updated_session = NotificationSession(
sessionId=notification_session_id,
telefono=phone_number,
@@ -286,7 +293,8 @@ class RedisService:
async def _cache_notification_session(self, session: NotificationSession) -> bool:
"""Cache notification session in Redis."""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._notification_key(session.sessionId)
phone_key = self._phone_to_notification_key(session.telefono)
@@ -302,17 +310,18 @@ class RedisService:
logger.debug(f"Cached notification session: {session.sessionId}")
return True
except Exception as e:
logger.error(
f"Error caching notification session {session.sessionId}: {str(e)}"
logger.exception(
f"Error caching notification session {session.sessionId}: {e!s}",
)
return False
async def get_notification_session(
self, session_id: str
self, session_id: str,
) -> NotificationSession | None:
"""Retrieve notification session from Redis."""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._notification_key(session_id)
data = await self.redis.get(key)
@@ -327,15 +336,16 @@ class RedisService:
logger.info(f"Notification session {session_id} retrieved from Redis")
return session
except Exception as e:
logger.error(
f"Error deserializing notification session {session_id}: {str(e)}"
logger.exception(
f"Error deserializing notification session {session_id}: {e!s}",
)
return None
async def get_notification_id_for_phone(self, phone: str) -> str | None:
"""Get notification session ID for a phone number."""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._phone_to_notification_key(phone)
session_id = await self.redis.get(key)
@@ -350,7 +360,8 @@ class RedisService:
async def delete_notification_session(self, phone_number: str) -> bool:
"""Delete notification session from Redis."""
if not self.redis:
raise RuntimeError("Redis client not connected")
msg = "Redis client not connected"
raise RuntimeError(msg)
notification_key = self._notification_key(phone_number)
phone_key = self._phone_to_notification_key(phone_number)
@@ -361,7 +372,7 @@ class RedisService:
await self.redis.delete(phone_key)
return True
except Exception as e:
logger.error(
f"Error deleting notification session for phone {phone_number}: {str(e)}"
logger.exception(
f"Error deleting notification session for phone {phone_number}: {e!s}",
)
return False