From 27b647e9b48f2df6cf23c5b237c21df6c7e33240 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 14 Apr 2026 14:25:31 +0200 Subject: [PATCH] refactor(ratelimit): remove per-window token tracking from proxy Window token counts are now computed in Grafana using the @ modifier with dashboard variables derived from proxy_usage_resets_at. This eliminates in-memory state, file persistence, and restart sensitivity. Removes: TokensIn/Out, RecordTokens, setResetTime, persist.go, window_tokens observable gauges. -171 lines. --- internal/proxy/handler.go | 2 - internal/ratelimit/persist.go | 89 ----------------------------------- internal/ratelimit/tracker.go | 76 +++++------------------------- internal/telemetry/metrics.go | 17 ------- 4 files changed, 13 insertions(+), 171 deletions(-) delete mode 100644 internal/ratelimit/persist.go diff --git a/internal/proxy/handler.go b/internal/proxy/handler.go index dd3a519..04d65d7 100644 --- a/internal/proxy/handler.go +++ b/internal/proxy/handler.go @@ -149,7 +149,6 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs) telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs) if tracker != nil { - tracker.RecordTokens(inputTokens, outputTokens) tracker.UpdateFromHeaders(headers) } @@ -307,7 +306,6 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs) telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs) if tracker != nil { - tracker.RecordTokens(inputTokens, outputTokens) tracker.UpdateFromHeaders(resp.Header) } } diff --git a/internal/ratelimit/persist.go b/internal/ratelimit/persist.go deleted file mode 100644 index 296bc68..0000000 --- a/internal/ratelimit/persist.go +++ /dev/null @@ -1,89 +0,0 @@ -package ratelimit - -import ( - "encoding/json" - "os" - "path/filepath" - "time" - - "github.com/rs/zerolog/log" -) - -const stateFile = ".anthropic-proxy-state.json" - -type windowState struct { - ResetsAt time.Time `json:"resets_at"` - TokensIn int64 `json:"tokens_in"` - TokensOut int64 `json:"tokens_out"` -} - -type persistedState struct { - FiveHour windowState `json:"five_hour"` - SevenDay windowState `json:"seven_day"` -} - -func statePath() string { - home, err := os.UserHomeDir() - if err != nil { - return stateFile - } - return filepath.Join(home, ".claude", stateFile) -} - -func (t *Tracker) Save() { - t.mu.RLock() - state := persistedState{ - FiveHour: windowState{ - ResetsAt: t.fiveHour.ResetsAt, - TokensIn: t.fiveHour.TokensIn.Load(), - TokensOut: t.fiveHour.TokensOut.Load(), - }, - SevenDay: windowState{ - ResetsAt: t.sevenDay.ResetsAt, - TokensIn: t.sevenDay.TokensIn.Load(), - TokensOut: t.sevenDay.TokensOut.Load(), - }, - } - t.mu.RUnlock() - - data, err := json.Marshal(state) - if err != nil { - log.Warn().Err(err).Msg("failed to marshal tracker state") - return - } - - path := statePath() - if err := os.WriteFile(path, data, 0644); err != nil { - log.Warn().Err(err).Str("path", path).Msg("failed to save tracker state") - return - } -} - -func (t *Tracker) Restore() { - path := statePath() - data, err := os.ReadFile(path) - if err != nil { - return - } - - var state persistedState - if err := json.Unmarshal(data, &state); err != nil { - log.Warn().Err(err).Msg("failed to parse tracker state") - return - } - - t.mu.Lock() - defer t.mu.Unlock() - - if !state.FiveHour.ResetsAt.IsZero() && state.FiveHour.ResetsAt == t.fiveHour.ResetsAt { - t.fiveHour.TokensIn.Store(state.FiveHour.TokensIn) - t.fiveHour.TokensOut.Store(state.FiveHour.TokensOut) - log.Info().Int64("tokens_in", state.FiveHour.TokensIn).Int64("tokens_out", state.FiveHour.TokensOut).Msg("restored 5h window tokens") - } - - if !state.SevenDay.ResetsAt.IsZero() && state.SevenDay.ResetsAt == t.sevenDay.ResetsAt { - t.sevenDay.TokensIn.Store(state.SevenDay.TokensIn) - t.sevenDay.TokensOut.Store(state.SevenDay.TokensOut) - log.Info().Int64("tokens_in", state.SevenDay.TokensIn).Int64("tokens_out", state.SevenDay.TokensOut).Msg("restored 7d window tokens") - } -} diff --git a/internal/ratelimit/tracker.go b/internal/ratelimit/tracker.go index 569b8e8..65e4fcb 100644 --- a/internal/ratelimit/tracker.go +++ b/internal/ratelimit/tracker.go @@ -5,7 +5,6 @@ import ( "net/http" "strconv" "sync" - "sync/atomic" "time" "github.com/rs/zerolog/log" @@ -15,21 +14,17 @@ import ( type Window struct { Utilization float64 // 0-100 from API ResetsAt time.Time // when window resets - TokensIn atomic.Int64 - TokensOut atomic.Int64 } // Snapshot is a read-only copy of a Window's state. type Snapshot struct { Utilization float64 ResetsAt time.Time - TokensIn int64 - TokensOut int64 } -// Tracker polls /api/oauth/usage and tracks per-window token usage. +// Tracker polls /api/oauth/usage and tracks per-window utilization. type Tracker struct { - tokenFn func() string // returns current OAuth access token + tokenFn func() string mu sync.RWMutex fiveHour Window sevenDay Window @@ -46,34 +41,30 @@ func NewTracker(tokenFn func() string) *Tracker { func (t *Tracker) Start(ctx context.Context) { go func() { t.poll(ctx) - t.Restore() for { select { case <-ctx.Done(): - t.Save() return case <-time.After(5 * time.Minute): t.poll(ctx) - t.Save() } } }() } // UpdateFromHeaders extracts rate limit data from /v1/messages response headers. -// This provides real-time utilization updates without polling the usage API. func (t *Tracker) UpdateFromHeaders(h http.Header) { t.mu.Lock() defer t.mu.Unlock() if v := h.Get("Anthropic-Ratelimit-Unified-5h-Utilization"); v != "" { if f, err := strconv.ParseFloat(v, 64); err == nil { - t.fiveHour.Utilization = f * 100 // header is 0-1, we store 0-100 + t.fiveHour.Utilization = f * 100 } } if v := h.Get("Anthropic-Ratelimit-Unified-5h-Reset"); v != "" { if ts, err := strconv.ParseInt(v, 10, 64); err == nil { - t.setResetTime(&t.fiveHour, time.Unix(ts, 0).UTC().Truncate(time.Minute), "5h") + t.fiveHour.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute) } } if v := h.Get("Anthropic-Ratelimit-Unified-7d-Utilization"); v != "" { @@ -83,51 +74,30 @@ func (t *Tracker) UpdateFromHeaders(h http.Header) { } if v := h.Get("Anthropic-Ratelimit-Unified-7d-Reset"); v != "" { if ts, err := strconv.ParseInt(v, 10, 64); err == nil { - t.setResetTime(&t.sevenDay, time.Unix(ts, 0).UTC().Truncate(time.Minute), "7d") + t.sevenDay.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute) } } } -// RecordTokens adds tokens to all active windows. -func (t *Tracker) RecordTokens(inputTokens, outputTokens int64) { - t.fiveHour.TokensIn.Add(inputTokens) - t.fiveHour.TokensOut.Add(outputTokens) - t.sevenDay.TokensIn.Add(inputTokens) - t.sevenDay.TokensOut.Add(outputTokens) -} - // FiveHour returns a snapshot of the 5-hour window. func (t *Tracker) FiveHour() Snapshot { t.mu.RLock() defer t.mu.RUnlock() - return Snapshot{ - Utilization: t.fiveHour.Utilization, - ResetsAt: t.fiveHour.ResetsAt, - TokensIn: t.fiveHour.TokensIn.Load(), - TokensOut: t.fiveHour.TokensOut.Load(), - } + return Snapshot{Utilization: t.fiveHour.Utilization, ResetsAt: t.fiveHour.ResetsAt} } // SevenDay returns a snapshot of the 7-day window. func (t *Tracker) SevenDay() Snapshot { t.mu.RLock() defer t.mu.RUnlock() - return Snapshot{ - Utilization: t.sevenDay.Utilization, - ResetsAt: t.sevenDay.ResetsAt, - TokensIn: t.sevenDay.TokensIn.Load(), - TokensOut: t.sevenDay.TokensOut.Load(), - } + return Snapshot{Utilization: t.sevenDay.Utilization, ResetsAt: t.sevenDay.ResetsAt} } // Sonnet returns a snapshot of the 7-day sonnet window. func (t *Tracker) Sonnet() Snapshot { t.mu.RLock() defer t.mu.RUnlock() - return Snapshot{ - Utilization: t.sonnet.Utilization, - ResetsAt: t.sonnet.ResetsAt, - } + return Snapshot{Utilization: t.sonnet.Utilization, ResetsAt: t.sonnet.ResetsAt} } // Extra returns the current extra usage state. @@ -153,13 +123,13 @@ func (t *Tracker) poll(ctx context.Context) { defer t.mu.Unlock() if usage.FiveHour != nil { - t.updateWindow(&t.fiveHour, usage.FiveHour, "5h") + t.updateWindow(&t.fiveHour, usage.FiveHour) } if usage.SevenDay != nil { - t.updateWindow(&t.sevenDay, usage.SevenDay, "7d") + t.updateWindow(&t.sevenDay, usage.SevenDay) } if usage.SevenDaySonnet != nil { - t.updateWindow(&t.sonnet, usage.SevenDaySonnet, "7d_sonnet") + t.updateWindow(&t.sonnet, usage.SevenDaySonnet) } if usage.ExtraUsage != nil { t.extra = *usage.ExtraUsage @@ -168,15 +138,10 @@ func (t *Tracker) poll(ctx context.Context) { log.Debug(). Float64("5h_util", t.fiveHour.Utilization). Time("5h_resets", t.fiveHour.ResetsAt). - Int64("5h_tokens_in", t.fiveHour.TokensIn.Load()). - Int64("5h_tokens_out", t.fiveHour.TokensOut.Load()). Float64("7d_util", t.sevenDay.Utilization). Time("7d_resets", t.sevenDay.ResetsAt). - Int64("7d_tokens_in", t.sevenDay.TokensIn.Load()). - Int64("7d_tokens_out", t.sevenDay.TokensOut.Load()). Msg("usage poll") - // Warn on high utilization if t.fiveHour.Utilization > 80 { log.Warn().Float64("utilization", t.fiveHour.Utilization).Time("resets_at", t.fiveHour.ResetsAt).Msg("5h window utilization high") } @@ -185,7 +150,7 @@ func (t *Tracker) poll(ctx context.Context) { } } -func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) { +func (t *Tracker) updateWindow(w *Window, rl *RateLimit) { if rl.Utilization != nil { w.Utilization = *rl.Utilization } @@ -195,22 +160,7 @@ func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) { parsed, err = time.Parse(time.RFC3339, *rl.ResetsAt) } if err == nil { - t.setResetTime(w, parsed.UTC().Truncate(time.Minute), name) + w.ResetsAt = parsed.UTC().Truncate(time.Minute) } } } - -func (t *Tracker) setResetTime(w *Window, newReset time.Time, name string) { - if !w.ResetsAt.IsZero() && newReset != w.ResetsAt { - log.Info(). - Str("window", name). - Int64("prev_tokens_in", w.TokensIn.Load()). - Int64("prev_tokens_out", w.TokensOut.Load()). - Time("old_reset", w.ResetsAt). - Time("new_reset", newReset). - Msg("window reset detected") - w.TokensIn.Store(0) - w.TokensOut.Store(0) - } - w.ResetsAt = newReset -} diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index a5f0662..795b95a 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -82,21 +82,4 @@ func InitMetrics(meter metric.Meter, tracker *ratelimit.Tracker) { }), ) - meter.Int64ObservableGauge("proxy.usage.window_tokens.input", - metric.WithDescription("Proxy input tokens in current window"), - metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { - o.Observe(tracker.FiveHour().TokensIn, metric.WithAttributes(attr5h)) - o.Observe(tracker.SevenDay().TokensIn, metric.WithAttributes(attr7d)) - return nil - }), - ) - - meter.Int64ObservableGauge("proxy.usage.window_tokens.output", - metric.WithDescription("Proxy output tokens in current window"), - metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { - o.Observe(tracker.FiveHour().TokensOut, metric.WithAttributes(attr5h)) - o.Observe(tracker.SevenDay().TokensOut, metric.WithAttributes(attr7d)) - return nil - }), - ) }