add healthcheck to remaining apps

This commit is contained in:
2025-11-25 07:05:14 +00:00
parent eccd53673c
commit 6d9686e373
87 changed files with 850 additions and 632 deletions

View File

View File

@@ -0,0 +1,3 @@
from .main import Agent
__all__ = ["Agent"]

View File

@@ -0,0 +1,108 @@
from pathlib import Path
from typing import Any
from langchain_core.messages import AIMessageChunk
from pydantic import BaseModel, Field
from langchain_azure_ai.chat_models import AzureAIChatCompletionsModel
from langchain_azure_ai.embeddings import AzureAIEmbeddingsModel
from banortegpt.vector.qdrant import AsyncQdrant
from api import context
from api.config import config
parent = Path(__file__).parent
SYSTEM_PROMPT = (parent / "system_prompt.md").read_text()
AZURE_AI_URI = "https://eastus2.api.cognitive.microsoft.com"
class get_information(BaseModel):
"""Search a private repository for information."""
question: str = Field(..., description="The user question")
class Agent:
system_prompt = SYSTEM_PROMPT
generation_config = {
"temperature": config.model_temperature,
}
embedding_model = config.embedding_model
message_limit = config.message_limit
index = config.vector_index
limit = config.search_limit
search = AsyncQdrant.from_config(config)
llm = AzureAIChatCompletionsModel(
endpoint=f"{AZURE_AI_URI}/openai/deployments/{config.model}",
credential=config.openai_api_key,
).bind_tools([get_information])
embedder = AzureAIEmbeddingsModel(
endpoint=f"{AZURE_AI_URI}/openai/deployments/{config.embedding_model}",
credential=config.openai_api_key,
)
def __init__(self) -> None:
self.tool_map = {
"get_information": self.get_information
}
def build_response(self, payloads, fallback):
template = "<FAQ {index}>\n\n{content}\n\n</FAQ {index}>"
filled_templates = [
template.format(index=idx, content=payload["content"])
for idx, payload in enumerate(payloads)
]
filled_templates.append(f"<FALLBACK>\n{fallback}\n</FALLBACK>")
return "\n".join(filled_templates)
async def get_information(self, question: str):
embedding = await self.embedder.aembed_query(question)
payloads = await self.search.semantic_search(
embedding=embedding,
collection=self.index,
limit=self.limit,
)
fallback_messages = {}
images = []
for idx, payload in enumerate(payloads):
fallback_message = payload.get("fallback_message", "None")
fallback_messages[fallback_message] = fallback_messages.get(fallback_message, 0) + 1
# Solo extraer imágenes del primer payload
if idx == 0 and "images" in payload:
images.extend(payload["images"])
fallback = max(fallback_messages, key=fallback_messages.get) # type: ignore
response = self.build_response(payloads, fallback)
return str(response), images[:3] # Limitar a 3 imágenes máximo
def _generation_config_overwrite(self, overwrites: dict | None) -> dict[str, Any]:
if not overwrites:
return self.generation_config.copy()
return {**self.generation_config, **overwrites}
async def stream(self, history, overwrites: dict | None = None):
generation_config = self._generation_config_overwrite(overwrites)
async for delta in self.llm.astream(input=history, **generation_config):
assert isinstance(delta, AIMessageChunk)
if call := delta.tool_call_chunks:
if tool_id := call[0].get("id"):
context.tool_id.set(tool_id)
if name := call[0].get("name"):
context.tool_name.set(name)
if args := call[0].get("args"):
context.tool_buffer.set(context.tool_buffer.get() + args)
elif delta.content:
assert isinstance(delta.content, str)
context.buffer.set(context.buffer.get() + delta.content)
yield delta.content
async def generate(self, history, overwrites: dict | None = None):
generation_config = self._generation_config_overwrite(overwrites)
return await self.llm.ainvoke(input=history, **generation_config)

View File

@@ -0,0 +1,49 @@
🧠 Asistente Experto en la Política de Gastos de Viaje — Banorte
🎯 Rol del Asistente:
Especialista normativo encargado de responder exclusivamente con base en la Política Oficial de Gastos de Viaje de Banorte, garantizando respuestas profesionales, claras y verificables.
✅ Misión Principal:
Brindar respuestas 100% alineadas con la política vigente de gastos de viaje de Banorte, cumpliendo con los siguientes principios:
⚙️ Reglas de Respuesta (Obligatorias):
📥 Consulta siempre con get_information:
Toda respuesta debe obtenerse únicamente a través de la herramienta get_information(question), que consulta la base de datos vectorial autorizada.
Esta herramienta tambien cuenta con la constancia de sitaicion fiscal de banorte en un url
No es obligatorio que el usuario especifique estrictamente su puesto para realizar la consulta.
Si el usuario sí indica un puesto, la respuesta debe forzarse a ese puesto y aplicarse la información correspondiente.
En caso de que no exista información para el puesto indicado, se debe responder con la respuesta general disponible en la base de conocimiento.
❗ Nunca inventar ni responder sin antes consultar esta fuente.
Si la herramienta no devuelve información relevante, indicar que la política no contempla esa situación.
📚 Fuente única y oficial:
Las respuestas deben estar basadas únicamente en la política oficial de Banorte.
❌ Prohibido usar Google, foros, suposiciones o contenido externo.
✅ Si get_information devuelve un enlace oficial o documento, debe incluirse con el ícono:
🔗 [Ver política oficial].
📐 Formato estructurado y profesional:
Utilizar un formato claro y fácil de leer:
• Viñetas para listar pasos, excepciones o montos autorizados
• Negritas para resaltar conceptos clave
• Separación clara entre secciones
🔒 Cero invención o interpretación libre:
Si una pregunta no está contemplada en la política, responder claramente:
❗ La política oficial no proporciona lineamientos específicos sobre este caso.
💼 Tono ejecutivo y directo:
Profesional y objetivo
Sin tecnicismos innecesarios
Redacción breve, clara y enfocada en lo esencial

View File

@@ -0,0 +1,59 @@
from hvac import Client
from pydantic import Field
from pydantic_settings import BaseSettings
client = Client(url="https://vault.ia-innovacion.work")
if not client.is_authenticated():
raise Exception("Vault authentication failed")
secret_map = client.secrets.kv.v2.read_secret_version(
path="banortegpt", mount_point="secret"
)["data"]["data"]
class Settings(BaseSettings):
"""
Esta clase obtiene sus valores de variables de ambiente.
Si no estan en el ambiente, los jala de nuestra Vault.
"""
# Config
model: str = "gpt-4o"
model_temperature: int = 0
message_limit: int = 10
host: str = "0.0.0.0"
port: int = 8000
vector_index: str = "chat-egresos-3"
search_limit: int = 3
embedding_model: str = "text-embedding-3-large"
# API Keys
azure_endpoint: str = Field(default_factory=lambda: secret_map["azure_endpoint"])
openai_api_key: str = Field(default_factory=lambda: secret_map["openai_api_key"])
openai_api_version: str = Field(
default_factory=lambda: secret_map["openai_api_version"]
)
mongodb_url: str = Field(
default_factory=lambda: secret_map["cosmosdb_connection_string"]
)
qdrant_url: str = Field(default_factory=lambda: secret_map["qdrant_api_url"])
qdrant_api_key: str | None = Field(
default_factory=lambda: secret_map["qdrant_api_key"]
)
async def init_mongo_db(self):
"""Este helper inicia la conexion enter el MongoDB ORM y nuestra instancia"""
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from banortegpt.database.mongo_memory.models import Conversation
await init_beanie(
database=AsyncIOMotorClient(self.mongodb_url).banortegptdos,
document_models=[Conversation],
)
config = Settings()

View File

@@ -0,0 +1,6 @@
from contextvars import ContextVar
buffer: ContextVar[str] = ContextVar("buffer", default="")
tool_buffer: ContextVar[str] = ContextVar("tool_buffer", default="")
tool_id: ContextVar[str | None] = ContextVar("tool_id", default=None)
tool_name: ContextVar[str | None] = ContextVar("tool_name", default=None)

116
apps/egresos/api/server.py Normal file
View File

@@ -0,0 +1,116 @@
import time
import uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langfuse import Langfuse
from pydantic import BaseModel
from api import services
from api.agent import Agent
from api.config import config
# Configurar Langfuse
langfuse = Langfuse(
public_key="pk-lf-49cb04b3-0c7d-475b-8105-ad8b8749ecdd",
secret_key="sk-lf-e02fa322-c709-4d80-bef2-9cb279846a0c",
host="https://ailogger.azurewebsites.net",
)
@asynccontextmanager
async def lifespan(_: FastAPI):
await config.init_mongo_db()
yield
app = FastAPI(lifespan=lifespan)
agent = Agent()
@app.post("/api/v1/conversation")
async def create_conversation():
conversation_id = uuid.uuid4()
await services.create_conversation(conversation_id, agent.system_prompt)
return {"conversation_id": conversation_id}
class Message(BaseModel):
conversation_id: uuid.UUID
prompt: str
@app.post("/api/v1/message")
async def send(message: Message):
# Crear trace principal
trace = langfuse.trace(
name="chat_message",
session_id=str(message.conversation_id),
input={
"prompt": message.prompt,
"conversation_id": str(message.conversation_id),
},
)
def b64_sse(func):
async def wrapper(*args, **kwargs):
response_parts = []
start_time = time.time()
async for chunk in func(*args, **kwargs):
if chunk.type == "text" and chunk.content:
response_parts.append(str(chunk.content))
content = chunk.model_dump_json()
data = f"data: {content}\n\n"
yield data
end_time = time.time()
latency_ms = round((end_time - start_time) * 1000)
full_response = "".join(response_parts)
input_tokens = len(message.prompt.split()) * 1.3
output_tokens = len(full_response.split()) * 1.3
total_tokens = int(input_tokens + output_tokens)
cost_per_1k_input = 0.03
cost_per_1k_output = 0.06
total_cost = (input_tokens / 1000 * cost_per_1k_input) + (
output_tokens / 1000 * cost_per_1k_output
)
trace.update(
output={"response": full_response},
usage={
"input": int(input_tokens),
"output": int(output_tokens),
"total": total_tokens,
"unit": "TOKENS",
},
)
langfuse.score(
trace_id=trace.id,
name="latency",
value=latency_ms,
comment=f"Response time: {latency_ms}ms",
)
langfuse.score(
trace_id=trace.id,
name="cost",
value=round(total_cost, 4),
comment=f"Estimated cost: ${round(total_cost, 4)}",
)
return wrapper
sse_stream = b64_sse(services.stream)
generator = sse_stream(agent, message.prompt, message.conversation_id)
return StreamingResponse(generator, media_type="text/event-stream")
@app.get("/")
async def health():
return {"status": "ok"}

View File

@@ -0,0 +1,8 @@
from banortegpt.database.mongo_memory.crud import create_conversation
from .stream_response import stream
__all__ = [
"stream",
"create_conversation",
]

View File

@@ -0,0 +1,86 @@
import json
from enum import StrEnum
from typing import TypeAlias
from uuid import UUID
from pydantic import BaseModel
import api.context as ctx
from api.agent import Agent
from banortegpt.database.mongo_memory import crud
class ChunkType(StrEnum):
START = "start"
TEXT = "text"
REFERENCE = "reference"
IMAGE = "image"
TOOL = "tool"
END = "end"
ERROR = "error"
ContentType: TypeAlias = str | int
class ResponseChunk(BaseModel):
type: ChunkType
content: ContentType | list[ContentType] | None
images: list[str] | None = None # Nuevo campo para imágenes
async def stream(agent: Agent, prompt: str, conversation_id: UUID):
yield ResponseChunk(type=ChunkType.START, content="")
conversation = await crud.get_conversation(conversation_id)
if conversation is None:
raise ValueError("Conversation not found")
conversation.add(role="user", content=prompt)
history = conversation.to_openai_format(agent.message_limit, langchain_compat=True)
async for content in agent.stream(history):
yield ResponseChunk(type=ChunkType.TEXT, content=content)
if (tool_id := ctx.tool_id.get()) is not None:
tool_buffer = ctx.tool_buffer.get()
assert tool_buffer is not None
tool_name = ctx.tool_name.get()
assert tool_name is not None
yield ResponseChunk(type=ChunkType.TOOL, content=None)
buffer_dict = json.loads(tool_buffer)
result, images = await agent.tool_map[tool_name](**buffer_dict)
# Enviar imágenes si existen
if images:
yield ResponseChunk(type=ChunkType.IMAGE, content=images)
conversation.add(
role="assistant",
tool_calls=[
{
"id": tool_id,
"type": "function",
"function": {
"name": tool_name,
"arguments": tool_buffer,
},
}
],
)
conversation.add(role="tool", content=result, tool_call_id=tool_id)
history = conversation.to_openai_format(agent.message_limit, langchain_compat=True)
async for content in agent.stream(history, {"tools": None}):
yield ResponseChunk(type=ChunkType.TEXT, content=content)
conversation.add(role="assistant", content=ctx.buffer.get())
await conversation.replace()
yield ResponseChunk(type=ChunkType.END, content="")