259 lines
6.6 KiB
Go
259 lines
6.6 KiB
Go
package observability
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/ajac-zero/latticelm/internal/api"
|
|
"github.com/ajac-zero/latticelm/internal/conversation"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
// InstrumentedStore wraps a conversation store with metrics and tracing.
|
|
type InstrumentedStore struct {
|
|
base conversation.Store
|
|
registry *prometheus.Registry
|
|
tracer trace.Tracer
|
|
backend string
|
|
}
|
|
|
|
// NewInstrumentedStore wraps a conversation store with observability.
|
|
func NewInstrumentedStore(s conversation.Store, backend string, registry *prometheus.Registry, tp *sdktrace.TracerProvider) conversation.Store {
|
|
var tracer trace.Tracer
|
|
if tp != nil {
|
|
tracer = tp.Tracer("llm-gateway")
|
|
}
|
|
|
|
// Initialize gauge with current size
|
|
if registry != nil {
|
|
conversationActiveCount.WithLabelValues(backend).Set(float64(s.Size()))
|
|
}
|
|
|
|
return &InstrumentedStore{
|
|
base: s,
|
|
registry: registry,
|
|
tracer: tracer,
|
|
backend: backend,
|
|
}
|
|
}
|
|
|
|
// Get wraps the store's Get method with metrics and tracing.
|
|
func (s *InstrumentedStore) Get(id string) (*conversation.Conversation, error) {
|
|
ctx := context.Background()
|
|
|
|
// Start span if tracing is enabled
|
|
if s.tracer != nil {
|
|
var span trace.Span
|
|
ctx, span = s.tracer.Start(ctx, "conversation.get",
|
|
trace.WithAttributes(
|
|
attribute.String("conversation.id", id),
|
|
attribute.String("conversation.backend", s.backend),
|
|
),
|
|
)
|
|
defer span.End()
|
|
}
|
|
|
|
// Record start time
|
|
start := time.Now()
|
|
|
|
// Call underlying store
|
|
conv, err := s.base.Get(id)
|
|
|
|
// Record metrics
|
|
duration := time.Since(start).Seconds()
|
|
status := "success"
|
|
if err != nil {
|
|
status = "error"
|
|
if s.tracer != nil {
|
|
span := trace.SpanFromContext(ctx)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
}
|
|
} else {
|
|
if s.tracer != nil {
|
|
span := trace.SpanFromContext(ctx)
|
|
if conv != nil {
|
|
span.SetAttributes(
|
|
attribute.Int("conversation.message_count", len(conv.Messages)),
|
|
attribute.String("conversation.model", conv.Model),
|
|
)
|
|
}
|
|
span.SetStatus(codes.Ok, "")
|
|
}
|
|
}
|
|
|
|
if s.registry != nil {
|
|
conversationOperationsTotal.WithLabelValues("get", s.backend, status).Inc()
|
|
conversationOperationDuration.WithLabelValues("get", s.backend).Observe(duration)
|
|
}
|
|
|
|
return conv, err
|
|
}
|
|
|
|
// Create wraps the store's Create method with metrics and tracing.
|
|
func (s *InstrumentedStore) Create(id string, model string, messages []api.Message) (*conversation.Conversation, error) {
|
|
ctx := context.Background()
|
|
|
|
// Start span if tracing is enabled
|
|
if s.tracer != nil {
|
|
var span trace.Span
|
|
ctx, span = s.tracer.Start(ctx, "conversation.create",
|
|
trace.WithAttributes(
|
|
attribute.String("conversation.id", id),
|
|
attribute.String("conversation.backend", s.backend),
|
|
attribute.String("conversation.model", model),
|
|
attribute.Int("conversation.initial_messages", len(messages)),
|
|
),
|
|
)
|
|
defer span.End()
|
|
}
|
|
|
|
// Record start time
|
|
start := time.Now()
|
|
|
|
// Call underlying store
|
|
conv, err := s.base.Create(id, model, messages)
|
|
|
|
// Record metrics
|
|
duration := time.Since(start).Seconds()
|
|
status := "success"
|
|
if err != nil {
|
|
status = "error"
|
|
if s.tracer != nil {
|
|
span := trace.SpanFromContext(ctx)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
}
|
|
} else {
|
|
if s.tracer != nil {
|
|
span := trace.SpanFromContext(ctx)
|
|
span.SetStatus(codes.Ok, "")
|
|
}
|
|
}
|
|
|
|
if s.registry != nil {
|
|
conversationOperationsTotal.WithLabelValues("create", s.backend, status).Inc()
|
|
conversationOperationDuration.WithLabelValues("create", s.backend).Observe(duration)
|
|
// Update active count
|
|
conversationActiveCount.WithLabelValues(s.backend).Set(float64(s.base.Size()))
|
|
}
|
|
|
|
return conv, err
|
|
}
|
|
|
|
// Append wraps the store's Append method with metrics and tracing.
|
|
func (s *InstrumentedStore) Append(id string, messages ...api.Message) (*conversation.Conversation, error) {
|
|
ctx := context.Background()
|
|
|
|
// Start span if tracing is enabled
|
|
if s.tracer != nil {
|
|
var span trace.Span
|
|
ctx, span = s.tracer.Start(ctx, "conversation.append",
|
|
trace.WithAttributes(
|
|
attribute.String("conversation.id", id),
|
|
attribute.String("conversation.backend", s.backend),
|
|
attribute.Int("conversation.appended_messages", len(messages)),
|
|
),
|
|
)
|
|
defer span.End()
|
|
}
|
|
|
|
// Record start time
|
|
start := time.Now()
|
|
|
|
// Call underlying store
|
|
conv, err := s.base.Append(id, messages...)
|
|
|
|
// Record metrics
|
|
duration := time.Since(start).Seconds()
|
|
status := "success"
|
|
if err != nil {
|
|
status = "error"
|
|
if s.tracer != nil {
|
|
span := trace.SpanFromContext(ctx)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
}
|
|
} else {
|
|
if s.tracer != nil {
|
|
span := trace.SpanFromContext(ctx)
|
|
if conv != nil {
|
|
span.SetAttributes(
|
|
attribute.Int("conversation.total_messages", len(conv.Messages)),
|
|
)
|
|
}
|
|
span.SetStatus(codes.Ok, "")
|
|
}
|
|
}
|
|
|
|
if s.registry != nil {
|
|
conversationOperationsTotal.WithLabelValues("append", s.backend, status).Inc()
|
|
conversationOperationDuration.WithLabelValues("append", s.backend).Observe(duration)
|
|
}
|
|
|
|
return conv, err
|
|
}
|
|
|
|
// Delete wraps the store's Delete method with metrics and tracing.
|
|
func (s *InstrumentedStore) Delete(id string) error {
|
|
ctx := context.Background()
|
|
|
|
// Start span if tracing is enabled
|
|
if s.tracer != nil {
|
|
var span trace.Span
|
|
ctx, span = s.tracer.Start(ctx, "conversation.delete",
|
|
trace.WithAttributes(
|
|
attribute.String("conversation.id", id),
|
|
attribute.String("conversation.backend", s.backend),
|
|
),
|
|
)
|
|
defer span.End()
|
|
}
|
|
|
|
// Record start time
|
|
start := time.Now()
|
|
|
|
// Call underlying store
|
|
err := s.base.Delete(id)
|
|
|
|
// Record metrics
|
|
duration := time.Since(start).Seconds()
|
|
status := "success"
|
|
if err != nil {
|
|
status = "error"
|
|
if s.tracer != nil {
|
|
span := trace.SpanFromContext(ctx)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
}
|
|
} else {
|
|
if s.tracer != nil {
|
|
span := trace.SpanFromContext(ctx)
|
|
span.SetStatus(codes.Ok, "")
|
|
}
|
|
}
|
|
|
|
if s.registry != nil {
|
|
conversationOperationsTotal.WithLabelValues("delete", s.backend, status).Inc()
|
|
conversationOperationDuration.WithLabelValues("delete", s.backend).Observe(duration)
|
|
// Update active count
|
|
conversationActiveCount.WithLabelValues(s.backend).Set(float64(s.base.Size()))
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Size returns the size of the underlying store.
|
|
func (s *InstrumentedStore) Size() int {
|
|
return s.base.Size()
|
|
}
|
|
|
|
// Close wraps the store's Close method.
|
|
func (s *InstrumentedStore) Close() error {
|
|
return s.base.Close()
|
|
}
|