Initial Python rewrite

This commit is contained in:
2026-02-19 17:50:14 +00:00
parent da95a64fb7
commit faa04a0d01
158 changed files with 5122 additions and 1144 deletions

View File

@@ -0,0 +1,10 @@
"""Copyright 2025 Google. This software is provided as-is,
without warranty or representation for any use or purpose.
Your use of it is subject to your agreement with Google.
Capa de Integración - Conversational AI Orchestrator Service
"""
from .main import app, main
__all__ = ["app", "main"]

View File

@@ -0,0 +1,54 @@
from pathlib import Path
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application configuration from environment variables."""
model_config = SettingsConfigDict(env_file=".env")
# GCP General
gcp_project_id: str
gcp_location: str
# RAG
rag_endpoint_url: str
# Firestore
firestore_database_id: str = Field(..., alias="GCP_FIRESTORE_DATABASE_ID")
firestore_host: str = Field(
default="firestore.googleapis.com", alias="GCP_FIRESTORE_HOST",
)
firestore_port: int = Field(default=443, alias="GCP_FIRESTORE_PORT")
firestore_importer_enabled: bool = Field(
default=False, alias="GCP_FIRESTORE_IMPORTER_ENABLE",
)
# Redis
redis_host: str
redis_port: int
redis_pwd: str | None = None
# DLP
dlp_template_complete_flow: str
# Conversation Context
conversation_context_message_limit: int = Field(
default=60, alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT",
)
conversation_context_days_limit: int = Field(
default=30, alias="CONVERSATION_CONTEXT_DAYS_LIMIT",
)
# Logging
log_level: str = Field(default="INFO", alias="LOGGING_LEVEL_ROOT")
@property
def base_path(self) -> Path:
"""Get base path for resources."""
return Path(__file__).parent.parent / "resources"
settings = Settings.model_validate({})

View File

@@ -0,0 +1,99 @@
from functools import lru_cache
from .config import settings
from .services import (
ConversationManagerService,
DLPService,
NotificationManagerService,
QuickReplyContentService,
)
from .services.firestore_service import FirestoreService
from .services.rag_service import RAGService
from .services.redis_service import RedisService
@lru_cache(maxsize=1)
def get_redis_service() -> RedisService:
"""Get Redis service instance."""
return RedisService(settings)
@lru_cache(maxsize=1)
def get_firestore_service() -> FirestoreService:
"""Get Firestore service instance."""
return FirestoreService(settings)
@lru_cache(maxsize=1)
def get_dlp_service() -> DLPService:
"""Get DLP service instance."""
return DLPService(settings)
@lru_cache(maxsize=1)
def get_quick_reply_content_service() -> QuickReplyContentService:
"""Get quick reply content service instance."""
return QuickReplyContentService(settings)
@lru_cache(maxsize=1)
def get_notification_manager() -> NotificationManagerService:
"""Get notification manager instance."""
return NotificationManagerService(
settings,
redis_service=get_redis_service(),
firestore_service=get_firestore_service(),
dlp_service=get_dlp_service(),
)
@lru_cache(maxsize=1)
def get_rag_service() -> RAGService:
"""Get RAG service instance."""
return RAGService(settings)
@lru_cache(maxsize=1)
def get_conversation_manager() -> ConversationManagerService:
"""Get conversation manager instance."""
return ConversationManagerService(
settings,
redis_service=get_redis_service(),
firestore_service=get_firestore_service(),
dlp_service=get_dlp_service(),
rag_service=get_rag_service(),
)
# Lifecycle management functions
def init_services(settings) -> None:
"""Initialize services (placeholder for compatibility)."""
# Services are lazy-loaded via lru_cache, no explicit init needed
async def startup_services() -> None:
"""Connect to external services on startup."""
# Connect to Redis
redis = get_redis_service()
await redis.connect()
async def shutdown_services() -> None:
"""Close all service connections on shutdown."""
# Close Redis
redis = get_redis_service()
await redis.close()
# Close Firestore
firestore = get_firestore_service()
await firestore.close()
# Close DLP
dlp = get_dlp_service()
await dlp.close()
# Close RAG
rag = get_rag_service()
await rag.close()

View File

@@ -0,0 +1,16 @@
class FirestorePersistenceException(Exception):
"""Exception raised when Firestore operations fail.
This is typically caught and logged without failing the request.
"""
def __init__(self, message: str, cause: Exception | None = None) -> None:
"""Initialize Firestore persistence exception.
Args:
message: Error message
cause: Original exception that caused this error
"""
super().__init__(message)
self.cause = cause

View File

@@ -0,0 +1,79 @@
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .config import settings
from .dependencies import init_services, shutdown_services, startup_services
from .routers import conversation_router, notification_router, quick_replies_router
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(_: FastAPI):
"""Application lifespan manager."""
# Startup
logger.info("Initializing services...")
init_services(settings)
await startup_services()
logger.info("Application started successfully")
yield
# Shutdown
logger.info("Shutting down services...")
await shutdown_services()
logger.info("Application shutdown complete")
app = FastAPI(
title="Capa de Integración - Orchestrator Service",
description="Conversational AI orchestrator for Dialogflow CX, Gemini, and Vertex AI",
version="0.1.0",
lifespan=lifespan,
)
# CORS middleware
# Note: Type checker reports false positive for CORSMiddleware
# This is the correct FastAPI pattern per official documentation
app.add_middleware(
CORSMiddleware, # ty: ignore
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Register routers
app.include_router(conversation_router)
app.include_router(notification_router)
app.include_router(quick_replies_router)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "service": "capa-de-integracion"}
def main() -> None:
"""Entry point for CLI."""
import uvicorn
uvicorn.run(
"capa_de_integracion.main:app",
host="0.0.0.0",
port=8080,
reload=True,
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,29 @@
"""Data models module."""
from .conversation import (
ConversationEntry,
ConversationRequest,
ConversationSession,
DetectIntentResponse,
QueryResult,
User,
)
from .notification import (
ExternalNotificationRequest,
Notification,
NotificationSession,
)
__all__ = [
"ConversationEntry",
"ConversationRequest",
"ConversationSession",
"DetectIntentResponse",
# Notification
"ExternalNotificationRequest",
"Notification",
"NotificationSession",
"QueryResult",
# Conversation
"User",
]

View File

@@ -0,0 +1,89 @@
from datetime import datetime
from typing import Any, Literal
from pydantic import BaseModel, Field
class User(BaseModel):
"""User information."""
telefono: str = Field(..., min_length=1)
nickname: str | None = None
class QueryResult(BaseModel):
"""Query result from Dialogflow."""
responseText: str | None = Field(None, alias="responseText")
parameters: dict[str, Any] | None = Field(None, alias="parameters")
model_config = {"populate_by_name": True}
class DetectIntentResponse(BaseModel):
"""Dialogflow detect intent response."""
responseId: str | None = Field(None, alias="responseId")
queryResult: QueryResult | None = Field(None, alias="queryResult")
quick_replies: Any | None = None # QuickReplyScreen from quick_replies module
model_config = {"populate_by_name": True}
class ConversationRequest(BaseModel):
"""External conversation request from client."""
mensaje: str = Field(..., alias="mensaje")
usuario: User = Field(..., alias="usuario")
canal: str = Field(..., alias="canal")
pantalla_contexto: str | None = Field(None, alias="pantallaContexto")
model_config = {"populate_by_name": True}
class ConversationEntry(BaseModel):
"""Single conversation entry."""
entity: Literal["user", "assistant"]
type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM"
timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp")
text: str = Field(..., alias="text")
parameters: dict[str, Any] | None = Field(None, alias="parameters")
canal: str | None = Field(None, alias="canal")
model_config = {"populate_by_name": True}
class ConversationSession(BaseModel):
"""Conversation session metadata."""
sessionId: str = Field(..., alias="sessionId")
userId: str = Field(..., alias="userId")
telefono: str = Field(..., alias="telefono")
createdAt: datetime = Field(default_factory=datetime.now, alias="createdAt")
lastModified: datetime = Field(default_factory=datetime.now, alias="lastModified")
lastMessage: str | None = Field(None, alias="lastMessage")
pantallaContexto: str | None = Field(None, alias="pantallaContexto")
model_config = {"populate_by_name": True}
@classmethod
def create(
cls,
session_id: str,
user_id: str,
telefono: str,
pantalla_contexto: str | None = None,
last_message: str | None = None,
) -> "ConversationSession":
"""Create a new conversation session."""
now = datetime.now()
return cls(
sessionId=session_id,
userId=user_id,
telefono=telefono,
createdAt=now,
lastModified=now,
pantallaContexto=pantalla_contexto,
lastMessage=last_message,
)

View File

@@ -0,0 +1,115 @@
from datetime import datetime
from typing import Any
from pydantic import BaseModel, Field
class Notification(BaseModel):
"""Individual notification event record.
Represents a notification to be stored in Firestore and cached in Redis.
"""
idNotificacion: str = Field(
..., alias="idNotificacion", description="Unique notification ID",
)
telefono: str = Field(..., alias="telefono", description="User phone number")
timestampCreacion: datetime = Field(
default_factory=datetime.now,
alias="timestampCreacion",
description="Notification creation timestamp",
)
texto: str = Field(..., alias="texto", description="Notification text content")
nombreEventoDialogflow: str = Field(
default="notificacion",
alias="nombreEventoDialogflow",
description="Dialogflow event name",
)
codigoIdiomaDialogflow: str = Field(
default="es",
alias="codigoIdiomaDialogflow",
description="Dialogflow language code",
)
parametros: dict[str, Any] = Field(
default_factory=dict,
alias="parametros",
description="Session parameters for Dialogflow",
)
status: str = Field(
default="active", alias="status", description="Notification status",
)
model_config = {"populate_by_name": True}
@classmethod
def create(
cls,
id_notificacion: str,
telefono: str,
texto: str,
nombre_evento_dialogflow: str = "notificacion",
codigo_idioma_dialogflow: str = "es",
parametros: dict[str, Any] | None = None,
status: str = "active",
) -> "Notification":
"""Create a new Notification with auto-filled timestamp.
Args:
id_notificacion: Unique notification ID
telefono: User phone number
texto: Notification text content
nombre_evento_dialogflow: Dialogflow event name
codigo_idioma_dialogflow: Dialogflow language code
parametros: Session parameters for Dialogflow
status: Notification status
Returns:
New Notification instance with current timestamp
"""
return cls(
idNotificacion=id_notificacion,
telefono=telefono,
timestampCreacion=datetime.now(),
texto=texto,
nombreEventoDialogflow=nombre_evento_dialogflow,
codigoIdiomaDialogflow=codigo_idioma_dialogflow,
parametros=parametros or {},
status=status,
)
class NotificationSession(BaseModel):
"""Notification session containing multiple notifications for a phone number."""
sessionId: str = Field(..., alias="sessionId", description="Session identifier")
telefono: str = Field(..., alias="telefono", description="User phone number")
fechaCreacion: datetime = Field(
default_factory=datetime.now,
alias="fechaCreacion",
description="Session creation time",
)
ultimaActualizacion: datetime = Field(
default_factory=datetime.now,
alias="ultimaActualizacion",
description="Last update time",
)
notificaciones: list[Notification] = Field(
default_factory=list,
alias="notificaciones",
description="List of notification events",
)
model_config = {"populate_by_name": True}
class ExternalNotificationRequest(BaseModel):
"""External notification push request from client."""
texto: str = Field(..., min_length=1)
telefono: str = Field(..., alias="telefono", description="User phone number")
parametros_ocultos: dict[str, Any] | None = Field(
None, alias="parametrosOcultos", description="Hidden parameters (metadata)",
)
model_config = {"populate_by_name": True}

View File

@@ -0,0 +1,19 @@
from pydantic import BaseModel, Field
class QuickReplyQuestions(BaseModel):
"""Individual FAQ question."""
titulo: str
descripcion: str | None = None
respuesta: str
class QuickReplyScreen(BaseModel):
"""Quick reply screen with questions."""
header: str | None = None
body: str | None = None
button: str | None = None
header_section: str | None = None
preguntas: list[QuickReplyQuestions] = Field(default_factory=list)

View File

@@ -0,0 +1 @@
{"titulo": "Capsulas"}

View File

@@ -0,0 +1 @@
{"titulo": "Descubre"}

View File

@@ -0,0 +1 @@
{"titulo": "Detalle TDC"}

View File

@@ -0,0 +1 @@
{"titulo": "Detalle TDD"}

View File

@@ -0,0 +1 @@
{"titulo": "Finanzas"}

View File

@@ -0,0 +1 @@
{"titulo": "Home"}

View File

@@ -0,0 +1 @@
{"titulo": "Inversiones"}

View File

@@ -0,0 +1 @@
{"titulo": "Lealtad"}

View File

@@ -0,0 +1,18 @@
{
"header": "preguntas frecuentes",
"body": "Aquí tienes las preguntas frecuentes que suelen hacernos algunos de nuestros clientes",
"button": "Ver",
"header_section": "preguntas sobre pagos",
"preguntas": [
{
"titulo": "Donde veo mi historial de pagos?",
"descripcion": "View your recent payments",
"respuesta": "puedes visualizar esto en la opcion X de tu app"
},
{
"titulo": "Pregunta servicio A",
"descripcion": "descripcion servicio A",
"respuesta": "puedes ver info de servicio A en tu app"
}
]
}

View File

@@ -0,0 +1 @@
{"titulo": "Prestamos"}

View File

@@ -0,0 +1 @@
{"titulo": "Retiro sin tarjeta"}

View File

@@ -0,0 +1 @@
{"titulo": "Transferencia"}

View File

@@ -0,0 +1,11 @@
"""Routers module."""
from .conversation import router as conversation_router
from .notification import router as notification_router
from .quick_replies import router as quick_replies_router
__all__ = [
"conversation_router",
"notification_router",
"quick_replies_router",
]

View File

@@ -0,0 +1,43 @@
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from capa_de_integracion.dependencies import get_conversation_manager
from capa_de_integracion.models import ConversationRequest, DetectIntentResponse
from capa_de_integracion.services import ConversationManagerService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/dialogflow", tags=["conversation"])
@router.post("/detect-intent")
async def detect_intent(
request: ConversationRequest,
conversation_manager: Annotated[ConversationManagerService, Depends(
get_conversation_manager,
)],
) -> DetectIntentResponse:
"""Detect user intent and manage conversation.
Args:
request: External conversation request from client
Returns:
Dialogflow detect intent response
"""
try:
logger.info("Received detect-intent request")
response = await conversation_manager.manage_conversation(request)
logger.info("Successfully processed detect-intent request")
return response
except ValueError as e:
logger.error(f"Validation error: {e!s}", exc_info=True)
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error processing detect-intent: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -0,0 +1,54 @@
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from capa_de_integracion.dependencies import get_notification_manager
from capa_de_integracion.models.notification import ExternalNotificationRequest
from capa_de_integracion.services.notification_manager import NotificationManagerService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/dialogflow", tags=["notifications"])
@router.post("/notification", status_code=200)
async def process_notification(
request: ExternalNotificationRequest,
notification_manager: Annotated[NotificationManagerService, Depends(
get_notification_manager,
)],
) -> None:
"""Process push notification from external system.
This endpoint receives notifications (e.g., "Your card was blocked") and:
1. Stores them in Redis/Firestore
2. Associates them with the user's conversation session
3. Triggers a Dialogflow event
When the user later sends a message asking about the notification
("Why was it blocked?"), the message filter will classify it as
NOTIFICATION and route to the appropriate handler.
Args:
request: External notification request with text, phone, and parameters
Returns:
None (204 No Content)
Raises:
HTTPException: 400 if validation fails, 500 for internal errors
"""
try:
logger.info("Received notification request")
await notification_manager.process_notification(request)
logger.info("Successfully processed notification request")
# Match Java behavior: process but don't return response body
except ValueError as e:
logger.error(f"Validation error: {e!s}", exc_info=True)
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error processing notification: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -0,0 +1,101 @@
import logging
from typing import Annotated
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from capa_de_integracion.dependencies import (
get_firestore_service,
get_quick_reply_content_service,
get_redis_service,
)
from capa_de_integracion.models.quick_replies import QuickReplyScreen
from capa_de_integracion.services.firestore_service import FirestoreService
from capa_de_integracion.services.quick_reply_content import QuickReplyContentService
from capa_de_integracion.services.redis_service import RedisService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"])
class QuickReplyUser(BaseModel):
telefono: str
nombre: str
class QuickReplyScreenRequest(BaseModel):
usuario: QuickReplyUser
pantallaContexto: str
model_config = {"populate_by_name": True}
class QuickReplyScreenResponse(BaseModel):
responseId: str
quick_replies: QuickReplyScreen
@router.post("/screen")
async def start_quick_reply_session(
request: QuickReplyScreenRequest,
redis_service: Annotated[RedisService, Depends(get_redis_service)],
firestore_service: Annotated[FirestoreService, Depends(get_firestore_service)],
quick_reply_content_service: Annotated[QuickReplyContentService, Depends(
get_quick_reply_content_service,
)],
) -> QuickReplyScreenResponse:
"""Start a quick reply FAQ session for a specific screen.
Creates a conversation session with pantalla_contexto set,
loads the quick reply questions for the screen, and returns them.
Args:
request: Quick reply screen request
Returns:
Detect intent response with quick reply questions
"""
try:
telefono = request.usuario.telefono
pantalla_contexto = request.pantallaContexto
if not telefono or not telefono.strip():
msg = "Phone number is required"
raise ValueError(msg)
session = await firestore_service.get_session_by_phone(telefono)
if session:
session_id = session.sessionId
await firestore_service.update_pantalla_contexto(
session_id, pantalla_contexto,
)
session.pantallaContexto = pantalla_contexto
else:
session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
session = await firestore_service.create_session(
session_id, user_id, telefono, pantalla_contexto,
)
# Cache session
await redis_service.save_session(session)
logger.info(
f"Created quick reply session {session_id} for screen: {pantalla_contexto}",
)
# Load quick replies
quick_replies = await quick_reply_content_service.get_quick_replies(
pantalla_contexto,
)
return QuickReplyScreenResponse(
responseId=session_id, quick_replies=quick_replies,
)
except ValueError as e:
logger.error(f"Validation error: {e}", exc_info=True)
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error starting quick reply session: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -0,0 +1,13 @@
"""Services module."""
from .conversation_manager import ConversationManagerService
from .dlp_service import DLPService
from .notification_manager import NotificationManagerService
from .quick_reply_content import QuickReplyContentService
__all__ = [
"ConversationManagerService",
"DLPService",
"NotificationManagerService",
"QuickReplyContentService",
]

View File

@@ -0,0 +1,488 @@
import logging
from datetime import datetime, timedelta
from uuid import uuid4
from capa_de_integracion.config import Settings
from capa_de_integracion.models import (
ConversationEntry,
ConversationRequest,
ConversationSession,
DetectIntentResponse,
QueryResult,
)
from .dlp_service import DLPService
from .firestore_service import FirestoreService
from .quick_reply_content import QuickReplyContentService
from .rag_service import RAGService
from .redis_service import RedisService
logger = logging.getLogger(__name__)
class ConversationManagerService:
"""Central orchestrator for managing user conversations."""
SESSION_RESET_THRESHOLD_MINUTES = 30
SCREEN_CONTEXT_TIMEOUT_MINUTES = 10
CONV_HISTORY_PARAM = "conversation_history"
HISTORY_PARAM = "historial"
def __init__(
self,
settings: Settings,
rag_service: RAGService,
redis_service: RedisService,
firestore_service: FirestoreService,
dlp_service: DLPService,
) -> None:
"""Initialize conversation manager."""
self.settings = settings
self.rag_service = rag_service
self.redis_service = redis_service
self.firestore_service = firestore_service
self.dlp_service = dlp_service
self.quick_reply_service = QuickReplyContentService(settings)
logger.info("ConversationManagerService initialized successfully")
async def manage_conversation(
self, request: ConversationRequest,
) -> DetectIntentResponse:
"""Main entry point for managing conversations.
Args:
request: External conversation request from client
Returns:
Detect intent response from Dialogflow
"""
try:
# Step 1: DLP obfuscation
obfuscated_message = await self.dlp_service.get_obfuscated_string(
request.mensaje,
self.settings.dlp_template_complete_flow,
)
request.mensaje = obfuscated_message
telefono = request.usuario.telefono
# Step 2. Fetch session in Redis -> Firestore -> Create new session
session = await self.redis_service.get_session(telefono)
if not session:
session = await self.firestore_service.get_session_by_phone(telefono)
if not session:
session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
session = await self.firestore_service.create_session(
session_id, user_id, telefono,
)
await self.redis_service.save_session(session)
# Step 2: Check for pantallaContexto in existing session
if session.pantallaContexto:
# Check if pantallaContexto is stale (10 minutes)
if self._is_pantalla_context_valid(session.lastModified):
logger.info(
f"Detected 'pantallaContexto' in session: {session.pantallaContexto}. "
f"Delegating to QuickReplies flow.",
)
response = await self._manage_quick_reply_conversation(
request, session.pantallaContexto,
)
if response:
# Save user message to Firestore
user_entry = ConversationEntry(
entity="user",
type="CONVERSACION",
timestamp=datetime.now(),
text=request.mensaje,
parameters=None,
canal=getattr(request, "canal", None),
)
await self.firestore_service.save_entry(
session.sessionId, user_entry,
)
# Save quick reply response to Firestore
response_text = (
response.queryResult.responseText
if response.queryResult
else ""
) or ""
assistant_entry = ConversationEntry(
entity="assistant",
type="CONVERSACION",
timestamp=datetime.now(),
text=response_text,
parameters=None,
canal=getattr(request, "canal", None),
)
await self.firestore_service.save_entry(
session.sessionId, assistant_entry,
)
# Update session with last message and timestamp
session.lastMessage = response_text
session.lastModified = datetime.now()
await self.firestore_service.save_session(session)
await self.redis_service.save_session(session)
return response
else:
logger.info(
"Detected STALE 'pantallaContexto'. Ignoring and proceeding with normal flow.",
)
# Step 3: Continue with standard conversation flow
nickname = request.usuario.nickname
logger.info(
f"Primary Check (Redis): Looking up session for phone: {telefono}",
)
# Step 3a: Load conversation history from Firestore
entries = await self.firestore_service.get_entries(
session.sessionId,
limit=self.settings.conversation_context_message_limit,
)
logger.info(f"Loaded {len(entries)} conversation entries from Firestore")
# Step 3b: Retrieve active notifications for this user
notifications = await self._get_active_notifications(telefono)
logger.info(f"Retrieved {len(notifications)} active notifications")
# Step 3c: Prepare context for RAG service
messages = await self._prepare_rag_messages(
session=session,
entries=entries,
notifications=notifications,
user_message=request.mensaje,
nickname=nickname or "Usuario",
)
# Step 3d: Query RAG service
logger.info("Sending query to RAG service")
assistant_response = await self.rag_service.query(messages)
logger.info(
f"Received response from RAG service: {assistant_response[:100]}...",
)
# Step 3e: Save user message to Firestore
user_entry = ConversationEntry(
entity="user",
type="CONVERSACION",
timestamp=datetime.now(),
text=request.mensaje,
parameters=None,
canal=getattr(request, "canal", None),
)
await self.firestore_service.save_entry(session.sessionId, user_entry)
logger.info("Saved user message to Firestore")
# Step 3f: Save assistant response to Firestore
assistant_entry = ConversationEntry(
entity="assistant",
type="LLM",
timestamp=datetime.now(),
text=assistant_response,
parameters=None,
canal=getattr(request, "canal", None),
)
await self.firestore_service.save_entry(session.sessionId, assistant_entry)
logger.info("Saved assistant response to Firestore")
# Step 3g: Update session with last message and timestamp
session.lastMessage = assistant_response
session.lastModified = datetime.now()
await self.firestore_service.save_session(session)
await self.redis_service.save_session(session)
logger.info("Updated session in Firestore and Redis")
# Step 3h: Mark notifications as processed if any were included
if notifications:
await self._mark_notifications_as_processed(telefono)
logger.info(f"Marked {len(notifications)} notifications as processed")
# Step 3i: Return response object
return DetectIntentResponse(
responseId=str(uuid4()),
queryResult=QueryResult(
responseText=assistant_response,
parameters=None,
),
quick_replies=None,
)
except Exception as e:
logger.error(f"Error managing conversation: {e!s}", exc_info=True)
raise
def _is_pantalla_context_valid(self, last_modified: datetime) -> bool:
"""Check if pantallaContexto is still valid (not stale)."""
time_diff = datetime.now() - last_modified
return time_diff < timedelta(minutes=self.SCREEN_CONTEXT_TIMEOUT_MINUTES)
async def _manage_quick_reply_conversation(
self,
request: ConversationRequest,
screen_id: str,
) -> DetectIntentResponse | None:
"""Handle conversation within Quick Replies context."""
quick_reply_screen = await self.quick_reply_service.get_quick_replies(screen_id)
# If no questions available, delegate to normal conversation flow
if not quick_reply_screen.preguntas:
logger.warning(f"No quick replies found for screen: {screen_id}.")
return None
# Match user message to a quick reply question
user_message_lower = request.mensaje.lower().strip()
matched_answer = None
for pregunta in quick_reply_screen.preguntas:
# Simple matching: check if question title matches user message
if pregunta.titulo.lower().strip() == user_message_lower:
matched_answer = pregunta.respuesta
logger.info(f"Matched quick reply: {pregunta.titulo}")
break
# If no match, use first question as default or delegate to normal flow
if not matched_answer:
logger.warning(
f"No matching quick reply found for message: '{request.mensaje}'.",
)
# Create response with the matched quick reply answer
return DetectIntentResponse(
responseId=str(uuid4()),
queryResult=QueryResult(responseText=matched_answer, parameters=None),
quick_replies=quick_reply_screen,
)
async def _get_active_notifications(self, telefono: str) -> list:
"""Retrieve active notifications for a user from Redis or Firestore.
Args:
telefono: User phone number
Returns:
List of active Notification objects
"""
try:
# Try Redis first
notification_session = await self.redis_service.get_notification_session(
telefono,
)
# If not in Redis, try Firestore
if not notification_session:
# Firestore uses phone as document ID for notifications
from capa_de_integracion.models.notification import NotificationSession
doc_ref = self.firestore_service.db.collection(
self.firestore_service.notifications_collection,
).document(telefono)
doc = await doc_ref.get()
if doc.exists:
data = doc.to_dict()
notification_session = NotificationSession.model_validate(data)
# Filter for active notifications only
if notification_session and notification_session.notificaciones:
return [
notif
for notif in notification_session.notificaciones
if notif.status == "active"
]
return []
except Exception as e:
logger.error(
f"Error retrieving notifications for {telefono}: {e!s}",
exc_info=True,
)
return []
async def _prepare_rag_messages(
self,
session: ConversationSession,
entries: list[ConversationEntry],
notifications: list,
user_message: str,
nickname: str,
) -> list[dict[str, str]]:
"""Prepare messages in OpenAI format for RAG service.
Args:
session: Current conversation session
entries: Conversation history entries
notifications: Active notifications
user_message: Current user message
nickname: User's nickname
Returns:
List of messages in OpenAI format [{"role": "...", "content": "..."}]
"""
messages = []
# Add system message with conversation history if available
if entries:
conversation_context = self._format_conversation_history(session, entries)
if conversation_context:
messages.append(
{
"role": "system",
"content": f"Historial de conversación:\n{conversation_context}",
},
)
# Add system message with notifications if available
if notifications:
# Simple Pythonic join - no mapper needed!
notifications_text = "\n".join(
n.texto for n in notifications if n.texto and n.texto.strip()
)
if notifications_text:
messages.append(
{
"role": "system",
"content": f"Notificaciones pendientes para el usuario:\n{notifications_text}",
},
)
# Add system message with user context
user_context = f"Usuario: {nickname}" if nickname else "Usuario anónimo"
messages.append({"role": "system", "content": user_context})
# Add current user message
messages.append({"role": "user", "content": user_message})
return messages
async def _mark_notifications_as_processed(self, telefono: str) -> None:
"""Mark all notifications for a user as processed.
Args:
telefono: User phone number
"""
try:
# Update status in Firestore
await self.firestore_service.update_notification_status(
telefono, "processed",
)
# Update or delete from Redis
await self.redis_service.delete_notification_session(telefono)
logger.info(f"Marked notifications as processed for {telefono}")
except Exception as e:
logger.error(
f"Error marking notifications as processed for {telefono}: {e!s}",
exc_info=True,
)
def _format_conversation_history(
self,
session: ConversationSession,
entries: list[ConversationEntry],
) -> str:
"""Format conversation history with business rule limits.
Applies limits:
- Date: 30 days maximum
- Count: 60 messages maximum
- Size: 50KB maximum
Args:
session: Conversation session
entries: List of conversation entries
Returns:
Formatted conversation text
"""
if not entries:
return ""
# Filter by date (30 days)
cutoff_date = datetime.now() - timedelta(
days=self.settings.conversation_context_days_limit,
)
recent_entries = [
e for e in entries if e.timestamp and e.timestamp >= cutoff_date
]
# Sort by timestamp (oldest first) and limit count
recent_entries.sort(key=lambda e: e.timestamp)
limited_entries = recent_entries[
-self.settings.conversation_context_message_limit :
]
# Format with size truncation (50KB)
return self._format_entries_with_size_limit(limited_entries)
def _format_entries_with_size_limit(self, entries: list[ConversationEntry]) -> str:
"""Format entries with 50KB size limit.
Builds from newest to oldest, stopping at size limit.
Args:
entries: List of conversation entries
Returns:
Formatted text, truncated if necessary
"""
if not entries:
return ""
MAX_BYTES = 50 * 1024 # 50KB
formatted_messages = [self._format_entry(entry) for entry in entries]
# Build from newest to oldest
text_block = []
current_size = 0
for message in reversed(formatted_messages):
message_line = message + "\n"
message_bytes = len(message_line.encode("utf-8"))
if current_size + message_bytes > MAX_BYTES:
break
text_block.insert(0, message_line)
current_size += message_bytes
return "".join(text_block).strip()
def _format_entry(self, entry: ConversationEntry) -> str:
"""Format a single conversation entry.
Args:
entry: Conversation entry
Returns:
Formatted string (e.g., "User: hello", "Assistant: hi there")
"""
# Map entity to prefix (fixed bug from Java port!)
prefix = "User: " if entry.entity == "user" else "Assistant: "
# Clean content if needed
content = entry.text
if entry.entity == "assistant":
# Remove trailing JSON artifacts like {...}
import re
content = re.sub(r"\s*\{.*\}\s*$", "", content).strip()
return prefix + content

View File

@@ -0,0 +1,192 @@
"""Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Data Loss Prevention service for obfuscating sensitive information.
"""
import logging
import re
from google.cloud import dlp_v2
from google.cloud.dlp_v2 import types
from capa_de_integracion.config import Settings
logger = logging.getLogger(__name__)
class DLPService:
"""Service for detecting and obfuscating sensitive data using Google Cloud DLP.
Integrates with the DLP API to scan text for PII and other sensitive information,
then obfuscates findings based on their info type.
"""
def __init__(self, settings: Settings) -> None:
"""Initialize DLP service.
Args:
settings: Application settings
"""
self.settings = settings
self.project_id = settings.gcp_project_id
self.location = settings.gcp_location
self.dlp_client = dlp_v2.DlpServiceAsyncClient()
logger.info("DLP Service initialized")
async def get_obfuscated_string(self, text: str, template_id: str) -> str:
"""Inspect text for sensitive data and obfuscate findings.
Args:
text: Text to inspect and obfuscate
template_id: DLP inspect template ID
Returns:
Obfuscated text with sensitive data replaced
Raises:
Exception: If DLP API call fails (returns original text on error)
"""
try:
# Build content item
byte_content_item = types.ByteContentItem(
type_=types.ByteContentItem.BytesType.TEXT_UTF8,
data=text.encode("utf-8"),
)
content_item = types.ContentItem(byte_item=byte_content_item)
# Build inspect config
finding_limits = types.InspectConfig.FindingLimits(
max_findings_per_item=0, # No limit
)
inspect_config = types.InspectConfig(
min_likelihood=types.Likelihood.VERY_UNLIKELY,
limits=finding_limits,
include_quote=True,
)
# Build request
inspect_template_name = f"projects/{self.project_id}/locations/{self.location}/inspectTemplates/{template_id}"
parent = f"projects/{self.project_id}/locations/{self.location}"
request = types.InspectContentRequest(
parent=parent,
inspect_template_name=inspect_template_name,
inspect_config=inspect_config,
item=content_item,
)
# Call DLP API
response = await self.dlp_client.inspect_content(request=request)
findings_count = len(response.result.findings)
logger.info(f"DLP {template_id} Findings: {findings_count}")
if findings_count > 0:
return self._obfuscate_text(response, text)
return text
except Exception as e:
logger.error(
f"Error during DLP inspection: {e}. Returning original text.",
exc_info=True,
)
return text
def _obfuscate_text(self, response: types.InspectContentResponse, text: str) -> str:
"""Obfuscate sensitive findings in text.
Args:
response: DLP inspect content response with findings
text: Original text
Returns:
Text with sensitive data obfuscated
"""
# Filter findings by likelihood (> POSSIBLE, which is value 3)
findings = [
finding
for finding in response.result.findings
if finding.likelihood.value > 3
]
# Sort by likelihood (descending)
findings.sort(key=lambda f: f.likelihood.value, reverse=True)
for finding in findings:
quote = finding.quote
info_type = finding.info_type.name
logger.info(
f"InfoType: {info_type} | Likelihood: {finding.likelihood.value}",
)
# Obfuscate based on info type
replacement = self._get_replacement(info_type, quote)
if replacement:
text = text.replace(quote, replacement)
# Clean up consecutive DIRECCION tags
return self._clean_direccion(text)
def _get_replacement(self, info_type: str, quote: str) -> str | None:
"""Get replacement text for a given info type.
Args:
info_type: DLP info type name
quote: Original sensitive text
Returns:
Replacement text or None to skip
"""
replacements = {
"CREDIT_CARD_NUMBER": f"**** **** **** {self._get_last4(quote)}",
"CREDIT_CARD_EXPIRATION_DATE": "[FECHA_VENCIMIENTO_TARJETA]",
"FECHA_VENCIMIENTO": "[FECHA_VENCIMIENTO_TARJETA]",
"CVV_NUMBER": "[CVV]",
"CVV": "[CVV]",
"EMAIL_ADDRESS": "[CORREO]",
"PERSON_NAME": "[NOMBRE]",
"PHONE_NUMBER": "[TELEFONO]",
"DIRECCION": "[DIRECCION]",
"DIR_COLONIA": "[DIRECCION]",
"DIR_DEL_MUN": "[DIRECCION]",
"DIR_INTERIOR": "[DIRECCION]",
"DIR_ESQUINA": "[DIRECCION]",
"DIR_CIUDAD_EDO": "[DIRECCION]",
"DIR_CP": "[DIRECCION]",
"CLABE_INTERBANCARIA": "[CLABE]",
"CLAVE_RASTREO_SPEI": "[CLAVE_RASTREO]",
"NIP": "[NIP]",
"SALDO": "[SALDO]",
"CUENTA": f"**************{self._get_last4(quote)}",
"NUM_ACLARACION": "[NUM_ACLARACION]",
}
return replacements.get(info_type)
def _get_last4(self, quote: str) -> str:
"""Extract last 4 characters from quote (removing spaces)."""
clean_quote = quote.strip().replace(" ", "")
if len(clean_quote) >= 4:
return clean_quote[-4:]
return clean_quote
def _clean_direccion(self, text: str) -> str:
"""Clean up consecutive [DIRECCION] tags."""
# Replace multiple [DIRECCION] tags separated by commas or spaces with single tag
pattern = r"\[DIRECCION\](?:(?:,\s*|\s+)\[DIRECCION\])*"
return re.sub(pattern, "[DIRECCION]", text).strip()
async def close(self) -> None:
"""Close DLP client."""
await self.dlp_client.transport.close()
logger.info("DLP client closed")

View File

@@ -0,0 +1,395 @@
import logging
from datetime import datetime
from google.cloud import firestore
from capa_de_integracion.config import Settings
from capa_de_integracion.models import ConversationEntry, ConversationSession
from capa_de_integracion.models.notification import Notification
logger = logging.getLogger(__name__)
class FirestoreService:
"""Service for Firestore operations on conversations."""
def __init__(self, settings: Settings) -> None:
"""Initialize Firestore client."""
self.settings = settings
self.db = firestore.AsyncClient(
project=settings.gcp_project_id,
database=settings.firestore_database_id,
)
self.conversations_collection = (
f"artifacts/{settings.gcp_project_id}/conversations"
)
self.entries_subcollection = "mensajes"
self.notifications_collection = (
f"artifacts/{settings.gcp_project_id}/notifications"
)
logger.info(
f"Firestore client initialized for project: {settings.gcp_project_id}",
)
async def close(self) -> None:
"""Close Firestore client."""
self.db.close()
logger.info("Firestore client closed")
def _session_ref(self, session_id: str):
"""Get Firestore document reference for session."""
return self.db.collection(self.conversations_collection).document(session_id)
async def get_session(self, session_id: str) -> ConversationSession | None:
"""Retrieve conversation session from Firestore by session ID."""
try:
doc_ref = self._session_ref(session_id)
doc = await doc_ref.get()
if not doc.exists:
logger.debug(f"Session not found in Firestore: {session_id}")
return None
data = doc.to_dict()
session = ConversationSession.model_validate(data)
logger.debug(f"Retrieved session from Firestore: {session_id}")
return session
except Exception as e:
logger.exception(
f"Error retrieving session {session_id} from Firestore: {e!s}",
)
return None
async def get_session_by_phone(self, telefono: str) -> ConversationSession | None:
"""Retrieve most recent conversation session from Firestore by phone number.
Args:
telefono: User phone number
Returns:
Most recent session for this phone, or None if not found
"""
try:
query = (
self.db.collection(self.conversations_collection)
.where("telefono", "==", telefono)
.order_by("lastModified", direction=firestore.Query.DESCENDING)
.limit(1)
)
docs = query.stream()
async for doc in docs:
data = doc.to_dict()
session = ConversationSession.model_validate(data)
logger.debug(
f"Retrieved session from Firestore for phone {telefono}: {session.sessionId}",
)
return session
logger.debug(f"No session found in Firestore for phone: {telefono}")
return None
except Exception as e:
logger.exception(
f"Error querying session by phone {telefono} from Firestore: {e!s}",
)
return None
async def save_session(self, session: ConversationSession) -> bool:
"""Save conversation session to Firestore."""
try:
doc_ref = self._session_ref(session.sessionId)
data = session.model_dump()
await doc_ref.set(data, merge=True)
logger.debug(f"Saved session to Firestore: {session.sessionId}")
return True
except Exception as e:
logger.exception(
f"Error saving session {session.sessionId} to Firestore: {e!s}",
)
return False
async def create_session(
self,
session_id: str,
user_id: str,
telefono: str,
pantalla_contexto: str | None = None,
last_message: str | None = None,
) -> ConversationSession:
"""Create and save a new conversation session to Firestore.
Args:
session_id: Unique session identifier
user_id: User identifier
telefono: User phone number
pantalla_contexto: Optional screen context for the conversation
last_message: Optional last message in the conversation
Returns:
The created session
Raises:
Exception: If session creation or save fails
"""
session = ConversationSession.create(
session_id=session_id,
user_id=user_id,
telefono=telefono,
pantalla_contexto=pantalla_contexto,
last_message=last_message,
)
doc_ref = self._session_ref(session.sessionId)
data = session.model_dump()
await doc_ref.set(data, merge=True)
logger.info(f"Created new session in Firestore: {session_id}")
return session
async def save_entry(self, session_id: str, entry: ConversationEntry) -> bool:
"""Save conversation entry to Firestore subcollection."""
try:
doc_ref = self._session_ref(session_id)
entries_ref = doc_ref.collection(self.entries_subcollection)
# Use timestamp as document ID for chronological ordering
entry_id = entry.timestamp.isoformat()
entry_doc = entries_ref.document(entry_id)
data = entry.model_dump()
await entry_doc.set(data)
logger.debug(f"Saved entry to Firestore for session: {session_id}")
return True
except Exception as e:
logger.exception(
f"Error saving entry for session {session_id} to Firestore: {e!s}",
)
return False
async def get_entries(
self, session_id: str, limit: int = 10,
) -> list[ConversationEntry]:
"""Retrieve recent conversation entries from Firestore."""
try:
doc_ref = self._session_ref(session_id)
entries_ref = doc_ref.collection(self.entries_subcollection)
# Get entries ordered by timestamp descending
query = entries_ref.order_by(
"timestamp", direction=firestore.Query.DESCENDING,
).limit(limit)
docs = query.stream()
entries = []
async for doc in docs:
entry_data = doc.to_dict()
entry = ConversationEntry.model_validate(entry_data)
entries.append(entry)
# Reverse to get chronological order
entries.reverse()
logger.debug(f"Retrieved {len(entries)} entries for session: {session_id}")
return entries
except Exception as e:
logger.exception(
f"Error retrieving entries for session {session_id} from Firestore: {e!s}",
)
return []
async def delete_session(self, session_id: str) -> bool:
"""Delete conversation session and all entries from Firestore."""
try:
doc_ref = self._session_ref(session_id)
# Delete all entries first
entries_ref = doc_ref.collection(self.entries_subcollection)
async for doc in entries_ref.stream():
await doc.reference.delete()
# Delete session document
await doc_ref.delete()
logger.debug(f"Deleted session from Firestore: {session_id}")
return True
except Exception as e:
logger.exception(
f"Error deleting session {session_id} from Firestore: {e!s}",
)
return False
async def update_pantalla_contexto(
self, session_id: str, pantalla_contexto: str | None,
) -> bool:
"""Update the pantallaContexto field for a conversation session.
Args:
session_id: Session ID to update
pantalla_contexto: New pantalla contexto value
Returns:
True if update was successful, False otherwise
"""
try:
doc_ref = self._session_ref(session_id)
doc = await doc_ref.get()
if not doc.exists:
logger.warning(
f"Session {session_id} not found in Firestore. Cannot update pantallaContexto",
)
return False
await doc_ref.update(
{
"pantallaContexto": pantalla_contexto,
"lastModified": datetime.now(),
},
)
logger.debug(
f"Updated pantallaContexto for session {session_id} in Firestore",
)
return True
except Exception as e:
logger.exception(
f"Error updating pantallaContexto for session {session_id} in Firestore: {e!s}",
)
return False
# ====== Notification Methods ======
def _notification_ref(self, notification_id: str):
"""Get Firestore document reference for notification."""
return self.db.collection(self.notifications_collection).document(
notification_id,
)
async def save_or_append_notification(self, new_entry: Notification) -> None:
"""Save or append notification entry to Firestore.
Args:
new_entry: Notification entry to save
Raises:
ValueError: If phone number is missing
"""
phone_number = new_entry.telefono
if not phone_number or not phone_number.strip():
msg = "Phone number is required to manage notification entries"
raise ValueError(msg)
# Use phone number as document ID
notification_session_id = phone_number
try:
doc_ref = self._notification_ref(notification_session_id)
doc = await doc_ref.get()
entry_dict = new_entry.model_dump()
if doc.exists:
# Append to existing session
await doc_ref.update(
{
"notificaciones": firestore.ArrayUnion([entry_dict]),
"ultimaActualizacion": datetime.now(),
},
)
logger.info(
f"Successfully appended notification entry to session {notification_session_id} in Firestore",
)
else:
# Create new notification session
new_session_data = {
"sessionId": notification_session_id,
"telefono": phone_number,
"fechaCreacion": datetime.now(),
"ultimaActualizacion": datetime.now(),
"notificaciones": [entry_dict],
}
await doc_ref.set(new_session_data)
logger.info(
f"Successfully created new notification session {notification_session_id} in Firestore",
)
except Exception as e:
logger.error(
f"Error saving notification to Firestore for phone {phone_number}: {e!s}",
exc_info=True,
)
raise
async def update_notification_status(self, session_id: str, status: str) -> None:
"""Update the status of all notifications in a session.
Args:
session_id: Notification session ID (phone number)
status: New status value
"""
try:
doc_ref = self._notification_ref(session_id)
doc = await doc_ref.get()
if not doc.exists:
logger.warning(
f"Notification session {session_id} not found in Firestore. Cannot update status",
)
return
session_data = doc.to_dict()
notifications = session_data.get("notificaciones", [])
# Update status for all notifications
updated_notifications = [
{**notif, "status": status} for notif in notifications
]
await doc_ref.update(
{
"notificaciones": updated_notifications,
"ultimaActualizacion": datetime.now(),
},
)
logger.info(
f"Successfully updated notification status to '{status}' for session {session_id} in Firestore",
)
except Exception as e:
logger.error(
f"Error updating notification status in Firestore for session {session_id}: {e!s}",
exc_info=True,
)
raise
async def delete_notification(self, notification_id: str) -> bool:
"""Delete notification session from Firestore."""
try:
logger.info(
f"Deleting notification session {notification_id} from Firestore",
)
doc_ref = self._notification_ref(notification_id)
await doc_ref.delete()
logger.info(
f"Successfully deleted notification session {notification_id} from Firestore",
)
return True
except Exception as e:
logger.error(
f"Error deleting notification session {notification_id} from Firestore: {e!s}",
exc_info=True,
)
return False

View File

@@ -0,0 +1,125 @@
import asyncio
import logging
from uuid import uuid4
from capa_de_integracion.config import Settings
from capa_de_integracion.models.notification import (
ExternalNotificationRequest,
Notification,
)
from .dlp_service import DLPService
from .firestore_service import FirestoreService
from .redis_service import RedisService
logger = logging.getLogger(__name__)
PREFIX_PO_PARAM = "notification_po_"
class NotificationManagerService:
"""Manages notification processing and integration with conversations.
Handles push notifications from external systems, stores them in
Redis/Firestore, and triggers Dialogflow event detection.
"""
def __init__(
self,
settings: Settings,
redis_service: RedisService,
firestore_service: FirestoreService,
dlp_service: DLPService,
) -> None:
"""Initialize notification manager.
Args:
settings: Application settings
dialogflow_client: Dialogflow CX client
redis_service: Redis caching service
firestore_service: Firestore persistence service
dlp_service: Data Loss Prevention service
"""
self.settings = settings
self.redis_service = redis_service
self.firestore_service = firestore_service
self.dlp_service = dlp_service
self.event_name = "notificacion"
self.default_language_code = "es"
logger.info("NotificationManagerService initialized")
async def process_notification(
self, external_request: ExternalNotificationRequest,
) -> None:
"""Process a push notification from external system.
Flow:
1. Validate phone number
2. Obfuscate sensitive data (DLP - TODO)
3. Create notification entry
4. Save to Redis and Firestore
5. Get or create conversation session
6. Add notification to conversation history
7. Trigger Dialogflow event
Args:
external_request: External notification request
Returns:
Dialogflow detect intent response
Raises:
ValueError: If phone number is missing
"""
telefono = external_request.telefono
# Obfuscate sensitive data using DLP
obfuscated_text = await self.dlp_service.get_obfuscated_string(
external_request.texto,
self.settings.dlp_template_complete_flow,
)
# Prepare parameters with prefix
parameters = {}
if external_request.parametros_ocultos:
for key, value in external_request.parametros_ocultos.items():
parameters[f"{PREFIX_PO_PARAM}{key}"] = value
# Create notification entry
new_notification_id = str(uuid4())
new_notification_entry = Notification.create(
id_notificacion=new_notification_id,
telefono=telefono,
texto=obfuscated_text,
nombre_evento_dialogflow=self.event_name,
codigo_idioma_dialogflow=self.default_language_code,
parametros=parameters,
status="active",
)
# Save notification to Redis (with async Firestore write-back)
await self.redis_service.save_or_append_notification(new_notification_entry)
logger.info(
f"Notification for phone {telefono} cached. Kicking off async Firestore write-back",
)
# Fire-and-forget Firestore write (matching Java's .subscribe() behavior)
async def save_notification_to_firestore() -> None:
try:
await self.firestore_service.save_or_append_notification(
new_notification_entry,
)
logger.debug(
f"Notification entry persisted to Firestore for phone {telefono}",
)
except Exception as e:
logger.error(
f"Background: Error during notification persistence to Firestore for phone {telefono}: {e}",
exc_info=True,
)
# Fire and forget - don't await
asyncio.create_task(save_notification_to_firestore())

View File

@@ -0,0 +1,104 @@
import json
import logging
from capa_de_integracion.config import Settings
from capa_de_integracion.models.quick_replies import (
QuickReplyQuestions,
QuickReplyScreen,
)
logger = logging.getLogger(__name__)
class QuickReplyContentService:
"""Service for loading quick reply screen content from JSON files."""
def __init__(self, settings: Settings) -> None:
"""Initialize quick reply content service.
Args:
settings: Application settings
"""
self.settings = settings
self.quick_replies_path = settings.base_path / "quick_replies"
logger.info(
f"QuickReplyContentService initialized with path: {self.quick_replies_path}",
)
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
"""Load quick reply screen content by ID.
Args:
screen_id: Screen identifier (e.g., "pagos", "home")
Returns:
Quick reply DTO
Raises:
ValueError: If the quick reply file is not found
"""
if not screen_id or not screen_id.strip():
logger.warning("screen_id is null or empty. Returning empty quick replies")
return QuickReplyScreen(
header="empty",
body=None,
button=None,
header_section=None,
preguntas=[],
)
file_path = self.quick_replies_path / f"{screen_id}.json"
try:
if not file_path.exists():
logger.warning(f"Quick reply file not found: {file_path}")
msg = f"Quick reply file not found for screen_id: {screen_id}"
raise ValueError(
msg,
)
with open(file_path, encoding="utf-8") as f:
data = json.load(f)
# Parse questions
preguntas_data = data.get("preguntas", [])
preguntas = [
QuickReplyQuestions(
titulo=q.get("titulo", ""),
descripcion=q.get("descripcion"),
respuesta=q.get("respuesta", ""),
)
for q in preguntas_data
]
quick_reply = QuickReplyScreen(
header=data.get("header"),
body=data.get("body"),
button=data.get("button"),
header_section=data.get("header_section"),
preguntas=preguntas,
)
logger.info(
f"Successfully loaded {len(preguntas)} quick replies for screen: {screen_id}",
)
return quick_reply
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON file {file_path}: {e}", exc_info=True)
msg = f"Invalid JSON format in quick reply file for screen_id: {screen_id}"
raise ValueError(
msg,
) from e
except Exception as e:
logger.error(
f"Error loading quick replies for screen {screen_id}: {e}",
exc_info=True,
)
msg = f"Error loading quick replies for screen_id: {screen_id}"
raise ValueError(
msg,
) from e

View File

@@ -0,0 +1,139 @@
import logging
import httpx
from pydantic import BaseModel, Field
from capa_de_integracion.config import Settings
logger = logging.getLogger(__name__)
class Message(BaseModel):
"""OpenAI-style message format."""
role: str = Field(..., description="Role: system, user, or assistant")
content: str = Field(..., description="Message content")
class RAGRequest(BaseModel):
"""Request model for RAG endpoint."""
messages: list[Message] = Field(..., description="Conversation history")
class RAGResponse(BaseModel):
"""Response model from RAG endpoint."""
response: str = Field(..., description="Generated response from RAG")
class RAGService:
"""Highly concurrent HTTP client for calling RAG endpoints.
Uses httpx AsyncClient with connection pooling for optimal performance
when handling multiple concurrent requests.
"""
def __init__(
self,
settings: Settings,
max_connections: int = 100,
max_keepalive_connections: int = 20,
timeout: float = 30.0,
) -> None:
"""Initialize RAG service with connection pooling.
Args:
settings: Application settings
max_connections: Maximum number of concurrent connections
max_keepalive_connections: Maximum number of idle connections to keep alive
timeout: Request timeout in seconds
"""
self.settings = settings
self.rag_endpoint_url = settings.rag_endpoint_url
self.timeout = timeout
# Configure connection limits for high concurrency
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
)
# Create async client with connection pooling
self._client = httpx.AsyncClient(
limits=limits,
timeout=httpx.Timeout(timeout),
http2=True, # Enable HTTP/2 for better performance
)
logger.info(
f"RAGService initialized with endpoint: {self.rag_endpoint_url}, "
f"max_connections: {max_connections}, timeout: {timeout}s",
)
async def query(self, messages: list[dict[str, str]]) -> str:
"""Send conversation history to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
Response string from RAG endpoint
Raises:
httpx.HTTPError: If HTTP request fails
ValueError: If response format is invalid
"""
try:
# Validate and construct request
message_objects = [Message(**msg) for msg in messages]
request = RAGRequest(messages=message_objects)
# Make async HTTP POST request
logger.debug(f"Sending RAG request with {len(messages)} messages")
response = await self._client.post(
self.rag_endpoint_url,
json=request.model_dump(),
headers={"Content-Type": "application/json"},
)
# Raise exception for HTTP errors
response.raise_for_status()
# Parse response
response_data = response.json()
rag_response = RAGResponse(**response_data)
logger.debug(f"RAG response received: {len(rag_response.response)} chars")
return rag_response.response
except httpx.HTTPStatusError as e:
logger.exception(
f"HTTP error calling RAG endpoint: {e.response.status_code} - {e.response.text}",
)
raise
except httpx.RequestError as e:
logger.exception(f"Request error calling RAG endpoint: {e!s}")
raise
except Exception as e:
logger.error(
f"Unexpected error calling RAG endpoint: {e!s}", exc_info=True,
)
raise
async def close(self) -> None:
"""Close the HTTP client and release connections."""
await self._client.aclose()
logger.info("RAGService client closed")
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()

View File

@@ -0,0 +1,378 @@
import json
import logging
from datetime import datetime
from redis.asyncio import Redis
from capa_de_integracion.config import Settings
from capa_de_integracion.models import ConversationSession
from capa_de_integracion.models.notification import Notification, NotificationSession
logger = logging.getLogger(__name__)
class RedisService:
"""Service for Redis operations on conversation sessions."""
def __init__(self, settings: Settings) -> None:
"""Initialize Redis client."""
self.settings = settings
self.redis: Redis | None = None
self.session_ttl = 2592000 # 30 days in seconds
self.notification_ttl = 2592000 # 30 days in seconds
self.qr_session_ttl = 86400 # 24 hours in seconds
async def connect(self) -> None:
"""Connect to Redis."""
self.redis = Redis(
host=self.settings.redis_host,
port=self.settings.redis_port,
password=self.settings.redis_pwd,
decode_responses=True,
)
logger.info(
f"Connected to Redis at {self.settings.redis_host}:{self.settings.redis_port}",
)
async def close(self) -> None:
"""Close Redis connection."""
if self.redis:
await self.redis.close()
logger.info("Redis connection closed")
def _session_key(self, session_id: str) -> str:
"""Generate Redis key for conversation session."""
return f"conversation:session:{session_id}"
def _phone_to_session_key(self, phone: str) -> str:
"""Generate Redis key for phone-to-session mapping."""
return f"conversation:phone:{phone}"
async def get_session(self, session_id_or_phone: str) -> ConversationSession | None:
"""Retrieve conversation session from Redis by session ID or phone number.
Args:
session_id_or_phone: Either a session ID or phone number
Returns:
Conversation session or None if not found
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
# First try as phone number (lookup session ID)
phone_key = self._phone_to_session_key(session_id_or_phone)
mapped_session_id = await self.redis.get(phone_key)
if mapped_session_id:
# Found phone mapping, get the actual session
session_id = mapped_session_id
else:
# Try as direct session ID
session_id = session_id_or_phone
# Get session by ID
key = self._session_key(session_id)
data = await self.redis.get(key)
if not data:
logger.debug(f"Session not found in Redis: {session_id_or_phone}")
return None
try:
session_dict = json.loads(data)
session = ConversationSession.model_validate(session_dict)
logger.debug(f"Retrieved session from Redis: {session_id}")
return session
except Exception as e:
logger.exception(f"Error deserializing session {session_id}: {e!s}")
return None
async def save_session(self, session: ConversationSession) -> bool:
"""Save conversation session to Redis with TTL.
Also stores phone-to-session mapping for lookup by phone number.
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session.sessionId)
phone_key = self._phone_to_session_key(session.telefono)
try:
# Save session data
data = session.model_dump_json(by_alias=False)
await self.redis.setex(key, self.session_ttl, data)
# Save phone-to-session mapping
await self.redis.setex(phone_key, self.session_ttl, session.sessionId)
logger.debug(
f"Saved session to Redis: {session.sessionId} for phone: {session.telefono}",
)
return True
except Exception as e:
logger.exception(f"Error saving session {session.sessionId} to Redis: {e!s}")
return False
async def delete_session(self, session_id: str) -> bool:
"""Delete conversation session from Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session_id)
try:
result = await self.redis.delete(key)
logger.debug(f"Deleted session from Redis: {session_id}")
return result > 0
except Exception as e:
logger.exception(f"Error deleting session {session_id} from Redis: {e!s}")
return False
async def exists(self, session_id: str) -> bool:
"""Check if session exists in Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._session_key(session_id)
return await self.redis.exists(key) > 0
# ====== Message Methods ======
def _messages_key(self, session_id: str) -> str:
"""Generate Redis key for conversation messages."""
return f"conversation:messages:{session_id}"
async def save_message(self, session_id: str, message) -> bool:
"""Save a conversation message to Redis sorted set.
Messages are stored in a sorted set with timestamp as score.
Args:
session_id: The session ID
message: ConversationEntry
Returns:
True if successful, False otherwise
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._messages_key(session_id)
try:
# Convert message to JSON
message_data = message.model_dump_json(by_alias=False)
# Use timestamp as score (in milliseconds)
score = message.timestamp.timestamp() * 1000
# Add to sorted set
await self.redis.zadd(key, {message_data: score})
# Set TTL on the messages key to match session TTL
await self.redis.expire(key, self.session_ttl)
logger.debug(f"Saved message to Redis: {session_id}")
return True
except Exception as e:
logger.exception(
f"Error saving message to Redis for session {session_id}: {e!s}",
)
return False
async def get_messages(self, session_id: str) -> list:
"""Retrieve all conversation messages for a session from Redis.
Returns messages ordered by timestamp (oldest first).
Args:
session_id: The session ID
Returns:
List of message dictionaries (parsed from JSON)
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._messages_key(session_id)
try:
# Get all messages from sorted set (ordered by score/timestamp)
message_strings = await self.redis.zrange(key, 0, -1)
if not message_strings:
logger.debug(f"No messages found in Redis for session: {session_id}")
return []
# Parse JSON strings to dictionaries
messages = []
for msg_str in message_strings:
try:
messages.append(json.loads(msg_str))
except json.JSONDecodeError as e:
logger.exception(f"Error parsing message JSON: {e!s}")
continue
logger.debug(
f"Retrieved {len(messages)} messages from Redis for session: {session_id}",
)
return messages
except Exception as e:
logger.exception(
f"Error retrieving messages from Redis for session {session_id}: {e!s}",
)
return []
# ====== Notification Methods ======
def _notification_key(self, session_id: str) -> str:
"""Generate Redis key for notification session."""
return f"notification:{session_id}"
def _phone_to_notification_key(self, phone: str) -> str:
"""Generate Redis key for phone-to-notification mapping."""
return f"notification:phone_to_notification:{phone}"
async def save_or_append_notification(self, new_entry: Notification) -> None:
"""Save or append notification entry to session.
Args:
new_entry: Notification entry to save
Raises:
ValueError: If phone number is missing
"""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
phone_number = new_entry.telefono
if not phone_number or not phone_number.strip():
msg = "Phone number is required to manage notification entries"
raise ValueError(msg)
# Use phone number as session ID for notifications
notification_session_id = phone_number
# Get existing session or create new one
existing_session = await self.get_notification_session(notification_session_id)
if existing_session:
# Append to existing session
updated_notifications = [*existing_session.notificaciones, new_entry]
updated_session = NotificationSession(
sessionId=notification_session_id,
telefono=phone_number,
fechaCreacion=existing_session.fechaCreacion,
ultimaActualizacion=datetime.now(),
notificaciones=updated_notifications,
)
else:
# Create new session
updated_session = NotificationSession(
sessionId=notification_session_id,
telefono=phone_number,
fechaCreacion=datetime.now(),
ultimaActualizacion=datetime.now(),
notificaciones=[new_entry],
)
# Save to Redis
await self._cache_notification_session(updated_session)
async def _cache_notification_session(self, session: NotificationSession) -> bool:
"""Cache notification session in Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._notification_key(session.sessionId)
phone_key = self._phone_to_notification_key(session.telefono)
try:
# Save notification session
data = session.model_dump_json(by_alias=False)
await self.redis.setex(key, self.notification_ttl, data)
# Save phone-to-session mapping
await self.redis.setex(phone_key, self.notification_ttl, session.sessionId)
logger.debug(f"Cached notification session: {session.sessionId}")
return True
except Exception as e:
logger.exception(
f"Error caching notification session {session.sessionId}: {e!s}",
)
return False
async def get_notification_session(
self, session_id: str,
) -> NotificationSession | None:
"""Retrieve notification session from Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._notification_key(session_id)
data = await self.redis.get(key)
if not data:
logger.debug(f"Notification session not found in Redis: {session_id}")
return None
try:
session_dict = json.loads(data)
session = NotificationSession.model_validate(session_dict)
logger.info(f"Notification session {session_id} retrieved from Redis")
return session
except Exception as e:
logger.exception(
f"Error deserializing notification session {session_id}: {e!s}",
)
return None
async def get_notification_id_for_phone(self, phone: str) -> str | None:
"""Get notification session ID for a phone number."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
key = self._phone_to_notification_key(phone)
session_id = await self.redis.get(key)
if session_id:
logger.info(f"Session ID {session_id} found for phone")
else:
logger.debug("Session ID not found for phone")
return session_id
async def delete_notification_session(self, phone_number: str) -> bool:
"""Delete notification session from Redis."""
if not self.redis:
msg = "Redis client not connected"
raise RuntimeError(msg)
notification_key = self._notification_key(phone_number)
phone_key = self._phone_to_notification_key(phone_number)
try:
logger.info(f"Deleting notification session for phone {phone_number}")
await self.redis.delete(notification_key)
await self.redis.delete(phone_key)
return True
except Exception as e:
logger.exception(
f"Error deleting notification session for phone {phone_number}: {e!s}",
)
return False