Misc improvements

This commit is contained in:
2026-02-20 06:59:31 +00:00
parent 734cade8d9
commit dba4122653
33 changed files with 1844 additions and 420 deletions

View File

@@ -18,17 +18,20 @@ class Settings(BaseSettings):
# RAG
rag_endpoint_url: str
rag_echo_enabled: bool = Field(
default=False, alias="RAG_ECHO_ENABLED",
default=False,
alias="RAG_ECHO_ENABLED",
)
# Firestore
firestore_database_id: str = Field(..., alias="GCP_FIRESTORE_DATABASE_ID")
firestore_host: str = Field(
default="firestore.googleapis.com", alias="GCP_FIRESTORE_HOST",
default="firestore.googleapis.com",
alias="GCP_FIRESTORE_HOST",
)
firestore_port: int = Field(default=443, alias="GCP_FIRESTORE_PORT")
firestore_importer_enabled: bool = Field(
default=False, alias="GCP_FIRESTORE_IMPORTER_ENABLE",
default=False,
alias="GCP_FIRESTORE_IMPORTER_ENABLE",
)
# Redis
@@ -41,10 +44,12 @@ class Settings(BaseSettings):
# Conversation Context
conversation_context_message_limit: int = Field(
default=60, alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT",
default=60,
alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT",
)
conversation_context_days_limit: int = Field(
default=30, alias="CONVERSATION_CONTEXT_DAYS_LIMIT",
default=30,
alias="CONVERSATION_CONTEXT_DAYS_LIMIT",
)
# Logging

View File

@@ -14,10 +14,9 @@ from .services import (
DLPService,
NotificationManagerService,
QuickReplyContentService,
QuickReplySessionService,
)
from .services.firestore_service import FirestoreService
from .services.quick_reply_session_service import QuickReplySessionService
from .services.redis_service import RedisService
from .services.storage import FirestoreService, RedisService
@lru_cache(maxsize=1)

View File

@@ -51,7 +51,8 @@ class ConversationEntry(BaseModel):
entity: Literal["user", "assistant"]
type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM"
timestamp: datetime = Field(
default_factory=lambda: datetime.now(UTC), alias="timestamp",
default_factory=lambda: datetime.now(UTC),
alias="timestamp",
)
text: str = Field(..., alias="text")
parameters: dict[str, Any] | None = Field(None, alias="parameters")
@@ -67,10 +68,12 @@ class ConversationSession(BaseModel):
user_id: str = Field(..., alias="userId")
telefono: str = Field(..., alias="telefono")
created_at: datetime = Field(
default_factory=lambda: datetime.now(UTC), alias="createdAt",
default_factory=lambda: datetime.now(UTC),
alias="createdAt",
)
last_modified: datetime = Field(
default_factory=lambda: datetime.now(UTC), alias="lastModified",
default_factory=lambda: datetime.now(UTC),
alias="lastModified",
)
last_message: str | None = Field(None, alias="lastMessage")
pantalla_contexto: str | None = Field(None, alias="pantallaContexto")

View File

@@ -13,7 +13,9 @@ class Notification(BaseModel):
"""
id_notificacion: str = Field(
..., alias="idNotificacion", description="Unique notification ID",
...,
alias="idNotificacion",
description="Unique notification ID",
)
telefono: str = Field(..., alias="telefono", description="User phone number")
timestamp_creacion: datetime = Field(
@@ -38,7 +40,9 @@ class Notification(BaseModel):
description="Session parameters for Dialogflow",
)
status: str = Field(
default="active", alias="status", description="Notification status",
default="active",
alias="status",
description="Notification status",
)
model_config = {"populate_by_name": True}
@@ -69,16 +73,18 @@ class Notification(BaseModel):
New Notification instance with current timestamp
"""
return cls.model_validate({
"idNotificacion": id_notificacion,
"telefono": telefono,
"timestampCreacion": datetime.now(UTC),
"texto": texto,
"nombreEventoDialogflow": nombre_evento_dialogflow,
"codigoIdiomaDialogflow": codigo_idioma_dialogflow,
"parametros": parametros or {},
"status": status,
})
return cls.model_validate(
{
"idNotificacion": id_notificacion,
"telefono": telefono,
"timestampCreacion": datetime.now(UTC),
"texto": texto,
"nombreEventoDialogflow": nombre_evento_dialogflow,
"codigoIdiomaDialogflow": codigo_idioma_dialogflow,
"parametros": parametros or {},
"status": status,
}
)
class NotificationSession(BaseModel):
@@ -111,7 +117,9 @@ class ExternalNotificationRequest(BaseModel):
texto: str = Field(..., min_length=1)
telefono: str = Field(..., alias="telefono", description="User phone number")
parametros_ocultos: dict[str, Any] | None = Field(
None, alias="parametrosOcultos", description="Hidden parameters (metadata)",
None,
alias="parametrosOcultos",
description="Hidden parameters (metadata)",
)
model_config = {"populate_by_name": True}

View File

@@ -17,9 +17,12 @@ router = APIRouter(prefix="/api/v1/dialogflow", tags=["conversation"])
@router.post("/detect-intent")
async def detect_intent(
request: ConversationRequest,
conversation_manager: Annotated[ConversationManagerService, Depends(
get_conversation_manager,
)],
conversation_manager: Annotated[
ConversationManagerService,
Depends(
get_conversation_manager,
),
],
) -> DetectIntentResponse:
"""Detect user intent and manage conversation.

View File

@@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException
from capa_de_integracion.dependencies import get_notification_manager
from capa_de_integracion.models.notification import ExternalNotificationRequest
from capa_de_integracion.services.notification_manager import NotificationManagerService
from capa_de_integracion.services import NotificationManagerService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/dialogflow", tags=["notifications"])
@@ -16,9 +16,12 @@ router = APIRouter(prefix="/api/v1/dialogflow", tags=["notifications"])
@router.post("/notification", status_code=200)
async def process_notification(
request: ExternalNotificationRequest,
notification_manager: Annotated[NotificationManagerService, Depends(
get_notification_manager,
)],
notification_manager: Annotated[
NotificationManagerService,
Depends(
get_notification_manager,
),
],
) -> None:
"""Process push notification from external system.

View File

@@ -10,9 +10,7 @@ from capa_de_integracion.dependencies import (
get_quick_reply_session_service,
)
from capa_de_integracion.models.quick_replies import QuickReplyScreen
from capa_de_integracion.services.quick_reply_session_service import (
QuickReplySessionService,
)
from capa_de_integracion.services import QuickReplySessionService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"])
@@ -45,7 +43,8 @@ class QuickReplyScreenResponse(BaseModel):
async def start_quick_reply_session(
request: QuickReplyScreenRequest,
quick_reply_session_service: Annotated[
QuickReplySessionService, Depends(get_quick_reply_session_service),
QuickReplySessionService,
Depends(get_quick_reply_session_service),
],
) -> QuickReplyScreenResponse:
"""Start a quick reply FAQ session for a specific screen.

View File

@@ -1,10 +1,10 @@
"""Services module."""
from .conversation_manager import ConversationManagerService
from .dlp_service import DLPService
from .notification_manager import NotificationManagerService
from .quick_reply_content import QuickReplyContentService
from .quick_reply_session_service import QuickReplySessionService
from capa_de_integracion.services.conversation import ConversationManagerService
from capa_de_integracion.services.dlp import DLPService
from capa_de_integracion.services.notifications import NotificationManagerService
from capa_de_integracion.services.quick_reply.content import QuickReplyContentService
from capa_de_integracion.services.quick_reply.session import QuickReplySessionService
__all__ = [
"ConversationManagerService",

View File

@@ -14,12 +14,11 @@ from capa_de_integracion.models import (
QueryResult,
)
from capa_de_integracion.models.notification import NotificationSession
from capa_de_integracion.services.dlp import DLPService
from capa_de_integracion.services.quick_reply.content import QuickReplyContentService
from capa_de_integracion.services.rag import RAGServiceBase
from .dlp_service import DLPService
from .firestore_service import FirestoreService
from .quick_reply_content import QuickReplyContentService
from .redis_service import RedisService
from capa_de_integracion.services.storage.firestore import FirestoreService
from capa_de_integracion.services.storage.redis import RedisService
logger = logging.getLogger(__name__)
@@ -50,11 +49,18 @@ class ConversationManagerService:
logger.info("ConversationManagerService initialized successfully")
async def manage_conversation( # noqa: PLR0915
self, request: ConversationRequest,
async def manage_conversation(
self,
request: ConversationRequest,
) -> DetectIntentResponse:
"""Manage conversation flow and return response.
Orchestrates:
1. Security (DLP obfuscation)
2. Session management
3. Quick reply path (if applicable)
4. Standard RAG path (fallback)
Args:
request: External conversation request from client
@@ -63,7 +69,7 @@ class ConversationManagerService:
"""
try:
# Step 1: DLP obfuscation
# Step 1: Apply DLP security
obfuscated_message = await self.dlp_service.get_obfuscated_string(
request.mensaje,
self.settings.dlp_template_complete_flow,
@@ -71,162 +77,270 @@ class ConversationManagerService:
request.mensaje = obfuscated_message
telefono = request.usuario.telefono
# Step 2. Fetch session in Redis -> Firestore -> Create new session
session = await self.redis_service.get_session(telefono)
if not session:
session = await self.firestore_service.get_session_by_phone(telefono)
if not session:
session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
session = await self.firestore_service.create_session(
session_id, user_id, telefono,
)
await self.redis_service.save_session(session)
# Step 2: Obtain or create session
session = await self._obtain_or_create_session(telefono)
# Step 2: Check for pantallaContexto in existing session
if session.pantalla_contexto:
# Check if pantallaContexto is stale (10 minutes)
if self._is_pantalla_context_valid(session.last_modified):
logger.info(
"Detected 'pantallaContexto' in session: %s. "
"Delegating to QuickReplies flow.",
session.pantalla_contexto,
)
response = await self._manage_quick_reply_conversation(
request, session.pantalla_contexto,
)
if response:
# Save user message to Firestore
user_entry = ConversationEntry(
entity="user",
type="CONVERSACION",
timestamp=datetime.now(UTC),
text=request.mensaje,
parameters=None,
canal=getattr(request, "canal", None),
)
await self.firestore_service.save_entry(
session.session_id, user_entry,
)
# Save quick reply response to Firestore
response_text = (
response.query_result.response_text
if response.query_result
else ""
) or ""
assistant_entry = ConversationEntry(
entity="assistant",
type="CONVERSACION",
timestamp=datetime.now(UTC),
text=response_text,
parameters=None,
canal=getattr(request, "canal", None),
)
await self.firestore_service.save_entry(
session.session_id, assistant_entry,
)
# Update session with last message and timestamp
session.last_message = response_text
session.last_modified = datetime.now(UTC)
await self.firestore_service.save_session(session)
await self.redis_service.save_session(session)
return response
else:
logger.info(
"Detected STALE 'pantallaContexto'. "
"Ignoring and proceeding with normal flow.",
)
# Step 3: Continue with standard conversation flow
nickname = request.usuario.nickname
logger.info(
"Primary Check (Redis): Looking up session for phone: %s",
telefono,
)
# Step 3a: Load conversation history from Firestore
entries = await self.firestore_service.get_entries(
session.session_id,
limit=self.settings.conversation_context_message_limit,
)
logger.info("Loaded %s conversation entries from Firestore", len(entries))
# Step 3b: Retrieve active notifications for this user
notifications = await self._get_active_notifications(telefono)
logger.info("Retrieved %s active notifications", len(notifications))
# Step 3c: Prepare context for RAG service
messages = await self._prepare_rag_messages(
session=session,
entries=entries,
notifications=notifications,
user_message=request.mensaje,
nickname=nickname or "Usuario",
)
# Step 3d: Query RAG service
logger.info("Sending query to RAG service")
assistant_response = await self.rag_service.query(messages)
logger.info(
"Received response from RAG service: %s...",
assistant_response[:100],
)
# Step 3e: Save user message to Firestore
user_entry = ConversationEntry(
entity="user",
type="CONVERSACION",
timestamp=datetime.now(UTC),
text=request.mensaje,
parameters=None,
canal=getattr(request, "canal", None),
)
await self.firestore_service.save_entry(session.session_id, user_entry)
logger.info("Saved user message to Firestore")
# Step 3f: Save assistant response to Firestore
assistant_entry = ConversationEntry(
entity="assistant",
type="LLM",
timestamp=datetime.now(UTC),
text=assistant_response,
parameters=None,
canal=getattr(request, "canal", None),
)
await self.firestore_service.save_entry(session.session_id, assistant_entry)
logger.info("Saved assistant response to Firestore")
# Step 3g: Update session with last message and timestamp
session.last_message = assistant_response
session.last_modified = datetime.now(UTC)
await self.firestore_service.save_session(session)
await self.redis_service.save_session(session)
logger.info("Updated session in Firestore and Redis")
# Step 3h: Mark notifications as processed if any were included
if notifications:
await self._mark_notifications_as_processed(telefono)
logger.info("Marked %s notifications as processed", len(notifications))
# Step 3i: Return response object
return DetectIntentResponse(
responseId=str(uuid4()),
queryResult=QueryResult(
responseText=assistant_response,
parameters=None,
),
quick_replies=None,
)
# Step 3: Try quick reply path first
response = await self._handle_quick_reply_path(request, session)
if response:
return response
# Step 4: Fall through to standard conversation path
return await self._handle_standard_conversation(request, session)
except Exception:
logger.exception("Error managing conversation")
raise
async def _obtain_or_create_session(self, telefono: str) -> ConversationSession:
"""Get existing session or create new one.
Checks Redis Firestore Creates new session with auto-caching.
Args:
telefono: User phone number
Returns:
ConversationSession instance
"""
# Try Redis first
session = await self.redis_service.get_session(telefono)
if session:
return session
# Try Firestore if Redis miss
session = await self.firestore_service.get_session_by_phone(telefono)
if session:
return session
# Create new session if both miss
session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
session = await self.firestore_service.create_session(
session_id,
user_id,
telefono,
)
# Auto-cache to Redis
await self.redis_service.save_session(session)
return session
async def _save_conversation_turn(
self,
session_id: str,
user_text: str,
assistant_text: str,
entry_type: str,
canal: str | None = None,
) -> None:
"""Save user and assistant messages to Firestore.
Args:
session_id: Session identifier
user_text: User message text
assistant_text: Assistant response text
entry_type: Type of conversation entry ("CONVERSACION" or "LLM")
canal: Communication channel
"""
# Save user entry
user_entry = ConversationEntry(
entity="user",
type=entry_type,
timestamp=datetime.now(UTC),
text=user_text,
parameters=None,
canal=canal,
)
await self.firestore_service.save_entry(session_id, user_entry)
# Save assistant entry
assistant_entry = ConversationEntry(
entity="assistant",
type=entry_type,
timestamp=datetime.now(UTC),
text=assistant_text,
parameters=None,
canal=canal,
)
await self.firestore_service.save_entry(session_id, assistant_entry)
async def _update_session_after_turn(
self,
session: ConversationSession,
last_message: str,
) -> None:
"""Update session metadata and sync to storage.
Updates last_message, last_modified timestamp, and saves to
both Firestore and Redis for dual-storage consistency.
Args:
session: Session to update (modified in place)
last_message: Latest message text
"""
session.last_message = last_message
session.last_modified = datetime.now(UTC)
await self.firestore_service.save_session(session)
await self.redis_service.save_session(session)
async def _handle_quick_reply_path(
self,
request: ConversationRequest,
session: ConversationSession,
) -> DetectIntentResponse | None:
"""Handle conversation when pantalla_contexto is active and valid.
Args:
request: User conversation request
session: Current conversation session
Returns:
DetectIntentResponse if handled, None if fall through to standard path
"""
# Check if pantalla_contexto exists
if not session.pantalla_contexto:
return None
# Check if pantalla_contexto is stale
if not self._is_pantalla_context_valid(session.last_modified):
logger.info(
"Detected STALE 'pantallaContexto'. "
"Ignoring and proceeding with normal flow.",
)
return None
logger.info(
"Detected 'pantallaContexto' in session: %s. "
"Delegating to QuickReplies flow.",
session.pantalla_contexto,
)
response = await self._manage_quick_reply_conversation(
request,
session.pantalla_contexto,
)
if not response:
return None
# Extract response text
response_text = (
response.query_result.response_text if response.query_result else ""
) or ""
# Save conversation turn
await self._save_conversation_turn(
session_id=session.session_id,
user_text=request.mensaje,
assistant_text=response_text,
entry_type="CONVERSACION",
canal=getattr(request, "canal", None),
)
# Update session
await self._update_session_after_turn(session, response_text)
return response
async def _handle_standard_conversation(
self,
request: ConversationRequest,
session: ConversationSession,
) -> DetectIntentResponse:
"""Handle standard RAG-based conversation flow.
Loads history, notifications, queries RAG service, and persists results.
Args:
request: User conversation request
session: Current conversation session
Returns:
DetectIntentResponse with RAG response
"""
telefono = request.usuario.telefono
nickname = request.usuario.nickname
logger.info(
"Primary Check (Redis): Looking up session for phone: %s",
telefono,
)
# Load conversation history from Firestore
entries = await self.firestore_service.get_entries(
session.session_id,
limit=self.settings.conversation_context_message_limit,
)
logger.info("Loaded %s conversation entries from Firestore", len(entries))
# Retrieve active notifications for this user
notifications = await self._get_active_notifications(telefono)
logger.info("Retrieved %s active notifications", len(notifications))
# Prepare current user message
messages = await self._prepare_rag_messages(request.mensaje)
# Extract notification texts for RAG
notification_texts = (
[n.texto for n in notifications if n.texto and n.texto.strip()]
if notifications
else None
)
# Format conversation history for RAG
conversation_history = (
self._format_conversation_history(session, entries) if entries else None
)
# Query RAG service with separated fields
logger.info("Sending query to RAG service")
assistant_response = await self.rag_service.query(
messages=messages,
notifications=notification_texts,
conversation_history=conversation_history,
user_nickname=nickname or None,
)
logger.info(
"Received response from RAG service: %s...",
assistant_response[:100],
)
# Save conversation turn
await self._save_conversation_turn(
session_id=session.session_id,
user_text=request.mensaje,
assistant_text=assistant_response,
entry_type="LLM",
canal=getattr(request, "canal", None),
)
logger.info("Saved user message and assistant response to Firestore")
# Update session
await self._update_session_after_turn(session, assistant_response)
logger.info("Updated session in Firestore and Redis")
# Mark notifications as processed if any were included
if notifications:
await self._mark_notifications_as_processed(telefono)
logger.info("Marked %s notifications as processed", len(notifications))
# Return response object
return DetectIntentResponse(
responseId=str(uuid4()),
queryResult=QueryResult(
responseText=assistant_response,
parameters=None,
),
quick_replies=None,
)
def _is_pantalla_context_valid(self, last_modified: datetime) -> bool:
"""Check if pantallaContexto is still valid (not stale)."""
time_diff = datetime.now(UTC) - last_modified
@@ -270,7 +384,6 @@ class ConversationManagerService:
quick_replies=quick_reply_screen,
)
async def _get_active_notifications(self, telefono: str) -> list:
"""Retrieve active notifications for a user from Redis or Firestore.
@@ -317,66 +430,19 @@ class ConversationManagerService:
async def _prepare_rag_messages(
self,
session: ConversationSession,
entries: list[ConversationEntry],
notifications: list,
user_message: str,
nickname: str,
) -> list[dict[str, str]]:
"""Prepare messages in OpenAI format for RAG service.
"""Prepare current user message for RAG service.
Args:
session: Current conversation session
entries: Conversation history entries
notifications: Active notifications
user_message: Current user message
nickname: User's nickname
Returns:
List of messages in OpenAI format [{"role": "...", "content": "..."}]
List with single user message
"""
messages = []
# Add system message with conversation history if available
if entries:
conversation_context = self._format_conversation_history(session, entries)
if conversation_context:
messages.append(
{
"role": "system",
"content": (
f"Historial de conversación:\n"
f"{conversation_context}"
),
},
)
# Add system message with notifications if available
if notifications:
# Simple Pythonic join - no mapper needed!
notifications_text = "\n".join(
n.texto for n in notifications if n.texto and n.texto.strip()
)
if notifications_text:
messages.append(
{
"role": "system",
"content": (
f"Notificaciones pendientes para el usuario:\n"
f"{notifications_text}"
),
},
)
# Add system message with user context
user_context = f"Usuario: {nickname}" if nickname else "Usuario anónimo"
messages.append({"role": "system", "content": user_context})
# Add current user message
messages.append({"role": "user", "content": user_message})
return messages
# Only include the current user message - no system messages
return [{"role": "user", "content": user_message}]
async def _mark_notifications_as_processed(self, telefono: str) -> None:
"""Mark all notifications for a user as processed.
@@ -388,7 +454,8 @@ class ConversationManagerService:
try:
# Update status in Firestore
await self.firestore_service.update_notification_status(
telefono, "processed",
telefono,
"processed",
)
# Update or delete from Redis

View File

@@ -140,7 +140,6 @@ class DLPService:
# Clean up consecutive DIRECCION tags
return self._clean_direccion(text)
def _get_replacement(self, info_type: str, quote: str) -> str | None:
"""Get replacement text for a given info type.

View File

@@ -9,10 +9,9 @@ from capa_de_integracion.models.notification import (
ExternalNotificationRequest,
Notification,
)
from .dlp_service import DLPService
from .firestore_service import FirestoreService
from .redis_service import RedisService
from capa_de_integracion.services.dlp import DLPService
from capa_de_integracion.services.storage.firestore import FirestoreService
from capa_de_integracion.services.storage.redis import RedisService
logger = logging.getLogger(__name__)
@@ -53,7 +52,8 @@ class NotificationManagerService:
logger.info("NotificationManagerService initialized")
async def process_notification(
self, external_request: ExternalNotificationRequest,
self,
external_request: ExternalNotificationRequest,
) -> None:
"""Process a push notification from external system.

View File

@@ -0,0 +1,9 @@
"""Quick reply services."""
from capa_de_integracion.services.quick_reply.content import QuickReplyContentService
from capa_de_integracion.services.quick_reply.session import QuickReplySessionService
__all__ = [
"QuickReplyContentService",
"QuickReplySessionService",
]

View File

@@ -0,0 +1,161 @@
"""Quick reply content service for loading FAQ screens."""
import json
import logging
from pathlib import Path
from capa_de_integracion.config import Settings
from capa_de_integracion.models.quick_replies import (
QuickReplyQuestions,
QuickReplyScreen,
)
logger = logging.getLogger(__name__)
class QuickReplyContentService:
"""Service for loading quick reply screen content from JSON files."""
def __init__(self, settings: Settings) -> None:
"""Initialize quick reply content service.
Args:
settings: Application settings
"""
self.settings = settings
self.quick_replies_path = settings.base_path / "quick_replies"
self._cache: dict[str, QuickReplyScreen] = {}
logger.info(
"QuickReplyContentService initialized with path: %s",
self.quick_replies_path,
)
# Preload all quick reply files into memory
self._preload_cache()
def _validate_file(self, file_path: Path, screen_id: str) -> None:
"""Validate that the quick reply file exists."""
if not file_path.exists():
logger.warning("Quick reply file not found: %s", file_path)
msg = f"Quick reply file not found for screen_id: {screen_id}"
raise ValueError(msg)
def _parse_quick_reply_data(self, data: dict) -> QuickReplyScreen:
"""Parse JSON data into QuickReplyScreen model.
Args:
data: JSON data dictionary
Returns:
Parsed QuickReplyScreen object
"""
preguntas_data = data.get("preguntas", [])
preguntas = [
QuickReplyQuestions(
titulo=q.get("titulo", ""),
descripcion=q.get("descripcion"),
respuesta=q.get("respuesta", ""),
)
for q in preguntas_data
]
return QuickReplyScreen(
header=data.get("header"),
body=data.get("body"),
button=data.get("button"),
header_section=data.get("header_section"),
preguntas=preguntas,
)
def _preload_cache(self) -> None:
"""Preload all quick reply files into memory cache at startup.
This method runs synchronously at initialization to load all
quick reply JSON files. Blocking here is acceptable since it
only happens once at startup.
"""
if not self.quick_replies_path.exists():
logger.warning(
"Quick replies directory not found: %s",
self.quick_replies_path,
)
return
loaded_count = 0
failed_count = 0
for file_path in self.quick_replies_path.glob("*.json"):
screen_id = file_path.stem
try:
# Blocking I/O is OK at startup
content = file_path.read_text(encoding="utf-8")
data = json.loads(content)
quick_reply = self._parse_quick_reply_data(data)
self._cache[screen_id] = quick_reply
loaded_count += 1
logger.debug(
"Cached %s quick replies for screen: %s",
len(quick_reply.preguntas),
screen_id,
)
except json.JSONDecodeError:
logger.exception("Invalid JSON in file: %s", file_path)
failed_count += 1
except Exception:
logger.exception("Failed to load quick reply file: %s", file_path)
failed_count += 1
logger.info(
"Quick reply cache initialized: %s screens loaded, %s failed",
loaded_count,
failed_count,
)
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
"""Get quick reply screen content by ID from in-memory cache.
This method is non-blocking as it retrieves data from the
in-memory cache populated at startup.
Args:
screen_id: Screen identifier (e.g., "pagos", "home")
Returns:
Quick reply screen data
Raises:
ValueError: If the quick reply is not found in cache
"""
if not screen_id or not screen_id.strip():
logger.warning("screen_id is null or empty. Returning empty quick replies")
return QuickReplyScreen(
header="empty",
body=None,
button=None,
header_section=None,
preguntas=[],
)
# Non-blocking: just a dictionary lookup
quick_reply = self._cache.get(screen_id)
if quick_reply is None:
logger.warning("Quick reply not found in cache for screen: %s", screen_id)
msg = f"Quick reply not found for screen_id: {screen_id}"
raise ValueError(msg)
logger.info(
"Retrieved %s quick replies for screen: %s from cache",
len(quick_reply.preguntas),
screen_id,
)
return quick_reply

View File

@@ -4,9 +4,9 @@ import logging
from uuid import uuid4
from capa_de_integracion.models.quick_replies import QuickReplyScreen
from capa_de_integracion.services.firestore_service import FirestoreService
from capa_de_integracion.services.quick_reply_content import QuickReplyContentService
from capa_de_integracion.services.redis_service import RedisService
from capa_de_integracion.services.quick_reply.content import QuickReplyContentService
from capa_de_integracion.services.storage.firestore import FirestoreService
from capa_de_integracion.services.storage.redis import RedisService
logger = logging.getLogger(__name__)
@@ -92,14 +92,18 @@ class QuickReplySessionService:
if session:
session_id = session.session_id
await self.firestore_service.update_pantalla_contexto(
session_id, pantalla_contexto,
session_id,
pantalla_contexto,
)
session.pantalla_contexto = pantalla_contexto
else:
session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
session = await self.firestore_service.create_session(
session_id, user_id, telefono, pantalla_contexto,
session_id,
user_id,
telefono,
pantalla_contexto,
)
# Cache session in Redis

View File

@@ -1,106 +0,0 @@
"""Quick reply content service for loading FAQ screens."""
import json
import logging
from pathlib import Path
from capa_de_integracion.config import Settings
from capa_de_integracion.models.quick_replies import (
QuickReplyQuestions,
QuickReplyScreen,
)
logger = logging.getLogger(__name__)
class QuickReplyContentService:
"""Service for loading quick reply screen content from JSON files."""
def __init__(self, settings: Settings) -> None:
"""Initialize quick reply content service.
Args:
settings: Application settings
"""
self.settings = settings
self.quick_replies_path = settings.base_path / "quick_replies"
logger.info(
"QuickReplyContentService initialized with path: %s",
self.quick_replies_path,
)
def _validate_file(self, file_path: Path, screen_id: str) -> None:
"""Validate that the quick reply file exists."""
if not file_path.exists():
logger.warning("Quick reply file not found: %s", file_path)
msg = f"Quick reply file not found for screen_id: {screen_id}"
raise ValueError(msg)
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
"""Load quick reply screen content by ID.
Args:
screen_id: Screen identifier (e.g., "pagos", "home")
Returns:
Quick reply DTO
Raises:
ValueError: If the quick reply file is not found
"""
if not screen_id or not screen_id.strip():
logger.warning("screen_id is null or empty. Returning empty quick replies")
return QuickReplyScreen(
header="empty",
body=None,
button=None,
header_section=None,
preguntas=[],
)
file_path = self.quick_replies_path / f"{screen_id}.json"
try:
self._validate_file(file_path, screen_id)
# Use Path.read_text() for async-friendly file reading
content = file_path.read_text(encoding="utf-8")
data = json.loads(content)
# Parse questions
preguntas_data = data.get("preguntas", [])
preguntas = [
QuickReplyQuestions(
titulo=q.get("titulo", ""),
descripcion=q.get("descripcion"),
respuesta=q.get("respuesta", ""),
)
for q in preguntas_data
]
quick_reply = QuickReplyScreen(
header=data.get("header"),
body=data.get("body"),
button=data.get("button"),
header_section=data.get("header_section"),
preguntas=preguntas,
)
logger.info(
"Successfully loaded %s quick replies for screen: %s",
len(preguntas),
screen_id,
)
except json.JSONDecodeError as e:
logger.exception("Error parsing JSON file %s", file_path)
msg = f"Invalid JSON format in quick reply file for screen_id: {screen_id}"
raise ValueError(msg) from e
except Exception as e:
logger.exception("Error loading quick replies for screen %s", screen_id)
msg = f"Error loading quick replies for screen_id: {screen_id}"
raise ValueError(msg) from e
else:
return quick_reply

View File

@@ -17,7 +17,22 @@ class Message(BaseModel):
class RAGRequest(BaseModel):
"""Request model for RAG endpoint."""
messages: list[Message] = Field(..., description="Conversation history")
messages: list[Message] = Field(
...,
description="Current conversation messages (user and assistant only)",
)
notifications: list[str] | None = Field(
default=None,
description="Active notifications for the user",
)
conversation_history: str | None = Field(
default=None,
description="Formatted conversation history",
)
user_nickname: str | None = Field(
default=None,
description="User's nickname or display name",
)
class RAGResponse(BaseModel):
@@ -34,12 +49,21 @@ class RAGServiceBase(ABC):
"""
@abstractmethod
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
async def query(
self,
messages: list[dict[str, str]],
notifications: list[str] | None = None,
conversation_history: str | None = None,
user_nickname: str | None = None,
) -> str:
"""Send conversation to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
messages: Current conversation messages (user/assistant only)
e.g., [{"role": "user", "content": "Hello"}, ...]
notifications: Active notifications for the user (optional)
conversation_history: Formatted conversation history (optional)
user_nickname: User's nickname or display name (optional)
Returns:
Response string from RAG endpoint

View File

@@ -28,12 +28,21 @@ class EchoRAGService(RAGServiceBase):
self.prefix = prefix
logger.info("EchoRAGService initialized with prefix: %r", prefix)
async def query(self, messages: list[dict[str, str]]) -> str:
async def query(
self,
messages: list[dict[str, str]],
notifications: list[str] | None = None, # noqa: ARG002
conversation_history: str | None = None, # noqa: ARG002
user_nickname: str | None = None, # noqa: ARG002
) -> str:
"""Echo back the last user message with a prefix.
Args:
messages: OpenAI-style conversation history
messages: Current conversation messages (user/assistant only)
e.g., [{"role": "user", "content": "Hello"}, ...]
notifications: Active notifications for the user (optional, ignored)
conversation_history: Formatted conversation history (optional, ignored)
user_nickname: User's nickname or display name (optional, ignored)
Returns:
The last user message content with prefix

View File

@@ -61,12 +61,21 @@ class HTTPRAGService(RAGServiceBase):
timeout,
)
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
async def query(
self,
messages: list[dict[str, str]],
notifications: list[str] | None = None,
conversation_history: str | None = None,
user_nickname: str | None = None,
) -> str:
"""Send conversation to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
messages: Current conversation messages (user/assistant only)
e.g., [{"role": "user", "content": "Hello"}, ...]
notifications: Active notifications for the user (optional)
conversation_history: Formatted conversation history (optional)
user_nickname: User's nickname or display name (optional)
Returns:
Response string from RAG endpoint
@@ -79,10 +88,22 @@ class HTTPRAGService(RAGServiceBase):
try:
# Validate and construct request
message_objects = [Message(**msg) for msg in messages]
request = RAGRequest(messages=message_objects)
request = RAGRequest(
messages=message_objects,
notifications=notifications,
conversation_history=conversation_history,
user_nickname=user_nickname,
)
# Make async HTTP POST request
logger.debug("Sending RAG request with %s messages", len(messages))
logger.debug(
"Sending RAG request with %s messages, %s notifications, "
"history: %s, user: %s",
len(messages),
len(notifications) if notifications else 0,
"yes" if conversation_history else "no",
user_nickname or "anonymous",
)
response = await self._client.post(
self.endpoint_url,

View File

@@ -0,0 +1,9 @@
"""Storage services."""
from capa_de_integracion.services.storage.firestore import FirestoreService
from capa_de_integracion.services.storage.redis import RedisService
__all__ = [
"FirestoreService",
"RedisService",
]

View File

@@ -183,7 +183,9 @@ class FirestoreService:
return True
async def get_entries(
self, session_id: str, limit: int = 10,
self,
session_id: str,
limit: int = 10,
) -> list[ConversationEntry]:
"""Retrieve recent conversation entries from Firestore."""
try:
@@ -192,7 +194,8 @@ class FirestoreService:
# Get entries ordered by timestamp descending
query = entries_ref.order_by(
"timestamp", direction=firestore.Query.DESCENDING,
"timestamp",
direction=firestore.Query.DESCENDING,
).limit(limit)
docs = query.stream()
@@ -206,7 +209,9 @@ class FirestoreService:
# Reverse to get chronological order
entries.reverse()
logger.debug(
"Retrieved %s entries for session: %s", len(entries), session_id,
"Retrieved %s entries for session: %s",
len(entries),
session_id,
)
except Exception:
logger.exception(
@@ -240,7 +245,9 @@ class FirestoreService:
return True
async def update_pantalla_contexto(
self, session_id: str, pantalla_contexto: str | None,
self,
session_id: str,
pantalla_contexto: str | None,
) -> bool:
"""Update the pantallaContexto field for a conversation session.
@@ -286,7 +293,8 @@ class FirestoreService:
# ====== Notification Methods ======
def _notification_ref(
self, notification_id: str,
self,
notification_id: str,
) -> firestore.AsyncDocumentReference:
"""Get Firestore document reference for notification."""
return self.db.collection(self.notifications_collection).document(

View File

@@ -329,7 +329,8 @@ class RedisService:
return True
async def get_notification_session(
self, session_id: str,
self,
session_id: str,
) -> NotificationSession | None:
"""Retrieve notification session from Redis."""
if not self.redis: