Files
int-layer/src/capa_de_integracion/services/quick_reply_content.py
2026-02-20 08:43:08 +00:00

107 lines
3.4 KiB
Python

"""Quick reply content service for loading FAQ screens."""
import json
import logging
from pathlib import Path
from capa_de_integracion.config import Settings
from capa_de_integracion.models.quick_replies import (
QuickReplyQuestions,
QuickReplyScreen,
)
logger = logging.getLogger(__name__)
class QuickReplyContentService:
"""Service for loading quick reply screen content from JSON files."""
def __init__(self, settings: Settings) -> None:
"""Initialize quick reply content service.
Args:
settings: Application settings
"""
self.settings = settings
self.quick_replies_path = settings.base_path / "quick_replies"
logger.info(
"QuickReplyContentService initialized with path: %s",
self.quick_replies_path,
)
def _validate_file(self, file_path: Path, screen_id: str) -> None:
"""Validate that the quick reply file exists."""
if not file_path.exists():
logger.warning("Quick reply file not found: %s", file_path)
msg = f"Quick reply file not found for screen_id: {screen_id}"
raise ValueError(msg)
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
"""Load quick reply screen content by ID.
Args:
screen_id: Screen identifier (e.g., "pagos", "home")
Returns:
Quick reply DTO
Raises:
ValueError: If the quick reply file is not found
"""
if not screen_id or not screen_id.strip():
logger.warning("screen_id is null or empty. Returning empty quick replies")
return QuickReplyScreen(
header="empty",
body=None,
button=None,
header_section=None,
preguntas=[],
)
file_path = self.quick_replies_path / f"{screen_id}.json"
try:
self._validate_file(file_path, screen_id)
# Use Path.read_text() for async-friendly file reading
content = file_path.read_text(encoding="utf-8")
data = json.loads(content)
# Parse questions
preguntas_data = data.get("preguntas", [])
preguntas = [
QuickReplyQuestions(
titulo=q.get("titulo", ""),
descripcion=q.get("descripcion"),
respuesta=q.get("respuesta", ""),
)
for q in preguntas_data
]
quick_reply = QuickReplyScreen(
header=data.get("header"),
body=data.get("body"),
button=data.get("button"),
header_section=data.get("header_section"),
preguntas=preguntas,
)
logger.info(
"Successfully loaded %s quick replies for screen: %s",
len(preguntas),
screen_id,
)
except json.JSONDecodeError as e:
logger.exception("Error parsing JSON file %s", file_path)
msg = f"Invalid JSON format in quick reply file for screen_id: {screen_id}"
raise ValueError(msg) from e
except Exception as e:
logger.exception("Error loading quick replies for screen %s", screen_id)
msg = f"Error loading quick replies for screen_id: {screen_id}"
raise ValueError(msg) from e
else:
return quick_reply