Misc improvements
This commit is contained in:
@@ -1,10 +1,10 @@
|
||||
"""Services module."""
|
||||
|
||||
from .conversation_manager import ConversationManagerService
|
||||
from .dlp_service import DLPService
|
||||
from .notification_manager import NotificationManagerService
|
||||
from .quick_reply_content import QuickReplyContentService
|
||||
from .quick_reply_session_service import QuickReplySessionService
|
||||
from capa_de_integracion.services.conversation import ConversationManagerService
|
||||
from capa_de_integracion.services.dlp import DLPService
|
||||
from capa_de_integracion.services.notifications import NotificationManagerService
|
||||
from capa_de_integracion.services.quick_reply.content import QuickReplyContentService
|
||||
from capa_de_integracion.services.quick_reply.session import QuickReplySessionService
|
||||
|
||||
__all__ = [
|
||||
"ConversationManagerService",
|
||||
|
||||
@@ -14,12 +14,11 @@ from capa_de_integracion.models import (
|
||||
QueryResult,
|
||||
)
|
||||
from capa_de_integracion.models.notification import NotificationSession
|
||||
from capa_de_integracion.services.dlp import DLPService
|
||||
from capa_de_integracion.services.quick_reply.content import QuickReplyContentService
|
||||
from capa_de_integracion.services.rag import RAGServiceBase
|
||||
|
||||
from .dlp_service import DLPService
|
||||
from .firestore_service import FirestoreService
|
||||
from .quick_reply_content import QuickReplyContentService
|
||||
from .redis_service import RedisService
|
||||
from capa_de_integracion.services.storage.firestore import FirestoreService
|
||||
from capa_de_integracion.services.storage.redis import RedisService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -50,11 +49,18 @@ class ConversationManagerService:
|
||||
|
||||
logger.info("ConversationManagerService initialized successfully")
|
||||
|
||||
async def manage_conversation( # noqa: PLR0915
|
||||
self, request: ConversationRequest,
|
||||
async def manage_conversation(
|
||||
self,
|
||||
request: ConversationRequest,
|
||||
) -> DetectIntentResponse:
|
||||
"""Manage conversation flow and return response.
|
||||
|
||||
Orchestrates:
|
||||
1. Security (DLP obfuscation)
|
||||
2. Session management
|
||||
3. Quick reply path (if applicable)
|
||||
4. Standard RAG path (fallback)
|
||||
|
||||
Args:
|
||||
request: External conversation request from client
|
||||
|
||||
@@ -63,7 +69,7 @@ class ConversationManagerService:
|
||||
|
||||
"""
|
||||
try:
|
||||
# Step 1: DLP obfuscation
|
||||
# Step 1: Apply DLP security
|
||||
obfuscated_message = await self.dlp_service.get_obfuscated_string(
|
||||
request.mensaje,
|
||||
self.settings.dlp_template_complete_flow,
|
||||
@@ -71,162 +77,270 @@ class ConversationManagerService:
|
||||
request.mensaje = obfuscated_message
|
||||
telefono = request.usuario.telefono
|
||||
|
||||
# Step 2. Fetch session in Redis -> Firestore -> Create new session
|
||||
session = await self.redis_service.get_session(telefono)
|
||||
if not session:
|
||||
session = await self.firestore_service.get_session_by_phone(telefono)
|
||||
if not session:
|
||||
session_id = str(uuid4())
|
||||
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
||||
session = await self.firestore_service.create_session(
|
||||
session_id, user_id, telefono,
|
||||
)
|
||||
await self.redis_service.save_session(session)
|
||||
# Step 2: Obtain or create session
|
||||
session = await self._obtain_or_create_session(telefono)
|
||||
|
||||
# Step 2: Check for pantallaContexto in existing session
|
||||
if session.pantalla_contexto:
|
||||
# Check if pantallaContexto is stale (10 minutes)
|
||||
if self._is_pantalla_context_valid(session.last_modified):
|
||||
logger.info(
|
||||
"Detected 'pantallaContexto' in session: %s. "
|
||||
"Delegating to QuickReplies flow.",
|
||||
session.pantalla_contexto,
|
||||
)
|
||||
response = await self._manage_quick_reply_conversation(
|
||||
request, session.pantalla_contexto,
|
||||
)
|
||||
if response:
|
||||
# Save user message to Firestore
|
||||
user_entry = ConversationEntry(
|
||||
entity="user",
|
||||
type="CONVERSACION",
|
||||
timestamp=datetime.now(UTC),
|
||||
text=request.mensaje,
|
||||
parameters=None,
|
||||
canal=getattr(request, "canal", None),
|
||||
)
|
||||
await self.firestore_service.save_entry(
|
||||
session.session_id, user_entry,
|
||||
)
|
||||
|
||||
# Save quick reply response to Firestore
|
||||
response_text = (
|
||||
response.query_result.response_text
|
||||
if response.query_result
|
||||
else ""
|
||||
) or ""
|
||||
assistant_entry = ConversationEntry(
|
||||
entity="assistant",
|
||||
type="CONVERSACION",
|
||||
timestamp=datetime.now(UTC),
|
||||
text=response_text,
|
||||
parameters=None,
|
||||
canal=getattr(request, "canal", None),
|
||||
)
|
||||
await self.firestore_service.save_entry(
|
||||
session.session_id, assistant_entry,
|
||||
)
|
||||
|
||||
# Update session with last message and timestamp
|
||||
session.last_message = response_text
|
||||
session.last_modified = datetime.now(UTC)
|
||||
await self.firestore_service.save_session(session)
|
||||
await self.redis_service.save_session(session)
|
||||
|
||||
return response
|
||||
else:
|
||||
logger.info(
|
||||
"Detected STALE 'pantallaContexto'. "
|
||||
"Ignoring and proceeding with normal flow.",
|
||||
)
|
||||
|
||||
# Step 3: Continue with standard conversation flow
|
||||
nickname = request.usuario.nickname
|
||||
|
||||
logger.info(
|
||||
"Primary Check (Redis): Looking up session for phone: %s",
|
||||
telefono,
|
||||
)
|
||||
|
||||
# Step 3a: Load conversation history from Firestore
|
||||
entries = await self.firestore_service.get_entries(
|
||||
session.session_id,
|
||||
limit=self.settings.conversation_context_message_limit,
|
||||
)
|
||||
logger.info("Loaded %s conversation entries from Firestore", len(entries))
|
||||
|
||||
# Step 3b: Retrieve active notifications for this user
|
||||
notifications = await self._get_active_notifications(telefono)
|
||||
logger.info("Retrieved %s active notifications", len(notifications))
|
||||
|
||||
# Step 3c: Prepare context for RAG service
|
||||
messages = await self._prepare_rag_messages(
|
||||
session=session,
|
||||
entries=entries,
|
||||
notifications=notifications,
|
||||
user_message=request.mensaje,
|
||||
nickname=nickname or "Usuario",
|
||||
)
|
||||
|
||||
# Step 3d: Query RAG service
|
||||
logger.info("Sending query to RAG service")
|
||||
assistant_response = await self.rag_service.query(messages)
|
||||
logger.info(
|
||||
"Received response from RAG service: %s...",
|
||||
assistant_response[:100],
|
||||
)
|
||||
|
||||
# Step 3e: Save user message to Firestore
|
||||
user_entry = ConversationEntry(
|
||||
entity="user",
|
||||
type="CONVERSACION",
|
||||
timestamp=datetime.now(UTC),
|
||||
text=request.mensaje,
|
||||
parameters=None,
|
||||
canal=getattr(request, "canal", None),
|
||||
)
|
||||
await self.firestore_service.save_entry(session.session_id, user_entry)
|
||||
logger.info("Saved user message to Firestore")
|
||||
|
||||
# Step 3f: Save assistant response to Firestore
|
||||
assistant_entry = ConversationEntry(
|
||||
entity="assistant",
|
||||
type="LLM",
|
||||
timestamp=datetime.now(UTC),
|
||||
text=assistant_response,
|
||||
parameters=None,
|
||||
canal=getattr(request, "canal", None),
|
||||
)
|
||||
await self.firestore_service.save_entry(session.session_id, assistant_entry)
|
||||
logger.info("Saved assistant response to Firestore")
|
||||
|
||||
# Step 3g: Update session with last message and timestamp
|
||||
session.last_message = assistant_response
|
||||
session.last_modified = datetime.now(UTC)
|
||||
await self.firestore_service.save_session(session)
|
||||
await self.redis_service.save_session(session)
|
||||
logger.info("Updated session in Firestore and Redis")
|
||||
|
||||
# Step 3h: Mark notifications as processed if any were included
|
||||
if notifications:
|
||||
await self._mark_notifications_as_processed(telefono)
|
||||
logger.info("Marked %s notifications as processed", len(notifications))
|
||||
|
||||
# Step 3i: Return response object
|
||||
return DetectIntentResponse(
|
||||
responseId=str(uuid4()),
|
||||
queryResult=QueryResult(
|
||||
responseText=assistant_response,
|
||||
parameters=None,
|
||||
),
|
||||
quick_replies=None,
|
||||
)
|
||||
# Step 3: Try quick reply path first
|
||||
response = await self._handle_quick_reply_path(request, session)
|
||||
if response:
|
||||
return response
|
||||
|
||||
# Step 4: Fall through to standard conversation path
|
||||
return await self._handle_standard_conversation(request, session)
|
||||
|
||||
except Exception:
|
||||
logger.exception("Error managing conversation")
|
||||
raise
|
||||
|
||||
async def _obtain_or_create_session(self, telefono: str) -> ConversationSession:
|
||||
"""Get existing session or create new one.
|
||||
|
||||
Checks Redis → Firestore → Creates new session with auto-caching.
|
||||
|
||||
Args:
|
||||
telefono: User phone number
|
||||
|
||||
Returns:
|
||||
ConversationSession instance
|
||||
|
||||
"""
|
||||
# Try Redis first
|
||||
session = await self.redis_service.get_session(telefono)
|
||||
if session:
|
||||
return session
|
||||
|
||||
# Try Firestore if Redis miss
|
||||
session = await self.firestore_service.get_session_by_phone(telefono)
|
||||
if session:
|
||||
return session
|
||||
|
||||
# Create new session if both miss
|
||||
session_id = str(uuid4())
|
||||
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
||||
session = await self.firestore_service.create_session(
|
||||
session_id,
|
||||
user_id,
|
||||
telefono,
|
||||
)
|
||||
|
||||
# Auto-cache to Redis
|
||||
await self.redis_service.save_session(session)
|
||||
|
||||
return session
|
||||
|
||||
async def _save_conversation_turn(
|
||||
self,
|
||||
session_id: str,
|
||||
user_text: str,
|
||||
assistant_text: str,
|
||||
entry_type: str,
|
||||
canal: str | None = None,
|
||||
) -> None:
|
||||
"""Save user and assistant messages to Firestore.
|
||||
|
||||
Args:
|
||||
session_id: Session identifier
|
||||
user_text: User message text
|
||||
assistant_text: Assistant response text
|
||||
entry_type: Type of conversation entry ("CONVERSACION" or "LLM")
|
||||
canal: Communication channel
|
||||
|
||||
"""
|
||||
# Save user entry
|
||||
user_entry = ConversationEntry(
|
||||
entity="user",
|
||||
type=entry_type,
|
||||
timestamp=datetime.now(UTC),
|
||||
text=user_text,
|
||||
parameters=None,
|
||||
canal=canal,
|
||||
)
|
||||
await self.firestore_service.save_entry(session_id, user_entry)
|
||||
|
||||
# Save assistant entry
|
||||
assistant_entry = ConversationEntry(
|
||||
entity="assistant",
|
||||
type=entry_type,
|
||||
timestamp=datetime.now(UTC),
|
||||
text=assistant_text,
|
||||
parameters=None,
|
||||
canal=canal,
|
||||
)
|
||||
await self.firestore_service.save_entry(session_id, assistant_entry)
|
||||
|
||||
async def _update_session_after_turn(
|
||||
self,
|
||||
session: ConversationSession,
|
||||
last_message: str,
|
||||
) -> None:
|
||||
"""Update session metadata and sync to storage.
|
||||
|
||||
Updates last_message, last_modified timestamp, and saves to
|
||||
both Firestore and Redis for dual-storage consistency.
|
||||
|
||||
Args:
|
||||
session: Session to update (modified in place)
|
||||
last_message: Latest message text
|
||||
|
||||
"""
|
||||
session.last_message = last_message
|
||||
session.last_modified = datetime.now(UTC)
|
||||
await self.firestore_service.save_session(session)
|
||||
await self.redis_service.save_session(session)
|
||||
|
||||
async def _handle_quick_reply_path(
|
||||
self,
|
||||
request: ConversationRequest,
|
||||
session: ConversationSession,
|
||||
) -> DetectIntentResponse | None:
|
||||
"""Handle conversation when pantalla_contexto is active and valid.
|
||||
|
||||
Args:
|
||||
request: User conversation request
|
||||
session: Current conversation session
|
||||
|
||||
Returns:
|
||||
DetectIntentResponse if handled, None if fall through to standard path
|
||||
|
||||
"""
|
||||
# Check if pantalla_contexto exists
|
||||
if not session.pantalla_contexto:
|
||||
return None
|
||||
|
||||
# Check if pantalla_contexto is stale
|
||||
if not self._is_pantalla_context_valid(session.last_modified):
|
||||
logger.info(
|
||||
"Detected STALE 'pantallaContexto'. "
|
||||
"Ignoring and proceeding with normal flow.",
|
||||
)
|
||||
return None
|
||||
|
||||
logger.info(
|
||||
"Detected 'pantallaContexto' in session: %s. "
|
||||
"Delegating to QuickReplies flow.",
|
||||
session.pantalla_contexto,
|
||||
)
|
||||
|
||||
response = await self._manage_quick_reply_conversation(
|
||||
request,
|
||||
session.pantalla_contexto,
|
||||
)
|
||||
|
||||
if not response:
|
||||
return None
|
||||
|
||||
# Extract response text
|
||||
response_text = (
|
||||
response.query_result.response_text if response.query_result else ""
|
||||
) or ""
|
||||
|
||||
# Save conversation turn
|
||||
await self._save_conversation_turn(
|
||||
session_id=session.session_id,
|
||||
user_text=request.mensaje,
|
||||
assistant_text=response_text,
|
||||
entry_type="CONVERSACION",
|
||||
canal=getattr(request, "canal", None),
|
||||
)
|
||||
|
||||
# Update session
|
||||
await self._update_session_after_turn(session, response_text)
|
||||
|
||||
return response
|
||||
|
||||
async def _handle_standard_conversation(
|
||||
self,
|
||||
request: ConversationRequest,
|
||||
session: ConversationSession,
|
||||
) -> DetectIntentResponse:
|
||||
"""Handle standard RAG-based conversation flow.
|
||||
|
||||
Loads history, notifications, queries RAG service, and persists results.
|
||||
|
||||
Args:
|
||||
request: User conversation request
|
||||
session: Current conversation session
|
||||
|
||||
Returns:
|
||||
DetectIntentResponse with RAG response
|
||||
|
||||
"""
|
||||
telefono = request.usuario.telefono
|
||||
nickname = request.usuario.nickname
|
||||
|
||||
logger.info(
|
||||
"Primary Check (Redis): Looking up session for phone: %s",
|
||||
telefono,
|
||||
)
|
||||
|
||||
# Load conversation history from Firestore
|
||||
entries = await self.firestore_service.get_entries(
|
||||
session.session_id,
|
||||
limit=self.settings.conversation_context_message_limit,
|
||||
)
|
||||
logger.info("Loaded %s conversation entries from Firestore", len(entries))
|
||||
|
||||
# Retrieve active notifications for this user
|
||||
notifications = await self._get_active_notifications(telefono)
|
||||
logger.info("Retrieved %s active notifications", len(notifications))
|
||||
|
||||
# Prepare current user message
|
||||
messages = await self._prepare_rag_messages(request.mensaje)
|
||||
|
||||
# Extract notification texts for RAG
|
||||
notification_texts = (
|
||||
[n.texto for n in notifications if n.texto and n.texto.strip()]
|
||||
if notifications
|
||||
else None
|
||||
)
|
||||
|
||||
# Format conversation history for RAG
|
||||
conversation_history = (
|
||||
self._format_conversation_history(session, entries) if entries else None
|
||||
)
|
||||
|
||||
# Query RAG service with separated fields
|
||||
logger.info("Sending query to RAG service")
|
||||
assistant_response = await self.rag_service.query(
|
||||
messages=messages,
|
||||
notifications=notification_texts,
|
||||
conversation_history=conversation_history,
|
||||
user_nickname=nickname or None,
|
||||
)
|
||||
logger.info(
|
||||
"Received response from RAG service: %s...",
|
||||
assistant_response[:100],
|
||||
)
|
||||
|
||||
# Save conversation turn
|
||||
await self._save_conversation_turn(
|
||||
session_id=session.session_id,
|
||||
user_text=request.mensaje,
|
||||
assistant_text=assistant_response,
|
||||
entry_type="LLM",
|
||||
canal=getattr(request, "canal", None),
|
||||
)
|
||||
logger.info("Saved user message and assistant response to Firestore")
|
||||
|
||||
# Update session
|
||||
await self._update_session_after_turn(session, assistant_response)
|
||||
logger.info("Updated session in Firestore and Redis")
|
||||
|
||||
# Mark notifications as processed if any were included
|
||||
if notifications:
|
||||
await self._mark_notifications_as_processed(telefono)
|
||||
logger.info("Marked %s notifications as processed", len(notifications))
|
||||
|
||||
# Return response object
|
||||
return DetectIntentResponse(
|
||||
responseId=str(uuid4()),
|
||||
queryResult=QueryResult(
|
||||
responseText=assistant_response,
|
||||
parameters=None,
|
||||
),
|
||||
quick_replies=None,
|
||||
)
|
||||
|
||||
def _is_pantalla_context_valid(self, last_modified: datetime) -> bool:
|
||||
"""Check if pantallaContexto is still valid (not stale)."""
|
||||
time_diff = datetime.now(UTC) - last_modified
|
||||
@@ -270,7 +384,6 @@ class ConversationManagerService:
|
||||
quick_replies=quick_reply_screen,
|
||||
)
|
||||
|
||||
|
||||
async def _get_active_notifications(self, telefono: str) -> list:
|
||||
"""Retrieve active notifications for a user from Redis or Firestore.
|
||||
|
||||
@@ -317,66 +430,19 @@ class ConversationManagerService:
|
||||
|
||||
async def _prepare_rag_messages(
|
||||
self,
|
||||
session: ConversationSession,
|
||||
entries: list[ConversationEntry],
|
||||
notifications: list,
|
||||
user_message: str,
|
||||
nickname: str,
|
||||
) -> list[dict[str, str]]:
|
||||
"""Prepare messages in OpenAI format for RAG service.
|
||||
"""Prepare current user message for RAG service.
|
||||
|
||||
Args:
|
||||
session: Current conversation session
|
||||
entries: Conversation history entries
|
||||
notifications: Active notifications
|
||||
user_message: Current user message
|
||||
nickname: User's nickname
|
||||
|
||||
Returns:
|
||||
List of messages in OpenAI format [{"role": "...", "content": "..."}]
|
||||
List with single user message
|
||||
|
||||
"""
|
||||
messages = []
|
||||
|
||||
# Add system message with conversation history if available
|
||||
if entries:
|
||||
conversation_context = self._format_conversation_history(session, entries)
|
||||
if conversation_context:
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
f"Historial de conversación:\n"
|
||||
f"{conversation_context}"
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
# Add system message with notifications if available
|
||||
if notifications:
|
||||
# Simple Pythonic join - no mapper needed!
|
||||
notifications_text = "\n".join(
|
||||
n.texto for n in notifications if n.texto and n.texto.strip()
|
||||
)
|
||||
if notifications_text:
|
||||
messages.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
f"Notificaciones pendientes para el usuario:\n"
|
||||
f"{notifications_text}"
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
# Add system message with user context
|
||||
user_context = f"Usuario: {nickname}" if nickname else "Usuario anónimo"
|
||||
messages.append({"role": "system", "content": user_context})
|
||||
|
||||
# Add current user message
|
||||
messages.append({"role": "user", "content": user_message})
|
||||
|
||||
return messages
|
||||
# Only include the current user message - no system messages
|
||||
return [{"role": "user", "content": user_message}]
|
||||
|
||||
async def _mark_notifications_as_processed(self, telefono: str) -> None:
|
||||
"""Mark all notifications for a user as processed.
|
||||
@@ -388,7 +454,8 @@ class ConversationManagerService:
|
||||
try:
|
||||
# Update status in Firestore
|
||||
await self.firestore_service.update_notification_status(
|
||||
telefono, "processed",
|
||||
telefono,
|
||||
"processed",
|
||||
)
|
||||
|
||||
# Update or delete from Redis
|
||||
@@ -140,7 +140,6 @@ class DLPService:
|
||||
# Clean up consecutive DIRECCION tags
|
||||
return self._clean_direccion(text)
|
||||
|
||||
|
||||
def _get_replacement(self, info_type: str, quote: str) -> str | None:
|
||||
"""Get replacement text for a given info type.
|
||||
|
||||
@@ -9,10 +9,9 @@ from capa_de_integracion.models.notification import (
|
||||
ExternalNotificationRequest,
|
||||
Notification,
|
||||
)
|
||||
|
||||
from .dlp_service import DLPService
|
||||
from .firestore_service import FirestoreService
|
||||
from .redis_service import RedisService
|
||||
from capa_de_integracion.services.dlp import DLPService
|
||||
from capa_de_integracion.services.storage.firestore import FirestoreService
|
||||
from capa_de_integracion.services.storage.redis import RedisService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -53,7 +52,8 @@ class NotificationManagerService:
|
||||
logger.info("NotificationManagerService initialized")
|
||||
|
||||
async def process_notification(
|
||||
self, external_request: ExternalNotificationRequest,
|
||||
self,
|
||||
external_request: ExternalNotificationRequest,
|
||||
) -> None:
|
||||
"""Process a push notification from external system.
|
||||
|
||||
9
src/capa_de_integracion/services/quick_reply/__init__.py
Normal file
9
src/capa_de_integracion/services/quick_reply/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
"""Quick reply services."""
|
||||
|
||||
from capa_de_integracion.services.quick_reply.content import QuickReplyContentService
|
||||
from capa_de_integracion.services.quick_reply.session import QuickReplySessionService
|
||||
|
||||
__all__ = [
|
||||
"QuickReplyContentService",
|
||||
"QuickReplySessionService",
|
||||
]
|
||||
161
src/capa_de_integracion/services/quick_reply/content.py
Normal file
161
src/capa_de_integracion/services/quick_reply/content.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Quick reply content service for loading FAQ screens."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from capa_de_integracion.config import Settings
|
||||
from capa_de_integracion.models.quick_replies import (
|
||||
QuickReplyQuestions,
|
||||
QuickReplyScreen,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class QuickReplyContentService:
|
||||
"""Service for loading quick reply screen content from JSON files."""
|
||||
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
"""Initialize quick reply content service.
|
||||
|
||||
Args:
|
||||
settings: Application settings
|
||||
|
||||
"""
|
||||
self.settings = settings
|
||||
self.quick_replies_path = settings.base_path / "quick_replies"
|
||||
self._cache: dict[str, QuickReplyScreen] = {}
|
||||
|
||||
logger.info(
|
||||
"QuickReplyContentService initialized with path: %s",
|
||||
self.quick_replies_path,
|
||||
)
|
||||
|
||||
# Preload all quick reply files into memory
|
||||
self._preload_cache()
|
||||
|
||||
def _validate_file(self, file_path: Path, screen_id: str) -> None:
|
||||
"""Validate that the quick reply file exists."""
|
||||
if not file_path.exists():
|
||||
logger.warning("Quick reply file not found: %s", file_path)
|
||||
msg = f"Quick reply file not found for screen_id: {screen_id}"
|
||||
raise ValueError(msg)
|
||||
|
||||
def _parse_quick_reply_data(self, data: dict) -> QuickReplyScreen:
|
||||
"""Parse JSON data into QuickReplyScreen model.
|
||||
|
||||
Args:
|
||||
data: JSON data dictionary
|
||||
|
||||
Returns:
|
||||
Parsed QuickReplyScreen object
|
||||
|
||||
"""
|
||||
preguntas_data = data.get("preguntas", [])
|
||||
preguntas = [
|
||||
QuickReplyQuestions(
|
||||
titulo=q.get("titulo", ""),
|
||||
descripcion=q.get("descripcion"),
|
||||
respuesta=q.get("respuesta", ""),
|
||||
)
|
||||
for q in preguntas_data
|
||||
]
|
||||
|
||||
return QuickReplyScreen(
|
||||
header=data.get("header"),
|
||||
body=data.get("body"),
|
||||
button=data.get("button"),
|
||||
header_section=data.get("header_section"),
|
||||
preguntas=preguntas,
|
||||
)
|
||||
|
||||
def _preload_cache(self) -> None:
|
||||
"""Preload all quick reply files into memory cache at startup.
|
||||
|
||||
This method runs synchronously at initialization to load all
|
||||
quick reply JSON files. Blocking here is acceptable since it
|
||||
only happens once at startup.
|
||||
|
||||
"""
|
||||
if not self.quick_replies_path.exists():
|
||||
logger.warning(
|
||||
"Quick replies directory not found: %s",
|
||||
self.quick_replies_path,
|
||||
)
|
||||
return
|
||||
|
||||
loaded_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for file_path in self.quick_replies_path.glob("*.json"):
|
||||
screen_id = file_path.stem
|
||||
try:
|
||||
# Blocking I/O is OK at startup
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
data = json.loads(content)
|
||||
quick_reply = self._parse_quick_reply_data(data)
|
||||
|
||||
self._cache[screen_id] = quick_reply
|
||||
loaded_count += 1
|
||||
|
||||
logger.debug(
|
||||
"Cached %s quick replies for screen: %s",
|
||||
len(quick_reply.preguntas),
|
||||
screen_id,
|
||||
)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logger.exception("Invalid JSON in file: %s", file_path)
|
||||
failed_count += 1
|
||||
except Exception:
|
||||
logger.exception("Failed to load quick reply file: %s", file_path)
|
||||
failed_count += 1
|
||||
|
||||
logger.info(
|
||||
"Quick reply cache initialized: %s screens loaded, %s failed",
|
||||
loaded_count,
|
||||
failed_count,
|
||||
)
|
||||
|
||||
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
|
||||
"""Get quick reply screen content by ID from in-memory cache.
|
||||
|
||||
This method is non-blocking as it retrieves data from the
|
||||
in-memory cache populated at startup.
|
||||
|
||||
Args:
|
||||
screen_id: Screen identifier (e.g., "pagos", "home")
|
||||
|
||||
Returns:
|
||||
Quick reply screen data
|
||||
|
||||
Raises:
|
||||
ValueError: If the quick reply is not found in cache
|
||||
|
||||
"""
|
||||
if not screen_id or not screen_id.strip():
|
||||
logger.warning("screen_id is null or empty. Returning empty quick replies")
|
||||
return QuickReplyScreen(
|
||||
header="empty",
|
||||
body=None,
|
||||
button=None,
|
||||
header_section=None,
|
||||
preguntas=[],
|
||||
)
|
||||
|
||||
# Non-blocking: just a dictionary lookup
|
||||
quick_reply = self._cache.get(screen_id)
|
||||
|
||||
if quick_reply is None:
|
||||
logger.warning("Quick reply not found in cache for screen: %s", screen_id)
|
||||
msg = f"Quick reply not found for screen_id: {screen_id}"
|
||||
raise ValueError(msg)
|
||||
|
||||
logger.info(
|
||||
"Retrieved %s quick replies for screen: %s from cache",
|
||||
len(quick_reply.preguntas),
|
||||
screen_id,
|
||||
)
|
||||
|
||||
return quick_reply
|
||||
@@ -4,9 +4,9 @@ import logging
|
||||
from uuid import uuid4
|
||||
|
||||
from capa_de_integracion.models.quick_replies import QuickReplyScreen
|
||||
from capa_de_integracion.services.firestore_service import FirestoreService
|
||||
from capa_de_integracion.services.quick_reply_content import QuickReplyContentService
|
||||
from capa_de_integracion.services.redis_service import RedisService
|
||||
from capa_de_integracion.services.quick_reply.content import QuickReplyContentService
|
||||
from capa_de_integracion.services.storage.firestore import FirestoreService
|
||||
from capa_de_integracion.services.storage.redis import RedisService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -92,14 +92,18 @@ class QuickReplySessionService:
|
||||
if session:
|
||||
session_id = session.session_id
|
||||
await self.firestore_service.update_pantalla_contexto(
|
||||
session_id, pantalla_contexto,
|
||||
session_id,
|
||||
pantalla_contexto,
|
||||
)
|
||||
session.pantalla_contexto = pantalla_contexto
|
||||
else:
|
||||
session_id = str(uuid4())
|
||||
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
||||
session = await self.firestore_service.create_session(
|
||||
session_id, user_id, telefono, pantalla_contexto,
|
||||
session_id,
|
||||
user_id,
|
||||
telefono,
|
||||
pantalla_contexto,
|
||||
)
|
||||
|
||||
# Cache session in Redis
|
||||
@@ -1,106 +0,0 @@
|
||||
"""Quick reply content service for loading FAQ screens."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from capa_de_integracion.config import Settings
|
||||
from capa_de_integracion.models.quick_replies import (
|
||||
QuickReplyQuestions,
|
||||
QuickReplyScreen,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class QuickReplyContentService:
|
||||
"""Service for loading quick reply screen content from JSON files."""
|
||||
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
"""Initialize quick reply content service.
|
||||
|
||||
Args:
|
||||
settings: Application settings
|
||||
|
||||
"""
|
||||
self.settings = settings
|
||||
self.quick_replies_path = settings.base_path / "quick_replies"
|
||||
|
||||
logger.info(
|
||||
"QuickReplyContentService initialized with path: %s",
|
||||
self.quick_replies_path,
|
||||
)
|
||||
|
||||
def _validate_file(self, file_path: Path, screen_id: str) -> None:
|
||||
"""Validate that the quick reply file exists."""
|
||||
if not file_path.exists():
|
||||
logger.warning("Quick reply file not found: %s", file_path)
|
||||
msg = f"Quick reply file not found for screen_id: {screen_id}"
|
||||
raise ValueError(msg)
|
||||
|
||||
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
|
||||
"""Load quick reply screen content by ID.
|
||||
|
||||
Args:
|
||||
screen_id: Screen identifier (e.g., "pagos", "home")
|
||||
|
||||
Returns:
|
||||
Quick reply DTO
|
||||
|
||||
Raises:
|
||||
ValueError: If the quick reply file is not found
|
||||
|
||||
"""
|
||||
if not screen_id or not screen_id.strip():
|
||||
logger.warning("screen_id is null or empty. Returning empty quick replies")
|
||||
return QuickReplyScreen(
|
||||
header="empty",
|
||||
body=None,
|
||||
button=None,
|
||||
header_section=None,
|
||||
preguntas=[],
|
||||
)
|
||||
|
||||
file_path = self.quick_replies_path / f"{screen_id}.json"
|
||||
|
||||
try:
|
||||
self._validate_file(file_path, screen_id)
|
||||
|
||||
# Use Path.read_text() for async-friendly file reading
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
data = json.loads(content)
|
||||
|
||||
# Parse questions
|
||||
preguntas_data = data.get("preguntas", [])
|
||||
preguntas = [
|
||||
QuickReplyQuestions(
|
||||
titulo=q.get("titulo", ""),
|
||||
descripcion=q.get("descripcion"),
|
||||
respuesta=q.get("respuesta", ""),
|
||||
)
|
||||
for q in preguntas_data
|
||||
]
|
||||
|
||||
quick_reply = QuickReplyScreen(
|
||||
header=data.get("header"),
|
||||
body=data.get("body"),
|
||||
button=data.get("button"),
|
||||
header_section=data.get("header_section"),
|
||||
preguntas=preguntas,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Successfully loaded %s quick replies for screen: %s",
|
||||
len(preguntas),
|
||||
screen_id,
|
||||
)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.exception("Error parsing JSON file %s", file_path)
|
||||
msg = f"Invalid JSON format in quick reply file for screen_id: {screen_id}"
|
||||
raise ValueError(msg) from e
|
||||
except Exception as e:
|
||||
logger.exception("Error loading quick replies for screen %s", screen_id)
|
||||
msg = f"Error loading quick replies for screen_id: {screen_id}"
|
||||
raise ValueError(msg) from e
|
||||
else:
|
||||
return quick_reply
|
||||
@@ -17,7 +17,22 @@ class Message(BaseModel):
|
||||
class RAGRequest(BaseModel):
|
||||
"""Request model for RAG endpoint."""
|
||||
|
||||
messages: list[Message] = Field(..., description="Conversation history")
|
||||
messages: list[Message] = Field(
|
||||
...,
|
||||
description="Current conversation messages (user and assistant only)",
|
||||
)
|
||||
notifications: list[str] | None = Field(
|
||||
default=None,
|
||||
description="Active notifications for the user",
|
||||
)
|
||||
conversation_history: str | None = Field(
|
||||
default=None,
|
||||
description="Formatted conversation history",
|
||||
)
|
||||
user_nickname: str | None = Field(
|
||||
default=None,
|
||||
description="User's nickname or display name",
|
||||
)
|
||||
|
||||
|
||||
class RAGResponse(BaseModel):
|
||||
@@ -34,12 +49,21 @@ class RAGServiceBase(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def query(self, messages: list[dict[str, str]]) -> str:
|
||||
"""Send conversation history to RAG endpoint and get response.
|
||||
async def query(
|
||||
self,
|
||||
messages: list[dict[str, str]],
|
||||
notifications: list[str] | None = None,
|
||||
conversation_history: str | None = None,
|
||||
user_nickname: str | None = None,
|
||||
) -> str:
|
||||
"""Send conversation to RAG endpoint and get response.
|
||||
|
||||
Args:
|
||||
messages: OpenAI-style conversation history
|
||||
messages: Current conversation messages (user/assistant only)
|
||||
e.g., [{"role": "user", "content": "Hello"}, ...]
|
||||
notifications: Active notifications for the user (optional)
|
||||
conversation_history: Formatted conversation history (optional)
|
||||
user_nickname: User's nickname or display name (optional)
|
||||
|
||||
Returns:
|
||||
Response string from RAG endpoint
|
||||
|
||||
@@ -28,12 +28,21 @@ class EchoRAGService(RAGServiceBase):
|
||||
self.prefix = prefix
|
||||
logger.info("EchoRAGService initialized with prefix: %r", prefix)
|
||||
|
||||
async def query(self, messages: list[dict[str, str]]) -> str:
|
||||
async def query(
|
||||
self,
|
||||
messages: list[dict[str, str]],
|
||||
notifications: list[str] | None = None, # noqa: ARG002
|
||||
conversation_history: str | None = None, # noqa: ARG002
|
||||
user_nickname: str | None = None, # noqa: ARG002
|
||||
) -> str:
|
||||
"""Echo back the last user message with a prefix.
|
||||
|
||||
Args:
|
||||
messages: OpenAI-style conversation history
|
||||
messages: Current conversation messages (user/assistant only)
|
||||
e.g., [{"role": "user", "content": "Hello"}, ...]
|
||||
notifications: Active notifications for the user (optional, ignored)
|
||||
conversation_history: Formatted conversation history (optional, ignored)
|
||||
user_nickname: User's nickname or display name (optional, ignored)
|
||||
|
||||
Returns:
|
||||
The last user message content with prefix
|
||||
|
||||
@@ -61,12 +61,21 @@ class HTTPRAGService(RAGServiceBase):
|
||||
timeout,
|
||||
)
|
||||
|
||||
async def query(self, messages: list[dict[str, str]]) -> str:
|
||||
"""Send conversation history to RAG endpoint and get response.
|
||||
async def query(
|
||||
self,
|
||||
messages: list[dict[str, str]],
|
||||
notifications: list[str] | None = None,
|
||||
conversation_history: str | None = None,
|
||||
user_nickname: str | None = None,
|
||||
) -> str:
|
||||
"""Send conversation to RAG endpoint and get response.
|
||||
|
||||
Args:
|
||||
messages: OpenAI-style conversation history
|
||||
messages: Current conversation messages (user/assistant only)
|
||||
e.g., [{"role": "user", "content": "Hello"}, ...]
|
||||
notifications: Active notifications for the user (optional)
|
||||
conversation_history: Formatted conversation history (optional)
|
||||
user_nickname: User's nickname or display name (optional)
|
||||
|
||||
Returns:
|
||||
Response string from RAG endpoint
|
||||
@@ -79,10 +88,22 @@ class HTTPRAGService(RAGServiceBase):
|
||||
try:
|
||||
# Validate and construct request
|
||||
message_objects = [Message(**msg) for msg in messages]
|
||||
request = RAGRequest(messages=message_objects)
|
||||
request = RAGRequest(
|
||||
messages=message_objects,
|
||||
notifications=notifications,
|
||||
conversation_history=conversation_history,
|
||||
user_nickname=user_nickname,
|
||||
)
|
||||
|
||||
# Make async HTTP POST request
|
||||
logger.debug("Sending RAG request with %s messages", len(messages))
|
||||
logger.debug(
|
||||
"Sending RAG request with %s messages, %s notifications, "
|
||||
"history: %s, user: %s",
|
||||
len(messages),
|
||||
len(notifications) if notifications else 0,
|
||||
"yes" if conversation_history else "no",
|
||||
user_nickname or "anonymous",
|
||||
)
|
||||
|
||||
response = await self._client.post(
|
||||
self.endpoint_url,
|
||||
|
||||
9
src/capa_de_integracion/services/storage/__init__.py
Normal file
9
src/capa_de_integracion/services/storage/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
"""Storage services."""
|
||||
|
||||
from capa_de_integracion.services.storage.firestore import FirestoreService
|
||||
from capa_de_integracion.services.storage.redis import RedisService
|
||||
|
||||
__all__ = [
|
||||
"FirestoreService",
|
||||
"RedisService",
|
||||
]
|
||||
@@ -183,7 +183,9 @@ class FirestoreService:
|
||||
return True
|
||||
|
||||
async def get_entries(
|
||||
self, session_id: str, limit: int = 10,
|
||||
self,
|
||||
session_id: str,
|
||||
limit: int = 10,
|
||||
) -> list[ConversationEntry]:
|
||||
"""Retrieve recent conversation entries from Firestore."""
|
||||
try:
|
||||
@@ -192,7 +194,8 @@ class FirestoreService:
|
||||
|
||||
# Get entries ordered by timestamp descending
|
||||
query = entries_ref.order_by(
|
||||
"timestamp", direction=firestore.Query.DESCENDING,
|
||||
"timestamp",
|
||||
direction=firestore.Query.DESCENDING,
|
||||
).limit(limit)
|
||||
|
||||
docs = query.stream()
|
||||
@@ -206,7 +209,9 @@ class FirestoreService:
|
||||
# Reverse to get chronological order
|
||||
entries.reverse()
|
||||
logger.debug(
|
||||
"Retrieved %s entries for session: %s", len(entries), session_id,
|
||||
"Retrieved %s entries for session: %s",
|
||||
len(entries),
|
||||
session_id,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
@@ -240,7 +245,9 @@ class FirestoreService:
|
||||
return True
|
||||
|
||||
async def update_pantalla_contexto(
|
||||
self, session_id: str, pantalla_contexto: str | None,
|
||||
self,
|
||||
session_id: str,
|
||||
pantalla_contexto: str | None,
|
||||
) -> bool:
|
||||
"""Update the pantallaContexto field for a conversation session.
|
||||
|
||||
@@ -286,7 +293,8 @@ class FirestoreService:
|
||||
# ====== Notification Methods ======
|
||||
|
||||
def _notification_ref(
|
||||
self, notification_id: str,
|
||||
self,
|
||||
notification_id: str,
|
||||
) -> firestore.AsyncDocumentReference:
|
||||
"""Get Firestore document reference for notification."""
|
||||
return self.db.collection(self.notifications_collection).document(
|
||||
@@ -329,7 +329,8 @@ class RedisService:
|
||||
return True
|
||||
|
||||
async def get_notification_session(
|
||||
self, session_id: str,
|
||||
self,
|
||||
session_id: str,
|
||||
) -> NotificationSession | None:
|
||||
"""Retrieve notification session from Redis."""
|
||||
if not self.redis:
|
||||
Reference in New Issue
Block a user