Add telemetry

This commit is contained in:
Alexander
2026-04-14 10:31:56 +02:00
parent 20049881ad
commit 9cc052c162
15 changed files with 580 additions and 62 deletions
+19 -5
View File
@@ -11,11 +11,12 @@ import (
)
type Config struct {
Port int `yaml:"port"`
APIKeys []string `yaml:"api_keys"`
ClaudeBinary string `yaml:"claude_binary"`
Sanitize SanitizeConfig `yaml:"sanitize"`
Logging LoggingConfig `yaml:"logging"`
Port int `yaml:"port"`
APIKeys []string `yaml:"api_keys"`
ClaudeBinary string `yaml:"claude_binary"`
Sanitize SanitizeConfig `yaml:"sanitize"`
Logging LoggingConfig `yaml:"logging"`
Telemetry TelemetryConfig `yaml:"telemetry"`
}
type SanitizeConfig struct {
@@ -34,6 +35,15 @@ type ReplaceRule struct {
Replace string `yaml:"replace"`
}
type TelemetryConfig struct {
Endpoint string `yaml:"endpoint"`
Insecure bool `yaml:"insecure"`
ServiceName string `yaml:"service_name"`
Headers map[string]string `yaml:"headers"`
}
func (t TelemetryConfig) ExportEnabled() bool { return t.Endpoint != "" }
type LoggingConfig struct {
Level string `yaml:"level"`
File string `yaml:"file"`
@@ -76,6 +86,10 @@ func Load(path string) (*Config, error) {
cfg.Logging.MaxAgeDays = 30
}
if cfg.Telemetry.ServiceName == "" {
cfg.Telemetry.ServiceName = "anthropic-proxy"
}
// Check for deprecated claude_credentials field
var rawCfg map[string]interface{}
if err := yaml.Unmarshal(data, &rawCfg); err == nil {
+20 -5
View File
@@ -3,6 +3,7 @@ package logging
import (
"context"
"encoding/json"
"io"
"net/http"
"os"
"strings"
@@ -28,7 +29,9 @@ type Config struct {
// - File set: JSON → lumberjack rotating file
// - File empty + TTY: colored ConsoleWriter → stderr
// - File empty + not TTY: JSON → stderr (for systemd journal)
func Setup(cfg Config) zerolog.Logger {
// Extra writers (e.g., OTLP log bridge) are added via io.MultiWriter so logs
// are written to both the primary destination and any extra writers.
func Setup(cfg Config, extraWriters ...io.Writer) zerolog.Logger {
// Parse log level
level, err := zerolog.ParseLevel(cfg.Level)
if err != nil || cfg.Level == "" {
@@ -48,20 +51,32 @@ func Setup(cfg Config) zerolog.Logger {
MaxAge: cfg.MaxAgeDays,
Compress: cfg.Compress,
}
logger = zerolog.New(jack).With().Timestamp().Caller().Logger()
var w io.Writer = jack
if len(extraWriters) > 0 {
w = io.MultiWriter(append([]io.Writer{jack}, extraWriters...)...)
}
logger = zerolog.New(w).With().Timestamp().Caller().Logger()
} else {
fi, err := os.Stderr.Stat()
isTTY := err == nil && (fi.Mode()&os.ModeCharDevice) != 0
if isTTY {
// Dev mode: colored console
// Dev mode: colored console (extra writers get JSON, console gets pretty)
cw := zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: time.RFC3339,
}
logger = zerolog.New(cw).With().Timestamp().Caller().Logger()
var w io.Writer = cw
if len(extraWriters) > 0 {
w = io.MultiWriter(append([]io.Writer{cw}, extraWriters...)...)
}
logger = zerolog.New(w).With().Timestamp().Caller().Logger()
} else {
// Systemd journal: JSON to stderr
logger = zerolog.New(os.Stderr).With().Timestamp().Caller().Logger()
var w io.Writer = os.Stderr
if len(extraWriters) > 0 {
w = io.MultiWriter(append([]io.Writer{os.Stderr}, extraWriters...)...)
}
logger = zerolog.New(w).With().Timestamp().Caller().Logger()
}
}
+140 -8
View File
@@ -9,9 +9,12 @@ import (
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"github.com/tidwall/gjson"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"github.com/fujin/anthropic-proxy/internal/auth"
"github.com/fujin/anthropic-proxy/internal/logging"
"github.com/fujin/anthropic-proxy/internal/telemetry"
)
func HandleMessages(pool *auth.Pool, profile *SniffedProfile, getSanitizer func() *Sanitizer) gin.HandlerFunc {
@@ -55,10 +58,16 @@ func HandleMessages(pool *auth.Pool, profile *SniffedProfile, getSanitizer func(
func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte, originalBody []byte) {
startTime := time.Now()
respBody, headers, statusCode, err := upstream.Execute(c.Request.Context(), cred, body)
model := gjson.GetBytes(body, "model").String()
ctx := c.Request.Context()
telemetry.RequestBodySize.Record(ctx, int64(len(body)),
metric.WithAttributes(attribute.String("model", model), attribute.Bool("stream", false)))
respBody, headers, statusCode, err := upstream.Execute(ctx, cred, body)
latencyMs := float64(time.Since(startTime).Milliseconds())
if err != nil {
latencyMs := float64(time.Since(startTime).Milliseconds())
model := gjson.GetBytes(body, "model").String()
log.Error().
Err(err).
Str("credential", cred.Email).
@@ -69,14 +78,39 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p
Int("request_body_size", len(body)).
Float64("latency_ms", latencyMs).
Msg("upstream connection error")
telemetry.UpstreamErrors.Add(ctx, 1,
metric.WithAttributes(
attribute.String("error_type", "connection"),
attribute.String("credential", cred.Email),
attribute.Int("status_code", http.StatusBadGateway),
))
telemetry.RequestCounter.Add(ctx, 1,
metric.WithAttributes(
attribute.String("model", model),
attribute.Bool("stream", false),
attribute.Int("status_code", http.StatusBadGateway),
))
telemetry.RequestDuration.Record(ctx, latencyMs,
metric.WithAttributes(attribute.String("model", model), attribute.Bool("stream", false), attribute.Int("status_code", http.StatusBadGateway)))
c.JSON(http.StatusBadGateway, gin.H{"error": "upstream request failed"})
return
}
attrs := []attribute.KeyValue{
attribute.String("model", model),
attribute.Bool("stream", false),
attribute.Int("status_code", statusCode),
}
telemetry.RequestCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
telemetry.RequestDuration.Record(ctx, latencyMs, metric.WithAttributes(attrs...))
if statusCode >= 400 {
pool.MarkFailure(cred, statusCode)
latencyMs := float64(time.Since(startTime).Milliseconds())
model := gjson.GetBytes(body, "model").String()
telemetry.CredentialCooldowns.Add(ctx, 1,
metric.WithAttributes(attribute.Int("status_code", statusCode)))
errorType := gjson.GetBytes(respBody, "error.type").String()
errorMessage := gjson.GetBytes(respBody, "error.message").String()
log.Error().
@@ -94,9 +128,33 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p
Int("request_body_size", len(body)).
Str("request_headers", logging.RedactHeaders(c.Request.Header)).
Msg("upstream error")
telemetry.UpstreamErrors.Add(ctx, 1,
metric.WithAttributes(
attribute.Int("status_code", statusCode),
attribute.String("error_type", errorType),
attribute.String("credential", cred.Email),
))
} else {
pool.MarkSuccess(cred)
respBody = san.DesanitizeResponse(respBody)
inputTokens := gjson.GetBytes(respBody, "usage.input_tokens").Int()
outputTokens := gjson.GetBytes(respBody, "usage.output_tokens").Int()
tokenAttrs := metric.WithAttributes(
attribute.String("model", model),
attribute.String("credential", cred.Email),
)
telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs)
telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs)
log.Info().
Int("status", statusCode).
Float64("latency_ms", latencyMs).
Str("model", model).
Int64("input_tokens", inputTokens).
Int64("output_tokens", outputTokens).
Msg("request completed")
}
for _, h := range []string{"Content-Type", "X-Request-Id"} {
@@ -110,10 +168,16 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p
func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte, originalBody []byte) {
startTime := time.Now()
resp, err := upstream.ExecuteStream(c.Request.Context(), cred, body)
model := gjson.GetBytes(body, "model").String()
ctx := c.Request.Context()
telemetry.StreamRequests.Add(ctx, 1, metric.WithAttributes(attribute.String("model", model)))
telemetry.RequestBodySize.Record(ctx, int64(len(body)),
metric.WithAttributes(attribute.String("model", model), attribute.Bool("stream", true)))
resp, err := upstream.ExecuteStream(ctx, cred, body)
if err != nil {
latencyMs := float64(time.Since(startTime).Milliseconds())
model := gjson.GetBytes(body, "model").String()
log.Error().
Err(err).
Str("credential", cred.Email).
@@ -124,6 +188,22 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool
Int("request_body_size", len(body)).
Float64("latency_ms", latencyMs).
Msg("upstream connection error")
telemetry.UpstreamErrors.Add(ctx, 1,
metric.WithAttributes(
attribute.String("error_type", "connection"),
attribute.String("credential", cred.Email),
attribute.Int("status_code", http.StatusBadGateway),
))
telemetry.RequestCounter.Add(ctx, 1,
metric.WithAttributes(
attribute.String("model", model),
attribute.Bool("stream", true),
attribute.Int("status_code", http.StatusBadGateway),
))
telemetry.RequestDuration.Record(ctx, latencyMs,
metric.WithAttributes(attribute.String("model", model), attribute.Bool("stream", true), attribute.Int("status_code", http.StatusBadGateway)))
c.JSON(http.StatusBadGateway, gin.H{"error": "upstream stream request failed"})
return
}
@@ -131,9 +211,10 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool
if resp.StatusCode >= 400 {
pool.MarkFailure(cred, resp.StatusCode)
telemetry.CredentialCooldowns.Add(ctx, 1,
metric.WithAttributes(attribute.Int("status_code", resp.StatusCode)))
respBody, _ := io.ReadAll(resp.Body)
latencyMs := float64(time.Since(startTime).Milliseconds())
model := gjson.GetBytes(body, "model").String()
errorType := gjson.GetBytes(respBody, "error.type").String()
errorMessage := gjson.GetBytes(respBody, "error.message").String()
log.Error().
@@ -151,6 +232,21 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool
Int("request_body_size", len(body)).
Str("request_headers", logging.RedactHeaders(c.Request.Header)).
Msg("upstream error")
attrs := []attribute.KeyValue{
attribute.String("model", model),
attribute.Bool("stream", true),
attribute.Int("status_code", resp.StatusCode),
}
telemetry.RequestCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
telemetry.RequestDuration.Record(ctx, latencyMs, metric.WithAttributes(attrs...))
telemetry.UpstreamErrors.Add(ctx, 1,
metric.WithAttributes(
attribute.Int("status_code", resp.StatusCode),
attribute.String("error_type", errorType),
attribute.String("credential", cred.Email),
))
c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), respBody)
return
}
@@ -169,14 +265,50 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool
return
}
var inputTokens, outputTokens int64
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
line := san.DesanitizeStreamEvent(scanner.Text())
c.Writer.WriteString(line + "\n")
flusher.Flush()
// Extract token usage from message_delta event
if len(line) > 5 && line[:5] == "data:" {
data := line[5:]
if gjson.Get(data, "type").String() == "message_delta" {
inputTokens = gjson.Get(data, "usage.input_tokens").Int()
outputTokens = gjson.Get(data, "usage.output_tokens").Int()
}
}
}
latencyMs := float64(time.Since(startTime).Milliseconds())
attrs := []attribute.KeyValue{
attribute.String("model", model),
attribute.Bool("stream", true),
attribute.Int("status_code", http.StatusOK),
}
telemetry.RequestCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
telemetry.RequestDuration.Record(ctx, latencyMs, metric.WithAttributes(attrs...))
if inputTokens > 0 || outputTokens > 0 {
tokenAttrs := metric.WithAttributes(
attribute.String("model", model),
attribute.String("credential", cred.Email),
)
telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs)
telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs)
}
log.Info().
Float64("latency_ms", latencyMs).
Str("model", model).
Bool("stream", true).
Int64("input_tokens", inputTokens).
Int64("output_tokens", outputTokens).
Msg("stream completed")
if err := scanner.Err(); err != nil {
log.Error().Err(err).Msg("stream scan error")
}
+9
View File
@@ -51,6 +51,15 @@ func (u *UpstreamClient) applyHeaders(req *http.Request, token string, streaming
req.Header.Del("x-api-key")
if strings.HasPrefix(token, "sk-ant-oat") {
req.Header.Set("Authorization", "Bearer "+token)
// OAuth tokens require this beta flag — without it the API rejects with 401
existing := req.Header.Get("anthropic-beta")
if !strings.Contains(existing, "oauth-2025-04-20") {
if existing == "" {
req.Header.Set("anthropic-beta", "oauth-2025-04-20")
} else {
req.Header.Set("anthropic-beta", existing+",oauth-2025-04-20")
}
}
} else {
req.Header.Set("x-api-key", token)
}
+4
View File
@@ -14,6 +14,7 @@ import (
"github.com/fujin/anthropic-proxy/internal/config"
"github.com/fujin/anthropic-proxy/internal/logging"
"github.com/fujin/anthropic-proxy/internal/proxy"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
)
type Server struct {
@@ -37,6 +38,9 @@ func New(cfg *config.Config, pool *auth.Pool, profile *proxy.SniffedProfile) *Se
engine := gin.New()
engine.Use(gin.Recovery())
engine.Use(corsMiddleware())
if cfg.Telemetry.ExportEnabled() {
engine.Use(otelgin.Middleware(cfg.Telemetry.ServiceName))
}
engine.Use(s.authMiddleware())
engine.Use(logging.GinRequestLogger())
+81
View File
@@ -0,0 +1,81 @@
package telemetry
import (
"context"
"encoding/json"
"time"
otellog "go.opentelemetry.io/otel/log"
sdklog "go.opentelemetry.io/otel/sdk/log"
)
// LogBridge implements io.Writer and forwards zerolog JSON lines to the
// OTel LoggerProvider. It is used as an extra writer in zerolog's MultiWriter
// so that logs go to both file and OTLP.
type LogBridge struct {
provider *sdklog.LoggerProvider
}
func (b *LogBridge) Write(p []byte) (n int, err error) {
var entry map[string]interface{}
if err := json.Unmarshal(p, &entry); err != nil {
return len(p), nil // skip malformed lines
}
logger := b.provider.Logger("zerolog")
var rec otellog.Record
rec.SetTimestamp(time.Now())
if msg, ok := entry["message"].(string); ok {
rec.SetBody(otellog.StringValue(msg))
}
if lvl, ok := entry["level"].(string); ok {
rec.SetSeverity(mapSeverity(lvl))
}
// Forward all fields as attributes
attrs := make([]otellog.KeyValue, 0, len(entry))
for k, v := range entry {
if k == "message" || k == "level" || k == "time" {
continue
}
switch val := v.(type) {
case string:
attrs = append(attrs, otellog.String(k, val))
case float64:
attrs = append(attrs, otellog.Float64(k, val))
case bool:
attrs = append(attrs, otellog.Bool(k, val))
default:
b, _ := json.Marshal(val)
attrs = append(attrs, otellog.String(k, string(b)))
}
}
rec.AddAttributes(attrs...)
logger.Emit(context.Background(), rec)
return len(p), nil
}
func mapSeverity(level string) otellog.Severity {
switch level {
case "trace":
return otellog.SeverityTrace
case "debug":
return otellog.SeverityDebug
case "info":
return otellog.SeverityInfo
case "warn", "warning":
return otellog.SeverityWarn
case "error":
return otellog.SeverityError
case "fatal":
return otellog.SeverityFatal
case "panic":
return otellog.SeverityFatal2
default:
return otellog.SeverityInfo
}
}
+50
View File
@@ -0,0 +1,50 @@
package telemetry
import (
"go.opentelemetry.io/otel/metric"
)
var (
RequestCounter metric.Int64Counter
RequestDuration metric.Float64Histogram
RequestBodySize metric.Int64Histogram
UpstreamErrors metric.Int64Counter
TokensInput metric.Int64Counter
TokensOutput metric.Int64Counter
CredentialCooldowns metric.Int64Counter
ActiveCredentials metric.Int64UpDownCounter
StreamRequests metric.Int64Counter
)
// InitMetrics creates all metric instruments from the given meter.
func InitMetrics(meter metric.Meter) {
RequestCounter, _ = meter.Int64Counter("proxy.request.count",
metric.WithDescription("Total proxy requests"),
)
RequestDuration, _ = meter.Float64Histogram("proxy.request.duration_ms",
metric.WithDescription("Request latency in milliseconds"),
metric.WithUnit("ms"),
)
RequestBodySize, _ = meter.Int64Histogram("proxy.request.body_size_bytes",
metric.WithDescription("Request body size in bytes"),
metric.WithUnit("By"),
)
UpstreamErrors, _ = meter.Int64Counter("proxy.upstream.errors",
metric.WithDescription("Upstream error count"),
)
TokensInput, _ = meter.Int64Counter("proxy.tokens.input",
metric.WithDescription("Input tokens consumed"),
)
TokensOutput, _ = meter.Int64Counter("proxy.tokens.output",
metric.WithDescription("Output tokens consumed"),
)
CredentialCooldowns, _ = meter.Int64Counter("proxy.credential.cooldowns",
metric.WithDescription("Credential cooldown activations"),
)
ActiveCredentials, _ = meter.Int64UpDownCounter("proxy.credential.active",
metric.WithDescription("Currently active (non-cooldown) credentials"),
)
StreamRequests, _ = meter.Int64Counter("proxy.stream.requests",
metric.WithDescription("Streaming request count"),
)
}
+107
View File
@@ -0,0 +1,107 @@
package telemetry
import (
"context"
"io"
"github.com/fujin/anthropic-proxy/internal/config"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
otellog "go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)
// Setup initializes OpenTelemetry providers. It always creates a MeterProvider
// so metrics can be recorded in-process. When cfg.ExportEnabled(), OTLP gRPC
// exporters are additionally configured to push to the LGTM stack.
// Returns a shutdown function and an optional io.Writer for the log bridge.
func Setup(ctx context.Context, cfg config.TelemetryConfig) (shutdown func(context.Context) error, logWriter io.Writer, err error) {
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName(cfg.ServiceName),
),
)
if err != nil {
return nil, nil, err
}
if !cfg.ExportEnabled() {
// No export — set up in-memory meter provider only so metric
// instruments are valid (they just don't export anywhere).
mp := sdkmetric.NewMeterProvider(sdkmetric.WithResource(res))
otel.SetMeterProvider(mp)
InitMetrics(mp.Meter(cfg.ServiceName))
return func(ctx context.Context) error { return mp.Shutdown(ctx) }, nil, nil
}
// Build exporter options
traceOpts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(cfg.Endpoint)}
metricOpts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithEndpoint(cfg.Endpoint),
otlpmetricgrpc.WithTemporalitySelector(sdkmetric.CumulativeTemporalitySelector),
}
logOpts := []otlploggrpc.Option{otlploggrpc.WithEndpoint(cfg.Endpoint)}
if cfg.Insecure {
traceOpts = append(traceOpts, otlptracegrpc.WithInsecure())
metricOpts = append(metricOpts, otlpmetricgrpc.WithInsecure())
logOpts = append(logOpts, otlploggrpc.WithInsecure())
}
// Trace exporter
traceExp, err := otlptracegrpc.New(ctx, traceOpts...)
if err != nil {
return nil, nil, err
}
tp := trace.NewTracerProvider(
trace.WithBatcher(traceExp),
trace.WithResource(res),
)
otel.SetTracerProvider(tp)
// Metric exporter
metricExp, err := otlpmetricgrpc.New(ctx, metricOpts...)
if err != nil {
return nil, nil, err
}
mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExp)),
sdkmetric.WithResource(res),
)
otel.SetMeterProvider(mp)
InitMetrics(mp.Meter(cfg.ServiceName))
// Log exporter
logExp, err := otlploggrpc.New(ctx, logOpts...)
if err != nil {
return nil, nil, err
}
lp := log.NewLoggerProvider(
log.WithProcessor(log.NewBatchProcessor(logExp)),
log.WithResource(res),
)
otellog.SetLoggerProvider(lp)
bridge := &LogBridge{provider: lp}
shutdownFn := func(ctx context.Context) error {
var firstErr error
if e := tp.Shutdown(ctx); e != nil && firstErr == nil {
firstErr = e
}
if e := mp.Shutdown(ctx); e != nil && firstErr == nil {
firstErr = e
}
if e := lp.Shutdown(ctx); e != nil && firstErr == nil {
firstErr = e
}
return firstErr
}
return shutdownFn, bridge, nil
}