import os from pathlib import Path from typing import Any, AsyncGenerator from dotenv import load_dotenv from langchain_core.messages import AIMessageChunk from langchain_azure_ai.chat_models import AzureAIChatCompletionsModel from langchain_openai import AzureChatOpenAI from langfuse.callback import CallbackHandler from api import context from api.config import config load_dotenv() parent = Path(__file__).parent SYSTEM_PROMPT = (parent / "system_prompt.md").read_text() AZURE_AI_URI = "https://eastus2.api.cognitive.microsoft.com" handler = CallbackHandler( public_key=os.getenv("LANGFUSE_PUBLIC_KEY"), secret_key=os.getenv("LANGFUSE_SECRET_KEY"), host=os.getenv("LANGFUSE_HOST") ) class Agent: system_prompt = SYSTEM_PROMPT generation_config = { "temperature": config.model_temperature, } message_limit = config.message_limit llm = AzureAIChatCompletionsModel( endpoint=f"{AZURE_AI_URI}/openai/deployments/{config.model}", credential=config.openai_api_key, model=config.model ) llm_deep_research = AzureChatOpenAI( azure_endpoint=os.getenv("AZURE_ENDPOINT"), model=os.getenv("MODEL"), api_version=os.getenv("OPENAI_API_VERSION"), api_key=os.getenv("OPENAI_API_KEY") #type: ignore ) def __init__(self) -> None: self.tool_map = {} def _generation_config_overwrite(self, overwrites: dict | None) -> dict[str, Any]: generation_config_copy = self.generation_config.copy() if overwrites: for k, v in overwrites.items(): generation_config_copy[k] = v return generation_config_copy async def stream(self, history: list, with_deep_research: bool, overwrites: dict | None = None) -> AsyncGenerator[str, None]: """Llama a un llm y regresa la respuesta en partes; Guarda las tool calls en el contexto de la app. Args: history: lista de mensajes en el formato OpenAI (Ej. [{"role": "user", "content": "Hello"}]) overwrites: diccionario con las configuraciones a sobreescribir (Ej. {"temperature": 0.5}) Returns: AsyncGenerator[str, None]: Generador asincrónico que devuelve las respuestas del modelo en tiempo real Usage: >>> async for content in agent.stream(history): >>> print(content) """ generation_config = self._generation_config_overwrite(overwrites) async def process_stream(stream): async for delta in stream: assert isinstance(delta, AIMessageChunk) if call := delta.tool_call_chunks: if tool_id := call[0].get("id"): context.tool_id.set(tool_id) if name := call[0].get("name"): context.tool_name.set(name) if args := call[0].get("args"): context.tool_buffer.set(context.tool_buffer.get() + args) else: if buffer := delta.content: assert isinstance(buffer, str) context.buffer.set(context.buffer.get() + buffer) yield buffer if with_deep_research is True: stream = self.llm_deep_research.astream(input=history) async for buffer in process_stream(stream): yield buffer return stream = self.llm.astream( input=history, config={"callbacks": [handler]}, **generation_config ) async for buffer in process_stream(stream): yield buffer