This commit is contained in:
2026-02-19 21:36:58 +00:00
parent 6f629c53a6
commit 03292a635c
37 changed files with 625 additions and 2200 deletions

View File

@@ -0,0 +1,53 @@
from pathlib import Path
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application configuration from environment variables."""
model_config = SettingsConfigDict(env_file=".env")
# GCP General
gcp_project_id: str
gcp_location: str
# RAG
rag_endpoint_url: str
# Firestore
firestore_database_id: str = Field(..., alias="GCP_FIRESTORE_DATABASE_ID")
firestore_host: str = Field(
default="firestore.googleapis.com", alias="GCP_FIRESTORE_HOST"
)
firestore_port: int = Field(default=443, alias="GCP_FIRESTORE_PORT")
firestore_importer_enabled: bool = Field(
default=False, alias="GCP_FIRESTORE_IMPORTER_ENABLE"
)
# Redis
redis_host: str
redis_port: int
redis_pwd: str | None = None
# DLP
dlp_template_complete_flow: str
# Conversation Context
conversation_context_message_limit: int = Field(
default=60, alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT"
)
conversation_context_days_limit: int = Field(
default=30, alias="CONVERSATION_CONTEXT_DAYS_LIMIT"
)
# Logging
log_level: str = Field(default="INFO", alias="LOGGING_LEVEL_ROOT")
@property
def base_path(self) -> Path:
"""Get base path for resources."""
return Path(__file__).parent.parent / "resources"
settings = Settings.model_validate({})

View File

@@ -1,5 +0,0 @@
"""Configuration module."""
from .settings import Settings, get_settings
__all__ = ["Settings", "get_settings"]

View File

@@ -1,113 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Application configuration settings.
"""
from functools import lru_cache
from pathlib import Path
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application configuration from environment variables."""
model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", case_sensitive=False
)
# GCP General
gcp_project_id: str = Field(..., alias="GCP_PROJECT_ID")
gcp_location: str = Field(default="us-central1", alias="GCP_LOCATION")
# Firestore
firestore_database_id: str = Field(..., alias="GCP_FIRESTORE_DATABASE_ID")
firestore_host: str = Field(
default="firestore.googleapis.com", alias="GCP_FIRESTORE_HOST"
)
firestore_port: int = Field(default=443, alias="GCP_FIRESTORE_PORT")
firestore_importer_enabled: bool = Field(
default=False, alias="GCP_FIRESTORE_IMPORTER_ENABLE"
)
# Redis
redis_host: str = Field(..., alias="REDIS_HOST")
redis_port: int = Field(default=6379, alias="REDIS_PORT")
redis_password: str | None = Field(default=None, alias="REDIS_PWD")
redis_ssl: bool = Field(default=False)
# Dialogflow CX
dialogflow_project_id: str = Field(..., alias="DIALOGFLOW_CX_PROJECT_ID")
dialogflow_location: str = Field(..., alias="DIALOGFLOW_CX_LOCATION")
dialogflow_agent_id: str = Field(..., alias="DIALOGFLOW_CX_AGENT_ID")
dialogflow_default_language: str = Field(
default="es", alias="DIALOGFLOW_DEFAULT_LANGUAGE_CODE"
)
# Gemini
gemini_model_name: str = Field(
default="gemini-2.0-flash-001", alias="GEMINI_MODEL_NAME"
)
# Message Filter (Gemini)
message_filter_model: str = Field(
default="gemini-2.0-flash-001", alias="MESSAGE_FILTER_GEMINI_MODEL"
)
message_filter_temperature: float = Field(
default=0.1, alias="MESSAGE_FILTER_TEMPERATURE"
)
message_filter_max_tokens: int = Field(
default=10, alias="MESSAGE_FILTER_MAX_OUTPUT_TOKENS"
)
message_filter_top_p: float = Field(default=0.1, alias="MESSAGE_FILTER_TOP_P")
message_filter_prompt_path: str = Field(default="prompts/message_filter_prompt.txt")
# Notification Context Resolver (Gemini)
notification_context_model: str = Field(
default="gemini-2.0-flash-001", alias="NOTIFICATION_CONTEXT_GEMINI_MODEL"
)
notification_context_temperature: float = Field(
default=0.1, alias="NOTIFICATION_CONTEXT_TEMPERATURE"
)
notification_context_max_tokens: int = Field(
default=1024, alias="NOTIFICATION_CONTEXT_MAX_OUTPUT_TOKENS"
)
notification_context_top_p: float = Field(
default=0.1, alias="NOTIFICATION_CONTEXT_TOP_P"
)
notification_context_prompt_path: str = Field(
default="prompts/notification_context_resolver.txt"
)
# DLP
dlp_template_complete_flow: str = Field(..., alias="DLP_TEMPLATE_COMPLETE_FLOW")
# Conversation Context
conversation_context_message_limit: int = Field(
default=60, alias="CONVERSATION_CONTEXT_MESSAGE_LIMIT"
)
conversation_context_days_limit: int = Field(
default=30, alias="CONVERSATION_CONTEXT_DAYS_LIMIT"
)
# Logging
log_level: str = Field(default="INFO", alias="LOGGING_LEVEL_ROOT")
@property
def dialogflow_endpoint(self) -> str:
"""Get Dialogflow regional endpoint."""
return f"{self.dialogflow_location}-dialogflow.googleapis.com:443"
@property
def base_path(self) -> Path:
"""Get base path for resources."""
return Path(__file__).parent.parent / "resources"
@lru_cache
def get_settings() -> Settings:
"""Get cached settings instance."""
return Settings()

View File

@@ -1,44 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Data purge API endpoints.
"""
import logging
from fastapi import APIRouter, Depends, HTTPException
from ..services.data_purge import DataPurgeService
from ..services.redis_service import RedisService
from ..dependencies import get_redis_service, get_settings
from ..config import Settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/data-purge", tags=["data-purge"])
@router.delete("/all")
async def purge_all_data(
redis_service: RedisService = Depends(get_redis_service),
settings: Settings = Depends(get_settings),
) -> None:
"""
Purge all data from Redis and Firestore.
WARNING: This is a destructive operation that will delete all conversation
and notification data from both Redis and Firestore.
"""
logger.warning(
"Received request to purge all data. This is a destructive operation."
)
try:
purge_service = DataPurgeService(settings, redis_service)
await purge_service.purge_all_data()
await purge_service.close()
logger.info("Successfully purged all data")
except Exception as e:
logger.error(f"Error purging all data: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -1,99 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
LLM webhook API endpoints for Dialogflow CX integration.
"""
import logging
from fastapi import APIRouter, Depends
from ..models.llm_webhook import WebhookRequestDTO, WebhookResponseDTO, SessionInfoDTO
from ..services.llm_response_tuner import LlmResponseTunerService
from ..dependencies import get_llm_response_tuner
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/llm", tags=["llm-webhook"])
@router.post("/tune-response", response_model=WebhookResponseDTO)
async def tune_response(
request: WebhookRequestDTO,
llm_tuner: LlmResponseTunerService = Depends(get_llm_response_tuner),
) -> WebhookResponseDTO:
"""
Dialogflow CX webhook to retrieve pre-generated LLM responses.
This endpoint is called by Dialogflow when an intent requires an
LLM-generated response. The UUID is passed in session parameters,
and the service retrieves the pre-computed response from Redis.
Flow:
1. Dialogflow sends webhook request with UUID in parameters
2. Service retrieves response from Redis (1-hour TTL)
3. Returns response in session parameters
Args:
request: Webhook request containing session info with UUID
Returns:
Webhook response with response text or error
Raises:
HTTPException: 400 for validation errors, 500 for internal errors
"""
try:
# Extract UUID from session parameters
uuid = request.sessionInfo.parameters.get("uuid")
if not uuid:
logger.error("No UUID provided in webhook request")
return _create_error_response("UUID parameter is required", is_error=True)
# Retrieve response from Redis
response_text = await llm_tuner.get_value(uuid)
if response_text:
# Success - return response
logger.info(f"Successfully retrieved response for UUID: {uuid}")
return WebhookResponseDTO(
sessionInfo=SessionInfoDTO(
parameters={
"webhook_success": True,
"response": response_text,
}
)
)
else:
# Not found
logger.warning(f"No response found for UUID: {uuid}")
return _create_error_response(
"No response found for the given UUID.", is_error=False
)
except Exception as e:
logger.error(f"Error in tune-response webhook: {e}", exc_info=True)
return _create_error_response("An internal error occurred.", is_error=True)
def _create_error_response(error_message: str, is_error: bool) -> WebhookResponseDTO:
"""
Create error response for webhook.
Args:
error_message: Error message to return
is_error: Whether this is a critical error
Returns:
Webhook response with error info
"""
return WebhookResponseDTO(
sessionInfo=SessionInfoDTO(
parameters={
"webhook_success": False,
"error_message": error_message,
}
)
)

View File

@@ -1,112 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Quick Replies API endpoints.
"""
import logging
from fastapi import APIRouter, Depends, HTTPException
from ..models.quick_replies import QuickReplyScreenRequestDTO
from ..models import DetectIntentResponseDTO
from ..services.quick_reply_content import QuickReplyContentService
from ..services.redis_service import RedisService
from ..services.firestore_service import FirestoreService
from ..utils.session_id import generate_session_id
from ..models.conversation import ConversationSessionDTO, ConversationEntryDTO
from ..dependencies import get_redis_service, get_firestore_service, get_settings
from ..config import Settings
from datetime import datetime
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"])
@router.post("/screen", response_model=DetectIntentResponseDTO)
async def start_quick_reply_session(
request: QuickReplyScreenRequestDTO,
redis_service: RedisService = Depends(get_redis_service),
firestore_service: FirestoreService = Depends(get_firestore_service),
settings: Settings = Depends(get_settings),
) -> DetectIntentResponseDTO:
"""
Start a quick reply FAQ session for a specific screen.
Creates a conversation session with pantalla_contexto set,
loads the quick reply questions for the screen, and returns them.
Args:
request: Quick reply screen request
Returns:
Detect intent response with quick reply questions
"""
try:
telefono = request.telefono
if not telefono or not telefono.strip():
raise ValueError("Phone number is required")
# Generate session ID
session_id = generate_session_id()
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
# Create system entry
system_entry = ConversationEntryDTO(
entity="SISTEMA",
type="INICIO",
timestamp=datetime.now(),
text=f"Pantalla: {request.pantalla_contexto} Agregada a la conversacion",
parameters=None,
intent=None,
)
# Create new session with pantalla_contexto
new_session = ConversationSessionDTO(
sessionId=session_id,
userId=user_id,
telefono=telefono,
createdAt=datetime.now(),
lastModified=datetime.now(),
lastMessage=system_entry.text,
pantallaContexto=request.pantalla_contexto,
)
# Save session and entry
await redis_service.save_session(new_session)
logger.info(
f"Created quick reply session {session_id} for screen: {request.pantalla_contexto}"
)
# Load quick replies
content_service = QuickReplyContentService(settings)
quick_reply_dto = await content_service.get_quick_replies(
request.pantalla_contexto
)
if not quick_reply_dto:
raise ValueError(
f"Quick reply screen not found: {request.pantalla_contexto}"
)
# Background save to Firestore
try:
await firestore_service.save_session(new_session)
await firestore_service.save_entry(session_id, system_entry)
except Exception as e:
logger.error(f"Background Firestore save failed: {e}")
return DetectIntentResponseDTO(
responseId=session_id,
queryResult=None,
quick_replies=quick_reply_dto,
)
except ValueError as e:
logger.error(f"Validation error: {e}", exc_info=True)
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error starting quick reply session: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -1,196 +1,62 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
FastAPI dependency injection.
"""
from .config import get_settings, Settings
from functools import lru_cache
from .config import settings
from .services import (
DialogflowClientService,
GeminiClientService,
ConversationManagerService,
MessageEntryFilter,
NotificationManagerService,
NotificationContextResolver,
QuickReplyContentService,
DLPService,
LlmResponseTunerService,
)
from .services.redis_service import RedisService
from .services.firestore_service import FirestoreService
from .services.rag_service import RAGService
# Global service instances (initialized at startup)
_dialogflow_client: DialogflowClientService | None = None
_gemini_client: GeminiClientService | None = None
_message_filter: MessageEntryFilter | None = None
_notification_context_resolver: NotificationContextResolver | None = None
_dlp_service: DLPService | None = None
_redis_service: RedisService | None = None
_firestore_service: FirestoreService | None = None
_conversation_manager: ConversationManagerService | None = None
_notification_manager: NotificationManagerService | None = None
_llm_response_tuner: LlmResponseTunerService | None = None
def init_services(settings: Settings):
"""Initialize all services at startup."""
global \
_dialogflow_client, \
_gemini_client, \
_message_filter, \
_notification_context_resolver, \
_dlp_service, \
_redis_service, \
_firestore_service, \
_conversation_manager, \
_notification_manager, \
_llm_response_tuner
_dialogflow_client = DialogflowClientService(settings)
_gemini_client = GeminiClientService(settings)
_message_filter = MessageEntryFilter(settings, _gemini_client)
_notification_context_resolver = NotificationContextResolver(
settings, _gemini_client
)
_dlp_service = DLPService(settings)
_redis_service = RedisService(settings)
_firestore_service = FirestoreService(settings)
# Note: LlmResponseTunerService will be initialized after Redis connects
_llm_response_tuner = None
# Initialize notification manager (without llm_response_tuner)
_notification_manager = NotificationManagerService(
settings=settings,
dialogflow_client=_dialogflow_client,
redis_service=_redis_service,
firestore_service=_firestore_service,
dlp_service=_dlp_service,
)
# Note: ConversationManagerService will be fully initialized after Redis connects
# For now, initialize with placeholder for llm_response_tuner
_conversation_manager = None
async def startup_services():
"""Connect services at startup."""
global \
_redis_service, \
_llm_response_tuner, \
_conversation_manager, \
_dialogflow_client, \
_message_filter, \
_notification_context_resolver, \
_dlp_service, \
_firestore_service
settings = get_settings()
if _redis_service:
await _redis_service.connect()
# Initialize LLM Response Tuner after Redis connects
if _redis_service.redis:
_llm_response_tuner = LlmResponseTunerService(_redis_service.redis)
# Now initialize ConversationManagerService with all dependencies
_conversation_manager = ConversationManagerService(
settings=settings,
dialogflow_client=_dialogflow_client,
redis_service=_redis_service,
firestore_service=_firestore_service,
dlp_service=_dlp_service,
message_filter=_message_filter,
notification_context_resolver=_notification_context_resolver,
llm_response_tuner=_llm_response_tuner,
)
async def shutdown_services():
"""Clean up services at shutdown."""
global _dialogflow_client, _redis_service, _firestore_service, _dlp_service
if _dialogflow_client:
await _dialogflow_client.close()
if _redis_service:
await _redis_service.close()
if _firestore_service:
await _firestore_service.close()
if _dlp_service:
await _dlp_service.close()
def get_conversation_manager() -> ConversationManagerService:
"""Get conversation manager instance."""
if _conversation_manager is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _conversation_manager
def get_dialogflow_client() -> DialogflowClientService:
"""Get Dialogflow client instance."""
if _dialogflow_client is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _dialogflow_client
@lru_cache(maxsize=1)
def get_redis_service() -> RedisService:
"""Get Redis service instance."""
if _redis_service is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _redis_service
return RedisService(settings)
@lru_cache(maxsize=1)
def get_firestore_service() -> FirestoreService:
"""Get Firestore service instance."""
if _firestore_service is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _firestore_service
def get_gemini_client() -> GeminiClientService:
"""Get Gemini client instance."""
if _gemini_client is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _gemini_client
def get_message_filter() -> MessageEntryFilter:
"""Get message filter instance."""
if _message_filter is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _message_filter
def get_notification_manager() -> NotificationManagerService:
"""Get notification manager instance."""
if _notification_manager is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _notification_manager
return FirestoreService(settings)
@lru_cache(maxsize=1)
def get_dlp_service() -> DLPService:
"""Get DLP service instance."""
if _dlp_service is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _dlp_service
return DLPService(settings)
@lru_cache(maxsize=1)
def get_quick_reply_content_service() -> QuickReplyContentService:
"""Get quick reply content service instance."""
return QuickReplyContentService(settings)
def get_notification_context_resolver() -> NotificationContextResolver:
"""Get notification context resolver instance."""
if _notification_context_resolver is None:
raise RuntimeError("Services not initialized. Call init_services first.")
return _notification_context_resolver
@lru_cache(maxsize=1)
def get_notification_manager() -> NotificationManagerService:
"""Get notification manager instance."""
return NotificationManagerService(
settings,
redis_service=get_redis_service(),
firestore_service=get_firestore_service(),
dlp_service=get_dlp_service(),
)
@lru_cache(maxsize=1)
def get_rag_service() -> RAGService:
"""Get RAG service instance."""
return RAGService(settings)
def get_llm_response_tuner() -> LlmResponseTunerService:
"""Get LLM response tuner instance."""
if _llm_response_tuner is None:
raise RuntimeError("Services not initialized. Call startup_services first.")
return _llm_response_tuner
@lru_cache(maxsize=1)
def get_conversation_manager() -> ConversationManagerService:
"""Get conversation manager instance."""
return ConversationManagerService(
settings,
redis_service=get_redis_service(),
firestore_service=get_firestore_service(),
dlp_service=get_dlp_service(),
rag_service=get_rag_service(),
)

View File

@@ -0,0 +1,17 @@
class FirestorePersistenceException(Exception):
"""
Exception raised when Firestore operations fail.
This is typically caught and logged without failing the request.
"""
def __init__(self, message: str, cause: Exception | None = None):
"""
Initialize Firestore persistence exception.
Args:
message: Error message
cause: Original exception that caused this error
"""
super().__init__(message)
self.cause = cause

View File

@@ -1,25 +1,11 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Main FastAPI application.
"""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .config import get_settings
from .controllers import (
conversation_router,
notification_router,
llm_webhook_router,
quick_replies_router,
data_purge_router,
)
from .config import settings
from .routers import conversation_router, notification_router, quick_replies_router
from .dependencies import init_services, startup_services, shutdown_services
@@ -32,10 +18,9 @@ logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
async def lifespan(_: FastAPI):
"""Application lifespan manager."""
# Startup
settings = get_settings()
logger.info("Initializing services...")
init_services(settings)
await startup_services()
@@ -49,8 +34,6 @@ async def lifespan(app: FastAPI):
logger.info("Application shutdown complete")
def create_app() -> FastAPI:
"""Create and configure FastAPI application."""
app = FastAPI(
title="Capa de Integración - Orchestrator Service",
description="Conversational AI orchestrator for Dialogflow CX, Gemini, and Vertex AI",
@@ -59,8 +42,10 @@ def create_app() -> FastAPI:
)
# CORS middleware
# Note: Type checker reports false positive for CORSMiddleware
# This is the correct FastAPI pattern per official documentation
app.add_middleware(
CORSMiddleware,
CORSMiddleware, # ty: ignore
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
@@ -70,21 +55,13 @@ def create_app() -> FastAPI:
# Register routers
app.include_router(conversation_router)
app.include_router(notification_router)
app.include_router(llm_webhook_router)
app.include_router(quick_replies_router)
app.include_router(data_purge_router)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "service": "capa-de-integracion"}
return app
# Create app instance
app = create_app()
def main():
"""Entry point for CLI."""

View File

@@ -12,13 +12,11 @@ from .conversation import (
EventInputDTO,
QueryParamsDTO,
QueryResultDTO,
MessageType,
ConversationEntryType,
)
from .notification import (
ExternalNotRequestDTO,
NotificationSessionDTO,
NotificationDTO,
ExternalNotificationRequest,
NotificationSession,
Notification,
)
__all__ = [
@@ -34,10 +32,8 @@ __all__ = [
"EventInputDTO",
"QueryParamsDTO",
"QueryResultDTO",
"MessageType",
"ConversationEntryType",
# Notification
"ExternalNotRequestDTO",
"NotificationSessionDTO",
"NotificationDTO",
"ExternalNotificationRequest",
"NotificationSession",
"Notification",
]

View File

@@ -1,31 +1,8 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Conversation-related data models.
"""
from datetime import datetime
from enum import Enum
from typing import Any
from typing import Any, Literal
from pydantic import BaseModel, Field, field_validator
class MessageType(str, Enum):
"""Message type enumeration."""
USER = "USER"
AGENT = "AGENT"
class ConversationEntryType(str, Enum):
"""Conversation entry type enumeration."""
INICIO = "INICIO"
CONVERSACION = "CONVERSACION"
LLM = "LLM"
class UsuarioDTO(BaseModel):
"""User information."""
@@ -102,7 +79,6 @@ class ExternalConvRequestDTO(BaseModel):
mensaje: str = Field(..., alias="mensaje")
usuario: UsuarioDTO = Field(..., alias="usuario")
canal: str = Field(..., alias="canal")
tipo: ConversationEntryType = Field(..., alias="tipo")
pantalla_contexto: str | None = Field(None, alias="pantallaContexto")
model_config = {"populate_by_name": True}
@@ -123,7 +99,7 @@ class ConversationMessageDTO(BaseModel):
class ConversationEntryDTO(BaseModel):
"""Single conversation entry."""
entity: str = Field(..., alias="entity") # "USUARIO", "AGENTE", "SISTEMA", "LLM"
entity: Literal['user', 'assistant']
type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM"
timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp")
text: str = Field(..., alias="text")
@@ -148,7 +124,12 @@ class ConversationSessionDTO(BaseModel):
@classmethod
def create(
cls, session_id: str, user_id: str, telefono: str
cls,
session_id: str,
user_id: str,
telefono: str,
pantalla_contexto: str | None = None,
last_message: str | None = None,
) -> "ConversationSessionDTO":
"""Create a new conversation session."""
now = datetime.now()
@@ -158,6 +139,8 @@ class ConversationSessionDTO(BaseModel):
telefono=telefono,
createdAt=now,
lastModified=now,
pantallaContexto=pantalla_contexto,
lastMessage=last_message,
)
def with_last_message(self, last_message: str) -> "ConversationSessionDTO":

View File

@@ -1,34 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
LLM webhook data models for Dialogflow CX webhook integration.
"""
from typing import Any
from pydantic import BaseModel, Field
class SessionInfoDTO(BaseModel):
"""Session info containing parameters."""
parameters: dict[str, Any] = Field(default_factory=dict)
class WebhookRequestDTO(BaseModel):
"""Dialogflow CX webhook request."""
sessionInfo: SessionInfoDTO = Field(
default_factory=SessionInfoDTO, alias="sessionInfo"
)
model_config = {"populate_by_name": True}
class WebhookResponseDTO(BaseModel):
"""Dialogflow CX webhook response."""
sessionInfo: SessionInfoDTO = Field(..., alias="sessionInfo")
model_config = {"populate_by_name": True}

View File

@@ -1,17 +1,9 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Notification-related data models.
"""
from datetime import datetime
from typing import Any
from pydantic import BaseModel, Field
class NotificationDTO(BaseModel):
class Notification(BaseModel):
"""
Individual notification event record.
@@ -49,8 +41,45 @@ class NotificationDTO(BaseModel):
model_config = {"populate_by_name": True}
@classmethod
def create(
cls,
id_notificacion: str,
telefono: str,
texto: str,
nombre_evento_dialogflow: str = "notificacion",
codigo_idioma_dialogflow: str = "es",
parametros: dict[str, Any] | None = None,
status: str = "active",
) -> "Notification":
"""
Create a new Notification with auto-filled timestamp.
class NotificationSessionDTO(BaseModel):
Args:
id_notificacion: Unique notification ID
telefono: User phone number
texto: Notification text content
nombre_evento_dialogflow: Dialogflow event name
codigo_idioma_dialogflow: Dialogflow language code
parametros: Session parameters for Dialogflow
status: Notification status
Returns:
New Notification instance with current timestamp
"""
return cls(
idNotificacion=id_notificacion,
telefono=telefono,
timestampCreacion=datetime.now(),
texto=texto,
nombreEventoDialogflow=nombre_evento_dialogflow,
codigoIdiomaDialogflow=codigo_idioma_dialogflow,
parametros=parametros or {},
status=status,
)
class NotificationSession(BaseModel):
"""Notification session containing multiple notifications for a phone number."""
sessionId: str = Field(..., alias="sessionId", description="Session identifier")
@@ -65,7 +94,7 @@ class NotificationSessionDTO(BaseModel):
alias="ultimaActualizacion",
description="Last update time",
)
notificaciones: list[NotificationDTO] = Field(
notificaciones: list[Notification] = Field(
default_factory=list,
alias="notificaciones",
description="List of notification events",
@@ -74,10 +103,10 @@ class NotificationSessionDTO(BaseModel):
model_config = {"populate_by_name": True}
class ExternalNotRequestDTO(BaseModel):
class ExternalNotificationRequest(BaseModel):
"""External notification push request from client."""
texto: str = Field(..., alias="texto", description="Notification text")
texto: str = Field(..., min_length=1)
telefono: str = Field(..., alias="telefono", description="User phone number")
parametros_ocultos: dict[str, Any] | None = Field(
None, alias="parametrosOcultos", description="Hidden parameters (metadata)"

View File

@@ -1,15 +1,7 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Quick Replies data models.
"""
from pydantic import BaseModel, Field
class QuestionDTO(BaseModel):
class QuickReplyQuestions(BaseModel):
"""Individual FAQ question."""
titulo: str
@@ -17,27 +9,11 @@ class QuestionDTO(BaseModel):
respuesta: str
class QuickReplyDTO(BaseModel):
class QuickReplyScreen(BaseModel):
"""Quick reply screen with questions."""
header: str | None = None
body: str | None = None
button: str | None = None
header_section: str | None = None
preguntas: list[QuestionDTO] = Field(default_factory=list)
class QuickReplyScreenRequestDTO(BaseModel):
"""Request to load a quick reply screen."""
usuario: dict = Field(..., alias="usuario")
canal: str = Field(..., alias="canal")
tipo: str = Field(..., alias="tipo")
pantalla_contexto: str = Field(..., alias="pantallaContexto")
model_config = {"populate_by_name": True}
@property
def telefono(self) -> str:
"""Extract phone number from usuario."""
return self.usuario.get("telefono", "")
preguntas: list[QuickReplyQuestions] = Field(default_factory=list)

View File

@@ -1,93 +0,0 @@
Hay un sistema de conversaciones entre un agente y un usuario. Durante
la conversación, una notificación puede entrar a la conversación de forma
abrupta, de tal forma que la siguiente interacción del usuario después
de la notificación puede corresponder a la conversación que estaba
sucediendo o puede ser un seguimiento a la notificación.
Tu tarea es identificar si la siguiente interacción del usuario es un
seguimiento a la notificación o una continuación de la conversación.
Recibirás esta información:
- HISTORIAL_CONVERSACION: El diálogo entre el agente y el usuario antes
de la notificación.
- INTERRUPCION_NOTIFICACION: La notificación. Esta puede o no traer parámetros
los cuales refieren a detalles específicos de la notificación. Por ejemplo:
{ "vigencia": “12 de septiembre de 2025”, "credito_tipo" : "platinum" }
- INTERACCION_USUARIO: La siguiente interacción del usuario después de
la notificación.
Reglas:
- Solo debes responder una palabra: NOTIFICATION o CONVERSATION. No agregues
o inventes otra palabra.
- Clasifica como NOTIFICATION si la siguiente interacción del usuario
es una clara respuesta o seguimiento a la notificación.
- Clasifica como CONVERSATION si la siguiente interacción del usuario
es un claro seguimiento al histórico de la conversación.
- Si la siguiente interacción del usuario es ambigua, clasifica
como CONVERSATION.
Ejemplos:
Ejemplo 1:
HISTORIAL_CONVERSACION:
Agente: Claro, para un crédito de vehículo, las tasas actuales inician en el 1.2%% mensual.
Usuario: Entiendo, ¿y el plazo máximo de cuánto sería?
INTERRUPCION_NOTIFICACION:
Tu pago de la tarjeta de crédito por $1,500.00 ha sido procesado.
INTERACCION_USUARIO:
perfecto, cuando es la fecha de corte?
Clasificación: NOTIFICACION
Ejemplo 2:
HISTORIAL_CONVERSACION:
Agente: No es necesario, puedes completar todo el proceso para abrir tu cuenta desde nuestra app.
Usuario: Ok
Agente: ¿Necesitas algo más?
INTERRUPCION_NOTIFICACION:
Tu estado de cuenta de Julio ya está disponible.
Parametros: {"fecha_corte": "30 de Agosto del 2025", "tipo_cuenta": "credito"}
INTERACCION_USUARIO:
que documentos necesito?
Clasificación: CONVERSACION
Ejemplo 3:
HISTORIAL_CONVERSACION:
Agente: Ese fondo de inversión tiene un perfil de alto riesgo, pero históricamente ha dado un rendimiento superior al 15%% anual.
Usuario: ok, entiendo
INTERRUPCION_NOTIFICACION:
Alerta: Tu cuenta de ahorros tiene un saldo bajo de $50.00.
Parametros: {"fecha_retiro": "5 de septiembre del 2025", "tipo_cuenta": "ahorros"}
INTERACCION_USUARIO:
cuando fue el ultimo retiro?
Clasificación: NOTIFICACION
Ejemplo 4:
HISTORIAL_CONVERSACION:
Usuario: Que es el CAT?
Agente: El CAT (Costo Anual Total) es un indicador financiero, expresado en un porcentaje anual, que refleja el costo total de un crédito, incluyendo no solo la tasa de interés, sino también todas las comisiones, gastos y otros cobros que genera.
INTERRUPCION_NOTIFICACION:
Alerta: Se realizó un retiro en efectivo por $100.
INTERACCION_USUARIO:
y este se aplica solo si dejo de pagar?
Clasificación: CONVERSACION
Ejemplo 5:
HISTORIAL_CONVERSACION:
Usuario: Cual es la tasa de hipoteca que manejan?
Agente: La tasa de una hipoteca depende tanto de factores económicos generales (inflación, tasas de referencia del banco central) como de factores individuales del solicitante (historial crediticio, monto del pago inicial, ingresos, endeudamiento, etc.)
INTERRUPCION_NOTIFICACION:
Hola, [Alias]: Pasó algo con la captura de tu INE y no se completó tu solicitud de tarjeta de crédito con folio 3421.
Parametros: {“solicitud_tarjeta_credito_vigencia”: “12 de septiembre de 2025”, “solicitud_tarjeta_credito_error”: “Error con el formato de la captura”, “solicitud_tarjeta_credito_tipo” : “platinum” }
INTERACCION_USUARIO:
cual fue el error?
Clasificación: NOTIFICACION
Tarea:
HISTORIAL_CONVERSACION:
%s
INTERRUPCION_NOTIFICACION:
%s
INTERACCION_USUARIO:
%s
Clasificación:

View File

@@ -1,84 +0,0 @@
Eres un agente conversacional de soporte al usuario, amable, servicial y conciso.
Recibirás cuatro piezas de información:
1. HISTORIAL_CONVERSACION: El diálogo previo con el usuario. Úsalo para entender el contexto y evitar repetir información.
2. NOTIFICACION: El texto del mensaje que el usuario acaba de recibir.
3. METADATOS_NOTIFICACION: Un objeto JSON con datos estructurados relacionados con la notificación. Esta es tu fuente de verdad principal.
4. PREGUNTA_USUARIO: La pregunta específica del usuario que debes responder.
Tu objetivo es sintetizar la información de estas fuentes para dar la respuesta más directa y útil posible.
**Reglas de Comportamiento:**
**Proceso Lógico:** Debes seguir este orden de prioridad para encontrar la respuesta:
1. Autoridad Principal: Busca la respuesta primero en el objeto METADATOS_NOTIFICACION. Los datos aquí tienen la máxima autoridad.
2. Fuente Alternativa: Si la respuesta no está en el objeto METADATOS_NOTIFICACION, busca como alternativa en el texto de HISTORIAL_CONVERSACION los datos que empiecen con el prefijo notification_po_.
3. Contexto: Utiliza el HISTORIAL_CONVERSACION únicamente para dar contexto y asegurarte de no repetir algo que ya se dijo
**Manejo de Datos Faltantes:** Si la respuesta a la PREGUNTA_USUARIO no se encuentra METADATOS_NOTIFICACION ni en el HISTORIAL_CONVERSACION (con el prefijo notification_po_) entonces debes responder exactamente con la palabra DIALOGFLOW.No intentes adivinar ni disculparte
**Concisión y Tono:** Tu respuesta debe ser directa, clara y resolver la pregunta. Mantén un tono profesional, amable y servicial.
**Idioma:** Responde siempre en el mismo idioma de la PREGUNTA_USUARIO.
Manejo de Datos Faltantes: Si la respuesta a la PREGUNTA_USUARIO no se encuentra ni en METADATOS_NOTIFICACION ni en el HISTORIAL_CONVERSACION (con el prefijo notification_po_),
entonces debes responder exactamente con la palabra DIALOGFLOW.
No intentes adivinar ni disculparte.
Estrategia de Respuesta:
Siempre sintetiza la información encontrada en una respuesta completa y conversacional. No devuelvas solo el dato. Utiliza el dato para construir una frase que sea útil y siga el tono. Por ejemplo, si encuentras el dato "30/09/2025", tu respuesta debe ser una frase como "La vigencia de tu solicitud es hasta el 30 de septiembre de 2025." o similar.
**Ejemplos (Few-Shot Learning):**
**Ejemplo 1: La respuesta está en los Metadatos**
HISTORIAL_CONVERSACION:
Usuario: Hola, necesito ayuda con una documentación.
Agente: Claro, ¿en qué puedo ayudarte?
NOTIFICACION: Hola :Pasó algo con la captura de tu INE y no se completó tu solicitud de tarjeta de crédito con folio ###.¡Reinténtalo cuando quieras! Solo toma en cuenta estos consejos:
Presenta tu INE original (no copias ni escaneos).📅Revisa que esté vigente y sin tachaduras.📷 Confirma que la fotografía sea clara.🏠 Asegúrate de que la dirección sea legible.
Estamos listos para recibirte.
METADATOS_NOTIFICACION: {
"parametrosOcultos": {
"vigencia": "30/09/2025"
}
}
PREGUNTA_USUARIO: ¿Hasta cuando esta disponible esta solicitud?
Respuesta: Tienes hasta el 30 de septiembre de 2025 para revisarlos.
**Ejemplo 2: Poca Información encontrada en texto de Notificacion *
HISTORIAL_CONVERSACION:
Usuario: Hola.
Agente: ¡Qué onda! Soy Beto, tu asistente virtual de Sigma. ¿Como te puedo ayudar hoy? 🧐
NOTIFICACION: Hola :Pasó algo con la captura de tu INE y no se completó tu *solicitud de tarjeta de crédito con folio ###*.
¡Reinténtalo cuando quieras! Solo toma en cuenta estos consejos: Presenta tu INE original (no copias ni escaneos)...
Estamos listos para recibirte.
METADATOS_NOTIFICACION: {
"parametrosOcultos": {
"vigencia": "30/09/2025"
}
}
PREGUNTA_USUARIO: Mi INE tiene algunas tachaduras y en general esta en mal estado
Respuesta: DIALOGFLOW
**Ejemplo 3: Información no encontrada en ninguna fuente**
HISTORIAL_CONVERSACION:
Usuario: ¿Cómo van mis trámites?
Agente: Veo que tienes una cita de mantenimiento programada.
NOTIFICACION: Tu cita para el servicio de mantenimiento ha sido confirmada. Por favor, llega 15 minutos antes.
METADATOS_NOTIFICACION: {
"tipo_servicio": "mantenimiento rutinario",
"ubicacion": "Sucursal Centro",
"id_cita": "C-182736"
}
PREGUNTA_USUARIO: Perfecto, ¿cuál será el costo del mantenimiento?
Respuesta: DIALOGFLOW
Historial de Conversación:
%s
Notificación:
%s
Metadatos de la Notificación:
%s
Pregunta del Usuario:
%s

View File

@@ -1,15 +1,11 @@
"""Controllers module."""
"""Routers module."""
from .conversation import router as conversation_router
from .notification import router as notification_router
from .llm_webhook import router as llm_webhook_router
from .quick_replies import router as quick_replies_router
from .data_purge import router as data_purge_router
__all__ = [
"conversation_router",
"notification_router",
"llm_webhook_router",
"quick_replies_router",
"data_purge_router",
]

View File

@@ -1,11 +1,3 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Conversation API endpoints.
"""
import logging
from fastapi import APIRouter, Depends, HTTPException
@@ -15,6 +7,7 @@ from ..dependencies import get_conversation_manager
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/dialogflow", tags=["conversation"])

View File

@@ -1,15 +1,7 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Notification API endpoints.
"""
import logging
from fastapi import APIRouter, Depends, HTTPException
from ..models.notification import ExternalNotRequestDTO
from ..models.notification import ExternalNotificationRequest
from ..services.notification_manager import NotificationManagerService
from ..dependencies import get_notification_manager
@@ -20,7 +12,7 @@ router = APIRouter(prefix="/api/v1/dialogflow", tags=["notifications"])
@router.post("/notification", status_code=200)
async def process_notification(
request: ExternalNotRequestDTO,
request: ExternalNotificationRequest,
notification_manager: NotificationManagerService = Depends(
get_notification_manager
),

View File

@@ -0,0 +1,81 @@
import logging
from fastapi import APIRouter, Depends, HTTPException
from uuid import uuid4
from pydantic import BaseModel
from ..models.quick_replies import QuickReplyScreen
from ..services.quick_reply_content import QuickReplyContentService
from ..services.redis_service import RedisService
from ..services.firestore_service import FirestoreService
from ..dependencies import get_redis_service, get_firestore_service, get_quick_reply_content_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"])
class QuickReplyUser(BaseModel):
telefono: str
nombre: str
class QuickReplyScreenRequest(BaseModel):
usuario: QuickReplyUser
pantallaContexto: str
model_config = {"populate_by_name": True}
class QuickReplyScreenResponse(BaseModel):
responseId: str
quick_replies: QuickReplyScreen
@router.post("/screen")
async def start_quick_reply_session(
request: QuickReplyScreenRequest,
redis_service: RedisService = Depends(get_redis_service),
firestore_service: FirestoreService = Depends(get_firestore_service),
quick_reply_content_service: QuickReplyContentService = Depends(get_quick_reply_content_service)
) -> QuickReplyScreenResponse:
"""
Start a quick reply FAQ session for a specific screen.
Creates a conversation session with pantalla_contexto set,
loads the quick reply questions for the screen, and returns them.
Args:
request: Quick reply screen request
Returns:
Detect intent response with quick reply questions
"""
try:
telefono = request.usuario.telefono
pantalla_contexto = request.pantallaContexto
if not telefono or not telefono.strip():
raise ValueError("Phone number is required")
session = await firestore_service.get_session_by_phone(telefono)
if session:
session_id = session.sessionId
await firestore_service.update_pantalla_contexto(session_id, pantalla_contexto)
session.pantallaContexto = pantalla_contexto
else:
session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
session = await firestore_service.create_session(session_id, user_id, telefono, pantalla_contexto)
# Cache session
await redis_service.save_session(session)
logger.info(f"Created quick reply session {session_id} for screen: {pantalla_contexto}")
# Load quick replies
quick_replies = await quick_reply_content_service.get_quick_replies(pantalla_contexto)
return QuickReplyScreenResponse(responseId=session_id, quick_replies=quick_replies)
except ValueError as e:
logger.error(f"Validation error: {e}", exc_info=True)
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error starting quick reply session: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -1,25 +1,16 @@
"""Services module."""
from .dialogflow_client import DialogflowClientService
from .gemini_client import GeminiClientService, GeminiClientException
from .conversation_manager import ConversationManagerService
from .message_filter import MessageEntryFilter
from .notification_manager import NotificationManagerService
from .notification_context_resolver import NotificationContextResolver
from .dlp_service import DLPService
from .llm_response_tuner import LlmResponseTunerService
from .quick_reply_content import QuickReplyContentService
from .mappers import NotificationContextMapper, ConversationContextMapper
__all__ = [
"DialogflowClientService",
"GeminiClientService",
"GeminiClientException",
"QuickReplyContentService",
"ConversationManagerService",
"MessageEntryFilter",
"NotificationManagerService",
"NotificationContextResolver",
"DLPService",
"LlmResponseTunerService",
"NotificationContextMapper",
"ConversationContextMapper",
]

View File

@@ -1,14 +1,7 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Conversation manager service - central orchestrator for conversations.
"""
import logging
import uuid
from datetime import datetime, timedelta
from typing import Any
from ..config import Settings
from ..models import (
@@ -21,14 +14,10 @@ from ..models import (
TextInputDTO,
QueryParamsDTO,
)
from ..utils import SessionIdGenerator
from .dialogflow_client import DialogflowClientService
from .redis_service import RedisService
from .firestore_service import FirestoreService
from .dlp_service import DLPService
from .message_filter import MessageEntryFilter
from .notification_context_resolver import NotificationContextResolver
from .llm_response_tuner import LlmResponseTunerService
from .rag_service import RAGService
from .mappers import NotificationContextMapper, ConversationContextMapper
from .quick_reply_content import QuickReplyContentService
@@ -62,23 +51,17 @@ class ConversationManagerService:
def __init__(
self,
settings: Settings,
dialogflow_client: DialogflowClientService,
rag_service: RAGService,
redis_service: RedisService,
firestore_service: FirestoreService,
dlp_service: DLPService,
message_filter: MessageEntryFilter,
notification_context_resolver: NotificationContextResolver,
llm_response_tuner: LlmResponseTunerService,
):
"""Initialize conversation manager."""
self.settings = settings
self.dialogflow_client = dialogflow_client
self.rag_service = rag_service
self.redis_service = redis_service
self.firestore_service = firestore_service
self.dlp_service = dlp_service
self.message_filter = message_filter
self.notification_context_resolver = notification_context_resolver
self.llm_response_tuner = llm_response_tuner
# Initialize mappers
self.notification_mapper = NotificationContextMapper()
@@ -117,18 +100,10 @@ class ConversationManagerService:
request.mensaje,
self.settings.dlp_template_complete_flow,
)
obfuscated_request = ExternalConvRequestDTO(
mensaje=obfuscated_message,
usuario=request.usuario,
canal=request.canal,
tipo=request.tipo,
pantalla_contexto=request.pantalla_contexto,
)
request.mensaje = obfuscated_message
# Step 2: Check for pantallaContexto in existing session
telefono = request.usuario.telefono
existing_session = await self.redis_service.get_session(telefono)
existing_session = await self.redis_service.get_session(request.usuario.telefono)
if existing_session and existing_session.pantallaContexto:
# Check if pantallaContexto is stale (10 minutes)
@@ -137,16 +112,14 @@ class ConversationManagerService:
f"Detected 'pantallaContexto' in session: {existing_session.pantallaContexto}. "
f"Delegating to QuickReplies flow."
)
return await self._manage_quick_reply_conversation(
obfuscated_request, existing_session
)
return await self._manage_quick_reply_conversation(request, existing_session)
else:
logger.info(
"Detected STALE 'pantallaContexto'. Ignoring and proceeding with normal flow."
)
# Step 3: Continue with standard conversation flow
return await self._continue_managing_conversation(obfuscated_request)
return await self._continue_managing_conversation(request)
except Exception as e:
logger.error(f"Error managing conversation: {str(e)}", exc_info=True)
@@ -183,7 +156,10 @@ class ConversationManagerService:
)
# Add pantallaContexto to parameters
if dialogflow_request.query_params:
if (
dialogflow_request.query_params
and dialogflow_request.query_params.parameters
):
dialogflow_request.query_params.parameters["pantalla_contexto"] = (
session.pantallaContexto
)
@@ -395,6 +371,10 @@ class ConversationManagerService:
dialogflow_request = self._build_dialogflow_request(
request, new_session, request.mensaje
)
if (
dialogflow_request.query_params
and dialogflow_request.query_params.parameters
):
dialogflow_request.query_params.parameters[self.CONV_HISTORY_PARAM] = (
conversation_history
)
@@ -409,7 +389,7 @@ class ConversationManagerService:
async def _start_notification_conversation(
self,
request: ExternalConvRequestDTO,
notification: any,
notification: Any,
session: ConversationSessionDTO,
conversation_entries: list[ConversationEntryDTO],
) -> DetectIntentResponseDTO:
@@ -490,13 +470,23 @@ class ConversationManagerService:
conversation_history = self.conversation_mapper.to_text_with_limits(
session, firestore_entries
)
dialogflow_request.query_params.parameters[self.CONV_HISTORY_PARAM] = (
conversation_history
)
if (
dialogflow_request.query_params
and dialogflow_request.query_params.parameters
):
dialogflow_request.query_params.parameters[
self.CONV_HISTORY_PARAM
] = conversation_history
# Always add notification parameters
if notification.parametros:
dialogflow_request.query_params.parameters.update(notification.parametros)
if (
notification.parametros
and dialogflow_request.query_params
and dialogflow_request.query_params.parameters
):
dialogflow_request.query_params.parameters.update(
notification.parametros
)
response = await self.dialogflow_client.detect_intent(
session.sessionId, dialogflow_request
@@ -579,6 +569,10 @@ class ConversationManagerService:
dialogflow_request = self._build_dialogflow_request(
request, new_session, request.mensaje
)
if (
dialogflow_request.query_params
and dialogflow_request.query_params.parameters
):
dialogflow_request.query_params.parameters[self.CONV_HISTORY_PARAM] = (
conversation_history
)
@@ -720,12 +714,10 @@ class ConversationManagerService:
# Create conversation entry
response_text = ""
intent = None
parameters = None
if response.queryResult:
response_text = response.queryResult.text or ""
intent = response.queryResult.intent
response_text = response.queryResult.responseText or ""
parameters = response.queryResult.parameters
user_entry = ConversationEntryDTO(
@@ -734,7 +726,6 @@ class ConversationManagerService:
timestamp=datetime.now(),
text=user_message,
parameters=None,
intent=None,
)
agent_entry = ConversationEntryDTO(
@@ -743,7 +734,6 @@ class ConversationManagerService:
timestamp=datetime.now(),
text=response_text,
parameters=parameters,
intent=intent,
)
# Save to Redis (fast, blocking)
@@ -757,8 +747,12 @@ class ConversationManagerService:
async def save_to_firestore():
try:
await self.firestore_service.save_session(updated_session)
await self.firestore_service.save_entry(session.sessionId, user_entry)
await self.firestore_service.save_entry(session.sessionId, agent_entry)
await self.firestore_service.save_entry(
session.sessionId, user_entry
)
await self.firestore_service.save_entry(
session.sessionId, agent_entry
)
logger.debug(
f"Asynchronously (Write-Back): Entry successfully saved to Firestore for session: {session.sessionId}"
)
@@ -800,8 +794,7 @@ class ConversationManagerService:
type="CONVERSACION",
timestamp=datetime.now(),
text=user_message,
parameters=notification.parametros,
intent=None,
parameters=None,
)
llm_entry = ConversationEntryDTO(
@@ -810,7 +803,6 @@ class ConversationManagerService:
timestamp=datetime.now(),
text=llm_response,
parameters=None,
intent=None,
)
# Save to Redis (fast, blocking)
@@ -824,8 +816,12 @@ class ConversationManagerService:
async def save_to_firestore():
try:
await self.firestore_service.save_session(updated_session)
await self.firestore_service.save_entry(session.sessionId, user_entry)
await self.firestore_service.save_entry(session.sessionId, llm_entry)
await self.firestore_service.save_entry(
session.sessionId, user_entry
)
await self.firestore_service.save_entry(
session.sessionId, llm_entry
)
logger.debug(
f"Asynchronously (Write-Back): LLM entry successfully saved to Firestore for session: {session.sessionId}"
)

View File

@@ -1,133 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Data purge service for Redis and Firestore.
"""
import logging
from google.cloud import firestore
from ..config import Settings
from .redis_service import RedisService
logger = logging.getLogger(__name__)
class DataPurgeService:
"""Service for purging all data from Redis and Firestore."""
def __init__(self, settings: Settings, redis_service: RedisService):
"""Initialize data purge service."""
self.settings = settings
self.redis_service = redis_service
self.db = firestore.AsyncClient(
project=settings.gcp_project_id,
database=settings.firestore_database_id,
)
async def purge_all_data(self) -> None:
"""Purge all data from Redis and Firestore."""
try:
await self._purge_redis()
await self._purge_firestore()
logger.info("Successfully purged all data from Redis and Firestore")
except Exception as e:
logger.error(f"Error purging data: {str(e)}", exc_info=True)
raise
async def _purge_redis(self) -> None:
"""Purge all data from Redis."""
logger.info("Starting Redis data purge")
try:
if not self.redis_service.redis:
raise RuntimeError("Redis client not connected")
await self.redis_service.redis.flushdb()
logger.info("Successfully purged all data from Redis")
except Exception as e:
logger.error(f"Error purging data from Redis: {str(e)}", exc_info=True)
raise
async def _purge_firestore(self) -> None:
"""Purge all data from Firestore."""
logger.info("Starting Firestore data purge")
try:
app_id = self.settings.gcp_project_id
conversations_path = f"artifacts/{app_id}/conversations"
notifications_path = f"artifacts/{app_id}/notifications"
# Delete mensajes subcollections from conversations
logger.info(
f"Deleting 'mensajes' sub-collections from '{conversations_path}'"
)
try:
conversations_ref = self.db.collection(conversations_path)
async for doc in conversations_ref.stream():
mensajes_ref = doc.reference.collection("mensajes")
await self._delete_collection(mensajes_ref, 50)
except Exception as e:
if "NOT_FOUND" in str(e):
logger.warning(
f"Collection '{conversations_path}' not found, skipping"
)
else:
raise
# Delete conversations collection
logger.info(f"Deleting collection: {conversations_path}")
try:
conversations_ref = self.db.collection(conversations_path)
await self._delete_collection(conversations_ref, 50)
except Exception as e:
if "NOT_FOUND" in str(e):
logger.warning(
f"Collection '{conversations_path}' not found, skipping"
)
else:
raise
# Delete notifications collection
logger.info(f"Deleting collection: {notifications_path}")
try:
notifications_ref = self.db.collection(notifications_path)
await self._delete_collection(notifications_ref, 50)
except Exception as e:
if "NOT_FOUND" in str(e):
logger.warning(
f"Collection '{notifications_path}' not found, skipping"
)
else:
raise
logger.info("Successfully purged Firestore collections")
except Exception as e:
logger.error(
f"Error purging Firestore collections: {str(e)}", exc_info=True
)
raise
async def _delete_collection(self, coll_ref, batch_size: int) -> None:
"""Delete a Firestore collection in batches."""
docs = []
async for doc in coll_ref.limit(batch_size).stream():
docs.append(doc)
if not docs:
return
# Delete documents in batch
batch = self.db.batch()
for doc in docs:
batch.delete(doc.reference)
await batch.commit()
# Recursively delete remaining documents
if len(docs) == batch_size:
await self._delete_collection(coll_ref, batch_size)
async def close(self):
"""Close Firestore client."""
await self.db.close()

View File

@@ -1,285 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Dialogflow CX client service for intent detection.
"""
import logging
from google.cloud.dialogflowcx_v3 import SessionsAsyncClient
from google.cloud.dialogflowcx_v3.types import (
DetectIntentRequest,
QueryInput,
TextInput,
EventInput,
QueryParameters,
)
from google.api_core.exceptions import (
GoogleAPIError,
InternalServerError,
ServiceUnavailable,
)
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from ..config import Settings
from ..models import DetectIntentRequestDTO, DetectIntentResponseDTO, QueryResultDTO
logger = logging.getLogger(__name__)
class DialogflowClientService:
"""Service for interacting with Dialogflow CX API."""
def __init__(self, settings: Settings):
"""Initialize Dialogflow client."""
self.settings = settings
self.project_id = settings.dialogflow_project_id
self.location = settings.dialogflow_location
self.agent_id = settings.dialogflow_agent_id
self.default_language = settings.dialogflow_default_language
# Initialize async client
endpoint = settings.dialogflow_endpoint
client_options = {"api_endpoint": endpoint}
self.client = SessionsAsyncClient(client_options=client_options)
logger.info(
f"Dialogflow CX SessionsClient initialized for endpoint: {endpoint}"
)
logger.info(f"Agent ID: {self.agent_id}")
def _build_session_path(self, session_id: str) -> str:
"""Build Dialogflow session path."""
return self.client.session_path(
project=self.project_id,
location=self.location,
agent=self.agent_id,
session=session_id,
)
def _map_query_input(self, query_input_dto) -> QueryInput:
"""Map QueryInputDTO to Dialogflow QueryInput."""
language_code = query_input_dto.language_code or self.default_language
if query_input_dto.text and query_input_dto.text.text:
return QueryInput(
text=TextInput(text=query_input_dto.text.text),
language_code=language_code,
)
elif query_input_dto.event and query_input_dto.event.event:
return QueryInput(
event=EventInput(event=query_input_dto.event.event),
language_code=language_code,
)
else:
raise ValueError("Either text or event input must be provided")
def _map_query_params(self, query_params_dto) -> QueryParameters | None:
"""Map QueryParamsDTO to Dialogflow QueryParameters."""
if not query_params_dto or not query_params_dto.parameters:
return None
return QueryParameters(parameters=query_params_dto.parameters)
def _extract_response_text(self, response) -> str:
"""Extract text from Dialogflow response messages."""
texts = []
for msg in response.query_result.response_messages:
if hasattr(msg, "text") and msg.text.text:
texts.extend(msg.text.text)
return " ".join(texts) if texts else ""
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((InternalServerError, ServiceUnavailable)),
reraise=True,
)
async def detect_intent(
self, session_id: str, request_dto: DetectIntentRequestDTO
) -> DetectIntentResponseDTO:
"""
Detect intent from user input using Dialogflow CX.
Args:
session_id: Unique session identifier
request_dto: Detect intent request
Returns:
Detect intent response with query results
Raises:
GoogleAPIError: If Dialogflow API call fails
"""
if not session_id:
raise ValueError("Session ID cannot be empty")
if not request_dto:
raise ValueError("Request DTO cannot be None")
logger.info(f"Initiating detectIntent for session: {session_id}")
try:
# Build request
session_path = self._build_session_path(session_id)
query_input = self._map_query_input(request_dto.query_input)
query_params = self._map_query_params(request_dto.query_params)
detect_request = DetectIntentRequest(
session=session_path,
query_input=query_input,
query_params=query_params,
)
# Call Dialogflow
logger.debug(
f"Calling Dialogflow CX detectIntent for session: {session_id}"
)
response = await self.client.detect_intent(request=detect_request)
# Extract response data
query_result = response.query_result
response_text = self._extract_response_text(response)
# Map to DTO
query_result_dto = QueryResultDTO(
responseText=response_text,
parameters=dict(query_result.parameters)
if query_result.parameters
else None,
)
result = DetectIntentResponseDTO(
responseId=response.response_id,
queryResult=query_result_dto,
)
logger.info(
f"Successfully processed detectIntent for session: {session_id}"
)
return result
except GoogleAPIError as e:
logger.error(
f"Dialogflow CX API error for session {session_id}: {e.message}",
exc_info=True,
)
raise
except Exception as e:
logger.error(
f"Unexpected error in detectIntent for session {session_id}: {str(e)}",
exc_info=True,
)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((InternalServerError, ServiceUnavailable)),
reraise=True,
)
async def detect_intent_event(
self,
session_id: str,
event_name: str,
parameters: dict | None = None,
language_code: str | None = None,
) -> DetectIntentResponseDTO:
"""
Trigger Dialogflow event detection.
Used for notification events and system-triggered flows.
Args:
session_id: Unique session identifier
event_name: Dialogflow event name (e.g., "notificacion")
parameters: Event parameters
language_code: Language code (defaults to settings)
Returns:
Detect intent response
Raises:
GoogleAPIError: If Dialogflow API call fails
"""
if not session_id:
raise ValueError("Session ID cannot be empty")
if not event_name:
raise ValueError("Event name cannot be empty")
lang_code = language_code or self.default_language
logger.info(
f"Triggering Dialogflow event '{event_name}' for session: {session_id}"
)
try:
# Build request
session_path = self._build_session_path(session_id)
query_input = QueryInput(
event=EventInput(event=event_name),
language_code=lang_code,
)
query_params = None
if parameters:
query_params = QueryParameters(parameters=parameters)
detect_request = DetectIntentRequest(
session=session_path,
query_input=query_input,
query_params=query_params,
)
# Call Dialogflow
logger.debug(
f"Calling Dialogflow CX for event '{event_name}' in session: {session_id}"
)
response = await self.client.detect_intent(request=detect_request)
# Extract response data
query_result = response.query_result
response_text = self._extract_response_text(response)
# Map to DTO
query_result_dto = QueryResultDTO(
responseText=response_text,
parameters=dict(query_result.parameters)
if query_result.parameters
else None,
)
result = DetectIntentResponseDTO(
responseId=response.response_id,
queryResult=query_result_dto,
)
logger.info(
f"Successfully processed event '{event_name}' for session: {session_id}"
)
return result
except GoogleAPIError as e:
logger.error(
f"Dialogflow CX API error for event '{event_name}' in session {session_id}: {e.message}",
exc_info=True,
)
raise
except Exception as e:
logger.error(
f"Unexpected error triggering event '{event_name}' for session {session_id}: {str(e)}",
exc_info=True,
)
raise
async def close(self):
"""Close the Dialogflow client."""
await self.client.transport.close()
logger.info("Dialogflow CX SessionsClient closed")

View File

@@ -53,9 +53,6 @@ class DLPService:
Raises:
Exception: If DLP API call fails (returns original text on error)
"""
if not text or not text.strip():
return text
try:
# Build content item
byte_content_item = types.ByteContentItem(

View File

@@ -1,18 +1,10 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Firestore service for persistent conversation storage.
"""
import logging
from datetime import datetime
from google.cloud import firestore
from ..config import Settings
from ..models import ConversationSessionDTO, ConversationEntryDTO
from ..models.notification import NotificationDTO
from ..models.notification import Notification
logger = logging.getLogger(__name__)
@@ -41,7 +33,7 @@ class FirestoreService:
async def close(self):
"""Close Firestore client."""
await self.db.close()
self.db.close()
logger.info("Firestore client closed")
def _session_ref(self, session_id: str):
@@ -83,7 +75,7 @@ class FirestoreService:
"""
try:
query = (
self.db.collection(self.sessions_collection)
self.db.collection(self.conversations_collection)
.where("telefono", "==", telefono)
.order_by("lastModified", direction=firestore.Query.DESCENDING)
.limit(1)
@@ -122,6 +114,44 @@ class FirestoreService:
)
return False
async def create_session(
self,
session_id: str,
user_id: str,
telefono: str,
pantalla_contexto: str | None = None,
last_message: str | None = None,
) -> ConversationSessionDTO:
"""Create and save a new conversation session to Firestore.
Args:
session_id: Unique session identifier
user_id: User identifier
telefono: User phone number
pantalla_contexto: Optional screen context for the conversation
last_message: Optional last message in the conversation
Returns:
The created session
Raises:
Exception: If session creation or save fails
"""
session = ConversationSessionDTO.create(
session_id=session_id,
user_id=user_id,
telefono=telefono,
pantalla_contexto=pantalla_contexto,
last_message=last_message,
)
doc_ref = self._session_ref(session.sessionId)
data = session.model_dump()
await doc_ref.set(data, merge=True)
logger.info(f"Created new session in Firestore: {session_id}")
return session
async def save_entry(self, session_id: str, entry: ConversationEntryDTO) -> bool:
"""Save conversation entry to Firestore subcollection."""
try:
@@ -196,6 +226,46 @@ class FirestoreService:
)
return False
async def update_pantalla_contexto(
self, session_id: str, pantalla_contexto: str | None
) -> bool:
"""Update the pantallaContexto field for a conversation session.
Args:
session_id: Session ID to update
pantalla_contexto: New pantalla contexto value
Returns:
True if update was successful, False otherwise
"""
try:
doc_ref = self._session_ref(session_id)
doc = await doc_ref.get()
if not doc.exists:
logger.warning(
f"Session {session_id} not found in Firestore. Cannot update pantallaContexto"
)
return False
await doc_ref.update(
{
"pantallaContexto": pantalla_contexto,
"lastModified": datetime.now(),
}
)
logger.debug(
f"Updated pantallaContexto for session {session_id} in Firestore"
)
return True
except Exception as e:
logger.error(
f"Error updating pantallaContexto for session {session_id} in Firestore: {str(e)}"
)
return False
# ====== Notification Methods ======
def _notification_ref(self, notification_id: str):
@@ -204,7 +274,7 @@ class FirestoreService:
notification_id
)
async def save_or_append_notification(self, new_entry: NotificationDTO) -> None:
async def save_or_append_notification(self, new_entry: Notification) -> None:
"""
Save or append notification entry to Firestore.

View File

@@ -1,100 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Gemini client service for LLM operations.
"""
import logging
import google.generativeai as genai
from ..config import Settings
logger = logging.getLogger(__name__)
class GeminiClientException(Exception):
"""Exception raised for Gemini API errors."""
pass
class GeminiClientService:
"""Service for interacting with Google Gemini API."""
def __init__(self, settings: Settings):
"""Initialize Gemini client."""
self.settings = settings
# Configure the Gemini API
genai.configure()
logger.info("Gemini client initialized successfully")
async def generate_content(
self,
prompt: str,
temperature: float,
max_output_tokens: int,
model_name: str,
top_p: float,
) -> str:
"""
Generate content using Gemini API.
Args:
prompt: The prompt text to send to Gemini
temperature: Sampling temperature (0.0 to 1.0)
max_output_tokens: Maximum number of tokens to generate
model_name: Gemini model name (e.g., "gemini-2.0-flash-exp")
top_p: Top-p sampling parameter
Returns:
Generated text response from Gemini
Raises:
GeminiClientException: If API call fails
"""
try:
logger.debug(f"Sending request to Gemini model '{model_name}'")
# Create generation config
generation_config = genai.GenerationConfig(
temperature=temperature,
max_output_tokens=max_output_tokens,
top_p=top_p,
)
# Initialize model
model = genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
)
# Generate content
response = await model.generate_content_async(prompt)
if response and response.text:
logger.debug(
f"Received response from Gemini: {len(response.text)} characters"
)
return response.text
else:
logger.warning(
f"Gemini returned no content or unexpected response structure for model '{model_name}'"
)
raise GeminiClientException(
"No content generated or unexpected response structure"
)
except Exception as e:
logger.error(
f"Error during Gemini content generation for model '{model_name}': {e}",
exc_info=True,
)
raise GeminiClientException(
f"An error occurred during content generation: {str(e)}"
) from e

View File

@@ -1,105 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
LLM Response Tuner service for storing/retrieving pre-generated responses.
"""
import logging
from redis.asyncio import Redis
logger = logging.getLogger(__name__)
class LlmResponseTunerService:
"""
Service for managing pre-generated LLM responses in Redis.
Used as a webhook bridge where:
1. LLM responses are pre-generated and stored with a UUID
2. Dialogflow webhook calls this service with the UUID
3. Service retrieves and returns the response
"""
def __init__(self, redis: Redis):
"""
Initialize LLM response tuner service.
Args:
redis: Redis client instance
"""
self.redis = redis
self.collection_prefix = "llm-pre-response:"
self.ttl = 3600 # 1 hour in seconds
logger.info("LlmResponseTunerService initialized")
def _get_key(self, uuid: str) -> str:
"""Generate Redis key for UUID."""
return f"{self.collection_prefix}{uuid}"
async def get_value(self, uuid: str) -> str | None:
"""
Retrieve pre-generated response by UUID.
Args:
uuid: Unique identifier for the response
Returns:
Response text or None if not found
"""
if not uuid or not uuid.strip():
logger.warning("UUID is null or blank")
return None
key = self._get_key(uuid)
try:
value = await self.redis.get(key)
if value:
logger.info(f"Retrieved LLM response for UUID: {uuid}")
return value
else:
logger.warning(f"No response found for UUID: {uuid}")
return None
except Exception as e:
logger.error(
f"Error retrieving LLM response for UUID {uuid}: {e}", exc_info=True
)
return None
async def set_value(self, uuid: str, value: str) -> bool:
"""
Store pre-generated response with UUID.
Args:
uuid: Unique identifier for the response
value: Response text to store
Returns:
True if successful, False otherwise
"""
if not uuid or not uuid.strip():
logger.warning("UUID is null or blank")
return False
if not value:
logger.warning("Value is null or empty")
return False
key = self._get_key(uuid)
try:
await self.redis.setex(key, self.ttl, value)
logger.info(f"Stored LLM response for UUID: {uuid} with TTL: {self.ttl}s")
return True
except Exception as e:
logger.error(
f"Error storing LLM response for UUID {uuid}: {e}", exc_info=True
)
return False

View File

@@ -14,7 +14,7 @@ from ..models import (
ConversationSessionDTO,
ConversationEntryDTO,
)
from ..models.notification import NotificationDTO
from ..models.notification import Notification
logger = logging.getLogger(__name__)
@@ -24,7 +24,7 @@ class NotificationContextMapper:
"""Maps notifications to text format for Gemini classification."""
@staticmethod
def to_text(notification: NotificationDTO) -> str:
def to_text(notification: Notification) -> str:
"""
Convert a notification to text format.
@@ -39,7 +39,7 @@ class NotificationContextMapper:
return notification.texto
@staticmethod
def to_text_multiple(notifications: list[NotificationDTO]) -> str:
def to_text_multiple(notifications: list[Notification]) -> str:
"""
Convert multiple notifications to text format.
@@ -56,7 +56,7 @@ class NotificationContextMapper:
return "\n".join(texts)
@staticmethod
def to_json(notification: NotificationDTO) -> str:
def to_json(notification: Notification) -> str:
"""
Convert notification to JSON string for Gemini.

View File

@@ -1,156 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Message classification service using Gemini LLM.
"""
import logging
from ..config import Settings
from .gemini_client import GeminiClientService, GeminiClientException
logger = logging.getLogger(__name__)
class MessageEntryFilter:
"""
Classifies a user's text input into a predefined category using Gemini.
Analyzes user queries in the context of conversation history and
notifications to determine if the message is part of ongoing dialogue
or an interruption. Classification is used to route requests to the
appropriate handler.
"""
# Classification categories
CATEGORY_CONVERSATION = "CONVERSATION"
CATEGORY_NOTIFICATION = "NOTIFICATION"
CATEGORY_UNKNOWN = "UNKNOWN"
CATEGORY_ERROR = "ERROR"
def __init__(self, settings: Settings, gemini_service: GeminiClientService):
"""
Initialize message filter.
Args:
settings: Application settings
gemini_service: Gemini client service
"""
self.settings = settings
self.gemini_service = gemini_service
self.prompt_template = self._load_prompt_template()
logger.info("MessageEntryFilter initialized successfully")
def _load_prompt_template(self) -> str:
"""Load the prompt template from resources."""
prompt_path = self.settings.base_path / self.settings.message_filter_prompt_path
try:
with open(prompt_path, "r", encoding="utf-8") as f:
prompt_template = f.read()
logger.info(f"Successfully loaded prompt template from '{prompt_path}'")
return prompt_template
except Exception as e:
logger.error(
f"Failed to load prompt template from '{prompt_path}': {e}",
exc_info=True,
)
raise RuntimeError("Could not load prompt template") from e
async def classify_message(
self,
query_input_text: str,
notifications_json: str | None = None,
conversation_json: str | None = None,
) -> str:
"""
Classify a user message as CONVERSATION, NOTIFICATION, or UNKNOWN.
Args:
query_input_text: The user's input text to classify
notifications_json: JSON string of interrupting notifications (optional)
conversation_json: JSON string of conversation history (optional)
Returns:
Classification category (CONVERSATION, NOTIFICATION, or UNKNOWN)
"""
if not query_input_text or not query_input_text.strip():
logger.warning(
f"Query input text for classification is null or blank. Returning {self.CATEGORY_UNKNOWN}"
)
return self.CATEGORY_UNKNOWN
# Prepare context strings
interrupting_notification = (
notifications_json
if notifications_json and notifications_json.strip()
else "No interrupting notification."
)
conversation_history = (
conversation_json
if conversation_json and conversation_json.strip()
else "No conversation history."
)
# Format the classification prompt
classification_prompt = self.prompt_template % (
conversation_history,
interrupting_notification,
query_input_text,
)
logger.debug(
f"Sending classification request to Gemini for input (first 100 chars): "
f"'{query_input_text[:100]}...'"
)
try:
# Call Gemini API
gemini_response = await self.gemini_service.generate_content(
prompt=classification_prompt,
temperature=self.settings.message_filter_temperature,
max_output_tokens=self.settings.message_filter_max_tokens,
model_name=self.settings.message_filter_model,
top_p=self.settings.message_filter_top_p,
)
# Parse and validate response
if not gemini_response:
logger.warning(
f"Gemini returned null/blank response. Returning {self.CATEGORY_UNKNOWN}"
)
return self.CATEGORY_UNKNOWN
response_upper = gemini_response.strip().upper()
if response_upper == self.CATEGORY_CONVERSATION:
logger.info(f"Classified as {self.CATEGORY_CONVERSATION}")
return self.CATEGORY_CONVERSATION
elif response_upper == self.CATEGORY_NOTIFICATION:
logger.info(f"Classified as {self.CATEGORY_NOTIFICATION}")
return self.CATEGORY_NOTIFICATION
else:
logger.warning(
f"Gemini returned unrecognized classification: '{gemini_response}'. "
f"Expected '{self.CATEGORY_CONVERSATION}' or '{self.CATEGORY_NOTIFICATION}'. "
f"Returning {self.CATEGORY_UNKNOWN}"
)
return self.CATEGORY_UNKNOWN
except GeminiClientException as e:
logger.error(
f"Error during Gemini content generation for message classification: {e}",
exc_info=True,
)
return self.CATEGORY_UNKNOWN
except Exception as e:
logger.error(
f"Unexpected error during message classification: {e}",
exc_info=True,
)
return self.CATEGORY_UNKNOWN

View File

@@ -1,192 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Notification context resolver using Gemini LLM.
"""
import logging
from ..config import Settings
from .gemini_client import GeminiClientService, GeminiClientException
logger = logging.getLogger(__name__)
class NotificationContextResolver:
"""
Resolves conversational context using LLM to answer notification-related questions.
Evaluates a user's question in the context of a notification and conversation
history. Decides if the query can be answered by the LLM (using notification
metadata) or should be delegated to Dialogflow.
"""
# Response categories
CATEGORY_DIALOGFLOW = "DIALOGFLOW"
def __init__(self, settings: Settings, gemini_service: GeminiClientService):
"""
Initialize notification context resolver.
Args:
settings: Application settings
gemini_service: Gemini client service
"""
self.settings = settings
self.gemini_service = gemini_service
# Load settings (with defaults matching Java)
self.model_name = getattr(
settings, "notification_context_model", "gemini-2.0-flash-001"
)
self.temperature = getattr(settings, "notification_context_temperature", 0.1)
self.max_tokens = getattr(settings, "notification_context_max_tokens", 1024)
self.top_p = getattr(settings, "notification_context_top_p", 0.1)
self.prompt_path = getattr(
settings,
"notification_context_prompt_path",
"prompts/notification_context_resolver.txt",
)
self.prompt_template = self._load_prompt_template()
logger.info("NotificationContextResolver initialized successfully")
def _load_prompt_template(self) -> str:
"""Load the prompt template from resources."""
prompt_path = self.settings.base_path / self.prompt_path
try:
with open(prompt_path, "r", encoding="utf-8") as f:
prompt_template = f.read()
logger.info(f"Successfully loaded prompt template from '{prompt_path}'")
return prompt_template
except Exception as e:
logger.error(
f"Failed to load prompt template from '{prompt_path}': {e}",
exc_info=True,
)
raise RuntimeError("Could not load prompt template") from e
async def resolve_context(
self,
query_input_text: str,
notifications_json: str | None = None,
conversation_json: str | None = None,
metadata: str | None = None,
user_id: str | None = None,
session_id: str | None = None,
user_phone_number: str | None = None,
) -> str:
"""
Resolve context and generate response for notification-related question.
Uses Gemini to analyze the question against notification metadata and
conversation history. Returns either:
- A direct answer generated by the LLM
- "DIALOGFLOW" to delegate to standard Dialogflow flow
Priority order for finding answers:
1. METADATOS_NOTIFICACION (highest authority)
2. HISTORIAL_CONVERSACION parameters with "notification_po_" prefix
3. If not found, return "DIALOGFLOW"
Args:
query_input_text: User's question
notifications_json: JSON string of notifications
conversation_json: JSON string of conversation history
metadata: Structured notification metadata
user_id: User identifier (optional, for logging)
session_id: Session identifier (optional, for logging)
user_phone_number: User phone number (optional, for logging)
Returns:
Either a direct LLM-generated answer or "DIALOGFLOW"
"""
logger.debug(
f"resolveContext -> queryInputText: {query_input_text}, "
f"notificationsJson: {notifications_json}, "
f"conversationJson: {conversation_json}, "
f"metadata: {metadata}"
)
if not query_input_text or not query_input_text.strip():
logger.warning(
f"Query input text for context resolution is null or blank. "
f"Returning {self.CATEGORY_DIALOGFLOW}"
)
return self.CATEGORY_DIALOGFLOW
# Prepare context strings
notification_content = (
notifications_json
if notifications_json and notifications_json.strip()
else "No metadata in notification."
)
conversation_history = (
conversation_json
if conversation_json and conversation_json.strip()
else "No conversation history."
)
notification_metadata = metadata if metadata and metadata.strip() else "{}"
# Format the context resolution prompt
context_prompt = self.prompt_template % (
conversation_history,
notification_content,
notification_metadata,
query_input_text,
)
logger.debug(
f"Sending context resolution request to Gemini for input (first 100 chars): "
f"'{query_input_text[:100]}...'"
)
try:
# Call Gemini API
gemini_response = await self.gemini_service.generate_content(
prompt=context_prompt,
temperature=self.temperature,
max_output_tokens=self.max_tokens,
model_name=self.model_name,
top_p=self.top_p,
)
if gemini_response and gemini_response.strip():
# Check if response is delegation to Dialogflow
if gemini_response.strip().upper() == self.CATEGORY_DIALOGFLOW:
logger.debug(
f"Resolved to {self.CATEGORY_DIALOGFLOW}. Input: '{query_input_text}'"
)
return self.CATEGORY_DIALOGFLOW
else:
# LLM provided a direct answer
logger.debug(
f"Resolved to a specific response. Input: '{query_input_text}'"
)
return gemini_response
else:
logger.warning(
f"Gemini returned a null or blank response. "
f"Returning {self.CATEGORY_DIALOGFLOW}"
)
return self.CATEGORY_DIALOGFLOW
except GeminiClientException as e:
logger.error(
f"Error during Gemini content generation for context resolution: {e}",
exc_info=True,
)
return self.CATEGORY_DIALOGFLOW
except Exception as e:
logger.error(
f"Unexpected error during context resolution: {e}",
exc_info=True,
)
return self.CATEGORY_DIALOGFLOW

View File

@@ -1,20 +1,9 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Notification manager service for processing push notifications.
"""
import asyncio
import logging
from datetime import datetime
from uuid import uuid4
from ..config import Settings
from ..models import DetectIntentResponseDTO
from ..models.notification import ExternalNotRequestDTO, NotificationDTO
from ..models.conversation import ConversationSessionDTO, ConversationEntryDTO
from ..utils.session_id import generate_session_id
from .dialogflow_client import DialogflowClientService
from ..models.notification import ExternalNotificationRequest, Notification
from .redis_service import RedisService
from .firestore_service import FirestoreService
from .dlp_service import DLPService
@@ -36,7 +25,6 @@ class NotificationManagerService:
def __init__(
self,
settings: Settings,
dialogflow_client: DialogflowClientService,
redis_service: RedisService,
firestore_service: FirestoreService,
dlp_service: DLPService,
@@ -52,18 +40,17 @@ class NotificationManagerService:
dlp_service: Data Loss Prevention service
"""
self.settings = settings
self.dialogflow_client = dialogflow_client
self.redis_service = redis_service
self.firestore_service = firestore_service
self.dlp_service = dlp_service
self.default_language_code = settings.dialogflow_default_language
self.event_name = "notificacion"
self.default_language_code = "es"
logger.info("NotificationManagerService initialized")
async def process_notification(
self, external_request: ExternalNotRequestDTO
) -> DetectIntentResponseDTO:
self, external_request: ExternalNotificationRequest
) -> None:
"""
Process a push notification from external system.
@@ -87,10 +74,6 @@ class NotificationManagerService:
"""
telefono = external_request.telefono
if not telefono or not telefono.strip():
logger.warning("No phone number provided in notification request")
raise ValueError("Phone number is required")
# Obfuscate sensitive data using DLP
obfuscated_text = await self.dlp_service.get_obfuscated_string(
external_request.texto,
@@ -104,14 +87,13 @@ class NotificationManagerService:
parameters[f"{PREFIX_PO_PARAM}{key}"] = value
# Create notification entry
new_notification_id = generate_session_id()
new_notification_entry = NotificationDTO(
idNotificacion=new_notification_id,
new_notification_id = str(uuid4())
new_notification_entry = Notification.create(
id_notificacion=new_notification_id,
telefono=telefono,
timestampCreacion=datetime.now(),
texto=obfuscated_text,
nombreEventoDialogflow=self.event_name,
codigoIdiomaDialogflow=self.default_language_code,
nombre_evento_dialogflow=self.event_name,
codigo_idioma_dialogflow=self.default_language_code,
parametros=parameters,
status="active",
)
@@ -122,8 +104,8 @@ class NotificationManagerService:
f"Notification for phone {telefono} cached. Kicking off async Firestore write-back"
)
# Fire-and-forget Firestore write
# In production, consider using asyncio.create_task() with proper error handling
# Fire-and-forget Firestore write (matching Java's .subscribe() behavior)
async def save_notification_to_firestore():
try:
await self.firestore_service.save_or_append_notification(
new_notification_entry
@@ -137,123 +119,5 @@ class NotificationManagerService:
exc_info=True,
)
# Get or create conversation session
session = await self._get_or_create_conversation_session(
telefono, obfuscated_text, parameters
)
# Send notification event to Dialogflow
logger.info(
f"Sending notification text to Dialogflow using conversation session: {session.sessionId}"
)
response = await self.dialogflow_client.detect_intent_event(
session_id=session.sessionId,
event_name=self.event_name,
parameters=parameters,
language_code=self.default_language_code,
)
logger.info(
f"Finished processing notification. Dialogflow response received for phone {telefono}"
)
return response
async def _get_or_create_conversation_session(
self, telefono: str, notification_text: str, parameters: dict
) -> ConversationSessionDTO:
"""
Get existing conversation session or create a new one.
Also persists system entry for the notification.
Args:
telefono: User phone number
notification_text: Notification text content
parameters: Notification parameters
Returns:
Conversation session
"""
# Try to get existing session by phone
# TODO: Need to implement get_session_by_telefono in Redis service
# For now, we'll create a new session
new_session_id = generate_session_id()
user_id = f"user_by_phone_{telefono}"
logger.info(
f"Creating new conversation session {new_session_id} for notification (phone: {telefono})"
)
# Create system entry for notification
system_entry = ConversationEntryDTO(
entity="SISTEMA",
type="SISTEMA",
timestamp=datetime.now(),
text=notification_text,
parameters=parameters,
intent=None,
)
# Create new session
new_session = ConversationSessionDTO(
sessionId=new_session_id,
userId=user_id,
telefono=telefono,
createdAt=datetime.now(),
lastModified=datetime.now(),
lastMessage=notification_text,
pantallaContexto=None,
)
# Persist conversation turn (session + system entry)
await self._persist_conversation_turn(new_session, system_entry)
return new_session
async def _persist_conversation_turn(
self, session: ConversationSessionDTO, entry: ConversationEntryDTO
) -> None:
"""
Persist conversation turn to Redis and Firestore.
Uses write-through caching: writes to Redis first, then async to Firestore.
Args:
session: Conversation session
entry: Conversation entry to persist
"""
logger.debug(
f"Starting Write-Back persistence for notification session {session.sessionId}. "
f"Type: {entry.type}. Writing to Redis first"
)
# Update session with last message
updated_session = ConversationSessionDTO(
**session.model_dump(),
lastMessage=entry.text,
lastModified=datetime.now(),
)
# Save to Redis
await self.redis_service.save_session(updated_session)
logger.info(
f"Entry saved to Redis for notification session {session.sessionId}. "
f"Type: {entry.type}. Kicking off async Firestore write-back"
)
# Fire-and-forget Firestore writes
try:
await self.firestore_service.save_session(updated_session)
await self.firestore_service.save_entry(session.sessionId, entry)
logger.debug(
f"Asynchronously (Write-Back): Entry successfully saved to Firestore "
f"for notification session {session.sessionId}. Type: {entry.type}"
)
except Exception as e:
logger.error(
f"Asynchronously (Write-Back): Failed to save entry to Firestore "
f"for notification session {session.sessionId}. Type: {entry.type}: {e}",
exc_info=True,
)
# Fire and forget - don't await
asyncio.create_task(save_notification_to_firestore())

View File

@@ -1,16 +1,8 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Quick Reply content service for loading FAQ screens.
"""
import json
import logging
from ..config import Settings
from ..models.quick_replies import QuickReplyDTO, QuestionDTO
from ..models.quick_replies import QuickReplyScreen, QuickReplyQuestions
logger = logging.getLogger(__name__)
@@ -33,7 +25,7 @@ class QuickReplyContentService:
f"QuickReplyContentService initialized with path: {self.quick_replies_path}"
)
async def get_quick_replies(self, screen_id: str) -> QuickReplyDTO | None:
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
"""
Load quick reply screen content by ID.
@@ -41,11 +33,14 @@ class QuickReplyContentService:
screen_id: Screen identifier (e.g., "pagos", "home")
Returns:
Quick reply DTO or None if not found
Quick reply DTO
Raises:
ValueError: If the quick reply file is not found
"""
if not screen_id or not screen_id.strip():
logger.warning("screen_id is null or empty. Returning empty quick replies")
return QuickReplyDTO(
return QuickReplyScreen(
header="empty",
body=None,
button=None,
@@ -58,7 +53,7 @@ class QuickReplyContentService:
try:
if not file_path.exists():
logger.warning(f"Quick reply file not found: {file_path}")
return None
raise ValueError(f"Quick reply file not found for screen_id: {screen_id}")
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
@@ -66,7 +61,7 @@ class QuickReplyContentService:
# Parse questions
preguntas_data = data.get("preguntas", [])
preguntas = [
QuestionDTO(
QuickReplyQuestions(
titulo=q.get("titulo", ""),
descripcion=q.get("descripcion"),
respuesta=q.get("respuesta", ""),
@@ -74,7 +69,7 @@ class QuickReplyContentService:
for q in preguntas_data
]
quick_reply = QuickReplyDTO(
quick_reply = QuickReplyScreen(
header=data.get("header"),
body=data.get("body"),
button=data.get("button"),
@@ -89,10 +84,10 @@ class QuickReplyContentService:
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON file {file_path}: {e}", exc_info=True)
return None
raise ValueError(f"Invalid JSON format in quick reply file for screen_id: {screen_id}") from e
except Exception as e:
logger.error(
f"Error loading quick replies for screen {screen_id}: {e}",
exc_info=True,
)
return None
raise ValueError(f"Error loading quick replies for screen_id: {screen_id}") from e

View File

@@ -0,0 +1,138 @@
import logging
import httpx
from pydantic import BaseModel, Field
from ..config import Settings
logger = logging.getLogger(__name__)
class Message(BaseModel):
"""OpenAI-style message format."""
role: str = Field(..., description="Role: system, user, or assistant")
content: str = Field(..., description="Message content")
class RAGRequest(BaseModel):
"""Request model for RAG endpoint."""
messages: list[Message] = Field(..., description="Conversation history")
class RAGResponse(BaseModel):
"""Response model from RAG endpoint."""
response: str = Field(..., description="Generated response from RAG")
class RAGService:
"""
Highly concurrent HTTP client for calling RAG endpoints.
Uses httpx AsyncClient with connection pooling for optimal performance
when handling multiple concurrent requests.
"""
def __init__(
self,
settings: Settings,
max_connections: int = 100,
max_keepalive_connections: int = 20,
timeout: float = 30.0,
):
"""
Initialize RAG service with connection pooling.
Args:
settings: Application settings
max_connections: Maximum number of concurrent connections
max_keepalive_connections: Maximum number of idle connections to keep alive
timeout: Request timeout in seconds
"""
self.settings = settings
self.rag_endpoint_url = settings.rag_endpoint_url
self.timeout = timeout
# Configure connection limits for high concurrency
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
)
# Create async client with connection pooling
self._client = httpx.AsyncClient(
limits=limits,
timeout=httpx.Timeout(timeout),
http2=True, # Enable HTTP/2 for better performance
)
logger.info(
f"RAGService initialized with endpoint: {self.rag_endpoint_url}, "
f"max_connections: {max_connections}, timeout: {timeout}s"
)
async def query(self, messages: list[dict[str, str]]) -> str:
"""
Send conversation history to RAG endpoint and get response.
Args:
messages: OpenAI-style conversation history
e.g., [{"role": "user", "content": "Hello"}, ...]
Returns:
Response string from RAG endpoint
Raises:
httpx.HTTPError: If HTTP request fails
ValueError: If response format is invalid
"""
try:
# Validate and construct request
message_objects = [Message(**msg) for msg in messages]
request = RAGRequest(messages=message_objects)
# Make async HTTP POST request
logger.debug(f"Sending RAG request with {len(messages)} messages")
response = await self._client.post(
self.rag_endpoint_url,
json=request.model_dump(),
headers={"Content-Type": "application/json"},
)
# Raise exception for HTTP errors
response.raise_for_status()
# Parse response
response_data = response.json()
rag_response = RAGResponse(**response_data)
logger.debug(f"RAG response received: {len(rag_response.response)} chars")
return rag_response.response
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error calling RAG endpoint: {e.response.status_code} - {e.response.text}"
)
raise
except httpx.RequestError as e:
logger.error(f"Request error calling RAG endpoint: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error calling RAG endpoint: {str(e)}", exc_info=True)
raise
async def close(self):
"""Close the HTTP client and release connections."""
await self._client.aclose()
logger.info("RAGService client closed")
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()

View File

@@ -1,11 +1,3 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Redis service for caching conversation sessions.
"""
import json
import logging
from datetime import datetime
@@ -13,7 +5,7 @@ from redis.asyncio import Redis
from ..config import Settings
from ..models import ConversationSessionDTO
from ..models.notification import NotificationSessionDTO, NotificationDTO
from ..models.notification import NotificationSession, Notification
logger = logging.getLogger(__name__)
@@ -28,14 +20,14 @@ class RedisService:
self.redis: Redis | None = None
self.session_ttl = 2592000 # 30 days in seconds
self.notification_ttl = 2592000 # 30 days in seconds
self.qr_session_ttl = 86400 # 24 hours in seconds
async def connect(self):
"""Connect to Redis."""
self.redis = Redis(
host=self.settings.redis_host,
port=self.settings.redis_port,
password=self.settings.redis_password,
ssl=self.settings.redis_ssl,
password=self.settings.redis_pwd,
decode_responses=True,
)
logger.info(
@@ -188,7 +180,9 @@ class RedisService:
logger.debug(f"Saved message to Redis: {session_id}")
return True
except Exception as e:
logger.error(f"Error saving message to Redis for session {session_id}: {str(e)}")
logger.error(
f"Error saving message to Redis for session {session_id}: {str(e)}"
)
return False
async def get_messages(self, session_id: str) -> list:
@@ -225,10 +219,14 @@ class RedisService:
logger.error(f"Error parsing message JSON: {str(e)}")
continue
logger.debug(f"Retrieved {len(messages)} messages from Redis for session: {session_id}")
logger.debug(
f"Retrieved {len(messages)} messages from Redis for session: {session_id}"
)
return messages
except Exception as e:
logger.error(f"Error retrieving messages from Redis for session {session_id}: {str(e)}")
logger.error(
f"Error retrieving messages from Redis for session {session_id}: {str(e)}"
)
return []
# ====== Notification Methods ======
@@ -241,7 +239,7 @@ class RedisService:
"""Generate Redis key for phone-to-notification mapping."""
return f"notification:phone_to_notification:{phone}"
async def save_or_append_notification(self, new_entry: NotificationDTO) -> None:
async def save_or_append_notification(self, new_entry: Notification) -> None:
"""
Save or append notification entry to session.
@@ -267,20 +265,20 @@ class RedisService:
if existing_session:
# Append to existing session
updated_notifications = existing_session.notificaciones + [new_entry]
updated_session = NotificationSessionDTO(
session_id=notification_session_id,
updated_session = NotificationSession(
sessionId=notification_session_id,
telefono=phone_number,
fecha_creacion=existing_session.fecha_creacion,
ultima_actualizacion=datetime.now(),
fechaCreacion=existing_session.fechaCreacion,
ultimaActualizacion=datetime.now(),
notificaciones=updated_notifications,
)
else:
# Create new session
updated_session = NotificationSessionDTO(
session_id=notification_session_id,
updated_session = NotificationSession(
sessionId=notification_session_id,
telefono=phone_number,
fecha_creacion=datetime.now(),
ultima_actualizacion=datetime.now(),
fechaCreacion=datetime.now(),
ultimaActualizacion=datetime.now(),
notificaciones=[new_entry],
)
@@ -288,7 +286,7 @@ class RedisService:
await self._cache_notification_session(updated_session)
async def _cache_notification_session(
self, session: NotificationSessionDTO
self, session: NotificationSession
) -> bool:
"""Cache notification session in Redis."""
if not self.redis:
@@ -315,7 +313,7 @@ class RedisService:
async def get_notification_session(
self, session_id: str
) -> NotificationSessionDTO | None:
) -> NotificationSession | None:
"""Retrieve notification session from Redis."""
if not self.redis:
raise RuntimeError("Redis client not connected")
@@ -329,7 +327,7 @@ class RedisService:
try:
session_dict = json.loads(data)
session = NotificationSessionDTO.model_validate(session_dict)
session = NotificationSession.model_validate(session_dict)
logger.info(f"Notification session {session_id} retrieved from Redis")
return session
except Exception as e:

View File

@@ -1,5 +0,0 @@
"""Utilities module."""
from .session_id import SessionIdGenerator
__all__ = ["SessionIdGenerator"]

View File

@@ -1,23 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Session ID generator utility.
"""
import uuid
class SessionIdGenerator:
"""Generate unique session IDs."""
@staticmethod
def generate() -> str:
"""Generate a new unique session ID."""
return str(uuid.uuid4())
def generate_session_id() -> str:
"""Generate a new unique session ID (convenience function)."""
return SessionIdGenerator.generate()