Files
Rogelio 325f1ef439 ic
2025-10-13 18:16:25 +00:00

103 lines
3.6 KiB
Python

import os
from pathlib import Path
from typing import Any, AsyncGenerator
from dotenv import load_dotenv
from langchain_core.messages import AIMessageChunk
from langchain_azure_ai.chat_models import AzureAIChatCompletionsModel
from langchain_openai import AzureChatOpenAI
from langfuse.callback import CallbackHandler
from api import context
from api.config import config
load_dotenv()
parent = Path(__file__).parent
SYSTEM_PROMPT = (parent / "system_prompt.md").read_text()
AZURE_AI_URI = "https://eastus2.api.cognitive.microsoft.com"
handler = CallbackHandler(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host=os.getenv("LANGFUSE_HOST")
)
class Agent:
system_prompt = SYSTEM_PROMPT
generation_config = {
"temperature": config.model_temperature,
}
message_limit = config.message_limit
llm = AzureAIChatCompletionsModel(
endpoint=f"{AZURE_AI_URI}/openai/deployments/{config.model}",
credential=config.openai_api_key,
model=config.model
)
llm_deep_research = AzureChatOpenAI(
azure_endpoint=os.getenv("AZURE_ENDPOINT"),
model=os.getenv("MODEL"),
api_version=os.getenv("OPENAI_API_VERSION"),
api_key=os.getenv("OPENAI_API_KEY") #type: ignore
)
def __init__(self) -> None:
self.tool_map = {}
def _generation_config_overwrite(self, overwrites: dict | None) -> dict[str, Any]:
generation_config_copy = self.generation_config.copy()
if overwrites:
for k, v in overwrites.items():
generation_config_copy[k] = v
return generation_config_copy
async def stream(self, history: list, with_deep_research: bool, overwrites: dict | None = None) -> AsyncGenerator[str, None]:
"""Llama a un llm y regresa la respuesta en partes; Guarda las tool calls en el contexto de la app.
Args:
history: lista de mensajes en el formato OpenAI (Ej. [{"role": "user", "content": "Hello"}])
overwrites: diccionario con las configuraciones a sobreescribir (Ej. {"temperature": 0.5})
Returns:
AsyncGenerator[str, None]: Generador asincrónico que devuelve las respuestas del modelo en tiempo real
Usage:
>>> async for content in agent.stream(history):
>>> print(content)
"""
generation_config = self._generation_config_overwrite(overwrites)
async def process_stream(stream):
async for delta in stream:
assert isinstance(delta, AIMessageChunk)
if call := delta.tool_call_chunks:
if tool_id := call[0].get("id"):
context.tool_id.set(tool_id)
if name := call[0].get("name"):
context.tool_name.set(name)
if args := call[0].get("args"):
context.tool_buffer.set(context.tool_buffer.get() + args)
else:
if buffer := delta.content:
assert isinstance(buffer, str)
context.buffer.set(context.buffer.get() + buffer)
yield buffer
if with_deep_research is True:
stream = self.llm_deep_research.astream(input=history)
async for buffer in process_stream(stream):
yield buffer
return
stream = self.llm.astream(
input=history,
config={"callbacks": [handler]},
**generation_config
)
async for buffer in process_stream(stream):
yield buffer