Files
int-layer/src/capa_de_integracion/models/conversation.py
2026-02-20 19:32:54 +00:00

157 lines
4.7 KiB
Python

from datetime import datetime
from typing import Any, Literal
from pydantic import BaseModel, Field, field_validator
class UsuarioDTO(BaseModel):
"""User information."""
telefono: str = Field(..., min_length=1)
nickname: str | None = None
class TextInputDTO(BaseModel):
"""Text input for queries."""
text: str
class EventInputDTO(BaseModel):
"""Event input for queries."""
event: str
class QueryParamsDTO(BaseModel):
"""Query parameters for Dialogflow requests."""
parameters: dict[str, Any] | None = None
class QueryInputDTO(BaseModel):
"""Query input combining text or event."""
text: TextInputDTO | None = None
event: EventInputDTO | None = None
language_code: str = "es"
@field_validator("text", "event")
@classmethod
def check_at_least_one(cls, v, info):
"""Ensure either text or event is provided."""
if info.field_name == "event" and v is None:
# Check if text was provided
if not info.data.get("text"):
raise ValueError("Either text or event must be provided")
return v
class DetectIntentRequestDTO(BaseModel):
"""Dialogflow detect intent request."""
query_input: QueryInputDTO
query_params: QueryParamsDTO | None = None
class QueryResultDTO(BaseModel):
"""Query result from Dialogflow."""
responseText: str | None = Field(None, alias="responseText")
parameters: dict[str, Any] | None = Field(None, alias="parameters")
model_config = {"populate_by_name": True}
class DetectIntentResponseDTO(BaseModel):
"""Dialogflow detect intent response."""
responseId: str | None = Field(None, alias="responseId")
queryResult: QueryResultDTO | None = Field(None, alias="queryResult")
quick_replies: Any | None = None # QuickReplyDTO from quick_replies module
model_config = {"populate_by_name": True}
class ExternalConvRequestDTO(BaseModel):
"""External conversation request from client."""
mensaje: str = Field(..., alias="mensaje")
usuario: UsuarioDTO = Field(..., alias="usuario")
canal: str = Field(..., alias="canal")
pantalla_contexto: str | None = Field(None, alias="pantallaContexto")
model_config = {"populate_by_name": True}
class ConversationMessageDTO(BaseModel):
"""Single conversation message."""
type: str = Field(..., alias="type") # Maps to MessageType
timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp")
text: str = Field(..., alias="text")
parameters: dict[str, Any] | None = Field(None, alias="parameters")
canal: str | None = Field(None, alias="canal")
model_config = {"populate_by_name": True}
class ConversationEntryDTO(BaseModel):
"""Single conversation entry."""
entity: Literal['user', 'assistant']
type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM"
timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp")
text: str = Field(..., alias="text")
parameters: dict[str, Any] | None = Field(None, alias="parameters")
canal: str | None = Field(None, alias="canal")
model_config = {"populate_by_name": True}
class ConversationSessionDTO(BaseModel):
"""Conversation session metadata."""
sessionId: str = Field(..., alias="sessionId")
userId: str = Field(..., alias="userId")
telefono: str = Field(..., alias="telefono")
createdAt: datetime = Field(default_factory=datetime.now, alias="createdAt")
lastModified: datetime = Field(default_factory=datetime.now, alias="lastModified")
lastMessage: str | None = Field(None, alias="lastMessage")
pantallaContexto: str | None = Field(None, alias="pantallaContexto")
model_config = {"populate_by_name": True}
@classmethod
def create(
cls,
session_id: str,
user_id: str,
telefono: str,
pantalla_contexto: str | None = None,
last_message: str | None = None,
) -> "ConversationSessionDTO":
"""Create a new conversation session."""
now = datetime.now()
return cls(
sessionId=session_id,
userId=user_id,
telefono=telefono,
createdAt=now,
lastModified=now,
pantallaContexto=pantalla_contexto,
lastMessage=last_message,
)
def with_last_message(self, last_message: str) -> "ConversationSessionDTO":
"""Create copy with updated last message."""
return self.model_copy(
update={"lastMessage": last_message, "lastModified": datetime.now()}
)
def with_pantalla_contexto(
self, pantalla_contexto: str
) -> "ConversationSessionDTO":
"""Create copy with updated pantalla contexto."""
return self.model_copy(update={"pantallaContexto": pantalla_contexto})