This commit is contained in:
2026-02-20 04:14:16 +00:00
parent 03292a635c
commit 14ed21a1f9
13 changed files with 492 additions and 1102 deletions

View File

@@ -11,14 +11,12 @@ from .services.firestore_service import FirestoreService
from .services.rag_service import RAGService
@lru_cache(maxsize=1)
def get_redis_service() -> RedisService:
"""Get Redis service instance."""
return RedisService(settings)
@lru_cache(maxsize=1)
def get_firestore_service() -> FirestoreService:
"""Get Firestore service instance."""
@@ -30,11 +28,13 @@ def get_dlp_service() -> DLPService:
"""Get DLP service instance."""
return DLPService(settings)
@lru_cache(maxsize=1)
def get_quick_reply_content_service() -> QuickReplyContentService:
"""Get quick reply content service instance."""
return QuickReplyContentService(settings)
@lru_cache(maxsize=1)
def get_notification_manager() -> NotificationManagerService:
"""Get notification manager instance."""
@@ -45,11 +45,13 @@ def get_notification_manager() -> NotificationManagerService:
dlp_service=get_dlp_service(),
)
@lru_cache(maxsize=1)
def get_rag_service() -> RAGService:
"""Get RAG service instance."""
return RAGService(settings)
@lru_cache(maxsize=1)
def get_conversation_manager() -> ConversationManagerService:
"""Get conversation manager instance."""
@@ -60,3 +62,38 @@ def get_conversation_manager() -> ConversationManagerService:
dlp_service=get_dlp_service(),
rag_service=get_rag_service(),
)
# Lifecycle management functions
def init_services(settings) -> None:
"""Initialize services (placeholder for compatibility)."""
# Services are lazy-loaded via lru_cache, no explicit init needed
pass
async def startup_services() -> None:
"""Connect to external services on startup."""
# Connect to Redis
redis = get_redis_service()
await redis.connect()
async def shutdown_services() -> None:
"""Close all service connections on shutdown."""
# Close Redis
redis = get_redis_service()
await redis.close()
# Close Firestore
firestore = get_firestore_service()
await firestore.close()
# Close DLP
dlp = get_dlp_service()
await dlp.close()
# Close RAG
rag = get_rag_service()
await rag.close()

View File

@@ -57,6 +57,7 @@ app.include_router(conversation_router)
app.include_router(notification_router)
app.include_router(quick_replies_router)
@app.get("/health")
async def health_check():
"""Health check endpoint."""

View File

@@ -1,17 +1,12 @@
"""Data models module."""
from .conversation import (
ConversationSessionDTO,
ConversationEntryDTO,
ConversationMessageDTO,
ExternalConvRequestDTO,
DetectIntentRequestDTO,
DetectIntentResponseDTO,
QueryInputDTO,
TextInputDTO,
EventInputDTO,
QueryParamsDTO,
QueryResultDTO,
User,
ConversationSession,
ConversationEntry,
ConversationRequest,
DetectIntentResponse,
QueryResult,
)
from .notification import (
ExternalNotificationRequest,
@@ -21,17 +16,12 @@ from .notification import (
__all__ = [
# Conversation
"ConversationSessionDTO",
"ConversationEntryDTO",
"ConversationMessageDTO",
"ExternalConvRequestDTO",
"DetectIntentRequestDTO",
"DetectIntentResponseDTO",
"QueryInputDTO",
"TextInputDTO",
"EventInputDTO",
"QueryParamsDTO",
"QueryResultDTO",
"User",
"ConversationSession",
"ConversationEntry",
"ConversationRequest",
"DetectIntentResponse",
"QueryResult",
# Notification
"ExternalNotificationRequest",
"NotificationSession",

View File

@@ -1,60 +1,16 @@
from datetime import datetime
from typing import Any, Literal
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field
class UsuarioDTO(BaseModel):
class User(BaseModel):
"""User information."""
telefono: str = Field(..., min_length=1)
nickname: str | None = None
class TextInputDTO(BaseModel):
"""Text input for queries."""
text: str
class EventInputDTO(BaseModel):
"""Event input for queries."""
event: str
class QueryParamsDTO(BaseModel):
"""Query parameters for Dialogflow requests."""
parameters: dict[str, Any] | None = None
class QueryInputDTO(BaseModel):
"""Query input combining text or event."""
text: TextInputDTO | None = None
event: EventInputDTO | None = None
language_code: str = "es"
@field_validator("text", "event")
@classmethod
def check_at_least_one(cls, v, info):
"""Ensure either text or event is provided."""
if info.field_name == "event" and v is None:
# Check if text was provided
if not info.data.get("text"):
raise ValueError("Either text or event must be provided")
return v
class DetectIntentRequestDTO(BaseModel):
"""Dialogflow detect intent request."""
query_input: QueryInputDTO
query_params: QueryParamsDTO | None = None
class QueryResultDTO(BaseModel):
class QueryResult(BaseModel):
"""Query result from Dialogflow."""
responseText: str | None = Field(None, alias="responseText")
@@ -63,43 +19,31 @@ class QueryResultDTO(BaseModel):
model_config = {"populate_by_name": True}
class DetectIntentResponseDTO(BaseModel):
class DetectIntentResponse(BaseModel):
"""Dialogflow detect intent response."""
responseId: str | None = Field(None, alias="responseId")
queryResult: QueryResultDTO | None = Field(None, alias="queryResult")
quick_replies: Any | None = None # QuickReplyDTO from quick_replies module
queryResult: QueryResult | None = Field(None, alias="queryResult")
quick_replies: Any | None = None # QuickReplyScreen from quick_replies module
model_config = {"populate_by_name": True}
class ExternalConvRequestDTO(BaseModel):
class ConversationRequest(BaseModel):
"""External conversation request from client."""
mensaje: str = Field(..., alias="mensaje")
usuario: UsuarioDTO = Field(..., alias="usuario")
usuario: User = Field(..., alias="usuario")
canal: str = Field(..., alias="canal")
pantalla_contexto: str | None = Field(None, alias="pantallaContexto")
model_config = {"populate_by_name": True}
class ConversationMessageDTO(BaseModel):
"""Single conversation message."""
type: str = Field(..., alias="type") # Maps to MessageType
timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp")
text: str = Field(..., alias="text")
parameters: dict[str, Any] | None = Field(None, alias="parameters")
canal: str | None = Field(None, alias="canal")
model_config = {"populate_by_name": True}
class ConversationEntryDTO(BaseModel):
class ConversationEntry(BaseModel):
"""Single conversation entry."""
entity: Literal['user', 'assistant']
entity: Literal["user", "assistant"]
type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM"
timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp")
text: str = Field(..., alias="text")
@@ -109,7 +53,7 @@ class ConversationEntryDTO(BaseModel):
model_config = {"populate_by_name": True}
class ConversationSessionDTO(BaseModel):
class ConversationSession(BaseModel):
"""Conversation session metadata."""
sessionId: str = Field(..., alias="sessionId")
@@ -130,7 +74,7 @@ class ConversationSessionDTO(BaseModel):
telefono: str,
pantalla_contexto: str | None = None,
last_message: str | None = None,
) -> "ConversationSessionDTO":
) -> "ConversationSession":
"""Create a new conversation session."""
now = datetime.now()
return cls(
@@ -142,15 +86,3 @@ class ConversationSessionDTO(BaseModel):
pantallaContexto=pantalla_contexto,
lastMessage=last_message,
)
def with_last_message(self, last_message: str) -> "ConversationSessionDTO":
"""Create copy with updated last message."""
return self.model_copy(
update={"lastMessage": last_message, "lastModified": datetime.now()}
)
def with_pantalla_contexto(
self, pantalla_contexto: str
) -> "ConversationSessionDTO":
"""Create copy with updated pantalla contexto."""
return self.model_copy(update={"pantallaContexto": pantalla_contexto})

View File

@@ -1,7 +1,7 @@
import logging
from fastapi import APIRouter, Depends, HTTPException
from ..models import ExternalConvRequestDTO, DetectIntentResponseDTO
from ..models import ConversationRequest, DetectIntentResponse
from ..services import ConversationManagerService
from ..dependencies import get_conversation_manager
@@ -11,13 +11,13 @@ logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/dialogflow", tags=["conversation"])
@router.post("/detect-intent", response_model=DetectIntentResponseDTO)
@router.post("/detect-intent", response_model=DetectIntentResponse)
async def detect_intent(
request: ExternalConvRequestDTO,
request: ConversationRequest,
conversation_manager: ConversationManagerService = Depends(
get_conversation_manager
),
) -> DetectIntentResponseDTO:
) -> DetectIntentResponse:
"""
Detect user intent and manage conversation.

View File

@@ -7,22 +7,29 @@ from ..models.quick_replies import QuickReplyScreen
from ..services.quick_reply_content import QuickReplyContentService
from ..services.redis_service import RedisService
from ..services.firestore_service import FirestoreService
from ..dependencies import get_redis_service, get_firestore_service, get_quick_reply_content_service
from ..dependencies import (
get_redis_service,
get_firestore_service,
get_quick_reply_content_service,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"])
class QuickReplyUser(BaseModel):
telefono: str
nombre: str
class QuickReplyScreenRequest(BaseModel):
usuario: QuickReplyUser
pantallaContexto: str
model_config = {"populate_by_name": True}
class QuickReplyScreenResponse(BaseModel):
responseId: str
quick_replies: QuickReplyScreen
@@ -33,7 +40,9 @@ async def start_quick_reply_session(
request: QuickReplyScreenRequest,
redis_service: RedisService = Depends(get_redis_service),
firestore_service: FirestoreService = Depends(get_firestore_service),
quick_reply_content_service: QuickReplyContentService = Depends(get_quick_reply_content_service)
quick_reply_content_service: QuickReplyContentService = Depends(
get_quick_reply_content_service
),
) -> QuickReplyScreenResponse:
"""
Start a quick reply FAQ session for a specific screen.
@@ -56,21 +65,30 @@ async def start_quick_reply_session(
session = await firestore_service.get_session_by_phone(telefono)
if session:
session_id = session.sessionId
await firestore_service.update_pantalla_contexto(session_id, pantalla_contexto)
await firestore_service.update_pantalla_contexto(
session_id, pantalla_contexto
)
session.pantallaContexto = pantalla_contexto
else:
session_id = str(uuid4())
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
session = await firestore_service.create_session(session_id, user_id, telefono, pantalla_contexto)
session = await firestore_service.create_session(
session_id, user_id, telefono, pantalla_contexto
)
# Cache session
await redis_service.save_session(session)
logger.info(f"Created quick reply session {session_id} for screen: {pantalla_contexto}")
logger.info(
f"Created quick reply session {session_id} for screen: {pantalla_contexto}"
)
# Load quick replies
quick_replies = await quick_reply_content_service.get_quick_replies(pantalla_contexto)
return QuickReplyScreenResponse(responseId=session_id, quick_replies=quick_replies)
quick_replies = await quick_reply_content_service.get_quick_replies(
pantalla_contexto
)
return QuickReplyScreenResponse(
responseId=session_id, quick_replies=quick_replies
)
except ValueError as e:
logger.error(f"Validation error: {e}", exc_info=True)

View File

@@ -4,13 +4,10 @@ from .conversation_manager import ConversationManagerService
from .notification_manager import NotificationManagerService
from .dlp_service import DLPService
from .quick_reply_content import QuickReplyContentService
from .mappers import NotificationContextMapper, ConversationContextMapper
__all__ = [
"QuickReplyContentService",
"ConversationManagerService",
"NotificationManagerService",
"DLPService",
"NotificationContextMapper",
"ConversationContextMapper",
]

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@ from datetime import datetime
from google.cloud import firestore
from ..config import Settings
from ..models import ConversationSessionDTO, ConversationEntryDTO
from ..models import ConversationSession, ConversationEntry
from ..models.notification import Notification
@@ -40,7 +40,7 @@ class FirestoreService:
"""Get Firestore document reference for session."""
return self.db.collection(self.conversations_collection).document(session_id)
async def get_session(self, session_id: str) -> ConversationSessionDTO | None:
async def get_session(self, session_id: str) -> ConversationSession | None:
"""Retrieve conversation session from Firestore by session ID."""
try:
doc_ref = self._session_ref(session_id)
@@ -51,7 +51,7 @@ class FirestoreService:
return None
data = doc.to_dict()
session = ConversationSessionDTO.model_validate(data)
session = ConversationSession.model_validate(data)
logger.debug(f"Retrieved session from Firestore: {session_id}")
return session
@@ -61,9 +61,7 @@ class FirestoreService:
)
return None
async def get_session_by_phone(
self, telefono: str
) -> ConversationSessionDTO | None:
async def get_session_by_phone(self, telefono: str) -> ConversationSession | None:
"""
Retrieve most recent conversation session from Firestore by phone number.
@@ -84,7 +82,7 @@ class FirestoreService:
docs = query.stream()
async for doc in docs:
data = doc.to_dict()
session = ConversationSessionDTO.model_validate(data)
session = ConversationSession.model_validate(data)
logger.debug(
f"Retrieved session from Firestore for phone {telefono}: {session.sessionId}"
)
@@ -99,7 +97,7 @@ class FirestoreService:
)
return None
async def save_session(self, session: ConversationSessionDTO) -> bool:
async def save_session(self, session: ConversationSession) -> bool:
"""Save conversation session to Firestore."""
try:
doc_ref = self._session_ref(session.sessionId)
@@ -121,7 +119,7 @@ class FirestoreService:
telefono: str,
pantalla_contexto: str | None = None,
last_message: str | None = None,
) -> ConversationSessionDTO:
) -> ConversationSession:
"""Create and save a new conversation session to Firestore.
Args:
@@ -137,7 +135,7 @@ class FirestoreService:
Raises:
Exception: If session creation or save fails
"""
session = ConversationSessionDTO.create(
session = ConversationSession.create(
session_id=session_id,
user_id=user_id,
telefono=telefono,
@@ -152,7 +150,7 @@ class FirestoreService:
logger.info(f"Created new session in Firestore: {session_id}")
return session
async def save_entry(self, session_id: str, entry: ConversationEntryDTO) -> bool:
async def save_entry(self, session_id: str, entry: ConversationEntry) -> bool:
"""Save conversation entry to Firestore subcollection."""
try:
doc_ref = self._session_ref(session_id)
@@ -175,7 +173,7 @@ class FirestoreService:
async def get_entries(
self, session_id: str, limit: int = 10
) -> list[ConversationEntryDTO]:
) -> list[ConversationEntry]:
"""Retrieve recent conversation entries from Firestore."""
try:
doc_ref = self._session_ref(session_id)
@@ -191,7 +189,7 @@ class FirestoreService:
async for doc in docs:
entry_data = doc.to_dict()
entry = ConversationEntryDTO.model_validate(entry_data)
entry = ConversationEntry.model_validate(entry_data)
entries.append(entry)
# Reverse to get chronological order

View File

@@ -1,229 +0,0 @@
"""
Copyright 2025 Google. This software is provided as-is, without warranty or
representation for any use or purpose. Your use of it is subject to your
agreement with Google.
Mappers for converting DTOs to text format for Gemini API.
"""
import json
import logging
from datetime import datetime, timedelta
from ..models import (
ConversationSessionDTO,
ConversationEntryDTO,
)
from ..models.notification import Notification
logger = logging.getLogger(__name__)
class NotificationContextMapper:
"""Maps notifications to text format for Gemini classification."""
@staticmethod
def to_text(notification: Notification) -> str:
"""
Convert a notification to text format.
Args:
notification: Notification DTO
Returns:
Notification text
"""
if not notification or not notification.texto:
return ""
return notification.texto
@staticmethod
def to_text_multiple(notifications: list[Notification]) -> str:
"""
Convert multiple notifications to text format.
Args:
notifications: List of notification DTOs
Returns:
Notifications joined by newlines
"""
if not notifications:
return ""
texts = [n.texto for n in notifications if n.texto and n.texto.strip()]
return "\n".join(texts)
@staticmethod
def to_json(notification: Notification) -> str:
"""
Convert notification to JSON string for Gemini.
Args:
notification: Notification DTO
Returns:
JSON representation
"""
if not notification:
return "{}"
data = {
"texto": notification.texto,
"parametros": notification.parametros or {},
"timestamp": notification.timestampCreacion.isoformat(),
}
return json.dumps(data, ensure_ascii=False)
class ConversationContextMapper:
"""Maps conversation history to text format for Gemini."""
# Business rules for conversation history limits
MESSAGE_LIMIT = 60 # Maximum 60 messages
DAYS_LIMIT = 30 # Maximum 30 days
MAX_HISTORY_BYTES = 50 * 1024 # 50 KB maximum size
NOTIFICATION_TEXT_PARAM = "notification_text"
def __init__(self, message_limit: int = 60, days_limit: int = 30):
"""
Initialize conversation context mapper.
Args:
message_limit: Maximum number of messages to include
days_limit: Maximum age of messages in days
"""
self.message_limit = message_limit
self.days_limit = days_limit
def to_text_from_entries(self, entries: list[ConversationEntryDTO]) -> str:
"""
Convert conversation entries to text format.
Args:
entries: List of conversation entries
Returns:
Formatted conversation history
"""
if not entries:
return ""
formatted = [self._format_entry(entry) for entry in entries]
return "\n".join(formatted)
def to_text_with_limits(
self,
session: ConversationSessionDTO,
entries: list[ConversationEntryDTO],
) -> str:
"""
Convert conversation to text with business rule limits applied.
Applies:
- Days limit (30 days)
- Message limit (60 messages)
- Size limit (50 KB)
Args:
session: Conversation session
entries: List of conversation entries
Returns:
Formatted conversation history with limits applied
"""
if not entries:
return ""
# Filter by date (30 days)
cutoff_date = datetime.now() - timedelta(days=self.days_limit)
recent_entries = [
e for e in entries if e.timestamp and e.timestamp >= cutoff_date
]
# Sort by timestamp (oldest first) and limit count
recent_entries.sort(key=lambda e: e.timestamp)
limited_entries = recent_entries[-self.message_limit :]
# Apply size truncation (50 KB)
return self._to_text_with_truncation(limited_entries)
def _to_text_with_truncation(self, entries: list[ConversationEntryDTO]) -> str:
"""
Format entries with size truncation (50 KB max).
Args:
entries: List of conversation entries
Returns:
Formatted text, truncated if necessary
"""
if not entries:
return ""
# Format all messages
formatted_messages = [self._format_entry(entry) for entry in entries]
# Build from newest to oldest, stopping at 50KB
text_block = []
current_size = 0
# Iterate from newest to oldest
for message in reversed(formatted_messages):
message_line = message + "\n"
message_bytes = len(message_line.encode("utf-8"))
if current_size + message_bytes > self.MAX_HISTORY_BYTES:
break
text_block.insert(0, message_line)
current_size += message_bytes
return "".join(text_block).strip()
def _format_entry(self, entry: ConversationEntryDTO) -> str:
"""
Format a single conversation entry.
Args:
entry: Conversation entry
Returns:
Formatted string (e.g., "User: hello", "Agent: hi there")
"""
prefix = "User: "
content = entry.text
# Determine prefix based on entity
if entry.entity == "AGENTE":
prefix = "Agent: "
# Clean JSON artifacts from agent messages
content = self._clean_agent_message(content)
elif entry.entity == "SISTEMA":
prefix = "System: "
# Check if this is a notification in parameters
if entry.parameters and self.NOTIFICATION_TEXT_PARAM in entry.parameters:
param_text = entry.parameters[self.NOTIFICATION_TEXT_PARAM]
if param_text and str(param_text).strip():
content = str(param_text)
elif entry.entity == "LLM":
prefix = "System: "
return prefix + content
def _clean_agent_message(self, message: str) -> str:
"""
Clean agent message by removing JSON artifacts at the end.
Args:
message: Original message
Returns:
Cleaned message
"""
# Remove trailing {...} patterns
import re
return re.sub(r"\s*\{.*\}\s*$", "", message).strip()

View File

@@ -53,7 +53,9 @@ class QuickReplyContentService:
try:
if not file_path.exists():
logger.warning(f"Quick reply file not found: {file_path}")
raise ValueError(f"Quick reply file not found for screen_id: {screen_id}")
raise ValueError(
f"Quick reply file not found for screen_id: {screen_id}"
)
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
@@ -84,10 +86,14 @@ class QuickReplyContentService:
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON file {file_path}: {e}", exc_info=True)
raise ValueError(f"Invalid JSON format in quick reply file for screen_id: {screen_id}") from e
raise ValueError(
f"Invalid JSON format in quick reply file for screen_id: {screen_id}"
) from e
except Exception as e:
logger.error(
f"Error loading quick replies for screen {screen_id}: {e}",
exc_info=True,
)
raise ValueError(f"Error loading quick replies for screen_id: {screen_id}") from e
raise ValueError(
f"Error loading quick replies for screen_id: {screen_id}"
) from e

View File

@@ -121,7 +121,9 @@ class RAGService:
logger.error(f"Request error calling RAG endpoint: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error calling RAG endpoint: {str(e)}", exc_info=True)
logger.error(
f"Unexpected error calling RAG endpoint: {str(e)}", exc_info=True
)
raise
async def close(self):

View File

@@ -4,7 +4,7 @@ from datetime import datetime
from redis.asyncio import Redis
from ..config import Settings
from ..models import ConversationSessionDTO
from ..models import ConversationSession
from ..models.notification import NotificationSession, Notification
@@ -48,9 +48,7 @@ class RedisService:
"""Generate Redis key for phone-to-session mapping."""
return f"conversation:phone:{phone}"
async def get_session(
self, session_id_or_phone: str
) -> ConversationSessionDTO | None:
async def get_session(self, session_id_or_phone: str) -> ConversationSession | None:
"""
Retrieve conversation session from Redis by session ID or phone number.
@@ -84,14 +82,14 @@ class RedisService:
try:
session_dict = json.loads(data)
session = ConversationSessionDTO.model_validate(session_dict)
session = ConversationSession.model_validate(session_dict)
logger.debug(f"Retrieved session from Redis: {session_id}")
return session
except Exception as e:
logger.error(f"Error deserializing session {session_id}: {str(e)}")
return None
async def save_session(self, session: ConversationSessionDTO) -> bool:
async def save_session(self, session: ConversationSession) -> bool:
"""
Save conversation session to Redis with TTL.
@@ -156,7 +154,7 @@ class RedisService:
Args:
session_id: The session ID
message: ConversationMessageDTO or ConversationEntryDTO
message: ConversationEntry
Returns:
True if successful, False otherwise
@@ -285,9 +283,7 @@ class RedisService:
# Save to Redis
await self._cache_notification_session(updated_session)
async def _cache_notification_session(
self, session: NotificationSession
) -> bool:
async def _cache_notification_session(self, session: NotificationSession) -> bool:
"""Cache notification session in Redis."""
if not self.redis:
raise RuntimeError("Redis client not connected")