This commit is contained in:
Rogelio
2025-10-13 18:16:25 +00:00
parent 739f087cef
commit 325f1ef439
415 changed files with 46870 additions and 0 deletions

13
apps/ocp/api/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
import os
import logging
import logfire
logfire.configure(service_name="ChatOCP", send_to_logfire=False)
logging.basicConfig(handlers=[logfire.LogfireLoggingHandler()])
log_level = os.environ.get("LOG_LEVEL", "WARNING")
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError("Invalid log level: %s" % log_level)
logging.getLogger("api").setLevel(numeric_level)

View File

@@ -0,0 +1,5 @@
from .main import MayaOCP
agent = MayaOCP()
__all__ = ["agent"]

130
apps/ocp/api/agent/main.py Normal file
View File

@@ -0,0 +1,130 @@
import logging
from pathlib import Path
from typing import Any
from langchain_core.messages.ai import AIMessageChunk
from pydantic import BaseModel, Field
from banortegpt.storage.azure_storage import AzureStorage
from banortegpt.vector.qdrant import AsyncQdrant
from langchain_azure_ai.chat_models import AzureAIChatCompletionsModel
from langchain_azure_ai.embeddings import AzureAIEmbeddingsModel
import api.context as ctx
from api.config import config
logger = logging.getLogger(__name__)
parent = Path(__file__).parent
SYSTEM_PROMPT = (parent / "system_prompt.md").read_text()
class get_information(BaseModel):
"""Search a private repository for information."""
question: str = Field(..., description="The user question")
AZURE_AI_URI = "https://eastus2.api.cognitive.microsoft.com"
class MayaOCP:
system_prompt = SYSTEM_PROMPT
generation_config = {
"temperature": config.model_temperature,
}
message_limit = config.message_limit
index = config.vector_index
limit = config.search_limit
bucket = config.storage_bucket
search = AsyncQdrant.from_config(config)
llm = AzureAIChatCompletionsModel(
endpoint=f"{AZURE_AI_URI}/openai/deployments/{config.model}",
credential=config.openai_api_key,
).bind_tools([get_information])
embedder = AzureAIEmbeddingsModel(
endpoint=f"{AZURE_AI_URI}/openai/deployments/{config.embedding_model}",
credential=config.openai_api_key,
)
storage = AzureStorage.from_config(config)
def __init__(self) -> None:
self.tool_map = {"get_information": self.get_information}
def build_response(self, payloads):
preface = ["Recuerda citar las referencias en el formato: texto[1]."]
template = "------ REFERENCIA {index} ----- \n\n{content}"
filled_templates = [
template.format(index=idx, content=payload.get("content", ""))
for idx, payload in enumerate(payloads)
]
return "\n".join(preface + filled_templates)
async def get_information(self, question: str):
logger.info(
f"Embedding question: {question} with model {self.embedder.model_name}"
)
embedding = await self.embedder.aembed_query(question)
results = await self.search.semantic_search(
embedding=embedding, collection=self.index, limit=self.limit
)
tool_response = self.build_response(results)
return tool_response, results
async def get_shareable_urls(self, metadatas: list):
reference_urls = []
image_urls = []
for metadata in metadatas:
if file := metadata.get("file"):
reference_url = await self.storage.get_file_url(
filename=file,
bucket=self.bucket,
minute_duration=20,
image=False,
)
reference_urls.append(reference_url)
if image_file := metadata.get("image"):
image_url = await self.storage.get_file_url(
filename=image_file,
bucket=self.bucket,
minute_duration=20,
image=True,
)
image_urls.append(image_url)
return reference_urls, image_urls
def _generation_config_overwrite(self, overwrites: dict | None) -> dict[str, Any]:
generation_config_copy = self.generation_config.copy()
if overwrites:
for k, v in overwrites.items():
generation_config_copy[k] = v
return generation_config_copy
async def stream(self, history, overwrites: dict | None = None):
generation_config = self._generation_config_overwrite(overwrites)
async for chunk in self.llm.astream(input=history, **generation_config):
assert isinstance(chunk, AIMessageChunk)
if call := chunk.tool_call_chunks:
if tool_id := call[0].get("id"):
ctx.tool_id.set(tool_id)
if name := call[0].get("name"):
ctx.tool_name.set(name)
if args := call[0].get("args"):
ctx.tool_buffer.set(ctx.tool_buffer.get() + args)
else:
if buffer := chunk.content:
assert isinstance(buffer, str)
ctx.buffer.set(ctx.buffer.get() + buffer)
yield buffer
async def generate(self, history, overwrites: dict | None = None):
generation_config = self._generation_config_overwrite(overwrites)
return await self.llm.ainvoke(input=history, **generation_config)

View File

@@ -0,0 +1,4 @@
Eres ChatOCP, un amigable y profesional asistente virtual de la Oficina Corporativa de Proyectos (OCP) de Banorte.
Tu objetivo es responder preguntas de usuarios de manera informativa y detallada.
Para responder TODAS las preguntas, utiliza la herramienta 'get_information' para obtener referencias relevantes a la pregunta de nuestro repositorio interno de documentos.
Utiliza las referencias para responder la pregunta del usuario, y cita tu respuesta con el numero de referencia. Ejemplo: este es un texto[1], este es otro texto[2].

View File

@@ -0,0 +1,19 @@
[
{
"type": "function",
"function": {
"name": "get_information",
"description": "Search a private repository for information.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The user question"
}
},
"required": ["question"]
}
}
}
]

66
apps/ocp/api/config.py Normal file
View File

@@ -0,0 +1,66 @@
from hvac import Client
from pydantic import Field
from pydantic_settings import BaseSettings
client = Client(url="https://vault.ia-innovacion.work")
if not client.is_authenticated():
raise Exception("Vault authentication failed")
secret_map = client.secrets.kv.v2.read_secret_version(
path="banortegpt", mount_point="secret"
)["data"]["data"]
class Settings(BaseSettings):
# Config
log_level: str = "warning"
service_name: str = "MayaOCP"
model: str = "gpt-4o"
model_temperature: int = 0
embedding_model: str = "text-embedding-3-large"
message_limit: int = 10
storage_bucket: str = "ocpreferences"
vector_index: str = "MayaOCP"
search_limit: int = 3
host: str = "0.0.0.0"
port: int = 8000
# API Keys
azure_endpoint: str = Field(default_factory=lambda: secret_map["azure_endpoint"])
openai_api_key: str = Field(default_factory=lambda: secret_map["openai_api_key"])
openai_api_version: str = Field(
default_factory=lambda: secret_map["openai_api_version"]
)
azure_blob_connection_string: str = Field(
default_factory=lambda: secret_map["azure_blob_connection_string"]
)
qdrant_url: str = Field(default_factory=lambda: secret_map["qdrant_api_url"])
qdrant_api_key: str | None = Field(
default_factory=lambda: secret_map["qdrant_api_key"]
)
mongodb_url: str = Field(
default_factory=lambda: secret_map["cosmosdb_connection_string"]
)
otel_exporter_otlp_endpoint: str | None = Field(
default_factory=lambda: secret_map["otel_exporter_otlp_endpoint"]
)
otel_exporter_otlp_headers: str | None = Field(
default_factory=lambda: secret_map["otel_exporter_otlp_headers"]
)
async def init_mongo_db(self):
from banortegpt.database.mongo_memory.models import Conversation
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
client = AsyncIOMotorClient(self.mongodb_url)
await init_beanie(
database=client.banortegptdos,
document_models=[Conversation],
)
config = Settings() # type: ignore

6
apps/ocp/api/context.py Normal file
View File

@@ -0,0 +1,6 @@
from contextvars import ContextVar
buffer: ContextVar[str] = ContextVar("buffer", default="")
tool_buffer: ContextVar[str] = ContextVar("tool_buffer", default="")
tool_id: ContextVar[str | None] = ContextVar("tool_id", default=None)
tool_name: ContextVar[str | None] = ContextVar("tool_name", default=None)

57
apps/ocp/api/server.py Normal file
View File

@@ -0,0 +1,57 @@
import logging
import uuid
from contextlib import asynccontextmanager
import logfire
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from . import services
from .config import config
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(_: FastAPI):
await config.init_mongo_db()
yield
app = FastAPI(lifespan=lifespan)
logfire.instrument_fastapi(app)
class Message(BaseModel):
conversation_id: uuid.UUID
prompt: str
@app.post("/api/v1/conversation")
async def create_conversation():
conversation_id = uuid.uuid4()
await services.create_conversation(conversation_id)
return {"conversation_id": conversation_id}
@app.post("/api/v1/message")
async def send(message: Message, stream: bool = False):
if stream is True:
def b64_sse(func):
async def wrapper(*args, **kwargs):
async for chunk in func(*args, **kwargs):
content = chunk.model_dump_json()
data = f"data: {content}\n\n"
logger.info(f"Yielding Event: {data}")
yield data
return wrapper
sse_stream = b64_sse(services.stream)
generator = sse_stream(message.prompt, message.conversation_id)
return StreamingResponse(generator, media_type="text/event-stream")
else:
response = await services.generate(message.prompt, message.conversation_id)
return response

View File

@@ -0,0 +1,9 @@
from .create_conversation import create_conversation
from .generate_response import generate
from .stream_response import stream
__all__ = [
"stream",
"generate",
"create_conversation",
]

View File

@@ -0,0 +1,9 @@
from uuid import UUID
from banortegpt.database.mongo_memory import crud
from api.agent import agent
async def create_conversation(user_id: UUID) -> None:
await crud.create_conversation(user_id, agent.system_prompt)

View File

@@ -0,0 +1,92 @@
import json
from typing import Any
from uuid import UUID
from banortegpt.database.mongo_memory import crud
from langfuse.decorators import langfuse_context, observe
from pydantic import BaseModel
import api.context as ctx
from api.agent import agent
class Response(BaseModel):
content: str
urls: list[str]
@observe(capture_input=False, capture_output=False)
async def generate(
prompt: str,
conversation_id: UUID,
) -> Response:
conversation = await crud.get_conversation(conversation_id)
if conversation is None:
raise ValueError(f"Conversation with ID {conversation_id} not found")
conversation.add(role="user", content=prompt)
response = await agent.generate(conversation.to_openai_format(agent.message_limit))
reference_urls, image_urls = [], []
if call := response.tool_calls:
if id := call[0].id:
ctx.tool_id.set(id)
if name := call[0].function.name:
ctx.tool_name.set(name)
ctx.tool_buffer.set(call[0].function.arguments)
else:
assert response.content is not None
ctx.buffer.set(response.content)
buffer = ctx.buffer.get()
tool_buffer = ctx.tool_buffer.get()
tool_id = ctx.tool_id.get()
tool_name = ctx.tool_name.get()
if tool_id is not None:
# Si tool_buffer es un string JSON, lo convertimos a diccionario
if isinstance(tool_buffer, str):
try:
tool_args = json.loads(tool_buffer)
except json.JSONDecodeError:
tool_args = {"question": tool_buffer}
else:
tool_args = tool_buffer
response, payloads = await agent.tool_map[tool_name](**tool_args) # type: ignore
assert tool_name is not None
tool_call: dict[str, Any] = agent.llm.build_tool_call(
tool_id, tool_name, tool_buffer
)
tool_call_id: dict[str, Any] = agent.llm.build_tool_call_id(tool_id)
conversation.add("assistant", **tool_call)
conversation.add("tool", content=response, **tool_call_id)
response = await agent.generate(
conversation.to_openai_format(agent.message_limit), {"tools": None}
)
assert response.content is not None
ctx.buffer.set(response.content)
reference_urls, image_urls = await agent.get_shareable_urls(payloads) # type: ignore
buffer = ctx.buffer.get()
if buffer is None:
raise ValueError("No buffer found")
conversation.add(role="assistant", content=buffer)
langfuse_context.update_current_trace(
name=agent.__class__.__name__,
session_id=str(conversation_id),
input=prompt,
output=buffer,
)
return Response(content=buffer, urls=reference_urls + image_urls)

View File

@@ -0,0 +1,110 @@
import json
import logging
from enum import StrEnum
from typing import TypeAlias
from uuid import UUID
from banortegpt.database.mongo_memory import crud
from langfuse.decorators import langfuse_context, observe
from pydantic import BaseModel
import api.context as ctx
from api.agent import agent
logger = logging.getLogger(__name__)
class ChunkType(StrEnum):
START = "start"
TEXT = "text"
REFERENCE = "reference"
IMAGE = "image"
TOOL = "tool"
END = "end"
ERROR = "error"
ContentType: TypeAlias = str | int
class ResponseChunk(BaseModel):
type: ChunkType
content: ContentType | list[ContentType] | None
@observe(capture_input=False, capture_output=False)
async def stream(prompt: str, conversation_id: UUID):
logger.info("Starting stream")
yield ResponseChunk(type=ChunkType.START, content="")
logger.info(f"Fetching conversation {conversation_id}")
conversation = await crud.get_conversation(conversation_id)
assert conversation is not None
logger.info(f"Conversation messages: {conversation.messages}")
if conversation is None:
raise ValueError("Conversation not found")
conversation.add(role="user", content=prompt)
history = conversation.to_openai_format(agent.message_limit, langchain_compat=True)
async for content in agent.stream(history):
yield ResponseChunk(type=ChunkType.TEXT, content=content)
if (tool_id := ctx.tool_id.get()) is not None:
tool_buffer = ctx.tool_buffer.get()
assert tool_buffer is not None
tool_name = ctx.tool_name.get()
assert tool_name is not None
yield ResponseChunk(type=ChunkType.TOOL, content=None)
buffer_dict = json.loads(tool_buffer)
response, payloads = await agent.tool_map[tool_name](**buffer_dict)
conversation.add(
role="assistant",
tool_calls=[
{
"id": tool_id,
"function": {
"name": tool_name,
"arguments": tool_buffer,
},
"type": "function",
}
],
)
conversation.add(role="tool", content=response, tool_call_id=tool_id)
history = conversation.to_openai_format(
agent.message_limit, langchain_compat=True
)
async for content in agent.stream(history, {"tools": None}):
yield ResponseChunk(type=ChunkType.TEXT, content=content)
ref_urls, image_urls = await agent.get_shareable_urls(payloads) # type: ignore
if len(ref_urls) > 0:
yield ResponseChunk(type=ChunkType.REFERENCE, content=ref_urls)
if len(image_urls) > 0:
yield ResponseChunk(type=ChunkType.IMAGE, content=image_urls)
buffer = ctx.buffer.get()
conversation.add(role="assistant", content=buffer)
await conversation.replace()
yield ResponseChunk(type=ChunkType.END, content="")
langfuse_context.update_current_trace(
name=agent.__class__.__name__,
session_id=str(conversation_id),
input=prompt,
output=buffer,
)