Fix lint errors
This commit is contained in:
@@ -41,4 +41,4 @@ dev = [
|
|||||||
|
|
||||||
[tool.ruff.lint]
|
[tool.ruff.lint]
|
||||||
select = ['ALL']
|
select = ['ALL']
|
||||||
ignore = []
|
ignore = ['D203', 'D213']
|
||||||
|
|||||||
@@ -1,9 +1,4 @@
|
|||||||
"""Copyright 2025 Google. This software is provided as-is,
|
"""Capa de Integración - Conversational AI Orchestrator Service."""
|
||||||
without warranty or representation for any use or purpose.
|
|
||||||
Your use of it is subject to your agreement with Google.
|
|
||||||
|
|
||||||
Capa de Integración - Conversational AI Orchestrator Service
|
|
||||||
"""
|
|
||||||
|
|
||||||
from .main import app, main
|
from .main import app, main
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
"""Configuration settings for the application."""
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
|
"""Dependency injection and service lifecycle management."""
|
||||||
|
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
|
||||||
from .config import settings
|
from .config import Settings, settings
|
||||||
from .services import (
|
from .services import (
|
||||||
ConversationManagerService,
|
ConversationManagerService,
|
||||||
DLPService,
|
DLPService,
|
||||||
@@ -68,7 +70,7 @@ def get_conversation_manager() -> ConversationManagerService:
|
|||||||
# Lifecycle management functions
|
# Lifecycle management functions
|
||||||
|
|
||||||
|
|
||||||
def init_services(settings) -> None:
|
def init_services(settings: Settings) -> None:
|
||||||
"""Initialize services (placeholder for compatibility)."""
|
"""Initialize services (placeholder for compatibility)."""
|
||||||
# Services are lazy-loaded via lru_cache, no explicit init needed
|
# Services are lazy-loaded via lru_cache, no explicit init needed
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,7 @@
|
|||||||
class FirestorePersistenceException(Exception):
|
"""Custom exceptions for the application."""
|
||||||
|
|
||||||
|
|
||||||
|
class FirestorePersistenceError(Exception):
|
||||||
"""Exception raised when Firestore operations fail.
|
"""Exception raised when Firestore operations fail.
|
||||||
|
|
||||||
This is typically caught and logged without failing the request.
|
This is typically caught and logged without failing the request.
|
||||||
|
|||||||
@@ -1,6 +1,10 @@
|
|||||||
|
"""Main application entry point and FastAPI app configuration."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from collections.abc import AsyncIterator
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
import uvicorn
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
@@ -17,7 +21,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(_: FastAPI):
|
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
|
||||||
"""Application lifespan manager."""
|
"""Application lifespan manager."""
|
||||||
# Startup
|
# Startup
|
||||||
logger.info("Initializing services...")
|
logger.info("Initializing services...")
|
||||||
@@ -35,7 +39,9 @@ async def lifespan(_: FastAPI):
|
|||||||
|
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="Capa de Integración - Orchestrator Service",
|
title="Capa de Integración - Orchestrator Service",
|
||||||
description="Conversational AI orchestrator for Dialogflow CX, Gemini, and Vertex AI",
|
description=(
|
||||||
|
"Conversational AI orchestrator for Dialogflow CX, Gemini, and Vertex AI"
|
||||||
|
),
|
||||||
version="0.1.0",
|
version="0.1.0",
|
||||||
lifespan=lifespan,
|
lifespan=lifespan,
|
||||||
)
|
)
|
||||||
@@ -58,18 +64,16 @@ app.include_router(quick_replies_router)
|
|||||||
|
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
async def health_check():
|
async def health_check() -> dict[str, str]:
|
||||||
"""Health check endpoint."""
|
"""Health check endpoint."""
|
||||||
return {"status": "healthy", "service": "capa-de-integracion"}
|
return {"status": "healthy", "service": "capa-de-integracion"}
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
"""Entry point for CLI."""
|
"""Entry point for CLI."""
|
||||||
import uvicorn
|
|
||||||
|
|
||||||
uvicorn.run(
|
uvicorn.run(
|
||||||
"capa_de_integracion.main:app",
|
"capa_de_integracion.main:app",
|
||||||
host="0.0.0.0",
|
host="0.0.0.0", # noqa: S104
|
||||||
port=8080,
|
port=8080,
|
||||||
reload=True,
|
reload=True,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
from datetime import datetime
|
"""Conversation models and data structures."""
|
||||||
|
|
||||||
|
from datetime import UTC, datetime
|
||||||
from typing import Any, Literal
|
from typing import Any, Literal
|
||||||
|
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
@@ -14,7 +16,7 @@ class User(BaseModel):
|
|||||||
class QueryResult(BaseModel):
|
class QueryResult(BaseModel):
|
||||||
"""Query result from Dialogflow."""
|
"""Query result from Dialogflow."""
|
||||||
|
|
||||||
responseText: str | None = Field(None, alias="responseText")
|
response_text: str | None = Field(None, alias="responseText")
|
||||||
parameters: dict[str, Any] | None = Field(None, alias="parameters")
|
parameters: dict[str, Any] | None = Field(None, alias="parameters")
|
||||||
|
|
||||||
model_config = {"populate_by_name": True}
|
model_config = {"populate_by_name": True}
|
||||||
@@ -23,8 +25,8 @@ class QueryResult(BaseModel):
|
|||||||
class DetectIntentResponse(BaseModel):
|
class DetectIntentResponse(BaseModel):
|
||||||
"""Dialogflow detect intent response."""
|
"""Dialogflow detect intent response."""
|
||||||
|
|
||||||
responseId: str | None = Field(None, alias="responseId")
|
response_id: str | None = Field(None, alias="responseId")
|
||||||
queryResult: QueryResult | None = Field(None, alias="queryResult")
|
query_result: QueryResult | None = Field(None, alias="queryResult")
|
||||||
quick_replies: Any | None = None # QuickReplyScreen from quick_replies module
|
quick_replies: Any | None = None # QuickReplyScreen from quick_replies module
|
||||||
|
|
||||||
model_config = {"populate_by_name": True}
|
model_config = {"populate_by_name": True}
|
||||||
@@ -46,7 +48,9 @@ class ConversationEntry(BaseModel):
|
|||||||
|
|
||||||
entity: Literal["user", "assistant"]
|
entity: Literal["user", "assistant"]
|
||||||
type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM"
|
type: str = Field(..., alias="type") # "INICIO", "CONVERSACION", "LLM"
|
||||||
timestamp: datetime = Field(default_factory=datetime.now, alias="timestamp")
|
timestamp: datetime = Field(
|
||||||
|
default_factory=lambda: datetime.now(UTC), alias="timestamp",
|
||||||
|
)
|
||||||
text: str = Field(..., alias="text")
|
text: str = Field(..., alias="text")
|
||||||
parameters: dict[str, Any] | None = Field(None, alias="parameters")
|
parameters: dict[str, Any] | None = Field(None, alias="parameters")
|
||||||
canal: str | None = Field(None, alias="canal")
|
canal: str | None = Field(None, alias="canal")
|
||||||
@@ -57,13 +61,17 @@ class ConversationEntry(BaseModel):
|
|||||||
class ConversationSession(BaseModel):
|
class ConversationSession(BaseModel):
|
||||||
"""Conversation session metadata."""
|
"""Conversation session metadata."""
|
||||||
|
|
||||||
sessionId: str = Field(..., alias="sessionId")
|
session_id: str = Field(..., alias="sessionId")
|
||||||
userId: str = Field(..., alias="userId")
|
user_id: str = Field(..., alias="userId")
|
||||||
telefono: str = Field(..., alias="telefono")
|
telefono: str = Field(..., alias="telefono")
|
||||||
createdAt: datetime = Field(default_factory=datetime.now, alias="createdAt")
|
created_at: datetime = Field(
|
||||||
lastModified: datetime = Field(default_factory=datetime.now, alias="lastModified")
|
default_factory=lambda: datetime.now(UTC), alias="createdAt",
|
||||||
lastMessage: str | None = Field(None, alias="lastMessage")
|
)
|
||||||
pantallaContexto: str | None = Field(None, alias="pantallaContexto")
|
last_modified: datetime = Field(
|
||||||
|
default_factory=lambda: datetime.now(UTC), alias="lastModified",
|
||||||
|
)
|
||||||
|
last_message: str | None = Field(None, alias="lastMessage")
|
||||||
|
pantalla_contexto: str | None = Field(None, alias="pantallaContexto")
|
||||||
|
|
||||||
model_config = {"populate_by_name": True}
|
model_config = {"populate_by_name": True}
|
||||||
|
|
||||||
@@ -77,7 +85,7 @@ class ConversationSession(BaseModel):
|
|||||||
last_message: str | None = None,
|
last_message: str | None = None,
|
||||||
) -> "ConversationSession":
|
) -> "ConversationSession":
|
||||||
"""Create a new conversation session."""
|
"""Create a new conversation session."""
|
||||||
now = datetime.now()
|
now = datetime.now(UTC)
|
||||||
return cls(
|
return cls(
|
||||||
sessionId=session_id,
|
sessionId=session_id,
|
||||||
userId=user_id,
|
userId=user_id,
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
from datetime import datetime
|
"""Notification models and data structures."""
|
||||||
|
|
||||||
|
from datetime import UTC, datetime
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
@@ -10,22 +12,22 @@ class Notification(BaseModel):
|
|||||||
Represents a notification to be stored in Firestore and cached in Redis.
|
Represents a notification to be stored in Firestore and cached in Redis.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
idNotificacion: str = Field(
|
id_notificacion: str = Field(
|
||||||
..., alias="idNotificacion", description="Unique notification ID",
|
..., alias="idNotificacion", description="Unique notification ID",
|
||||||
)
|
)
|
||||||
telefono: str = Field(..., alias="telefono", description="User phone number")
|
telefono: str = Field(..., alias="telefono", description="User phone number")
|
||||||
timestampCreacion: datetime = Field(
|
timestamp_creacion: datetime = Field(
|
||||||
default_factory=datetime.now,
|
default_factory=lambda: datetime.now(UTC),
|
||||||
alias="timestampCreacion",
|
alias="timestampCreacion",
|
||||||
description="Notification creation timestamp",
|
description="Notification creation timestamp",
|
||||||
)
|
)
|
||||||
texto: str = Field(..., alias="texto", description="Notification text content")
|
texto: str = Field(..., alias="texto", description="Notification text content")
|
||||||
nombreEventoDialogflow: str = Field(
|
nombre_evento_dialogflow: str = Field(
|
||||||
default="notificacion",
|
default="notificacion",
|
||||||
alias="nombreEventoDialogflow",
|
alias="nombreEventoDialogflow",
|
||||||
description="Dialogflow event name",
|
description="Dialogflow event name",
|
||||||
)
|
)
|
||||||
codigoIdiomaDialogflow: str = Field(
|
codigo_idioma_dialogflow: str = Field(
|
||||||
default="es",
|
default="es",
|
||||||
alias="codigoIdiomaDialogflow",
|
alias="codigoIdiomaDialogflow",
|
||||||
description="Dialogflow language code",
|
description="Dialogflow language code",
|
||||||
@@ -42,7 +44,7 @@ class Notification(BaseModel):
|
|||||||
model_config = {"populate_by_name": True}
|
model_config = {"populate_by_name": True}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(
|
def create( # noqa: PLR0913
|
||||||
cls,
|
cls,
|
||||||
id_notificacion: str,
|
id_notificacion: str,
|
||||||
telefono: str,
|
telefono: str,
|
||||||
@@ -70,7 +72,7 @@ class Notification(BaseModel):
|
|||||||
return cls(
|
return cls(
|
||||||
idNotificacion=id_notificacion,
|
idNotificacion=id_notificacion,
|
||||||
telefono=telefono,
|
telefono=telefono,
|
||||||
timestampCreacion=datetime.now(),
|
timestampCreacion=datetime.now(UTC),
|
||||||
texto=texto,
|
texto=texto,
|
||||||
nombreEventoDialogflow=nombre_evento_dialogflow,
|
nombreEventoDialogflow=nombre_evento_dialogflow,
|
||||||
codigoIdiomaDialogflow=codigo_idioma_dialogflow,
|
codigoIdiomaDialogflow=codigo_idioma_dialogflow,
|
||||||
@@ -82,15 +84,15 @@ class Notification(BaseModel):
|
|||||||
class NotificationSession(BaseModel):
|
class NotificationSession(BaseModel):
|
||||||
"""Notification session containing multiple notifications for a phone number."""
|
"""Notification session containing multiple notifications for a phone number."""
|
||||||
|
|
||||||
sessionId: str = Field(..., alias="sessionId", description="Session identifier")
|
session_id: str = Field(..., alias="sessionId", description="Session identifier")
|
||||||
telefono: str = Field(..., alias="telefono", description="User phone number")
|
telefono: str = Field(..., alias="telefono", description="User phone number")
|
||||||
fechaCreacion: datetime = Field(
|
fecha_creacion: datetime = Field(
|
||||||
default_factory=datetime.now,
|
default_factory=lambda: datetime.now(UTC),
|
||||||
alias="fechaCreacion",
|
alias="fechaCreacion",
|
||||||
description="Session creation time",
|
description="Session creation time",
|
||||||
)
|
)
|
||||||
ultimaActualizacion: datetime = Field(
|
ultima_actualizacion: datetime = Field(
|
||||||
default_factory=datetime.now,
|
default_factory=lambda: datetime.now(UTC),
|
||||||
alias="ultimaActualizacion",
|
alias="ultimaActualizacion",
|
||||||
description="Last update time",
|
description="Last update time",
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
"""Models for quick reply functionality."""
|
||||||
|
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
"""Conversation router for detect-intent endpoints."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
|
|
||||||
@@ -23,6 +25,7 @@ async def detect_intent(
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
request: External conversation request from client
|
request: External conversation request from client
|
||||||
|
conversation_manager: Conversation manager service instance
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dialogflow detect intent response
|
Dialogflow detect intent response
|
||||||
@@ -32,12 +35,12 @@ async def detect_intent(
|
|||||||
logger.info("Received detect-intent request")
|
logger.info("Received detect-intent request")
|
||||||
response = await conversation_manager.manage_conversation(request)
|
response = await conversation_manager.manage_conversation(request)
|
||||||
logger.info("Successfully processed detect-intent request")
|
logger.info("Successfully processed detect-intent request")
|
||||||
return response
|
|
||||||
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
logger.error(f"Validation error: {e!s}", exc_info=True)
|
logger.exception("Validation error")
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing detect-intent: {e!s}", exc_info=True)
|
logger.exception("Error processing detect-intent")
|
||||||
raise HTTPException(status_code=500, detail="Internal server error")
|
raise HTTPException(status_code=500, detail="Internal server error") from e
|
||||||
|
else:
|
||||||
|
return response
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
"""Notification router for processing push notifications."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
|
|
||||||
@@ -31,6 +33,7 @@ async def process_notification(
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
request: External notification request with text, phone, and parameters
|
request: External notification request with text, phone, and parameters
|
||||||
|
notification_manager: Notification manager service instance
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
None (204 No Content)
|
None (204 No Content)
|
||||||
@@ -46,9 +49,9 @@ async def process_notification(
|
|||||||
# Match Java behavior: process but don't return response body
|
# Match Java behavior: process but don't return response body
|
||||||
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
logger.error(f"Validation error: {e!s}", exc_info=True)
|
logger.exception("Validation error")
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing notification: {e!s}", exc_info=True)
|
logger.exception("Error processing notification")
|
||||||
raise HTTPException(status_code=500, detail="Internal server error")
|
raise HTTPException(status_code=500, detail="Internal server error") from e
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
|
"""Quick replies router for FAQ session management."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
from capa_de_integracion.dependencies import (
|
from capa_de_integracion.dependencies import (
|
||||||
get_firestore_service,
|
get_firestore_service,
|
||||||
@@ -20,19 +22,25 @@ router = APIRouter(prefix="/api/v1/quick-replies", tags=["quick-replies"])
|
|||||||
|
|
||||||
|
|
||||||
class QuickReplyUser(BaseModel):
|
class QuickReplyUser(BaseModel):
|
||||||
|
"""User information for quick reply requests."""
|
||||||
|
|
||||||
telefono: str
|
telefono: str
|
||||||
nombre: str
|
nombre: str
|
||||||
|
|
||||||
|
|
||||||
class QuickReplyScreenRequest(BaseModel):
|
class QuickReplyScreenRequest(BaseModel):
|
||||||
|
"""Request model for quick reply screen."""
|
||||||
|
|
||||||
usuario: QuickReplyUser
|
usuario: QuickReplyUser
|
||||||
pantallaContexto: str
|
pantalla_contexto: str = Field(alias="pantallaContexto")
|
||||||
|
|
||||||
model_config = {"populate_by_name": True}
|
model_config = {"populate_by_name": True}
|
||||||
|
|
||||||
|
|
||||||
class QuickReplyScreenResponse(BaseModel):
|
class QuickReplyScreenResponse(BaseModel):
|
||||||
responseId: str
|
"""Response model for quick reply screen."""
|
||||||
|
|
||||||
|
response_id: str = Field(alias="responseId")
|
||||||
quick_replies: QuickReplyScreen
|
quick_replies: QuickReplyScreen
|
||||||
|
|
||||||
|
|
||||||
@@ -52,25 +60,31 @@ async def start_quick_reply_session(
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
request: Quick reply screen request
|
request: Quick reply screen request
|
||||||
|
redis_service: Redis service instance
|
||||||
|
firestore_service: Firestore service instance
|
||||||
|
quick_reply_content_service: Quick reply content service instance
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Detect intent response with quick reply questions
|
Detect intent response with quick reply questions
|
||||||
|
|
||||||
"""
|
"""
|
||||||
try:
|
def _validate_phone(phone: str) -> None:
|
||||||
telefono = request.usuario.telefono
|
if not phone or not phone.strip():
|
||||||
pantalla_contexto = request.pantallaContexto
|
|
||||||
if not telefono or not telefono.strip():
|
|
||||||
msg = "Phone number is required"
|
msg = "Phone number is required"
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
telefono = request.usuario.telefono
|
||||||
|
pantalla_contexto = request.pantalla_contexto
|
||||||
|
_validate_phone(telefono)
|
||||||
|
|
||||||
session = await firestore_service.get_session_by_phone(telefono)
|
session = await firestore_service.get_session_by_phone(telefono)
|
||||||
if session:
|
if session:
|
||||||
session_id = session.sessionId
|
session_id = session.session_id
|
||||||
await firestore_service.update_pantalla_contexto(
|
await firestore_service.update_pantalla_contexto(
|
||||||
session_id, pantalla_contexto,
|
session_id, pantalla_contexto,
|
||||||
)
|
)
|
||||||
session.pantallaContexto = pantalla_contexto
|
session.pantalla_contexto = pantalla_contexto
|
||||||
else:
|
else:
|
||||||
session_id = str(uuid4())
|
session_id = str(uuid4())
|
||||||
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
user_id = f"user_by_phone_{telefono.replace(' ', '').replace('-', '')}"
|
||||||
@@ -81,7 +95,9 @@ async def start_quick_reply_session(
|
|||||||
# Cache session
|
# Cache session
|
||||||
await redis_service.save_session(session)
|
await redis_service.save_session(session)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Created quick reply session {session_id} for screen: {pantalla_contexto}",
|
"Created quick reply session %s for screen: %s",
|
||||||
|
session_id,
|
||||||
|
pantalla_contexto,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Load quick replies
|
# Load quick replies
|
||||||
@@ -89,13 +105,13 @@ async def start_quick_reply_session(
|
|||||||
pantalla_contexto,
|
pantalla_contexto,
|
||||||
)
|
)
|
||||||
return QuickReplyScreenResponse(
|
return QuickReplyScreenResponse(
|
||||||
responseId=session_id, quick_replies=quick_replies,
|
response_id=session_id, quick_replies=quick_replies,
|
||||||
)
|
)
|
||||||
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
logger.error(f"Validation error: {e}", exc_info=True)
|
logger.exception("Validation error")
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error starting quick reply session: {e}", exc_info=True)
|
logger.exception("Error starting quick reply session")
|
||||||
raise HTTPException(status_code=500, detail="Internal server error")
|
raise HTTPException(status_code=500, detail="Internal server error") from e
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
|
"""Conversation manager service for orchestrating user conversations."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime, timedelta
|
import re
|
||||||
|
from datetime import UTC, datetime, timedelta
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from capa_de_integracion.config import Settings
|
from capa_de_integracion.config import Settings
|
||||||
@@ -10,6 +13,7 @@ from capa_de_integracion.models import (
|
|||||||
DetectIntentResponse,
|
DetectIntentResponse,
|
||||||
QueryResult,
|
QueryResult,
|
||||||
)
|
)
|
||||||
|
from capa_de_integracion.models.notification import NotificationSession
|
||||||
|
|
||||||
from .dlp_service import DLPService
|
from .dlp_service import DLPService
|
||||||
from .firestore_service import FirestoreService
|
from .firestore_service import FirestoreService
|
||||||
@@ -46,10 +50,10 @@ class ConversationManagerService:
|
|||||||
|
|
||||||
logger.info("ConversationManagerService initialized successfully")
|
logger.info("ConversationManagerService initialized successfully")
|
||||||
|
|
||||||
async def manage_conversation(
|
async def manage_conversation( # noqa: PLR0915
|
||||||
self, request: ConversationRequest,
|
self, request: ConversationRequest,
|
||||||
) -> DetectIntentResponse:
|
) -> DetectIntentResponse:
|
||||||
"""Main entry point for managing conversations.
|
"""Manage conversation flow and return response.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
request: External conversation request from client
|
request: External conversation request from client
|
||||||
@@ -82,10 +86,11 @@ class ConversationManagerService:
|
|||||||
# Step 2: Check for pantallaContexto in existing session
|
# Step 2: Check for pantallaContexto in existing session
|
||||||
if session.pantallaContexto:
|
if session.pantallaContexto:
|
||||||
# Check if pantallaContexto is stale (10 minutes)
|
# Check if pantallaContexto is stale (10 minutes)
|
||||||
if self._is_pantalla_context_valid(session.lastModified):
|
if self._is_pantalla_context_valid(session.last_modified):
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Detected 'pantallaContexto' in session: {session.pantallaContexto}. "
|
"Detected 'pantallaContexto' in session: %s. "
|
||||||
f"Delegating to QuickReplies flow.",
|
"Delegating to QuickReplies flow.",
|
||||||
|
session.pantallaContexto,
|
||||||
)
|
)
|
||||||
response = await self._manage_quick_reply_conversation(
|
response = await self._manage_quick_reply_conversation(
|
||||||
request, session.pantallaContexto,
|
request, session.pantallaContexto,
|
||||||
@@ -95,62 +100,64 @@ class ConversationManagerService:
|
|||||||
user_entry = ConversationEntry(
|
user_entry = ConversationEntry(
|
||||||
entity="user",
|
entity="user",
|
||||||
type="CONVERSACION",
|
type="CONVERSACION",
|
||||||
timestamp=datetime.now(),
|
timestamp=datetime.now(UTC),
|
||||||
text=request.mensaje,
|
text=request.mensaje,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
canal=getattr(request, "canal", None),
|
canal=getattr(request, "canal", None),
|
||||||
)
|
)
|
||||||
await self.firestore_service.save_entry(
|
await self.firestore_service.save_entry(
|
||||||
session.sessionId, user_entry,
|
session.session_id, user_entry,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Save quick reply response to Firestore
|
# Save quick reply response to Firestore
|
||||||
response_text = (
|
response_text = (
|
||||||
response.queryResult.responseText
|
response.query_result.response_text
|
||||||
if response.queryResult
|
if response.query_result
|
||||||
else ""
|
else ""
|
||||||
) or ""
|
) or ""
|
||||||
assistant_entry = ConversationEntry(
|
assistant_entry = ConversationEntry(
|
||||||
entity="assistant",
|
entity="assistant",
|
||||||
type="CONVERSACION",
|
type="CONVERSACION",
|
||||||
timestamp=datetime.now(),
|
timestamp=datetime.now(UTC),
|
||||||
text=response_text,
|
text=response_text,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
canal=getattr(request, "canal", None),
|
canal=getattr(request, "canal", None),
|
||||||
)
|
)
|
||||||
await self.firestore_service.save_entry(
|
await self.firestore_service.save_entry(
|
||||||
session.sessionId, assistant_entry,
|
session.session_id, assistant_entry,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update session with last message and timestamp
|
# Update session with last message and timestamp
|
||||||
session.lastMessage = response_text
|
session.last_message = response_text
|
||||||
session.lastModified = datetime.now()
|
session.last_modified = datetime.now(UTC)
|
||||||
await self.firestore_service.save_session(session)
|
await self.firestore_service.save_session(session)
|
||||||
await self.redis_service.save_session(session)
|
await self.redis_service.save_session(session)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
else:
|
else:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Detected STALE 'pantallaContexto'. Ignoring and proceeding with normal flow.",
|
"Detected STALE 'pantallaContexto'. "
|
||||||
|
"Ignoring and proceeding with normal flow.",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 3: Continue with standard conversation flow
|
# Step 3: Continue with standard conversation flow
|
||||||
nickname = request.usuario.nickname
|
nickname = request.usuario.nickname
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Primary Check (Redis): Looking up session for phone: {telefono}",
|
"Primary Check (Redis): Looking up session for phone: %s",
|
||||||
|
telefono,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 3a: Load conversation history from Firestore
|
# Step 3a: Load conversation history from Firestore
|
||||||
entries = await self.firestore_service.get_entries(
|
entries = await self.firestore_service.get_entries(
|
||||||
session.sessionId,
|
session.session_id,
|
||||||
limit=self.settings.conversation_context_message_limit,
|
limit=self.settings.conversation_context_message_limit,
|
||||||
)
|
)
|
||||||
logger.info(f"Loaded {len(entries)} conversation entries from Firestore")
|
logger.info("Loaded %s conversation entries from Firestore", len(entries))
|
||||||
|
|
||||||
# Step 3b: Retrieve active notifications for this user
|
# Step 3b: Retrieve active notifications for this user
|
||||||
notifications = await self._get_active_notifications(telefono)
|
notifications = await self._get_active_notifications(telefono)
|
||||||
logger.info(f"Retrieved {len(notifications)} active notifications")
|
logger.info("Retrieved %s active notifications", len(notifications))
|
||||||
|
|
||||||
# Step 3c: Prepare context for RAG service
|
# Step 3c: Prepare context for RAG service
|
||||||
messages = await self._prepare_rag_messages(
|
messages = await self._prepare_rag_messages(
|
||||||
@@ -165,36 +172,37 @@ class ConversationManagerService:
|
|||||||
logger.info("Sending query to RAG service")
|
logger.info("Sending query to RAG service")
|
||||||
assistant_response = await self.rag_service.query(messages)
|
assistant_response = await self.rag_service.query(messages)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Received response from RAG service: {assistant_response[:100]}...",
|
"Received response from RAG service: %s...",
|
||||||
|
assistant_response[:100],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 3e: Save user message to Firestore
|
# Step 3e: Save user message to Firestore
|
||||||
user_entry = ConversationEntry(
|
user_entry = ConversationEntry(
|
||||||
entity="user",
|
entity="user",
|
||||||
type="CONVERSACION",
|
type="CONVERSACION",
|
||||||
timestamp=datetime.now(),
|
timestamp=datetime.now(UTC),
|
||||||
text=request.mensaje,
|
text=request.mensaje,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
canal=getattr(request, "canal", None),
|
canal=getattr(request, "canal", None),
|
||||||
)
|
)
|
||||||
await self.firestore_service.save_entry(session.sessionId, user_entry)
|
await self.firestore_service.save_entry(session.session_id, user_entry)
|
||||||
logger.info("Saved user message to Firestore")
|
logger.info("Saved user message to Firestore")
|
||||||
|
|
||||||
# Step 3f: Save assistant response to Firestore
|
# Step 3f: Save assistant response to Firestore
|
||||||
assistant_entry = ConversationEntry(
|
assistant_entry = ConversationEntry(
|
||||||
entity="assistant",
|
entity="assistant",
|
||||||
type="LLM",
|
type="LLM",
|
||||||
timestamp=datetime.now(),
|
timestamp=datetime.now(UTC),
|
||||||
text=assistant_response,
|
text=assistant_response,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
canal=getattr(request, "canal", None),
|
canal=getattr(request, "canal", None),
|
||||||
)
|
)
|
||||||
await self.firestore_service.save_entry(session.sessionId, assistant_entry)
|
await self.firestore_service.save_entry(session.session_id, assistant_entry)
|
||||||
logger.info("Saved assistant response to Firestore")
|
logger.info("Saved assistant response to Firestore")
|
||||||
|
|
||||||
# Step 3g: Update session with last message and timestamp
|
# Step 3g: Update session with last message and timestamp
|
||||||
session.lastMessage = assistant_response
|
session.last_message = assistant_response
|
||||||
session.lastModified = datetime.now()
|
session.last_modified = datetime.now(UTC)
|
||||||
await self.firestore_service.save_session(session)
|
await self.firestore_service.save_session(session)
|
||||||
await self.redis_service.save_session(session)
|
await self.redis_service.save_session(session)
|
||||||
logger.info("Updated session in Firestore and Redis")
|
logger.info("Updated session in Firestore and Redis")
|
||||||
@@ -202,26 +210,26 @@ class ConversationManagerService:
|
|||||||
# Step 3h: Mark notifications as processed if any were included
|
# Step 3h: Mark notifications as processed if any were included
|
||||||
if notifications:
|
if notifications:
|
||||||
await self._mark_notifications_as_processed(telefono)
|
await self._mark_notifications_as_processed(telefono)
|
||||||
logger.info(f"Marked {len(notifications)} notifications as processed")
|
logger.info("Marked %s notifications as processed", len(notifications))
|
||||||
|
|
||||||
# Step 3i: Return response object
|
# Step 3i: Return response object
|
||||||
return DetectIntentResponse(
|
return DetectIntentResponse(
|
||||||
responseId=str(uuid4()),
|
response_id=str(uuid4()),
|
||||||
queryResult=QueryResult(
|
query_result=QueryResult(
|
||||||
responseText=assistant_response,
|
response_text=assistant_response,
|
||||||
parameters=None,
|
parameters=None,
|
||||||
),
|
),
|
||||||
quick_replies=None,
|
quick_replies=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error(f"Error managing conversation: {e!s}", exc_info=True)
|
logger.exception("Error managing conversation")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _is_pantalla_context_valid(self, last_modified: datetime) -> bool:
|
def _is_pantalla_context_valid(self, last_modified: datetime) -> bool:
|
||||||
"""Check if pantallaContexto is still valid (not stale)."""
|
"""Check if pantallaContexto is still valid (not stale)."""
|
||||||
time_diff = datetime.now() - last_modified
|
time_diff = datetime.now(UTC) - last_modified
|
||||||
return time_diff < timedelta(minutes=self.SCREEN_CONTEXT_TIMEOUT_MINUTES)
|
return time_diff < timedelta(minutes=self.SCREEN_CONTEXT_TIMEOUT_MINUTES)
|
||||||
|
|
||||||
async def _manage_quick_reply_conversation(
|
async def _manage_quick_reply_conversation(
|
||||||
@@ -234,7 +242,7 @@ class ConversationManagerService:
|
|||||||
|
|
||||||
# If no questions available, delegate to normal conversation flow
|
# If no questions available, delegate to normal conversation flow
|
||||||
if not quick_reply_screen.preguntas:
|
if not quick_reply_screen.preguntas:
|
||||||
logger.warning(f"No quick replies found for screen: {screen_id}.")
|
logger.warning("No quick replies found for screen: %s.", screen_id)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Match user message to a quick reply question
|
# Match user message to a quick reply question
|
||||||
@@ -245,19 +253,20 @@ class ConversationManagerService:
|
|||||||
# Simple matching: check if question title matches user message
|
# Simple matching: check if question title matches user message
|
||||||
if pregunta.titulo.lower().strip() == user_message_lower:
|
if pregunta.titulo.lower().strip() == user_message_lower:
|
||||||
matched_answer = pregunta.respuesta
|
matched_answer = pregunta.respuesta
|
||||||
logger.info(f"Matched quick reply: {pregunta.titulo}")
|
logger.info("Matched quick reply: %s", pregunta.titulo)
|
||||||
break
|
break
|
||||||
|
|
||||||
# If no match, use first question as default or delegate to normal flow
|
# If no match, use first question as default or delegate to normal flow
|
||||||
if not matched_answer:
|
if not matched_answer:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"No matching quick reply found for message: '{request.mensaje}'.",
|
"No matching quick reply found for message: '%s'.",
|
||||||
|
request.mensaje,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create response with the matched quick reply answer
|
# Create response with the matched quick reply answer
|
||||||
return DetectIntentResponse(
|
return DetectIntentResponse(
|
||||||
responseId=str(uuid4()),
|
response_id=str(uuid4()),
|
||||||
queryResult=QueryResult(responseText=matched_answer, parameters=None),
|
query_result=QueryResult(response_text=matched_answer, parameters=None),
|
||||||
quick_replies=quick_reply_screen,
|
quick_replies=quick_reply_screen,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -281,8 +290,6 @@ class ConversationManagerService:
|
|||||||
# If not in Redis, try Firestore
|
# If not in Redis, try Firestore
|
||||||
if not notification_session:
|
if not notification_session:
|
||||||
# Firestore uses phone as document ID for notifications
|
# Firestore uses phone as document ID for notifications
|
||||||
from capa_de_integracion.models.notification import NotificationSession
|
|
||||||
|
|
||||||
doc_ref = self.firestore_service.db.collection(
|
doc_ref = self.firestore_service.db.collection(
|
||||||
self.firestore_service.notifications_collection,
|
self.firestore_service.notifications_collection,
|
||||||
).document(telefono)
|
).document(telefono)
|
||||||
@@ -294,20 +301,19 @@ class ConversationManagerService:
|
|||||||
|
|
||||||
# Filter for active notifications only
|
# Filter for active notifications only
|
||||||
if notification_session and notification_session.notificaciones:
|
if notification_session and notification_session.notificaciones:
|
||||||
return [
|
active_notifications = [
|
||||||
notif
|
notif
|
||||||
for notif in notification_session.notificaciones
|
for notif in notification_session.notificaciones
|
||||||
if notif.status == "active"
|
if notif.status == "active"
|
||||||
]
|
]
|
||||||
|
else:
|
||||||
|
active_notifications = []
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error retrieving notifications for %s", telefono)
|
||||||
return []
|
return []
|
||||||
|
else:
|
||||||
except Exception as e:
|
return active_notifications
|
||||||
logger.error(
|
|
||||||
f"Error retrieving notifications for {telefono}: {e!s}",
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
return []
|
|
||||||
|
|
||||||
async def _prepare_rag_messages(
|
async def _prepare_rag_messages(
|
||||||
self,
|
self,
|
||||||
@@ -339,7 +345,10 @@ class ConversationManagerService:
|
|||||||
messages.append(
|
messages.append(
|
||||||
{
|
{
|
||||||
"role": "system",
|
"role": "system",
|
||||||
"content": f"Historial de conversación:\n{conversation_context}",
|
"content": (
|
||||||
|
f"Historial de conversación:\n"
|
||||||
|
f"{conversation_context}"
|
||||||
|
),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -353,7 +362,10 @@ class ConversationManagerService:
|
|||||||
messages.append(
|
messages.append(
|
||||||
{
|
{
|
||||||
"role": "system",
|
"role": "system",
|
||||||
"content": f"Notificaciones pendientes para el usuario:\n{notifications_text}",
|
"content": (
|
||||||
|
f"Notificaciones pendientes para el usuario:\n"
|
||||||
|
f"{notifications_text}"
|
||||||
|
),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -382,17 +394,17 @@ class ConversationManagerService:
|
|||||||
# Update or delete from Redis
|
# Update or delete from Redis
|
||||||
await self.redis_service.delete_notification_session(telefono)
|
await self.redis_service.delete_notification_session(telefono)
|
||||||
|
|
||||||
logger.info(f"Marked notifications as processed for {telefono}")
|
logger.info("Marked notifications as processed for %s", telefono)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error(
|
logger.exception(
|
||||||
f"Error marking notifications as processed for {telefono}: {e!s}",
|
"Error marking notifications as processed for %s",
|
||||||
exc_info=True,
|
telefono,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _format_conversation_history(
|
def _format_conversation_history(
|
||||||
self,
|
self,
|
||||||
session: ConversationSession,
|
session: ConversationSession, # noqa: ARG002
|
||||||
entries: list[ConversationEntry],
|
entries: list[ConversationEntry],
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Format conversation history with business rule limits.
|
"""Format conversation history with business rule limits.
|
||||||
@@ -414,7 +426,7 @@ class ConversationManagerService:
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
# Filter by date (30 days)
|
# Filter by date (30 days)
|
||||||
cutoff_date = datetime.now() - timedelta(
|
cutoff_date = datetime.now(UTC) - timedelta(
|
||||||
days=self.settings.conversation_context_days_limit,
|
days=self.settings.conversation_context_days_limit,
|
||||||
)
|
)
|
||||||
recent_entries = [
|
recent_entries = [
|
||||||
@@ -445,7 +457,7 @@ class ConversationManagerService:
|
|||||||
if not entries:
|
if not entries:
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
MAX_BYTES = 50 * 1024 # 50KB
|
max_bytes = 50 * 1024 # 50KB
|
||||||
formatted_messages = [self._format_entry(entry) for entry in entries]
|
formatted_messages = [self._format_entry(entry) for entry in entries]
|
||||||
|
|
||||||
# Build from newest to oldest
|
# Build from newest to oldest
|
||||||
@@ -456,7 +468,7 @@ class ConversationManagerService:
|
|||||||
message_line = message + "\n"
|
message_line = message + "\n"
|
||||||
message_bytes = len(message_line.encode("utf-8"))
|
message_bytes = len(message_line.encode("utf-8"))
|
||||||
|
|
||||||
if current_size + message_bytes > MAX_BYTES:
|
if current_size + message_bytes > max_bytes:
|
||||||
break
|
break
|
||||||
|
|
||||||
text_block.insert(0, message_line)
|
text_block.insert(0, message_line)
|
||||||
@@ -481,8 +493,6 @@ class ConversationManagerService:
|
|||||||
content = entry.text
|
content = entry.text
|
||||||
if entry.entity == "assistant":
|
if entry.entity == "assistant":
|
||||||
# Remove trailing JSON artifacts like {...}
|
# Remove trailing JSON artifacts like {...}
|
||||||
import re
|
|
||||||
|
|
||||||
content = re.sub(r"\s*\{.*\}\s*$", "", content).strip()
|
content = re.sub(r"\s*\{.*\}\s*$", "", content).strip()
|
||||||
|
|
||||||
return prefix + content
|
return prefix + content
|
||||||
|
|||||||
@@ -1,9 +1,4 @@
|
|||||||
"""Copyright 2025 Google. This software is provided as-is, without warranty or
|
"""DLP service for detecting and obfuscating sensitive data."""
|
||||||
representation for any use or purpose. Your use of it is subject to your
|
|
||||||
agreement with Google.
|
|
||||||
|
|
||||||
Data Loss Prevention service for obfuscating sensitive information.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
@@ -15,6 +10,11 @@ from capa_de_integracion.config import Settings
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# DLP likelihood threshold for filtering findings
|
||||||
|
LIKELIHOOD_THRESHOLD = 3 # POSSIBLE (values: 0=VERY_UNLIKELY to 5=VERY_LIKELY)
|
||||||
|
# Minimum length for last 4 characters extraction
|
||||||
|
MIN_LENGTH_FOR_LAST_FOUR = 4
|
||||||
|
|
||||||
|
|
||||||
class DLPService:
|
class DLPService:
|
||||||
"""Service for detecting and obfuscating sensitive data using Google Cloud DLP.
|
"""Service for detecting and obfuscating sensitive data using Google Cloud DLP.
|
||||||
@@ -71,7 +71,10 @@ class DLPService:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Build request
|
# Build request
|
||||||
inspect_template_name = f"projects/{self.project_id}/locations/{self.location}/inspectTemplates/{template_id}"
|
inspect_template_name = (
|
||||||
|
f"projects/{self.project_id}/locations/{self.location}/"
|
||||||
|
f"inspectTemplates/{template_id}"
|
||||||
|
)
|
||||||
parent = f"projects/{self.project_id}/locations/{self.location}"
|
parent = f"projects/{self.project_id}/locations/{self.location}"
|
||||||
|
|
||||||
request = types.InspectContentRequest(
|
request = types.InspectContentRequest(
|
||||||
@@ -85,18 +88,18 @@ class DLPService:
|
|||||||
response = await self.dlp_client.inspect_content(request=request)
|
response = await self.dlp_client.inspect_content(request=request)
|
||||||
|
|
||||||
findings_count = len(response.result.findings)
|
findings_count = len(response.result.findings)
|
||||||
logger.info(f"DLP {template_id} Findings: {findings_count}")
|
logger.info("DLP %s Findings: %s", template_id, findings_count)
|
||||||
|
|
||||||
if findings_count > 0:
|
if findings_count > 0:
|
||||||
return self._obfuscate_text(response, text)
|
obfuscated_text = self._obfuscate_text(response, text)
|
||||||
return text
|
else:
|
||||||
|
obfuscated_text = text
|
||||||
|
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error(
|
logger.exception("Error during DLP inspection. Returning original text.")
|
||||||
f"Error during DLP inspection: {e}. Returning original text.",
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
return text
|
return text
|
||||||
|
else:
|
||||||
|
return obfuscated_text
|
||||||
|
|
||||||
def _obfuscate_text(self, response: types.InspectContentResponse, text: str) -> str:
|
def _obfuscate_text(self, response: types.InspectContentResponse, text: str) -> str:
|
||||||
"""Obfuscate sensitive findings in text.
|
"""Obfuscate sensitive findings in text.
|
||||||
@@ -109,11 +112,11 @@ class DLPService:
|
|||||||
Text with sensitive data obfuscated
|
Text with sensitive data obfuscated
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# Filter findings by likelihood (> POSSIBLE, which is value 3)
|
# Filter findings by likelihood (> POSSIBLE)
|
||||||
findings = [
|
findings = [
|
||||||
finding
|
finding
|
||||||
for finding in response.result.findings
|
for finding in response.result.findings
|
||||||
if finding.likelihood.value > 3
|
if finding.likelihood.value > LIKELIHOOD_THRESHOLD
|
||||||
]
|
]
|
||||||
|
|
||||||
# Sort by likelihood (descending)
|
# Sort by likelihood (descending)
|
||||||
@@ -124,7 +127,9 @@ class DLPService:
|
|||||||
info_type = finding.info_type.name
|
info_type = finding.info_type.name
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"InfoType: {info_type} | Likelihood: {finding.likelihood.value}",
|
"InfoType: %s | Likelihood: %s",
|
||||||
|
info_type,
|
||||||
|
finding.likelihood.value,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Obfuscate based on info type
|
# Obfuscate based on info type
|
||||||
@@ -176,13 +181,15 @@ class DLPService:
|
|||||||
def _get_last4(self, quote: str) -> str:
|
def _get_last4(self, quote: str) -> str:
|
||||||
"""Extract last 4 characters from quote (removing spaces)."""
|
"""Extract last 4 characters from quote (removing spaces)."""
|
||||||
clean_quote = quote.strip().replace(" ", "")
|
clean_quote = quote.strip().replace(" ", "")
|
||||||
if len(clean_quote) >= 4:
|
if len(clean_quote) >= MIN_LENGTH_FOR_LAST_FOUR:
|
||||||
return clean_quote[-4:]
|
return clean_quote[-4:]
|
||||||
return clean_quote
|
return clean_quote
|
||||||
|
|
||||||
def _clean_direccion(self, text: str) -> str:
|
def _clean_direccion(self, text: str) -> str:
|
||||||
"""Clean up consecutive [DIRECCION] tags."""
|
"""Clean up consecutive [DIRECCION] tags.
|
||||||
# Replace multiple [DIRECCION] tags separated by commas or spaces with single tag
|
|
||||||
|
Replace multiple [DIRECCION] tags separated by commas or spaces.
|
||||||
|
"""
|
||||||
pattern = r"\[DIRECCION\](?:(?:,\s*|\s+)\[DIRECCION\])*"
|
pattern = r"\[DIRECCION\](?:(?:,\s*|\s+)\[DIRECCION\])*"
|
||||||
return re.sub(pattern, "[DIRECCION]", text).strip()
|
return re.sub(pattern, "[DIRECCION]", text).strip()
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
|
"""Firestore service for conversation and notification persistence."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
from google.cloud import firestore
|
from google.cloud import firestore
|
||||||
|
|
||||||
@@ -28,7 +30,8 @@ class FirestoreService:
|
|||||||
f"artifacts/{settings.gcp_project_id}/notifications"
|
f"artifacts/{settings.gcp_project_id}/notifications"
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Firestore client initialized for project: {settings.gcp_project_id}",
|
"Firestore client initialized for project: %s",
|
||||||
|
settings.gcp_project_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
@@ -36,7 +39,7 @@ class FirestoreService:
|
|||||||
self.db.close()
|
self.db.close()
|
||||||
logger.info("Firestore client closed")
|
logger.info("Firestore client closed")
|
||||||
|
|
||||||
def _session_ref(self, session_id: str):
|
def _session_ref(self, session_id: str) -> firestore.DocumentReference:
|
||||||
"""Get Firestore document reference for session."""
|
"""Get Firestore document reference for session."""
|
||||||
return self.db.collection(self.conversations_collection).document(session_id)
|
return self.db.collection(self.conversations_collection).document(session_id)
|
||||||
|
|
||||||
@@ -47,19 +50,20 @@ class FirestoreService:
|
|||||||
doc = await doc_ref.get()
|
doc = await doc_ref.get()
|
||||||
|
|
||||||
if not doc.exists:
|
if not doc.exists:
|
||||||
logger.debug(f"Session not found in Firestore: {session_id}")
|
logger.debug("Session not found in Firestore: %s", session_id)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
data = doc.to_dict()
|
data = doc.to_dict()
|
||||||
session = ConversationSession.model_validate(data)
|
session = ConversationSession.model_validate(data)
|
||||||
logger.debug(f"Retrieved session from Firestore: {session_id}")
|
logger.debug("Retrieved session from Firestore: %s", session_id)
|
||||||
return session
|
except Exception:
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error retrieving session {session_id} from Firestore: {e!s}",
|
"Error retrieving session %s from Firestore:",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
else:
|
||||||
|
return session
|
||||||
|
|
||||||
async def get_session_by_phone(self, telefono: str) -> ConversationSession | None:
|
async def get_session_by_phone(self, telefono: str) -> ConversationSession | None:
|
||||||
"""Retrieve most recent conversation session from Firestore by phone number.
|
"""Retrieve most recent conversation session from Firestore by phone number.
|
||||||
@@ -84,33 +88,37 @@ class FirestoreService:
|
|||||||
data = doc.to_dict()
|
data = doc.to_dict()
|
||||||
session = ConversationSession.model_validate(data)
|
session = ConversationSession.model_validate(data)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Retrieved session from Firestore for phone {telefono}: {session.sessionId}",
|
"Retrieved session from Firestore for phone %s: %s",
|
||||||
|
telefono,
|
||||||
|
session.session_id,
|
||||||
)
|
)
|
||||||
return session
|
return session
|
||||||
|
|
||||||
logger.debug(f"No session found in Firestore for phone: {telefono}")
|
logger.debug("No session found in Firestore for phone: %s", telefono)
|
||||||
return None
|
except Exception:
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error querying session by phone {telefono} from Firestore: {e!s}",
|
"Error querying session by phone %s from Firestore:",
|
||||||
|
telefono,
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
async def save_session(self, session: ConversationSession) -> bool:
|
async def save_session(self, session: ConversationSession) -> bool:
|
||||||
"""Save conversation session to Firestore."""
|
"""Save conversation session to Firestore."""
|
||||||
try:
|
try:
|
||||||
doc_ref = self._session_ref(session.sessionId)
|
doc_ref = self._session_ref(session.session_id)
|
||||||
data = session.model_dump()
|
data = session.model_dump()
|
||||||
await doc_ref.set(data, merge=True)
|
await doc_ref.set(data, merge=True)
|
||||||
logger.debug(f"Saved session to Firestore: {session.sessionId}")
|
logger.debug("Saved session to Firestore: %s", session.session_id)
|
||||||
return True
|
except Exception:
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error saving session {session.sessionId} to Firestore: {e!s}",
|
"Error saving session %s to Firestore:",
|
||||||
|
session.session_id,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
async def create_session(
|
async def create_session(
|
||||||
self,
|
self,
|
||||||
@@ -144,11 +152,11 @@ class FirestoreService:
|
|||||||
last_message=last_message,
|
last_message=last_message,
|
||||||
)
|
)
|
||||||
|
|
||||||
doc_ref = self._session_ref(session.sessionId)
|
doc_ref = self._session_ref(session.session_id)
|
||||||
data = session.model_dump()
|
data = session.model_dump()
|
||||||
await doc_ref.set(data, merge=True)
|
await doc_ref.set(data, merge=True)
|
||||||
|
|
||||||
logger.info(f"Created new session in Firestore: {session_id}")
|
logger.info("Created new session in Firestore: %s", session_id)
|
||||||
return session
|
return session
|
||||||
|
|
||||||
async def save_entry(self, session_id: str, entry: ConversationEntry) -> bool:
|
async def save_entry(self, session_id: str, entry: ConversationEntry) -> bool:
|
||||||
@@ -163,14 +171,15 @@ class FirestoreService:
|
|||||||
|
|
||||||
data = entry.model_dump()
|
data = entry.model_dump()
|
||||||
await entry_doc.set(data)
|
await entry_doc.set(data)
|
||||||
logger.debug(f"Saved entry to Firestore for session: {session_id}")
|
logger.debug("Saved entry to Firestore for session: %s", session_id)
|
||||||
return True
|
except Exception:
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error saving entry for session {session_id} to Firestore: {e!s}",
|
"Error saving entry for session %s to Firestore:",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
async def get_entries(
|
async def get_entries(
|
||||||
self, session_id: str, limit: int = 10,
|
self, session_id: str, limit: int = 10,
|
||||||
@@ -195,14 +204,17 @@ class FirestoreService:
|
|||||||
|
|
||||||
# Reverse to get chronological order
|
# Reverse to get chronological order
|
||||||
entries.reverse()
|
entries.reverse()
|
||||||
logger.debug(f"Retrieved {len(entries)} entries for session: {session_id}")
|
logger.debug(
|
||||||
return entries
|
"Retrieved %s entries for session: %s", len(entries), session_id,
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error retrieving entries for session {session_id} from Firestore: {e!s}",
|
"Error retrieving entries for session %s from Firestore:",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
|
else:
|
||||||
|
return entries
|
||||||
|
|
||||||
async def delete_session(self, session_id: str) -> bool:
|
async def delete_session(self, session_id: str) -> bool:
|
||||||
"""Delete conversation session and all entries from Firestore."""
|
"""Delete conversation session and all entries from Firestore."""
|
||||||
@@ -216,14 +228,15 @@ class FirestoreService:
|
|||||||
|
|
||||||
# Delete session document
|
# Delete session document
|
||||||
await doc_ref.delete()
|
await doc_ref.delete()
|
||||||
logger.debug(f"Deleted session from Firestore: {session_id}")
|
logger.debug("Deleted session from Firestore: %s", session_id)
|
||||||
return True
|
except Exception:
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error deleting session {session_id} from Firestore: {e!s}",
|
"Error deleting session %s from Firestore:",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
async def update_pantalla_contexto(
|
async def update_pantalla_contexto(
|
||||||
self, session_id: str, pantalla_contexto: str | None,
|
self, session_id: str, pantalla_contexto: str | None,
|
||||||
@@ -244,31 +257,34 @@ class FirestoreService:
|
|||||||
|
|
||||||
if not doc.exists:
|
if not doc.exists:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Session {session_id} not found in Firestore. Cannot update pantallaContexto",
|
"Session %s not found in Firestore. Cannot update pantallaContexto",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
await doc_ref.update(
|
await doc_ref.update(
|
||||||
{
|
{
|
||||||
"pantallaContexto": pantalla_contexto,
|
"pantallaContexto": pantalla_contexto,
|
||||||
"lastModified": datetime.now(),
|
"lastModified": datetime.now(UTC),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Updated pantallaContexto for session {session_id} in Firestore",
|
"Updated pantallaContexto for session %s in Firestore",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return True
|
except Exception:
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error updating pantallaContexto for session {session_id} in Firestore: {e!s}",
|
"Error updating pantallaContexto for session %s in Firestore:",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
# ====== Notification Methods ======
|
# ====== Notification Methods ======
|
||||||
|
|
||||||
def _notification_ref(self, notification_id: str):
|
def _notification_ref(self, notification_id: str) -> firestore.DocumentReference:
|
||||||
"""Get Firestore document reference for notification."""
|
"""Get Firestore document reference for notification."""
|
||||||
return self.db.collection(self.notifications_collection).document(
|
return self.db.collection(self.notifications_collection).document(
|
||||||
notification_id,
|
notification_id,
|
||||||
@@ -303,30 +319,33 @@ class FirestoreService:
|
|||||||
await doc_ref.update(
|
await doc_ref.update(
|
||||||
{
|
{
|
||||||
"notificaciones": firestore.ArrayUnion([entry_dict]),
|
"notificaciones": firestore.ArrayUnion([entry_dict]),
|
||||||
"ultimaActualizacion": datetime.now(),
|
"ultima_actualizacion": datetime.now(UTC),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Successfully appended notification entry to session {notification_session_id} in Firestore",
|
"Successfully appended notification entry "
|
||||||
|
"to session %s in Firestore",
|
||||||
|
notification_session_id,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Create new notification session
|
# Create new notification session
|
||||||
new_session_data = {
|
new_session_data = {
|
||||||
"sessionId": notification_session_id,
|
"session_id": notification_session_id,
|
||||||
"telefono": phone_number,
|
"telefono": phone_number,
|
||||||
"fechaCreacion": datetime.now(),
|
"fecha_creacion": datetime.now(UTC),
|
||||||
"ultimaActualizacion": datetime.now(),
|
"ultima_actualizacion": datetime.now(UTC),
|
||||||
"notificaciones": [entry_dict],
|
"notificaciones": [entry_dict],
|
||||||
}
|
}
|
||||||
await doc_ref.set(new_session_data)
|
await doc_ref.set(new_session_data)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Successfully created new notification session {notification_session_id} in Firestore",
|
"Successfully created new notification session %s in Firestore",
|
||||||
|
notification_session_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error(
|
logger.exception(
|
||||||
f"Error saving notification to Firestore for phone {phone_number}: {e!s}",
|
"Error saving notification to Firestore for phone %s",
|
||||||
exc_info=True,
|
phone_number,
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@@ -344,7 +363,9 @@ class FirestoreService:
|
|||||||
|
|
||||||
if not doc.exists:
|
if not doc.exists:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Notification session {session_id} not found in Firestore. Cannot update status",
|
"Notification session %s not found in Firestore. "
|
||||||
|
"Cannot update status",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -359,18 +380,21 @@ class FirestoreService:
|
|||||||
await doc_ref.update(
|
await doc_ref.update(
|
||||||
{
|
{
|
||||||
"notificaciones": updated_notifications,
|
"notificaciones": updated_notifications,
|
||||||
"ultimaActualizacion": datetime.now(),
|
"ultima_actualizacion": datetime.now(UTC),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Successfully updated notification status to '{status}' for session {session_id} in Firestore",
|
"Successfully updated notification status to '%s' "
|
||||||
|
"for session %s in Firestore",
|
||||||
|
status,
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error(
|
logger.exception(
|
||||||
f"Error updating notification status in Firestore for session {session_id}: {e!s}",
|
"Error updating notification status in Firestore for session %s",
|
||||||
exc_info=True,
|
session_id,
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@@ -378,18 +402,20 @@ class FirestoreService:
|
|||||||
"""Delete notification session from Firestore."""
|
"""Delete notification session from Firestore."""
|
||||||
try:
|
try:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Deleting notification session {notification_id} from Firestore",
|
"Deleting notification session %s from Firestore",
|
||||||
|
notification_id,
|
||||||
)
|
)
|
||||||
doc_ref = self._notification_ref(notification_id)
|
doc_ref = self._notification_ref(notification_id)
|
||||||
await doc_ref.delete()
|
await doc_ref.delete()
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Successfully deleted notification session {notification_id} from Firestore",
|
"Successfully deleted notification session %s from Firestore",
|
||||||
|
notification_id,
|
||||||
)
|
)
|
||||||
return True
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
except Exception as e:
|
"Error deleting notification session %s from Firestore",
|
||||||
logger.error(
|
notification_id,
|
||||||
f"Error deleting notification session {notification_id} from Firestore: {e!s}",
|
|
||||||
exc_info=True,
|
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
"""Notification manager service for processing push notifications."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
@@ -103,7 +105,8 @@ class NotificationManagerService:
|
|||||||
# Save notification to Redis (with async Firestore write-back)
|
# Save notification to Redis (with async Firestore write-back)
|
||||||
await self.redis_service.save_or_append_notification(new_notification_entry)
|
await self.redis_service.save_or_append_notification(new_notification_entry)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Notification for phone {telefono} cached. Kicking off async Firestore write-back",
|
"Notification for phone %s cached. Kicking off async Firestore write-back",
|
||||||
|
telefono,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fire-and-forget Firestore write (matching Java's .subscribe() behavior)
|
# Fire-and-forget Firestore write (matching Java's .subscribe() behavior)
|
||||||
@@ -113,13 +116,17 @@ class NotificationManagerService:
|
|||||||
new_notification_entry,
|
new_notification_entry,
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Notification entry persisted to Firestore for phone {telefono}",
|
"Notification entry persisted to Firestore for phone %s",
|
||||||
|
telefono,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error(
|
logger.exception(
|
||||||
f"Background: Error during notification persistence to Firestore for phone {telefono}: {e}",
|
"Background: Error during notification persistence "
|
||||||
exc_info=True,
|
"to Firestore for phone %s",
|
||||||
|
telefono,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fire and forget - don't await
|
# Fire and forget - don't await
|
||||||
asyncio.create_task(save_notification_to_firestore())
|
_task = asyncio.create_task(save_notification_to_firestore())
|
||||||
|
# Store reference to prevent premature garbage collection
|
||||||
|
del _task
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
|
"""Quick reply content service for loading FAQ screens."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from capa_de_integracion.config import Settings
|
from capa_de_integracion.config import Settings
|
||||||
from capa_de_integracion.models.quick_replies import (
|
from capa_de_integracion.models.quick_replies import (
|
||||||
@@ -24,9 +27,17 @@ class QuickReplyContentService:
|
|||||||
self.quick_replies_path = settings.base_path / "quick_replies"
|
self.quick_replies_path = settings.base_path / "quick_replies"
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"QuickReplyContentService initialized with path: {self.quick_replies_path}",
|
"QuickReplyContentService initialized with path: %s",
|
||||||
|
self.quick_replies_path,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _validate_file(self, file_path: Path, screen_id: str) -> None:
|
||||||
|
"""Validate that the quick reply file exists."""
|
||||||
|
if not file_path.exists():
|
||||||
|
logger.warning("Quick reply file not found: %s", file_path)
|
||||||
|
msg = f"Quick reply file not found for screen_id: {screen_id}"
|
||||||
|
raise ValueError(msg)
|
||||||
|
|
||||||
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
|
async def get_quick_replies(self, screen_id: str) -> QuickReplyScreen:
|
||||||
"""Load quick reply screen content by ID.
|
"""Load quick reply screen content by ID.
|
||||||
|
|
||||||
@@ -53,15 +64,11 @@ class QuickReplyContentService:
|
|||||||
file_path = self.quick_replies_path / f"{screen_id}.json"
|
file_path = self.quick_replies_path / f"{screen_id}.json"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not file_path.exists():
|
self._validate_file(file_path, screen_id)
|
||||||
logger.warning(f"Quick reply file not found: {file_path}")
|
|
||||||
msg = f"Quick reply file not found for screen_id: {screen_id}"
|
|
||||||
raise ValueError(
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
|
|
||||||
with open(file_path, encoding="utf-8") as f:
|
# Use Path.read_text() for async-friendly file reading
|
||||||
data = json.load(f)
|
content = file_path.read_text(encoding="utf-8")
|
||||||
|
data = json.loads(content)
|
||||||
|
|
||||||
# Parse questions
|
# Parse questions
|
||||||
preguntas_data = data.get("preguntas", [])
|
preguntas_data = data.get("preguntas", [])
|
||||||
@@ -83,22 +90,17 @@ class QuickReplyContentService:
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Successfully loaded {len(preguntas)} quick replies for screen: {screen_id}",
|
"Successfully loaded %s quick replies for screen: %s",
|
||||||
|
len(preguntas),
|
||||||
|
screen_id,
|
||||||
)
|
)
|
||||||
return quick_reply
|
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error(f"Error parsing JSON file {file_path}: {e}", exc_info=True)
|
logger.exception("Error parsing JSON file %s", file_path)
|
||||||
msg = f"Invalid JSON format in quick reply file for screen_id: {screen_id}"
|
msg = f"Invalid JSON format in quick reply file for screen_id: {screen_id}"
|
||||||
raise ValueError(
|
raise ValueError(msg) from e
|
||||||
msg,
|
|
||||||
) from e
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.exception("Error loading quick replies for screen %s", screen_id)
|
||||||
f"Error loading quick replies for screen {screen_id}: {e}",
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
msg = f"Error loading quick replies for screen_id: {screen_id}"
|
msg = f"Error loading quick replies for screen_id: {screen_id}"
|
||||||
raise ValueError(
|
raise ValueError(msg) from e
|
||||||
msg,
|
else:
|
||||||
) from e
|
return quick_reply
|
||||||
|
|||||||
@@ -1,4 +1,8 @@
|
|||||||
|
"""RAG service for calling RAG endpoints with high concurrency."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from types import TracebackType
|
||||||
|
from typing import Self
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
@@ -68,8 +72,11 @@ class RAGService:
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"RAGService initialized with endpoint: {self.rag_endpoint_url}, "
|
"RAGService initialized with endpoint: %s, "
|
||||||
f"max_connections: {max_connections}, timeout: {timeout}s",
|
"max_connections: %s, timeout: %ss",
|
||||||
|
self.rag_endpoint_url,
|
||||||
|
max_connections,
|
||||||
|
timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def query(self, messages: list[dict[str, str]]) -> str:
|
async def query(self, messages: list[dict[str, str]]) -> str:
|
||||||
@@ -93,7 +100,7 @@ class RAGService:
|
|||||||
request = RAGRequest(messages=message_objects)
|
request = RAGRequest(messages=message_objects)
|
||||||
|
|
||||||
# Make async HTTP POST request
|
# Make async HTTP POST request
|
||||||
logger.debug(f"Sending RAG request with {len(messages)} messages")
|
logger.debug("Sending RAG request with %s messages", len(messages))
|
||||||
|
|
||||||
response = await self._client.post(
|
response = await self._client.post(
|
||||||
self.rag_endpoint_url,
|
self.rag_endpoint_url,
|
||||||
@@ -108,32 +115,37 @@ class RAGService:
|
|||||||
response_data = response.json()
|
response_data = response.json()
|
||||||
rag_response = RAGResponse(**response_data)
|
rag_response = RAGResponse(**response_data)
|
||||||
|
|
||||||
logger.debug(f"RAG response received: {len(rag_response.response)} chars")
|
logger.debug("RAG response received: %s chars", len(rag_response.response))
|
||||||
return rag_response.response
|
|
||||||
|
|
||||||
except httpx.HTTPStatusError as e:
|
except httpx.HTTPStatusError as e:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"HTTP error calling RAG endpoint: {e.response.status_code} - {e.response.text}",
|
"HTTP error calling RAG endpoint: %s - %s",
|
||||||
|
e.response.status_code,
|
||||||
|
e.response.text,
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
except httpx.RequestError as e:
|
except httpx.RequestError:
|
||||||
logger.exception(f"Request error calling RAG endpoint: {e!s}")
|
logger.exception("Request error calling RAG endpoint:")
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error(
|
logger.exception("Unexpected error calling RAG endpoint")
|
||||||
f"Unexpected error calling RAG endpoint: {e!s}", exc_info=True,
|
|
||||||
)
|
|
||||||
raise
|
raise
|
||||||
|
else:
|
||||||
|
return rag_response.response
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
"""Close the HTTP client and release connections."""
|
"""Close the HTTP client and release connections."""
|
||||||
await self._client.aclose()
|
await self._client.aclose()
|
||||||
logger.info("RAGService client closed")
|
logger.info("RAGService client closed")
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self) -> Self:
|
||||||
"""Async context manager entry."""
|
"""Async context manager entry."""
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
async def __aexit__(
|
||||||
|
self,
|
||||||
|
exc_type: type[BaseException] | None,
|
||||||
|
exc_val: BaseException | None,
|
||||||
|
exc_tb: TracebackType | None,
|
||||||
|
) -> None:
|
||||||
"""Async context manager exit."""
|
"""Async context manager exit."""
|
||||||
await self.close()
|
await self.close()
|
||||||
|
|||||||
@@ -1,11 +1,13 @@
|
|||||||
|
"""Redis service for caching conversation sessions and notifications."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import UTC, datetime
|
||||||
|
|
||||||
from redis.asyncio import Redis
|
from redis.asyncio import Redis
|
||||||
|
|
||||||
from capa_de_integracion.config import Settings
|
from capa_de_integracion.config import Settings
|
||||||
from capa_de_integracion.models import ConversationSession
|
from capa_de_integracion.models import ConversationEntry, ConversationSession
|
||||||
from capa_de_integracion.models.notification import Notification, NotificationSession
|
from capa_de_integracion.models.notification import Notification, NotificationSession
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -31,7 +33,9 @@ class RedisService:
|
|||||||
decode_responses=True,
|
decode_responses=True,
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Connected to Redis at {self.settings.redis_host}:{self.settings.redis_port}",
|
"Connected to Redis at %s:%s",
|
||||||
|
self.settings.redis_host,
|
||||||
|
self.settings.redis_port,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
@@ -66,29 +70,26 @@ class RedisService:
|
|||||||
phone_key = self._phone_to_session_key(session_id_or_phone)
|
phone_key = self._phone_to_session_key(session_id_or_phone)
|
||||||
mapped_session_id = await self.redis.get(phone_key)
|
mapped_session_id = await self.redis.get(phone_key)
|
||||||
|
|
||||||
if mapped_session_id:
|
# Use mapped session ID if found, otherwise use input directly
|
||||||
# Found phone mapping, get the actual session
|
session_id = mapped_session_id or session_id_or_phone
|
||||||
session_id = mapped_session_id
|
|
||||||
else:
|
|
||||||
# Try as direct session ID
|
|
||||||
session_id = session_id_or_phone
|
|
||||||
|
|
||||||
# Get session by ID
|
# Get session by ID
|
||||||
key = self._session_key(session_id)
|
key = self._session_key(session_id)
|
||||||
data = await self.redis.get(key)
|
data = await self.redis.get(key)
|
||||||
|
|
||||||
if not data:
|
if not data:
|
||||||
logger.debug(f"Session not found in Redis: {session_id_or_phone}")
|
logger.debug("Session not found in Redis: %s", session_id_or_phone)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
session_dict = json.loads(data)
|
session_dict = json.loads(data)
|
||||||
session = ConversationSession.model_validate(session_dict)
|
session = ConversationSession.model_validate(session_dict)
|
||||||
logger.debug(f"Retrieved session from Redis: {session_id}")
|
logger.debug("Retrieved session from Redis: %s", session_id)
|
||||||
return session
|
except Exception:
|
||||||
except Exception as e:
|
logger.exception("Error deserializing session %s:", session_id)
|
||||||
logger.exception(f"Error deserializing session {session_id}: {e!s}")
|
|
||||||
return None
|
return None
|
||||||
|
else:
|
||||||
|
return session
|
||||||
|
|
||||||
async def save_session(self, session: ConversationSession) -> bool:
|
async def save_session(self, session: ConversationSession) -> bool:
|
||||||
"""Save conversation session to Redis with TTL.
|
"""Save conversation session to Redis with TTL.
|
||||||
@@ -99,7 +100,7 @@ class RedisService:
|
|||||||
msg = "Redis client not connected"
|
msg = "Redis client not connected"
|
||||||
raise RuntimeError(msg)
|
raise RuntimeError(msg)
|
||||||
|
|
||||||
key = self._session_key(session.sessionId)
|
key = self._session_key(session.session_id)
|
||||||
phone_key = self._phone_to_session_key(session.telefono)
|
phone_key = self._phone_to_session_key(session.telefono)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -108,15 +109,18 @@ class RedisService:
|
|||||||
await self.redis.setex(key, self.session_ttl, data)
|
await self.redis.setex(key, self.session_ttl, data)
|
||||||
|
|
||||||
# Save phone-to-session mapping
|
# Save phone-to-session mapping
|
||||||
await self.redis.setex(phone_key, self.session_ttl, session.sessionId)
|
await self.redis.setex(phone_key, self.session_ttl, session.session_id)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Saved session to Redis: {session.sessionId} for phone: {session.telefono}",
|
"Saved session to Redis: %s for phone: %s",
|
||||||
|
session.session_id,
|
||||||
|
session.telefono,
|
||||||
)
|
)
|
||||||
return True
|
except Exception:
|
||||||
except Exception as e:
|
logger.exception("Error saving session %s to Redis:", session.session_id)
|
||||||
logger.exception(f"Error saving session {session.sessionId} to Redis: {e!s}")
|
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
async def delete_session(self, session_id: str) -> bool:
|
async def delete_session(self, session_id: str) -> bool:
|
||||||
"""Delete conversation session from Redis."""
|
"""Delete conversation session from Redis."""
|
||||||
@@ -128,11 +132,12 @@ class RedisService:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
result = await self.redis.delete(key)
|
result = await self.redis.delete(key)
|
||||||
logger.debug(f"Deleted session from Redis: {session_id}")
|
logger.debug("Deleted session from Redis: %s", session_id)
|
||||||
return result > 0
|
except Exception:
|
||||||
except Exception as e:
|
logger.exception("Error deleting session %s from Redis:", session_id)
|
||||||
logger.exception(f"Error deleting session {session_id} from Redis: {e!s}")
|
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return result > 0
|
||||||
|
|
||||||
async def exists(self, session_id: str) -> bool:
|
async def exists(self, session_id: str) -> bool:
|
||||||
"""Check if session exists in Redis."""
|
"""Check if session exists in Redis."""
|
||||||
@@ -149,7 +154,7 @@ class RedisService:
|
|||||||
"""Generate Redis key for conversation messages."""
|
"""Generate Redis key for conversation messages."""
|
||||||
return f"conversation:messages:{session_id}"
|
return f"conversation:messages:{session_id}"
|
||||||
|
|
||||||
async def save_message(self, session_id: str, message) -> bool:
|
async def save_message(self, session_id: str, message: ConversationEntry) -> bool:
|
||||||
"""Save a conversation message to Redis sorted set.
|
"""Save a conversation message to Redis sorted set.
|
||||||
|
|
||||||
Messages are stored in a sorted set with timestamp as score.
|
Messages are stored in a sorted set with timestamp as score.
|
||||||
@@ -179,13 +184,15 @@ class RedisService:
|
|||||||
# Set TTL on the messages key to match session TTL
|
# Set TTL on the messages key to match session TTL
|
||||||
await self.redis.expire(key, self.session_ttl)
|
await self.redis.expire(key, self.session_ttl)
|
||||||
|
|
||||||
logger.debug(f"Saved message to Redis: {session_id}")
|
logger.debug("Saved message to Redis: %s", session_id)
|
||||||
return True
|
except Exception:
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error saving message to Redis for session {session_id}: {e!s}",
|
"Error saving message to Redis for session %s:",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
async def get_messages(self, session_id: str) -> list:
|
async def get_messages(self, session_id: str) -> list:
|
||||||
"""Retrieve all conversation messages for a session from Redis.
|
"""Retrieve all conversation messages for a session from Redis.
|
||||||
@@ -210,7 +217,7 @@ class RedisService:
|
|||||||
message_strings = await self.redis.zrange(key, 0, -1)
|
message_strings = await self.redis.zrange(key, 0, -1)
|
||||||
|
|
||||||
if not message_strings:
|
if not message_strings:
|
||||||
logger.debug(f"No messages found in Redis for session: {session_id}")
|
logger.debug("No messages found in Redis for session: %s", session_id)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Parse JSON strings to dictionaries
|
# Parse JSON strings to dictionaries
|
||||||
@@ -218,19 +225,23 @@ class RedisService:
|
|||||||
for msg_str in message_strings:
|
for msg_str in message_strings:
|
||||||
try:
|
try:
|
||||||
messages.append(json.loads(msg_str))
|
messages.append(json.loads(msg_str))
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError:
|
||||||
logger.exception(f"Error parsing message JSON: {e!s}")
|
logger.exception("Error parsing message JSON:")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Retrieved {len(messages)} messages from Redis for session: {session_id}",
|
"Retrieved %s messages from Redis for session: %s",
|
||||||
|
len(messages),
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return messages
|
except Exception:
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error retrieving messages from Redis for session {session_id}: {e!s}",
|
"Error retrieving messages from Redis for session %s:",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
|
else:
|
||||||
|
return messages
|
||||||
|
|
||||||
# ====== Notification Methods ======
|
# ====== Notification Methods ======
|
||||||
|
|
||||||
@@ -273,17 +284,17 @@ class RedisService:
|
|||||||
updated_session = NotificationSession(
|
updated_session = NotificationSession(
|
||||||
sessionId=notification_session_id,
|
sessionId=notification_session_id,
|
||||||
telefono=phone_number,
|
telefono=phone_number,
|
||||||
fechaCreacion=existing_session.fechaCreacion,
|
fechaCreacion=existing_session.fecha_creacion,
|
||||||
ultimaActualizacion=datetime.now(),
|
ultimaActualizacion=datetime.now(UTC),
|
||||||
notificaciones=updated_notifications,
|
notificaciones=updated_notifications,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Create new session
|
# Create new session
|
||||||
updated_session = NotificationSession(
|
updated_session = NotificationSession(
|
||||||
sessionId=notification_session_id,
|
session_id=notification_session_id,
|
||||||
telefono=phone_number,
|
telefono=phone_number,
|
||||||
fechaCreacion=datetime.now(),
|
fecha_creacion=datetime.now(UTC),
|
||||||
ultimaActualizacion=datetime.now(),
|
ultima_actualizacion=datetime.now(UTC),
|
||||||
notificaciones=[new_entry],
|
notificaciones=[new_entry],
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -296,7 +307,7 @@ class RedisService:
|
|||||||
msg = "Redis client not connected"
|
msg = "Redis client not connected"
|
||||||
raise RuntimeError(msg)
|
raise RuntimeError(msg)
|
||||||
|
|
||||||
key = self._notification_key(session.sessionId)
|
key = self._notification_key(session.session_id)
|
||||||
phone_key = self._phone_to_notification_key(session.telefono)
|
phone_key = self._phone_to_notification_key(session.telefono)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -305,15 +316,17 @@ class RedisService:
|
|||||||
await self.redis.setex(key, self.notification_ttl, data)
|
await self.redis.setex(key, self.notification_ttl, data)
|
||||||
|
|
||||||
# Save phone-to-session mapping
|
# Save phone-to-session mapping
|
||||||
await self.redis.setex(phone_key, self.notification_ttl, session.sessionId)
|
await self.redis.setex(phone_key, self.notification_ttl, session.session_id)
|
||||||
|
|
||||||
logger.debug(f"Cached notification session: {session.sessionId}")
|
logger.debug("Cached notification session: %s", session.session_id)
|
||||||
return True
|
except Exception:
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error caching notification session {session.sessionId}: {e!s}",
|
"Error caching notification session %s:",
|
||||||
|
session.session_id,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
async def get_notification_session(
|
async def get_notification_session(
|
||||||
self, session_id: str,
|
self, session_id: str,
|
||||||
@@ -327,19 +340,21 @@ class RedisService:
|
|||||||
data = await self.redis.get(key)
|
data = await self.redis.get(key)
|
||||||
|
|
||||||
if not data:
|
if not data:
|
||||||
logger.debug(f"Notification session not found in Redis: {session_id}")
|
logger.debug("Notification session not found in Redis: %s", session_id)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
session_dict = json.loads(data)
|
session_dict = json.loads(data)
|
||||||
session = NotificationSession.model_validate(session_dict)
|
session = NotificationSession.model_validate(session_dict)
|
||||||
logger.info(f"Notification session {session_id} retrieved from Redis")
|
logger.info("Notification session %s retrieved from Redis", session_id)
|
||||||
return session
|
except Exception:
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error deserializing notification session {session_id}: {e!s}",
|
"Error deserializing notification session %s:",
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
else:
|
||||||
|
return session
|
||||||
|
|
||||||
async def get_notification_id_for_phone(self, phone: str) -> str | None:
|
async def get_notification_id_for_phone(self, phone: str) -> str | None:
|
||||||
"""Get notification session ID for a phone number."""
|
"""Get notification session ID for a phone number."""
|
||||||
@@ -351,7 +366,7 @@ class RedisService:
|
|||||||
session_id = await self.redis.get(key)
|
session_id = await self.redis.get(key)
|
||||||
|
|
||||||
if session_id:
|
if session_id:
|
||||||
logger.info(f"Session ID {session_id} found for phone")
|
logger.info("Session ID %s found for phone", session_id)
|
||||||
else:
|
else:
|
||||||
logger.debug("Session ID not found for phone")
|
logger.debug("Session ID not found for phone")
|
||||||
|
|
||||||
@@ -367,12 +382,14 @@ class RedisService:
|
|||||||
phone_key = self._phone_to_notification_key(phone_number)
|
phone_key = self._phone_to_notification_key(phone_number)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.info(f"Deleting notification session for phone {phone_number}")
|
logger.info("Deleting notification session for phone %s", phone_number)
|
||||||
await self.redis.delete(notification_key)
|
await self.redis.delete(notification_key)
|
||||||
await self.redis.delete(phone_key)
|
await self.redis.delete(phone_key)
|
||||||
return True
|
except Exception:
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error deleting notification session for phone {phone_number}: {e!s}",
|
"Error deleting notification session for phone %s:",
|
||||||
|
phone_number,
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|||||||
Reference in New Issue
Block a user