Make gateway Open Responses compliant

This commit is contained in:
2026-03-02 04:21:29 +00:00
parent 47d517c913
commit 3e645a3525
6 changed files with 858 additions and 453 deletions

View File

@@ -1,81 +1,307 @@
package api package api
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
) )
// ResponseRequest models the Open Responses create request payload. // ============================================================
// Request Types (CreateResponseBody)
// ============================================================
// ResponseRequest models the OpenResponses CreateResponseBody.
type ResponseRequest struct { type ResponseRequest struct {
Model string `json:"model"` Model string `json:"model"`
Provider string `json:"provider,omitempty"` Input InputUnion `json:"input"`
MaxOutputTokens int `json:"max_output_tokens,omitempty"` Instructions *string `json:"instructions,omitempty"`
MaxOutputTokens *int `json:"max_output_tokens,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"` Metadata map[string]string `json:"metadata,omitempty"`
Input []Message `json:"input"`
Stream bool `json:"stream,omitempty"` Stream bool `json:"stream,omitempty"`
PreviousResponseID string `json:"previous_response_id,omitempty"` PreviousResponseID *string `json:"previous_response_id,omitempty"`
Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"`
FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"`
PresencePenalty *float64 `json:"presence_penalty,omitempty"`
TopLogprobs *int `json:"top_logprobs,omitempty"`
Truncation *string `json:"truncation,omitempty"`
ToolChoice json.RawMessage `json:"tool_choice,omitempty"`
Tools json.RawMessage `json:"tools,omitempty"`
ParallelToolCalls *bool `json:"parallel_tool_calls,omitempty"`
Store *bool `json:"store,omitempty"`
Text json.RawMessage `json:"text,omitempty"`
Reasoning json.RawMessage `json:"reasoning,omitempty"`
Include []string `json:"include,omitempty"`
ServiceTier *string `json:"service_tier,omitempty"`
Background *bool `json:"background,omitempty"`
StreamOptions json.RawMessage `json:"stream_options,omitempty"`
MaxToolCalls *int `json:"max_tool_calls,omitempty"`
// Non-spec extension: allows client to select a specific provider.
Provider string `json:"provider,omitempty"`
} }
// Message captures user, assistant, or system roles. // InputUnion handles the polymorphic "input" field: string or []InputItem.
type InputUnion struct {
String *string
Items []InputItem
}
func (u *InputUnion) UnmarshalJSON(data []byte) error {
if string(data) == "null" {
return nil
}
var s string
if err := json.Unmarshal(data, &s); err == nil {
u.String = &s
return nil
}
var items []InputItem
if err := json.Unmarshal(data, &items); err == nil {
u.Items = items
return nil
}
return fmt.Errorf("input must be a string or array of items")
}
func (u InputUnion) MarshalJSON() ([]byte, error) {
if u.String != nil {
return json.Marshal(*u.String)
}
if u.Items != nil {
return json.Marshal(u.Items)
}
return []byte("null"), nil
}
// InputItem is a discriminated union on "type".
// Valid types: message, item_reference, function_call, function_call_output, reasoning.
type InputItem struct {
Type string `json:"type"`
Role string `json:"role,omitempty"`
Content json.RawMessage `json:"content,omitempty"`
ID string `json:"id,omitempty"`
CallID string `json:"call_id,omitempty"`
Name string `json:"name,omitempty"`
Arguments string `json:"arguments,omitempty"`
Output string `json:"output,omitempty"`
Status string `json:"status,omitempty"`
}
// ============================================================
// Internal Types (providers + conversation store)
// ============================================================
// Message is the normalized internal message representation.
type Message struct { type Message struct {
Role string `json:"role"` Role string `json:"role"`
Content []ContentBlock `json:"content"` Content []ContentBlock `json:"content"`
} }
// ContentBlock represents a typed content element (text, data, tool call, etc.). // ContentBlock is a typed content element.
type ContentBlock struct { type ContentBlock struct {
Type string `json:"type"` Type string `json:"type"`
Text string `json:"text,omitempty"` Text string `json:"text,omitempty"`
} }
// Response is a simplified Open Responses response payload. // NormalizeInput converts the request Input into messages for providers.
// Does NOT include instructions (the server prepends those separately).
func (r *ResponseRequest) NormalizeInput() []Message {
if r.Input.String != nil {
return []Message{{
Role: "user",
Content: []ContentBlock{{Type: "input_text", Text: *r.Input.String}},
}}
}
var msgs []Message
for _, item := range r.Input.Items {
switch item.Type {
case "message", "":
msg := Message{Role: item.Role}
if item.Content != nil {
var s string
if err := json.Unmarshal(item.Content, &s); err == nil {
contentType := "input_text"
if item.Role == "assistant" {
contentType = "output_text"
}
msg.Content = []ContentBlock{{Type: contentType, Text: s}}
} else {
var blocks []ContentBlock
_ = json.Unmarshal(item.Content, &blocks)
msg.Content = blocks
}
}
msgs = append(msgs, msg)
case "function_call_output":
msgs = append(msgs, Message{
Role: "tool",
Content: []ContentBlock{{Type: "input_text", Text: item.Output}},
})
}
}
return msgs
}
// ============================================================
// Response Types (ResponseResource)
// ============================================================
// Response is the spec-compliant ResponseResource.
type Response struct { type Response struct {
ID string `json:"id"` ID string `json:"id"`
Object string `json:"object"` Object string `json:"object"`
Created int64 `json:"created"` CreatedAt int64 `json:"created_at"`
CompletedAt *int64 `json:"completed_at"`
Status string `json:"status"`
IncompleteDetails *IncompleteDetails `json:"incomplete_details"`
Model string `json:"model"` Model string `json:"model"`
Provider string `json:"provider"` PreviousResponseID *string `json:"previous_response_id"`
Output []Message `json:"output"` Instructions *string `json:"instructions"`
Usage Usage `json:"usage"` Output []OutputItem `json:"output"`
Error *ResponseError `json:"error"`
Tools json.RawMessage `json:"tools"`
ToolChoice json.RawMessage `json:"tool_choice"`
Truncation string `json:"truncation"`
ParallelToolCalls bool `json:"parallel_tool_calls"`
Text json.RawMessage `json:"text"`
TopP float64 `json:"top_p"`
PresencePenalty float64 `json:"presence_penalty"`
FrequencyPenalty float64 `json:"frequency_penalty"`
TopLogprobs int `json:"top_logprobs"`
Temperature float64 `json:"temperature"`
Reasoning json.RawMessage `json:"reasoning"`
Usage *Usage `json:"usage"`
MaxOutputTokens *int `json:"max_output_tokens"`
MaxToolCalls *int `json:"max_tool_calls"`
Store bool `json:"store"`
Background bool `json:"background"`
ServiceTier string `json:"service_tier"`
Metadata map[string]string `json:"metadata"`
SafetyIdentifier *string `json:"safety_identifier"`
PromptCacheKey *string `json:"prompt_cache_key"`
// Non-spec extension
Provider string `json:"provider,omitempty"`
} }
// Usage captures token accounting. // OutputItem represents a typed item in the response output.
type OutputItem struct {
ID string `json:"id"`
Type string `json:"type"`
Status string `json:"status"`
Role string `json:"role,omitempty"`
Content []ContentPart `json:"content,omitempty"`
}
// ContentPart is a content block within an output item.
type ContentPart struct {
Type string `json:"type"`
Text string `json:"text"`
Annotations []Annotation `json:"annotations"`
}
// Annotation on output text content.
type Annotation struct {
Type string `json:"type"`
}
// IncompleteDetails explains why a response is incomplete.
type IncompleteDetails struct {
Reason string `json:"reason"`
}
// ResponseError describes an error in the response.
type ResponseError struct {
Type string `json:"type"`
Message string `json:"message"`
Code *string `json:"code"`
}
// ============================================================
// Usage Types
// ============================================================
// Usage captures token accounting with sub-details.
type Usage struct { type Usage struct {
InputTokens int `json:"input_tokens"` InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"` OutputTokens int `json:"output_tokens"`
TotalTokens int `json:"total_tokens"` TotalTokens int `json:"total_tokens"`
InputTokensDetails InputTokensDetails `json:"input_tokens_details"`
OutputTokensDetails OutputTokensDetails `json:"output_tokens_details"`
} }
// StreamChunk represents a single Server-Sent Event in a streaming response. // InputTokensDetails breaks down input token usage.
type StreamChunk struct { type InputTokensDetails struct {
ID string `json:"id,omitempty"` CachedTokens int `json:"cached_tokens"`
Object string `json:"object"`
Created int64 `json:"created,omitempty"`
Model string `json:"model,omitempty"`
Provider string `json:"provider,omitempty"`
Delta *StreamDelta `json:"delta,omitempty"`
Usage *Usage `json:"usage,omitempty"`
Done bool `json:"done,omitempty"`
} }
// StreamDelta represents incremental content in a stream chunk. // OutputTokensDetails breaks down output token usage.
type StreamDelta struct { type OutputTokensDetails struct {
Role string `json:"role,omitempty"` ReasoningTokens int `json:"reasoning_tokens"`
Content []ContentBlock `json:"content,omitempty"`
} }
// ============================================================
// Streaming Types
// ============================================================
// StreamEvent represents a single SSE event in the streaming response.
// Fields are selectively populated based on the event Type.
type StreamEvent struct {
Type string `json:"type"`
SequenceNumber int `json:"sequence_number"`
Response *Response `json:"response,omitempty"`
OutputIndex *int `json:"output_index,omitempty"`
Item *OutputItem `json:"item,omitempty"`
ItemID string `json:"item_id,omitempty"`
ContentIndex *int `json:"content_index,omitempty"`
Part *ContentPart `json:"part,omitempty"`
Delta string `json:"delta,omitempty"`
Text string `json:"text,omitempty"`
}
// ============================================================
// Provider Result Types (internal, not exposed via HTTP)
// ============================================================
// ProviderResult is returned by Provider.Generate.
type ProviderResult struct {
ID string
Model string
Text string
Usage Usage
}
// ProviderStreamDelta is sent through the stream channel.
type ProviderStreamDelta struct {
ID string
Model string
Text string
Done bool
Usage *Usage
}
// ============================================================
// Models Endpoint Types
// ============================================================
// ModelInfo describes a single model available through the gateway. // ModelInfo describes a single model available through the gateway.
type ModelInfo struct { type ModelInfo struct {
ID string `json:"id"` ID string `json:"id"`
Provider string `json:"provider"` Provider string `json:"provider"`
} }
// ModelsResponse is returned by the GET /v1/models endpoint. // ModelsResponse is returned by GET /v1/models.
type ModelsResponse struct { type ModelsResponse struct {
Object string `json:"object"` Object string `json:"object"`
Data []ModelInfo `json:"data"` Data []ModelInfo `json:"data"`
} }
// ============================================================
// Validation
// ============================================================
// Validate performs basic structural validation. // Validate performs basic structural validation.
func (r *ResponseRequest) Validate() error { func (r *ResponseRequest) Validate() error {
if r == nil { if r == nil {
@@ -84,16 +310,8 @@ func (r *ResponseRequest) Validate() error {
if r.Model == "" { if r.Model == "" {
return errors.New("model is required") return errors.New("model is required")
} }
if len(r.Input) == 0 { if r.Input.String == nil && len(r.Input.Items) == 0 {
return errors.New("input messages are required") return errors.New("input is required")
}
for i, msg := range r.Input {
if msg.Role == "" {
return fmt.Errorf("input[%d] role is required", i)
}
if len(msg.Content) == 0 {
return fmt.Errorf("input[%d] content is required", i)
}
} }
return nil return nil
} }

View File

@@ -3,7 +3,6 @@ package anthropic
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/anthropics/anthropic-sdk-go" "github.com/anthropics/anthropic-sdk-go"
"github.com/anthropics/anthropic-sdk-go/option" "github.com/anthropics/anthropic-sdk-go/option"
@@ -60,8 +59,8 @@ func NewAzure(azureCfg config.AzureAnthropicConfig) *Provider {
func (p *Provider) Name() string { return Name } func (p *Provider) Name() string { return Name }
// Generate routes the Open Responses request to Anthropic's API. // Generate routes the request to Anthropic's API.
func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api.Response, error) { func (p *Provider) Generate(ctx context.Context, messages []api.Message, req *api.ResponseRequest) (*api.ProviderResult, error) {
if p.cfg.APIKey == "" { if p.cfg.APIKey == "" {
return nil, fmt.Errorf("anthropic api key missing") return nil, fmt.Errorf("anthropic api key missing")
} }
@@ -69,13 +68,11 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
return nil, fmt.Errorf("anthropic client not initialized") return nil, fmt.Errorf("anthropic client not initialized")
} }
model := chooseModel(req.Model, p.cfg.Model) // Convert messages to Anthropic format
anthropicMsgs := make([]anthropic.MessageParam, 0, len(messages))
// Convert Open Responses messages to Anthropic format
messages := make([]anthropic.MessageParam, 0, len(req.Input))
var system string var system string
for _, msg := range req.Input { for _, msg := range messages {
var content string var content string
for _, block := range msg.Content { for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" { if block.Type == "input_text" || block.Type == "output_text" {
@@ -85,19 +82,24 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
switch msg.Role { switch msg.Role {
case "user": case "user":
messages = append(messages, anthropic.NewUserMessage(anthropic.NewTextBlock(content))) anthropicMsgs = append(anthropicMsgs, anthropic.NewUserMessage(anthropic.NewTextBlock(content)))
case "assistant": case "assistant":
messages = append(messages, anthropic.NewAssistantMessage(anthropic.NewTextBlock(content))) anthropicMsgs = append(anthropicMsgs, anthropic.NewAssistantMessage(anthropic.NewTextBlock(content)))
case "system": case "system", "developer":
system = content system = content
} }
} }
// Build request params // Build request params
maxTokens := int64(4096)
if req.MaxOutputTokens != nil {
maxTokens = int64(*req.MaxOutputTokens)
}
params := anthropic.MessageNewParams{ params := anthropic.MessageNewParams{
Model: anthropic.Model(model), Model: anthropic.Model(req.Model),
Messages: messages, Messages: anthropicMsgs,
MaxTokens: int64(4096), MaxTokens: maxTokens,
} }
if system != "" { if system != "" {
@@ -107,36 +109,31 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
params.System = systemBlocks params.System = systemBlocks
} }
if req.Temperature != nil {
params.Temperature = anthropic.Float(*req.Temperature)
}
if req.TopP != nil {
params.TopP = anthropic.Float(*req.TopP)
}
// Call Anthropic API // Call Anthropic API
resp, err := p.client.Messages.New(ctx, params) resp, err := p.client.Messages.New(ctx, params)
if err != nil { if err != nil {
return nil, fmt.Errorf("anthropic api error: %w", err) return nil, fmt.Errorf("anthropic api error: %w", err)
} }
// Convert Anthropic response to Open Responses format // Extract text from response
output := make([]api.Message, 0, 1)
var text string var text string
for _, block := range resp.Content { for _, block := range resp.Content {
if block.Type == "text" { if block.Type == "text" {
text += block.Text text += block.Text
} }
} }
output = append(output, api.Message{ return &api.ProviderResult{
Role: "assistant",
Content: []api.ContentBlock{
{Type: "output_text", Text: text},
},
})
return &api.Response{
ID: resp.ID, ID: resp.ID,
Object: "response",
Created: time.Now().Unix(),
Model: string(resp.Model), Model: string(resp.Model),
Provider: Name, Text: text,
Output: output,
Usage: api.Usage{ Usage: api.Usage{
InputTokens: int(resp.Usage.InputTokens), InputTokens: int(resp.Usage.InputTokens),
OutputTokens: int(resp.Usage.OutputTokens), OutputTokens: int(resp.Usage.OutputTokens),
@@ -146,12 +143,12 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
} }
// GenerateStream handles streaming requests to Anthropic. // GenerateStream handles streaming requests to Anthropic.
func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest) (<-chan *api.StreamChunk, <-chan error) { func (p *Provider) GenerateStream(ctx context.Context, messages []api.Message, req *api.ResponseRequest) (<-chan *api.ProviderStreamDelta, <-chan error) {
chunkChan := make(chan *api.StreamChunk) deltaChan := make(chan *api.ProviderStreamDelta)
errChan := make(chan error, 1) errChan := make(chan error, 1)
go func() { go func() {
defer close(chunkChan) defer close(deltaChan)
defer close(errChan) defer close(errChan)
if p.cfg.APIKey == "" { if p.cfg.APIKey == "" {
@@ -163,13 +160,11 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
return return
} }
model := chooseModel(req.Model, p.cfg.Model) // Convert messages to Anthropic format
anthropicMsgs := make([]anthropic.MessageParam, 0, len(messages))
// Convert messages
messages := make([]anthropic.MessageParam, 0, len(req.Input))
var system string var system string
for _, msg := range req.Input { for _, msg := range messages {
var content string var content string
for _, block := range msg.Content { for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" { if block.Type == "input_text" || block.Type == "output_text" {
@@ -179,19 +174,24 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
switch msg.Role { switch msg.Role {
case "user": case "user":
messages = append(messages, anthropic.NewUserMessage(anthropic.NewTextBlock(content))) anthropicMsgs = append(anthropicMsgs, anthropic.NewUserMessage(anthropic.NewTextBlock(content)))
case "assistant": case "assistant":
messages = append(messages, anthropic.NewAssistantMessage(anthropic.NewTextBlock(content))) anthropicMsgs = append(anthropicMsgs, anthropic.NewAssistantMessage(anthropic.NewTextBlock(content)))
case "system": case "system", "developer":
system = content system = content
} }
} }
// Build params // Build params
maxTokens := int64(4096)
if req.MaxOutputTokens != nil {
maxTokens = int64(*req.MaxOutputTokens)
}
params := anthropic.MessageNewParams{ params := anthropic.MessageNewParams{
Model: anthropic.Model(model), Model: anthropic.Model(req.Model),
Messages: messages, Messages: anthropicMsgs,
MaxTokens: int64(4096), MaxTokens: maxTokens,
} }
if system != "" { if system != "" {
@@ -201,6 +201,13 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
params.System = systemBlocks params.System = systemBlocks
} }
if req.Temperature != nil {
params.Temperature = anthropic.Float(*req.Temperature)
}
if req.TopP != nil {
params.TopP = anthropic.Float(*req.TopP)
}
// Create stream // Create stream
stream := p.client.Messages.NewStreaming(ctx, params) stream := p.client.Messages.NewStreaming(ctx, params)
@@ -208,51 +215,30 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
for stream.Next() { for stream.Next() {
event := stream.Current() event := stream.Current()
delta := &api.StreamDelta{}
var text string
// Handle different event types
if event.Type == "content_block_delta" && event.Delta.Type == "text_delta" { if event.Type == "content_block_delta" && event.Delta.Type == "text_delta" {
text = event.Delta.Text
delta.Content = []api.ContentBlock{
{Type: "output_text", Text: text},
}
}
if event.Type == "message_start" {
delta.Role = "assistant"
}
streamChunk := &api.StreamChunk{
Object: "response.chunk",
Created: time.Now().Unix(),
Model: string(model),
Provider: Name,
Delta: delta,
}
select { select {
case chunkChan <- streamChunk: case deltaChan <- &api.ProviderStreamDelta{Text: event.Delta.Text}:
case <-ctx.Done(): case <-ctx.Done():
errChan <- ctx.Err() errChan <- ctx.Err()
return return
} }
} }
}
if err := stream.Err(); err != nil { if err := stream.Err(); err != nil {
errChan <- fmt.Errorf("anthropic stream error: %w", err) errChan <- fmt.Errorf("anthropic stream error: %w", err)
return return
} }
// Send final chunk // Send final delta
select { select {
case chunkChan <- &api.StreamChunk{Object: "response.chunk", Done: true}: case deltaChan <- &api.ProviderStreamDelta{Done: true}:
case <-ctx.Done(): case <-ctx.Done():
errChan <- ctx.Err() errChan <- ctx.Err()
} }
}() }()
return chunkChan, errChan return deltaChan, errChan
} }
func chooseModel(requested, defaultModel string) string { func chooseModel(requested, defaultModel string) string {

View File

@@ -3,7 +3,6 @@ package google
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"google.golang.org/genai" "google.golang.org/genai"
@@ -41,8 +40,8 @@ func New(cfg config.ProviderConfig) *Provider {
func (p *Provider) Name() string { return Name } func (p *Provider) Name() string { return Name }
// Generate routes the Open Responses request to Gemini. // Generate routes the request to Gemini and returns a ProviderResult.
func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api.Response, error) { func (p *Provider) Generate(ctx context.Context, messages []api.Message, req *api.ResponseRequest) (*api.ProviderResult, error) {
if p.cfg.APIKey == "" { if p.cfg.APIKey == "" {
return nil, fmt.Errorf("google api key missing") return nil, fmt.Errorf("google api key missing")
} }
@@ -50,60 +49,18 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
return nil, fmt.Errorf("google client not initialized") return nil, fmt.Errorf("google client not initialized")
} }
model := chooseModel(req.Model, p.cfg.Model) model := req.Model
// Convert Open Responses messages to Gemini format contents, systemText := convertMessages(messages)
var contents []*genai.Content
var systemText string
for _, msg := range req.Input { config := buildConfig(systemText, req)
if msg.Role == "system" {
for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" {
systemText += block.Text
}
}
continue
}
var parts []*genai.Part
for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" {
parts = append(parts, genai.NewPartFromText(block.Text))
}
}
role := "user"
if msg.Role == "assistant" || msg.Role == "model" {
role = "model"
}
contents = append(contents, &genai.Content{
Role: role,
Parts: parts,
})
}
// Build config with system instruction if present
var config *genai.GenerateContentConfig
if systemText != "" {
config = &genai.GenerateContentConfig{
SystemInstruction: &genai.Content{
Parts: []*genai.Part{genai.NewPartFromText(systemText)},
},
}
}
// Generate content
resp, err := p.client.Models.GenerateContent(ctx, model, contents, config) resp, err := p.client.Models.GenerateContent(ctx, model, contents, config)
if err != nil { if err != nil {
return nil, fmt.Errorf("google api error: %w", err) return nil, fmt.Errorf("google api error: %w", err)
} }
// Convert Gemini response to Open Responses format
output := make([]api.Message, 0, 1)
var text string var text string
if len(resp.Candidates) > 0 && resp.Candidates[0].Content != nil { if len(resp.Candidates) > 0 && resp.Candidates[0].Content != nil {
for _, part := range resp.Candidates[0].Content.Parts { for _, part := range resp.Candidates[0].Content.Parts {
if part != nil { if part != nil {
@@ -112,27 +69,16 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
} }
} }
output = append(output, api.Message{
Role: "assistant",
Content: []api.ContentBlock{
{Type: "output_text", Text: text},
},
})
// Extract usage info if available
var inputTokens, outputTokens int var inputTokens, outputTokens int
if resp.UsageMetadata != nil { if resp.UsageMetadata != nil {
inputTokens = int(resp.UsageMetadata.PromptTokenCount) inputTokens = int(resp.UsageMetadata.PromptTokenCount)
outputTokens = int(resp.UsageMetadata.CandidatesTokenCount) outputTokens = int(resp.UsageMetadata.CandidatesTokenCount)
} }
return &api.Response{ return &api.ProviderResult{
ID: uuid.NewString(), ID: uuid.NewString(),
Object: "response",
Created: time.Now().Unix(),
Model: model, Model: model,
Provider: Name, Text: text,
Output: output,
Usage: api.Usage{ Usage: api.Usage{
InputTokens: inputTokens, InputTokens: inputTokens,
OutputTokens: outputTokens, OutputTokens: outputTokens,
@@ -142,12 +88,12 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
} }
// GenerateStream handles streaming requests to Google. // GenerateStream handles streaming requests to Google.
func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest) (<-chan *api.StreamChunk, <-chan error) { func (p *Provider) GenerateStream(ctx context.Context, messages []api.Message, req *api.ResponseRequest) (<-chan *api.ProviderStreamDelta, <-chan error) {
chunkChan := make(chan *api.StreamChunk) deltaChan := make(chan *api.ProviderStreamDelta)
errChan := make(chan error, 1) errChan := make(chan error, 1)
go func() { go func() {
defer close(chunkChan) defer close(deltaChan)
defer close(errChan) defer close(errChan)
if p.cfg.APIKey == "" { if p.cfg.APIKey == "" {
@@ -159,54 +105,14 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
return return
} }
model := chooseModel(req.Model, p.cfg.Model) model := req.Model
// Convert messages contents, systemText := convertMessages(messages)
var contents []*genai.Content
var systemText string
for _, msg := range req.Input { config := buildConfig(systemText, req)
if msg.Role == "system" {
for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" {
systemText += block.Text
}
}
continue
}
var parts []*genai.Part
for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" {
parts = append(parts, genai.NewPartFromText(block.Text))
}
}
role := "user"
if msg.Role == "assistant" || msg.Role == "model" {
role = "model"
}
contents = append(contents, &genai.Content{
Role: role,
Parts: parts,
})
}
// Build config with system instruction if present
var config *genai.GenerateContentConfig
if systemText != "" {
config = &genai.GenerateContentConfig{
SystemInstruction: &genai.Content{
Parts: []*genai.Part{genai.NewPartFromText(systemText)},
},
}
}
// Create stream
stream := p.client.Models.GenerateContentStream(ctx, model, contents, config) stream := p.client.Models.GenerateContentStream(ctx, model, contents, config)
// Process stream
for resp, err := range stream { for resp, err := range stream {
if err != nil { if err != nil {
errChan <- fmt.Errorf("google stream error: %w", err) errChan <- fmt.Errorf("google stream error: %w", err)
@@ -222,38 +128,94 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
} }
} }
delta := &api.StreamDelta{}
if text != "" { if text != "" {
delta.Content = []api.ContentBlock{
{Type: "output_text", Text: text},
}
}
streamChunk := &api.StreamChunk{
Object: "response.chunk",
Created: time.Now().Unix(),
Model: model,
Provider: Name,
Delta: delta,
}
select { select {
case chunkChan <- streamChunk: case deltaChan <- &api.ProviderStreamDelta{Text: text}:
case <-ctx.Done(): case <-ctx.Done():
errChan <- ctx.Err() errChan <- ctx.Err()
return return
} }
} }
}
// Send final chunk
select { select {
case chunkChan <- &api.StreamChunk{Object: "response.chunk", Done: true}: case deltaChan <- &api.ProviderStreamDelta{Done: true}:
case <-ctx.Done(): case <-ctx.Done():
errChan <- ctx.Err() errChan <- ctx.Err()
} }
}() }()
return chunkChan, errChan return deltaChan, errChan
}
// convertMessages splits messages into Gemini contents and system text.
func convertMessages(messages []api.Message) ([]*genai.Content, string) {
var contents []*genai.Content
var systemText string
for _, msg := range messages {
if msg.Role == "system" || msg.Role == "developer" {
for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" {
systemText += block.Text
}
}
continue
}
var parts []*genai.Part
for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" {
parts = append(parts, genai.NewPartFromText(block.Text))
}
}
role := "user"
if msg.Role == "assistant" || msg.Role == "model" {
role = "model"
}
contents = append(contents, &genai.Content{
Role: role,
Parts: parts,
})
}
return contents, systemText
}
// buildConfig constructs a GenerateContentConfig from system text and request params.
func buildConfig(systemText string, req *api.ResponseRequest) *genai.GenerateContentConfig {
var cfg *genai.GenerateContentConfig
needsCfg := systemText != "" || req.MaxOutputTokens != nil || req.Temperature != nil || req.TopP != nil
if !needsCfg {
return nil
}
cfg = &genai.GenerateContentConfig{}
if systemText != "" {
cfg.SystemInstruction = &genai.Content{
Parts: []*genai.Part{genai.NewPartFromText(systemText)},
}
}
if req.MaxOutputTokens != nil {
cfg.MaxOutputTokens = int32(*req.MaxOutputTokens)
}
if req.Temperature != nil {
t := float32(*req.Temperature)
cfg.Temperature = &t
}
if req.TopP != nil {
tp := float32(*req.TopP)
cfg.TopP = &tp
}
return cfg
} }
func chooseModel(requested, defaultModel string) string { func chooseModel(requested, defaultModel string) string {

View File

@@ -3,7 +3,6 @@ package openai
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/openai/openai-go" "github.com/openai/openai-go"
"github.com/openai/openai-go/azure" "github.com/openai/openai-go/azure"
@@ -64,8 +63,8 @@ func NewAzure(azureCfg config.AzureOpenAIConfig) *Provider {
// Name returns the provider identifier. // Name returns the provider identifier.
func (p *Provider) Name() string { return Name } func (p *Provider) Name() string { return Name }
// Generate routes the Open Responses request to OpenAI. // Generate routes the request to OpenAI and returns a ProviderResult.
func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api.Response, error) { func (p *Provider) Generate(ctx context.Context, messages []api.Message, req *api.ResponseRequest) (*api.ProviderResult, error) {
if p.cfg.APIKey == "" { if p.cfg.APIKey == "" {
return nil, fmt.Errorf("openai api key missing") return nil, fmt.Errorf("openai api key missing")
} }
@@ -73,11 +72,9 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
return nil, fmt.Errorf("openai client not initialized") return nil, fmt.Errorf("openai client not initialized")
} }
model := chooseModel(req.Model, p.cfg.Model) // Convert messages to OpenAI format
oaiMessages := make([]openai.ChatCompletionMessageParamUnion, 0, len(messages))
// Convert Open Responses messages to OpenAI format for _, msg := range messages {
messages := make([]openai.ChatCompletionMessageParamUnion, 0, len(req.Input))
for _, msg := range req.Input {
var content string var content string
for _, block := range msg.Content { for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" { if block.Type == "input_text" || block.Type == "output_text" {
@@ -87,41 +84,45 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
switch msg.Role { switch msg.Role {
case "user": case "user":
messages = append(messages, openai.UserMessage(content)) oaiMessages = append(oaiMessages, openai.UserMessage(content))
case "assistant": case "assistant":
messages = append(messages, openai.AssistantMessage(content)) oaiMessages = append(oaiMessages, openai.AssistantMessage(content))
case "system": case "system":
messages = append(messages, openai.SystemMessage(content)) oaiMessages = append(oaiMessages, openai.SystemMessage(content))
case "developer":
oaiMessages = append(oaiMessages, openai.SystemMessage(content))
} }
} }
params := openai.ChatCompletionNewParams{
Model: openai.ChatModel(req.Model),
Messages: oaiMessages,
}
if req.MaxOutputTokens != nil {
params.MaxTokens = openai.Int(int64(*req.MaxOutputTokens))
}
if req.Temperature != nil {
params.Temperature = openai.Float(*req.Temperature)
}
if req.TopP != nil {
params.TopP = openai.Float(*req.TopP)
}
// Call OpenAI API // Call OpenAI API
resp, err := p.client.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{ resp, err := p.client.Chat.Completions.New(ctx, params)
Model: openai.ChatModel(model),
Messages: messages,
})
if err != nil { if err != nil {
return nil, fmt.Errorf("openai api error: %w", err) return nil, fmt.Errorf("openai api error: %w", err)
} }
// Convert OpenAI response to Open Responses format var combinedText string
output := make([]api.Message, 0, len(resp.Choices))
for _, choice := range resp.Choices { for _, choice := range resp.Choices {
output = append(output, api.Message{ combinedText += choice.Message.Content
Role: "assistant",
Content: []api.ContentBlock{
{Type: "output_text", Text: choice.Message.Content},
},
})
} }
return &api.Response{ return &api.ProviderResult{
ID: resp.ID, ID: resp.ID,
Object: "response",
Created: time.Now().Unix(),
Model: resp.Model, Model: resp.Model,
Provider: Name, Text: combinedText,
Output: output,
Usage: api.Usage{ Usage: api.Usage{
InputTokens: int(resp.Usage.PromptTokens), InputTokens: int(resp.Usage.PromptTokens),
OutputTokens: int(resp.Usage.CompletionTokens), OutputTokens: int(resp.Usage.CompletionTokens),
@@ -131,12 +132,12 @@ func (p *Provider) Generate(ctx context.Context, req *api.ResponseRequest) (*api
} }
// GenerateStream handles streaming requests to OpenAI. // GenerateStream handles streaming requests to OpenAI.
func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest) (<-chan *api.StreamChunk, <-chan error) { func (p *Provider) GenerateStream(ctx context.Context, messages []api.Message, req *api.ResponseRequest) (<-chan *api.ProviderStreamDelta, <-chan error) {
chunkChan := make(chan *api.StreamChunk) deltaChan := make(chan *api.ProviderStreamDelta)
errChan := make(chan error, 1) errChan := make(chan error, 1)
go func() { go func() {
defer close(chunkChan) defer close(deltaChan)
defer close(errChan) defer close(errChan)
if p.cfg.APIKey == "" { if p.cfg.APIKey == "" {
@@ -148,11 +149,9 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
return return
} }
model := chooseModel(req.Model, p.cfg.Model) // Convert messages to OpenAI format
oaiMessages := make([]openai.ChatCompletionMessageParamUnion, 0, len(messages))
// Convert messages for _, msg := range messages {
messages := make([]openai.ChatCompletionMessageParamUnion, 0, len(req.Input))
for _, msg := range req.Input {
var content string var content string
for _, block := range msg.Content { for _, block := range msg.Content {
if block.Type == "input_text" || block.Type == "output_text" { if block.Type == "input_text" || block.Type == "output_text" {
@@ -162,48 +161,48 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
switch msg.Role { switch msg.Role {
case "user": case "user":
messages = append(messages, openai.UserMessage(content)) oaiMessages = append(oaiMessages, openai.UserMessage(content))
case "assistant": case "assistant":
messages = append(messages, openai.AssistantMessage(content)) oaiMessages = append(oaiMessages, openai.AssistantMessage(content))
case "system": case "system":
messages = append(messages, openai.SystemMessage(content)) oaiMessages = append(oaiMessages, openai.SystemMessage(content))
case "developer":
oaiMessages = append(oaiMessages, openai.SystemMessage(content))
} }
} }
params := openai.ChatCompletionNewParams{
Model: openai.ChatModel(req.Model),
Messages: oaiMessages,
}
if req.MaxOutputTokens != nil {
params.MaxTokens = openai.Int(int64(*req.MaxOutputTokens))
}
if req.Temperature != nil {
params.Temperature = openai.Float(*req.Temperature)
}
if req.TopP != nil {
params.TopP = openai.Float(*req.TopP)
}
// Create streaming request // Create streaming request
stream := p.client.Chat.Completions.NewStreaming(ctx, openai.ChatCompletionNewParams{ stream := p.client.Chat.Completions.NewStreaming(ctx, params)
Model: openai.ChatModel(model),
Messages: messages,
})
// Process stream // Process stream
for stream.Next() { for stream.Next() {
chunk := stream.Current() chunk := stream.Current()
for _, choice := range chunk.Choices { for _, choice := range chunk.Choices {
delta := &api.StreamDelta{} if choice.Delta.Content == "" {
continue
if choice.Delta.Role != "" {
delta.Role = string(choice.Delta.Role)
}
if choice.Delta.Content != "" {
delta.Content = []api.ContentBlock{
{Type: "output_text", Text: choice.Delta.Content},
}
}
streamChunk := &api.StreamChunk{
ID: chunk.ID,
Object: "response.chunk",
Created: time.Now().Unix(),
Model: chunk.Model,
Provider: Name,
Delta: delta,
} }
select { select {
case chunkChan <- streamChunk: case deltaChan <- &api.ProviderStreamDelta{
ID: chunk.ID,
Model: chunk.Model,
Text: choice.Delta.Content,
}:
case <-ctx.Done(): case <-ctx.Done():
errChan <- ctx.Err() errChan <- ctx.Err()
return return
@@ -216,15 +215,15 @@ func (p *Provider) GenerateStream(ctx context.Context, req *api.ResponseRequest)
return return
} }
// Send final chunk // Send final delta
select { select {
case chunkChan <- &api.StreamChunk{Object: "response.chunk", Done: true}: case deltaChan <- &api.ProviderStreamDelta{Done: true}:
case <-ctx.Done(): case <-ctx.Done():
errChan <- ctx.Err() errChan <- ctx.Err()
} }
}() }()
return chunkChan, errChan return deltaChan, errChan
} }
func chooseModel(requested, defaultModel string) string { func chooseModel(requested, defaultModel string) string {

View File

@@ -14,8 +14,8 @@ import (
// Provider represents a unified interface that each LLM provider must implement. // Provider represents a unified interface that each LLM provider must implement.
type Provider interface { type Provider interface {
Name() string Name() string
Generate(ctx context.Context, req *api.ResponseRequest) (*api.Response, error) Generate(ctx context.Context, messages []api.Message, req *api.ResponseRequest) (*api.ProviderResult, error)
GenerateStream(ctx context.Context, req *api.ResponseRequest) (<-chan *api.StreamChunk, <-chan error) GenerateStream(ctx context.Context, messages []api.Message, req *api.ResponseRequest) (<-chan *api.ProviderStreamDelta, <-chan error)
} }
// Registry keeps track of registered providers and model-to-provider mappings. // Registry keeps track of registered providers and model-to-provider mappings.

View File

@@ -5,6 +5,10 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"strings"
"time"
"github.com/google/uuid"
"github.com/yourusername/go-llm-gateway/internal/api" "github.com/yourusername/go-llm-gateway/internal/api"
"github.com/yourusername/go-llm-gateway/internal/conversation" "github.com/yourusername/go-llm-gateway/internal/conversation"
@@ -74,16 +78,34 @@ func (s *GatewayServer) handleResponses(w http.ResponseWriter, r *http.Request)
return return
} }
// Build full message history // Normalize input to internal messages
messages := s.buildMessageHistory(&req) inputMsgs := req.NormalizeInput()
if messages == nil {
// Build full message history from previous conversation
var historyMsgs []api.Message
if req.PreviousResponseID != nil && *req.PreviousResponseID != "" {
conv, ok := s.convs.Get(*req.PreviousResponseID)
if !ok {
http.Error(w, "conversation not found", http.StatusNotFound) http.Error(w, "conversation not found", http.StatusNotFound)
return return
} }
historyMsgs = conv.Messages
}
// Update request with full history for provider // Combined messages for conversation storage (history + new input, no instructions)
fullReq := req storeMsgs := make([]api.Message, 0, len(historyMsgs)+len(inputMsgs))
fullReq.Input = messages storeMsgs = append(storeMsgs, historyMsgs...)
storeMsgs = append(storeMsgs, inputMsgs...)
// Build provider messages: instructions + history + input
var providerMsgs []api.Message
if req.Instructions != nil && *req.Instructions != "" {
providerMsgs = append(providerMsgs, api.Message{
Role: "developer",
Content: []api.ContentBlock{{Type: "input_text", Text: *req.Instructions}},
})
}
providerMsgs = append(providerMsgs, storeMsgs...)
provider, err := s.resolveProvider(&req) provider, err := s.resolveProvider(&req)
if err != nil { if err != nil {
@@ -91,64 +113,44 @@ func (s *GatewayServer) handleResponses(w http.ResponseWriter, r *http.Request)
return return
} }
// Resolve provider_model_id (e.g., Azure deployment name) before sending to provider // Resolve provider_model_id (e.g., Azure deployment name)
fullReq.Model = s.registry.ResolveModelID(req.Model) resolvedReq := req
resolvedReq.Model = s.registry.ResolveModelID(req.Model)
// Handle streaming vs non-streaming
if req.Stream { if req.Stream {
s.handleStreamingResponse(w, r, provider, &fullReq, &req) s.handleStreamingResponse(w, r, provider, providerMsgs, &resolvedReq, &req, storeMsgs)
} else { } else {
s.handleSyncResponse(w, r, provider, &fullReq, &req) s.handleSyncResponse(w, r, provider, providerMsgs, &resolvedReq, &req, storeMsgs)
} }
} }
func (s *GatewayServer) buildMessageHistory(req *api.ResponseRequest) []api.Message { func (s *GatewayServer) handleSyncResponse(w http.ResponseWriter, r *http.Request, provider providers.Provider, providerMsgs []api.Message, resolvedReq *api.ResponseRequest, origReq *api.ResponseRequest, storeMsgs []api.Message) {
// If no previous_response_id, use input as-is result, err := provider.Generate(r.Context(), providerMsgs, resolvedReq)
if req.PreviousResponseID == "" {
return req.Input
}
// Load previous conversation
conv, ok := s.convs.Get(req.PreviousResponseID)
if !ok {
return nil
}
// Append new input to conversation history
messages := make([]api.Message, len(conv.Messages))
copy(messages, conv.Messages)
messages = append(messages, req.Input...)
return messages
}
func (s *GatewayServer) handleSyncResponse(w http.ResponseWriter, r *http.Request, provider providers.Provider, fullReq *api.ResponseRequest, origReq *api.ResponseRequest) {
resp, err := provider.Generate(r.Context(), fullReq)
if err != nil { if err != nil {
s.logger.Printf("provider %s error: %v", provider.Name(), err) s.logger.Printf("provider %s error: %v", provider.Name(), err)
http.Error(w, "provider error", http.StatusBadGateway) http.Error(w, "provider error", http.StatusBadGateway)
return return
} }
// Store conversation - use previous_response_id if continuing, otherwise use new ID responseID := generateID("resp_")
conversationID := origReq.PreviousResponseID
if conversationID == "" { // Build assistant message for conversation store
conversationID = resp.ID assistantMsg := api.Message{
Role: "assistant",
Content: []api.ContentBlock{{Type: "output_text", Text: result.Text}},
} }
allMsgs := append(storeMsgs, assistantMsg)
s.convs.Create(responseID, result.Model, allMsgs)
messages := append(fullReq.Input, resp.Output...) // Build spec-compliant response
s.convs.Create(conversationID, resp.Model, messages) resp := s.buildResponse(origReq, result, provider.Name(), responseID)
// Return the conversation ID (not the provider's response ID)
resp.ID = conversationID
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(resp) _ = json.NewEncoder(w).Encode(resp)
} }
func (s *GatewayServer) handleStreamingResponse(w http.ResponseWriter, r *http.Request, provider providers.Provider, fullReq *api.ResponseRequest, origReq *api.ResponseRequest) { func (s *GatewayServer) handleStreamingResponse(w http.ResponseWriter, r *http.Request, provider providers.Provider, providerMsgs []api.Message, resolvedReq *api.ResponseRequest, origReq *api.ResponseRequest, storeMsgs []api.Message) {
// Set headers for SSE
w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
@@ -160,89 +162,322 @@ func (s *GatewayServer) handleStreamingResponse(w http.ResponseWriter, r *http.R
return return
} }
chunkChan, errChan := provider.GenerateStream(r.Context(), fullReq) responseID := generateID("resp_")
itemID := generateID("msg_")
seq := 0
outputIdx := 0
contentIdx := 0
// Build initial response snapshot (in_progress, no output yet)
initialResp := s.buildResponse(origReq, &api.ProviderResult{
Model: origReq.Model,
}, provider.Name(), responseID)
initialResp.Status = "in_progress"
initialResp.CompletedAt = nil
initialResp.Output = []api.OutputItem{}
initialResp.Usage = nil
// response.created
s.sendSSE(w, flusher, &seq, "response.created", &api.StreamEvent{
Type: "response.created",
Response: initialResp,
})
// response.in_progress
s.sendSSE(w, flusher, &seq, "response.in_progress", &api.StreamEvent{
Type: "response.in_progress",
Response: initialResp,
})
// response.output_item.added
inProgressItem := &api.OutputItem{
ID: itemID,
Type: "message",
Status: "in_progress",
Role: "assistant",
Content: []api.ContentPart{},
}
s.sendSSE(w, flusher, &seq, "response.output_item.added", &api.StreamEvent{
Type: "response.output_item.added",
OutputIndex: &outputIdx,
Item: inProgressItem,
})
// response.content_part.added
emptyPart := &api.ContentPart{
Type: "output_text",
Text: "",
Annotations: []api.Annotation{},
}
s.sendSSE(w, flusher, &seq, "response.content_part.added", &api.StreamEvent{
Type: "response.content_part.added",
ItemID: itemID,
OutputIndex: &outputIdx,
ContentIndex: &contentIdx,
Part: emptyPart,
})
// Start provider stream
deltaChan, errChan := provider.GenerateStream(r.Context(), providerMsgs, resolvedReq)
var responseID string
var fullText string var fullText string
var streamErr error
var providerModel string
loop:
for { for {
select { select {
case chunk, ok := <-chunkChan: case delta, ok := <-deltaChan:
if !ok { if !ok {
return break loop
} }
if delta.Model != "" && providerModel == "" {
// Capture response ID providerModel = delta.Model
if chunk.ID != "" && responseID == "" {
responseID = chunk.ID
} }
if delta.Text != "" {
// Override chunk ID with conversation ID fullText += delta.Text
if origReq.PreviousResponseID != "" { s.sendSSE(w, flusher, &seq, "response.output_text.delta", &api.StreamEvent{
chunk.ID = origReq.PreviousResponseID Type: "response.output_text.delta",
} else if responseID != "" { ItemID: itemID,
chunk.ID = responseID OutputIndex: &outputIdx,
ContentIndex: &contentIdx,
Delta: delta.Text,
})
} }
if delta.Done {
// Accumulate text from deltas break loop
if chunk.Delta != nil && len(chunk.Delta.Content) > 0 {
for _, block := range chunk.Delta.Content {
fullText += block.Text
} }
}
data, err := json.Marshal(chunk)
if err != nil {
s.logger.Printf("failed to marshal chunk: %v", err)
continue
}
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
if chunk.Done {
// Store conversation with a single consolidated assistant message
s.storeStreamConversation(fullReq, origReq, responseID, fullText)
return
}
case err := <-errChan: case err := <-errChan:
if err != nil { if err != nil {
s.logger.Printf("stream error: %v", err) streamErr = err
errData, _ := json.Marshal(map[string]string{"error": err.Error()})
fmt.Fprintf(w, "data: %s\n\n", errData)
flusher.Flush()
} }
// Store whatever we accumulated before the error break loop
s.storeStreamConversation(fullReq, origReq, responseID, fullText)
return
case <-r.Context().Done(): case <-r.Context().Done():
s.logger.Printf("client disconnected") s.logger.Printf("client disconnected")
return return
} }
} }
}
func (s *GatewayServer) storeStreamConversation(fullReq *api.ResponseRequest, origReq *api.ResponseRequest, responseID string, fullText string) { if streamErr != nil {
if responseID == "" || fullText == "" { s.logger.Printf("stream error: %v", streamErr)
failedResp := s.buildResponse(origReq, &api.ProviderResult{
Model: origReq.Model,
}, provider.Name(), responseID)
failedResp.Status = "failed"
failedResp.CompletedAt = nil
failedResp.Output = []api.OutputItem{}
failedResp.Error = &api.ResponseError{
Type: "server_error",
Message: streamErr.Error(),
}
s.sendSSE(w, flusher, &seq, "response.failed", &api.StreamEvent{
Type: "response.failed",
Response: failedResp,
})
return return
} }
// response.output_text.done
s.sendSSE(w, flusher, &seq, "response.output_text.done", &api.StreamEvent{
Type: "response.output_text.done",
ItemID: itemID,
OutputIndex: &outputIdx,
ContentIndex: &contentIdx,
Text: fullText,
})
// response.content_part.done
completedPart := &api.ContentPart{
Type: "output_text",
Text: fullText,
Annotations: []api.Annotation{},
}
s.sendSSE(w, flusher, &seq, "response.content_part.done", &api.StreamEvent{
Type: "response.content_part.done",
ItemID: itemID,
OutputIndex: &outputIdx,
ContentIndex: &contentIdx,
Part: completedPart,
})
// response.output_item.done
completedItem := &api.OutputItem{
ID: itemID,
Type: "message",
Status: "completed",
Role: "assistant",
Content: []api.ContentPart{*completedPart},
}
s.sendSSE(w, flusher, &seq, "response.output_item.done", &api.StreamEvent{
Type: "response.output_item.done",
OutputIndex: &outputIdx,
Item: completedItem,
})
// Build final completed response
model := origReq.Model
if providerModel != "" {
model = providerModel
}
finalResult := &api.ProviderResult{
Model: model,
Text: fullText,
}
completedResp := s.buildResponse(origReq, finalResult, provider.Name(), responseID)
completedResp.Output[0].ID = itemID
// response.completed
s.sendSSE(w, flusher, &seq, "response.completed", &api.StreamEvent{
Type: "response.completed",
Response: completedResp,
})
// Store conversation
if fullText != "" {
assistantMsg := api.Message{ assistantMsg := api.Message{
Role: "assistant", Role: "assistant",
Content: []api.ContentBlock{ Content: []api.ContentBlock{{Type: "output_text", Text: fullText}},
{Type: "output_text", Text: fullText},
},
} }
messages := append(fullReq.Input, assistantMsg) allMsgs := append(storeMsgs, assistantMsg)
s.convs.Create(responseID, model, allMsgs)
}
}
conversationID := origReq.PreviousResponseID func (s *GatewayServer) sendSSE(w http.ResponseWriter, flusher http.Flusher, seq *int, eventType string, event *api.StreamEvent) {
if conversationID == "" { event.SequenceNumber = *seq
conversationID = responseID *seq++
data, err := json.Marshal(event)
if err != nil {
s.logger.Printf("failed to marshal SSE event: %v", err)
return
}
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, data)
flusher.Flush()
}
func (s *GatewayServer) buildResponse(req *api.ResponseRequest, result *api.ProviderResult, providerName string, responseID string) *api.Response {
now := time.Now().Unix()
model := result.Model
if model == "" {
model = req.Model
} }
s.convs.Create(conversationID, fullReq.Model, messages) // Build output item
itemID := generateID("msg_")
outputItem := api.OutputItem{
ID: itemID,
Type: "message",
Status: "completed",
Role: "assistant",
Content: []api.ContentPart{{
Type: "output_text",
Text: result.Text,
Annotations: []api.Annotation{},
}},
}
// Echo back request params with defaults
tools := req.Tools
if tools == nil {
tools = json.RawMessage(`[]`)
}
toolChoice := req.ToolChoice
if toolChoice == nil {
toolChoice = json.RawMessage(`"auto"`)
}
text := req.Text
if text == nil {
text = json.RawMessage(`{"format":{"type":"text"}}`)
}
truncation := "disabled"
if req.Truncation != nil {
truncation = *req.Truncation
}
temperature := 1.0
if req.Temperature != nil {
temperature = *req.Temperature
}
topP := 1.0
if req.TopP != nil {
topP = *req.TopP
}
presencePenalty := 0.0
if req.PresencePenalty != nil {
presencePenalty = *req.PresencePenalty
}
frequencyPenalty := 0.0
if req.FrequencyPenalty != nil {
frequencyPenalty = *req.FrequencyPenalty
}
topLogprobs := 0
if req.TopLogprobs != nil {
topLogprobs = *req.TopLogprobs
}
parallelToolCalls := true
if req.ParallelToolCalls != nil {
parallelToolCalls = *req.ParallelToolCalls
}
store := true
if req.Store != nil {
store = *req.Store
}
background := false
if req.Background != nil {
background = *req.Background
}
serviceTier := "default"
if req.ServiceTier != nil {
serviceTier = *req.ServiceTier
}
var reasoning json.RawMessage
if req.Reasoning != nil {
reasoning = req.Reasoning
}
metadata := req.Metadata
if metadata == nil {
metadata = map[string]string{}
}
var usage *api.Usage
if result.Text != "" {
usage = &result.Usage
}
return &api.Response{
ID: responseID,
Object: "response",
CreatedAt: now,
CompletedAt: &now,
Status: "completed",
IncompleteDetails: nil,
Model: model,
PreviousResponseID: req.PreviousResponseID,
Instructions: req.Instructions,
Output: []api.OutputItem{outputItem},
Error: nil,
Tools: tools,
ToolChoice: toolChoice,
Truncation: truncation,
ParallelToolCalls: parallelToolCalls,
Text: text,
TopP: topP,
PresencePenalty: presencePenalty,
FrequencyPenalty: frequencyPenalty,
TopLogprobs: topLogprobs,
Temperature: temperature,
Reasoning: reasoning,
Usage: usage,
MaxOutputTokens: req.MaxOutputTokens,
MaxToolCalls: req.MaxToolCalls,
Store: store,
Background: background,
ServiceTier: serviceTier,
Metadata: metadata,
SafetyIdentifier: nil,
PromptCacheKey: nil,
Provider: providerName,
}
} }
func (s *GatewayServer) resolveProvider(req *api.ResponseRequest) (providers.Provider, error) { func (s *GatewayServer) resolveProvider(req *api.ResponseRequest) (providers.Provider, error) {
@@ -254,3 +489,8 @@ func (s *GatewayServer) resolveProvider(req *api.ResponseRequest) (providers.Pro
} }
return s.registry.Default(req.Model) return s.registry.Default(req.Model)
} }
func generateID(prefix string) string {
id := strings.ReplaceAll(uuid.NewString(), "-", "")
return prefix + id[:24]
}