diff --git a/internal/proxy/handler.go b/internal/proxy/handler.go index caaff03..146d92e 100644 --- a/internal/proxy/handler.go +++ b/internal/proxy/handler.go @@ -14,10 +14,11 @@ import ( "github.com/fujin/anthropic-proxy/internal/auth" "github.com/fujin/anthropic-proxy/internal/logging" + "github.com/fujin/anthropic-proxy/internal/ratelimit" "github.com/fujin/anthropic-proxy/internal/telemetry" ) -func HandleMessages(pool *auth.Pool, profile *SniffedProfile, getSanitizer func() *Sanitizer) gin.HandlerFunc { +func HandleMessages(pool *auth.Pool, profile *SniffedProfile, getSanitizer func() *Sanitizer, tracker *ratelimit.Tracker) gin.HandlerFunc { upstream := NewUpstreamClient(profile) return func(c *gin.Context) { @@ -49,14 +50,14 @@ func HandleMessages(pool *auth.Pool, profile *SniffedProfile, getSanitizer func( isStream := gjson.GetBytes(body, "stream").Bool() if isStream { - handleStream(c, upstream, san, pool, cred, body, originalBody) + handleStream(c, upstream, san, pool, cred, body, originalBody, tracker) } else { - handleNonStream(c, upstream, san, pool, cred, body, originalBody) + handleNonStream(c, upstream, san, pool, cred, body, originalBody, tracker) } } } -func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte, originalBody []byte) { +func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte, originalBody []byte, tracker *ratelimit.Tracker) { startTime := time.Now() model := gjson.GetBytes(body, "model").String() ctx := c.Request.Context() @@ -147,6 +148,10 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p ) telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs) telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs) + if tracker != nil { + tracker.RecordTokens(inputTokens, outputTokens) + tracker.UpdateFromHeaders(headers) + } log.Info(). Int("status", statusCode). @@ -166,7 +171,7 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p c.Data(statusCode, headers.Get("Content-Type"), respBody) } -func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte, originalBody []byte) { +func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte, originalBody []byte, tracker *ratelimit.Tracker) { startTime := time.Now() model := gjson.GetBytes(body, "model").String() ctx := c.Request.Context() @@ -299,6 +304,10 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool ) telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs) telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs) + if tracker != nil { + tracker.RecordTokens(inputTokens, outputTokens) + tracker.UpdateFromHeaders(resp.Header) + } } log.Info(). diff --git a/internal/ratelimit/tracker.go b/internal/ratelimit/tracker.go new file mode 100644 index 0000000..5b25634 --- /dev/null +++ b/internal/ratelimit/tracker.go @@ -0,0 +1,213 @@ +package ratelimit + +import ( + "context" + "net/http" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/rs/zerolog/log" +) + +// Window holds per-window usage state. +type Window struct { + Utilization float64 // 0-100 from API + ResetsAt time.Time // when window resets + TokensIn atomic.Int64 + TokensOut atomic.Int64 +} + +// Snapshot is a read-only copy of a Window's state. +type Snapshot struct { + Utilization float64 + ResetsAt time.Time + TokensIn int64 + TokensOut int64 +} + +// Tracker polls /api/oauth/usage and tracks per-window token usage. +type Tracker struct { + tokenFn func() string // returns current OAuth access token + mu sync.RWMutex + fiveHour Window + sevenDay Window + sonnet Window + extra ExtraUsage +} + +// NewTracker creates a tracker. tokenFn should return the current access token. +func NewTracker(tokenFn func() string) *Tracker { + return &Tracker{tokenFn: tokenFn} +} + +// Start begins the background poll loop. +func (t *Tracker) Start(ctx context.Context) { + go func() { + // Poll immediately on start + t.poll(ctx) + for { + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Minute): + t.poll(ctx) + } + } + }() +} + +// UpdateFromHeaders extracts rate limit data from /v1/messages response headers. +// This provides real-time utilization updates without polling the usage API. +func (t *Tracker) UpdateFromHeaders(h http.Header) { + t.mu.Lock() + defer t.mu.Unlock() + + if v := h.Get("Anthropic-Ratelimit-Unified-5h-Utilization"); v != "" { + if f, err := strconv.ParseFloat(v, 64); err == nil { + t.fiveHour.Utilization = f * 100 // header is 0-1, we store 0-100 + } + } + if v := h.Get("Anthropic-Ratelimit-Unified-5h-Reset"); v != "" { + if ts, err := strconv.ParseInt(v, 10, 64); err == nil { + t.fiveHour.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute) + } + } + if v := h.Get("Anthropic-Ratelimit-Unified-7d-Utilization"); v != "" { + if f, err := strconv.ParseFloat(v, 64); err == nil { + t.sevenDay.Utilization = f * 100 + } + } + if v := h.Get("Anthropic-Ratelimit-Unified-7d-Reset"); v != "" { + if ts, err := strconv.ParseInt(v, 10, 64); err == nil { + t.sevenDay.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute) + } + } +} + +// RecordTokens adds tokens to all active windows. +func (t *Tracker) RecordTokens(inputTokens, outputTokens int64) { + t.fiveHour.TokensIn.Add(inputTokens) + t.fiveHour.TokensOut.Add(outputTokens) + t.sevenDay.TokensIn.Add(inputTokens) + t.sevenDay.TokensOut.Add(outputTokens) +} + +// FiveHour returns a snapshot of the 5-hour window. +func (t *Tracker) FiveHour() Snapshot { + t.mu.RLock() + defer t.mu.RUnlock() + return Snapshot{ + Utilization: t.fiveHour.Utilization, + ResetsAt: t.fiveHour.ResetsAt, + TokensIn: t.fiveHour.TokensIn.Load(), + TokensOut: t.fiveHour.TokensOut.Load(), + } +} + +// SevenDay returns a snapshot of the 7-day window. +func (t *Tracker) SevenDay() Snapshot { + t.mu.RLock() + defer t.mu.RUnlock() + return Snapshot{ + Utilization: t.sevenDay.Utilization, + ResetsAt: t.sevenDay.ResetsAt, + TokensIn: t.sevenDay.TokensIn.Load(), + TokensOut: t.sevenDay.TokensOut.Load(), + } +} + +// Sonnet returns a snapshot of the 7-day sonnet window. +func (t *Tracker) Sonnet() Snapshot { + t.mu.RLock() + defer t.mu.RUnlock() + return Snapshot{ + Utilization: t.sonnet.Utilization, + ResetsAt: t.sonnet.ResetsAt, + } +} + +// Extra returns the current extra usage state. +func (t *Tracker) Extra() ExtraUsage { + t.mu.RLock() + defer t.mu.RUnlock() + return t.extra +} + +func (t *Tracker) poll(ctx context.Context) { + token := t.tokenFn() + if token == "" { + return + } + + usage, err := fetchUsage(ctx, token) + if err != nil { + log.Warn().Err(err).Msg("usage poll failed") + return + } + + t.mu.Lock() + defer t.mu.Unlock() + + if usage.FiveHour != nil { + t.updateWindow(&t.fiveHour, usage.FiveHour, "5h") + } + if usage.SevenDay != nil { + t.updateWindow(&t.sevenDay, usage.SevenDay, "7d") + } + if usage.SevenDaySonnet != nil { + t.updateWindow(&t.sonnet, usage.SevenDaySonnet, "7d_sonnet") + } + if usage.ExtraUsage != nil { + t.extra = *usage.ExtraUsage + } + + log.Debug(). + Float64("5h_util", t.fiveHour.Utilization). + Time("5h_resets", t.fiveHour.ResetsAt). + Int64("5h_tokens_in", t.fiveHour.TokensIn.Load()). + Int64("5h_tokens_out", t.fiveHour.TokensOut.Load()). + Float64("7d_util", t.sevenDay.Utilization). + Time("7d_resets", t.sevenDay.ResetsAt). + Int64("7d_tokens_in", t.sevenDay.TokensIn.Load()). + Int64("7d_tokens_out", t.sevenDay.TokensOut.Load()). + Msg("usage poll") + + // Warn on high utilization + if t.fiveHour.Utilization > 80 { + log.Warn().Float64("utilization", t.fiveHour.Utilization).Time("resets_at", t.fiveHour.ResetsAt).Msg("5h window utilization high") + } + if t.sevenDay.Utilization > 80 { + log.Warn().Float64("utilization", t.sevenDay.Utilization).Time("resets_at", t.sevenDay.ResetsAt).Msg("7d window utilization high") + } +} + +func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) { + if rl.Utilization != nil { + w.Utilization = *rl.Utilization + } + if rl.ResetsAt != nil { + parsed, err := time.Parse(time.RFC3339Nano, *rl.ResetsAt) + if err != nil { + // Fallback to RFC3339 without fractional seconds + parsed, err = time.Parse(time.RFC3339, *rl.ResetsAt) + } + parsed = parsed.UTC().Truncate(time.Minute) + if err == nil && parsed != w.ResetsAt && !w.ResetsAt.IsZero() { + // Window reset detected — zero token counters + log.Info(). + Str("window", name). + Int64("prev_tokens_in", w.TokensIn.Load()). + Int64("prev_tokens_out", w.TokensOut.Load()). + Time("old_reset", w.ResetsAt). + Time("new_reset", parsed). + Msg("window reset detected") + w.TokensIn.Store(0) + w.TokensOut.Store(0) + } + if err == nil { + w.ResetsAt = parsed + } + } +} diff --git a/internal/ratelimit/usage.go b/internal/ratelimit/usage.go new file mode 100644 index 0000000..5aa8586 --- /dev/null +++ b/internal/ratelimit/usage.go @@ -0,0 +1,62 @@ +package ratelimit + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +const usageURL = "https://api.anthropic.com/api/oauth/usage" + +type RateLimit struct { + Utilization *float64 `json:"utilization"` // 0-100 + ResetsAt *string `json:"resets_at"` // ISO 8601 +} + +type ExtraUsage struct { + IsEnabled bool `json:"is_enabled"` + MonthlyLimit *float64 `json:"monthly_limit"` + UsedCredits *float64 `json:"used_credits"` + Utilization *float64 `json:"utilization"` +} + +type UsageResponse struct { + FiveHour *RateLimit `json:"five_hour"` + SevenDay *RateLimit `json:"seven_day"` + SevenDaySonnet *RateLimit `json:"seven_day_sonnet"` + ExtraUsage *ExtraUsage `json:"extra_usage"` +} + +func fetchUsage(ctx context.Context, token string) (*UsageResponse, error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, usageURL, nil) + if err != nil { + return nil, fmt.Errorf("build request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("anthropic-beta", "oauth-2025-04-20") + req.Header.Set("User-Agent", "claude-cli/2.1.92") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("usage returned %d: %s", resp.StatusCode, string(body)) + } + + var usage UsageResponse + if err := json.Unmarshal(body, &usage); err != nil { + return nil, fmt.Errorf("decode: %w", err) + } + return &usage, nil +} diff --git a/internal/server/server.go b/internal/server/server.go index 64c097c..28d7492 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -14,6 +14,7 @@ import ( "github.com/fujin/anthropic-proxy/internal/config" "github.com/fujin/anthropic-proxy/internal/logging" "github.com/fujin/anthropic-proxy/internal/proxy" + "github.com/fujin/anthropic-proxy/internal/ratelimit" "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" ) @@ -25,7 +26,7 @@ type Server struct { apiKeys atomic.Pointer[map[string]struct{}] } -func New(cfg *config.Config, pool *auth.Pool, profile *proxy.SniffedProfile) *Server { +func New(cfg *config.Config, pool *auth.Pool, profile *proxy.SniffedProfile, tracker *ratelimit.Tracker) *Server { s := &Server{configPath: "config.yaml"} san := proxy.NewSanitizer(cfg.Sanitize) @@ -46,7 +47,7 @@ func New(cfg *config.Config, pool *auth.Pool, profile *proxy.SniffedProfile) *Se handler := proxy.HandleMessages(pool, profile, func() *proxy.Sanitizer { return s.sanitizer.Load() - }) + }, tracker) engine.POST("/v1/messages", handler) engine.POST("/messages", handler) diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 1f7abd6..a5f0662 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -1,7 +1,12 @@ package telemetry import ( + "context" + + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + + "github.com/fujin/anthropic-proxy/internal/ratelimit" ) var ( @@ -17,7 +22,8 @@ var ( ) // InitMetrics creates all metric instruments from the given meter. -func InitMetrics(meter metric.Meter) { +// If tracker is non-nil, registers observable gauges for per-window usage. +func InitMetrics(meter metric.Meter, tracker *ratelimit.Tracker) { RequestCounter, _ = meter.Int64Counter("proxy.request.count", metric.WithDescription("Total proxy requests"), ) @@ -47,4 +53,50 @@ func InitMetrics(meter metric.Meter) { StreamRequests, _ = meter.Int64Counter("proxy.stream.requests", metric.WithDescription("Streaming request count"), ) + + if tracker == nil { + return + } + + attr5h := attribute.String("window", "5h") + attr7d := attribute.String("window", "7d") + attrSonnet := attribute.String("window", "7d_sonnet") + + meter.Float64ObservableGauge("proxy.usage.utilization", + metric.WithDescription("Current utilization % from API"), + metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error { + o.Observe(tracker.FiveHour().Utilization, metric.WithAttributes(attr5h)) + o.Observe(tracker.SevenDay().Utilization, metric.WithAttributes(attr7d)) + o.Observe(tracker.Sonnet().Utilization, metric.WithAttributes(attrSonnet)) + return nil + }), + ) + + meter.Int64ObservableGauge("proxy.usage.resets_at", + metric.WithDescription("Unix seconds when window resets"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(tracker.FiveHour().ResetsAt.Unix(), metric.WithAttributes(attr5h)) + o.Observe(tracker.SevenDay().ResetsAt.Unix(), metric.WithAttributes(attr7d)) + o.Observe(tracker.Sonnet().ResetsAt.Unix(), metric.WithAttributes(attrSonnet)) + return nil + }), + ) + + meter.Int64ObservableGauge("proxy.usage.window_tokens.input", + metric.WithDescription("Proxy input tokens in current window"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(tracker.FiveHour().TokensIn, metric.WithAttributes(attr5h)) + o.Observe(tracker.SevenDay().TokensIn, metric.WithAttributes(attr7d)) + return nil + }), + ) + + meter.Int64ObservableGauge("proxy.usage.window_tokens.output", + metric.WithDescription("Proxy output tokens in current window"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(tracker.FiveHour().TokensOut, metric.WithAttributes(attr5h)) + o.Observe(tracker.SevenDay().TokensOut, metric.WithAttributes(attr7d)) + return nil + }), + ) } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 0e5f7c9..d836e93 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -5,6 +5,7 @@ import ( "io" "github.com/fujin/anthropic-proxy/internal/config" + "github.com/fujin/anthropic-proxy/internal/ratelimit" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -21,7 +22,7 @@ import ( // so metrics can be recorded in-process. When cfg.ExportEnabled(), OTLP gRPC // exporters are additionally configured to push to the LGTM stack. // Returns a shutdown function and an optional io.Writer for the log bridge. -func Setup(ctx context.Context, cfg config.TelemetryConfig) (shutdown func(context.Context) error, logWriter io.Writer, err error) { +func Setup(ctx context.Context, cfg config.TelemetryConfig, tracker *ratelimit.Tracker) (shutdown func(context.Context) error, logWriter io.Writer, err error) { res, err := resource.New(ctx, resource.WithAttributes( semconv.ServiceName(cfg.ServiceName), @@ -36,7 +37,7 @@ func Setup(ctx context.Context, cfg config.TelemetryConfig) (shutdown func(conte // instruments are valid (they just don't export anywhere). mp := sdkmetric.NewMeterProvider(sdkmetric.WithResource(res)) otel.SetMeterProvider(mp) - InitMetrics(mp.Meter(cfg.ServiceName)) + InitMetrics(mp.Meter(cfg.ServiceName), tracker) return func(ctx context.Context) error { return mp.Shutdown(ctx) }, nil, nil } @@ -74,7 +75,7 @@ func Setup(ctx context.Context, cfg config.TelemetryConfig) (shutdown func(conte sdkmetric.WithResource(res), ) otel.SetMeterProvider(mp) - InitMetrics(mp.Meter(cfg.ServiceName)) + InitMetrics(mp.Meter(cfg.ServiceName), tracker) // Log exporter logExp, err := otlploggrpc.New(ctx, logOpts...) diff --git a/main.go b/main.go index eaffa31..4974ce9 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "github.com/fujin/anthropic-proxy/internal/config" "github.com/fujin/anthropic-proxy/internal/logging" "github.com/fujin/anthropic-proxy/internal/proxy" + "github.com/fujin/anthropic-proxy/internal/ratelimit" "github.com/fujin/anthropic-proxy/internal/server" "github.com/fujin/anthropic-proxy/internal/telemetry" "github.com/rs/zerolog/log" @@ -25,8 +26,17 @@ func run() error { return fmt.Errorf("load config: %w", err) } + // Create usage tracker (started later once credential is loaded) + var credForTracker *auth.Credential + tracker := ratelimit.NewTracker(func() string { + if credForTracker == nil { + return "" + } + return credForTracker.Token() + }) + // Initialize telemetry (metrics always active; OTLP export when endpoint set) - telemetryShutdown, logBridge, err := telemetry.Setup(context.Background(), cfg.Telemetry) + telemetryShutdown, logBridge, err := telemetry.Setup(context.Background(), cfg.Telemetry, tracker) if err != nil { return fmt.Errorf("telemetry setup: %w", err) } @@ -85,6 +95,8 @@ func run() error { log.Info().Str("credential", cred.Email).Msg("credential loaded") + credForTracker = cred + pool := auth.NewPool([]*auth.Credential{cred}) ctx, cancel := context.WithCancel(context.Background()) @@ -92,6 +104,7 @@ func run() error { pool.RefreshExpiring(context.Background()) auth.StartBackgroundRefresh(ctx, pool) + tracker.Start(ctx) var profile *proxy.SniffedProfile if cfg.ClaudeBinary != "" { @@ -103,7 +116,7 @@ func run() error { } log.Info().Int("port", cfg.Port).Msg("starting server") - srv := server.New(cfg, pool, profile) + srv := server.New(cfg, pool, profile, tracker) quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)