refactor(ratelimit): remove in-memory per-window token tracking

Token counts per rate limit window are now derived in Grafana via
increase(counter[5h/168h]) on the existing cumulative OTel counters.
Removes TokensIn/Out from Window, RecordTokens, setResetTime, and
the window_tokens observable gauges.
This commit is contained in:
Alexander
2026-04-14 13:49:05 +02:00
parent 744abc1d24
commit eda66ff7d4
3 changed files with 7 additions and 62 deletions
-2
View File
@@ -149,7 +149,6 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p
telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs) telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs)
telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs) telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs)
if tracker != nil { if tracker != nil {
tracker.RecordTokens(inputTokens, outputTokens)
tracker.UpdateFromHeaders(headers) tracker.UpdateFromHeaders(headers)
} }
@@ -305,7 +304,6 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool
telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs) telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs)
telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs) telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs)
if tracker != nil { if tracker != nil {
tracker.RecordTokens(inputTokens, outputTokens)
tracker.UpdateFromHeaders(resp.Header) tracker.UpdateFromHeaders(resp.Header)
} }
} }
+7 -43
View File
@@ -5,7 +5,6 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@@ -15,16 +14,12 @@ import (
type Window struct { type Window struct {
Utilization float64 // 0-100 from API Utilization float64 // 0-100 from API
ResetsAt time.Time // when window resets ResetsAt time.Time // when window resets
TokensIn atomic.Int64
TokensOut atomic.Int64
} }
// Snapshot is a read-only copy of a Window's state. // Snapshot is a read-only copy of a Window's state.
type Snapshot struct { type Snapshot struct {
Utilization float64 Utilization float64
ResetsAt time.Time ResetsAt time.Time
TokensIn int64
TokensOut int64
} }
// Tracker polls /api/oauth/usage and tracks per-window token usage. // Tracker polls /api/oauth/usage and tracks per-window token usage.
@@ -71,7 +66,7 @@ func (t *Tracker) UpdateFromHeaders(h http.Header) {
} }
if v := h.Get("Anthropic-Ratelimit-Unified-5h-Reset"); v != "" { if v := h.Get("Anthropic-Ratelimit-Unified-5h-Reset"); v != "" {
if ts, err := strconv.ParseInt(v, 10, 64); err == nil { if ts, err := strconv.ParseInt(v, 10, 64); err == nil {
t.setResetTime(&t.fiveHour, time.Unix(ts, 0).UTC().Truncate(time.Minute), "5h") t.fiveHour.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute)
} }
} }
if v := h.Get("Anthropic-Ratelimit-Unified-7d-Utilization"); v != "" { if v := h.Get("Anthropic-Ratelimit-Unified-7d-Utilization"); v != "" {
@@ -81,19 +76,11 @@ func (t *Tracker) UpdateFromHeaders(h http.Header) {
} }
if v := h.Get("Anthropic-Ratelimit-Unified-7d-Reset"); v != "" { if v := h.Get("Anthropic-Ratelimit-Unified-7d-Reset"); v != "" {
if ts, err := strconv.ParseInt(v, 10, 64); err == nil { if ts, err := strconv.ParseInt(v, 10, 64); err == nil {
t.setResetTime(&t.sevenDay, time.Unix(ts, 0).UTC().Truncate(time.Minute), "7d") t.sevenDay.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute)
} }
} }
} }
// RecordTokens adds tokens to all active windows.
func (t *Tracker) RecordTokens(inputTokens, outputTokens int64) {
t.fiveHour.TokensIn.Add(inputTokens)
t.fiveHour.TokensOut.Add(outputTokens)
t.sevenDay.TokensIn.Add(inputTokens)
t.sevenDay.TokensOut.Add(outputTokens)
}
// FiveHour returns a snapshot of the 5-hour window. // FiveHour returns a snapshot of the 5-hour window.
func (t *Tracker) FiveHour() Snapshot { func (t *Tracker) FiveHour() Snapshot {
t.mu.RLock() t.mu.RLock()
@@ -101,8 +88,6 @@ func (t *Tracker) FiveHour() Snapshot {
return Snapshot{ return Snapshot{
Utilization: t.fiveHour.Utilization, Utilization: t.fiveHour.Utilization,
ResetsAt: t.fiveHour.ResetsAt, ResetsAt: t.fiveHour.ResetsAt,
TokensIn: t.fiveHour.TokensIn.Load(),
TokensOut: t.fiveHour.TokensOut.Load(),
} }
} }
@@ -113,8 +98,6 @@ func (t *Tracker) SevenDay() Snapshot {
return Snapshot{ return Snapshot{
Utilization: t.sevenDay.Utilization, Utilization: t.sevenDay.Utilization,
ResetsAt: t.sevenDay.ResetsAt, ResetsAt: t.sevenDay.ResetsAt,
TokensIn: t.sevenDay.TokensIn.Load(),
TokensOut: t.sevenDay.TokensOut.Load(),
} }
} }
@@ -151,13 +134,13 @@ func (t *Tracker) poll(ctx context.Context) {
defer t.mu.Unlock() defer t.mu.Unlock()
if usage.FiveHour != nil { if usage.FiveHour != nil {
t.updateWindow(&t.fiveHour, usage.FiveHour, "5h") t.updateWindow(&t.fiveHour, usage.FiveHour)
} }
if usage.SevenDay != nil { if usage.SevenDay != nil {
t.updateWindow(&t.sevenDay, usage.SevenDay, "7d") t.updateWindow(&t.sevenDay, usage.SevenDay)
} }
if usage.SevenDaySonnet != nil { if usage.SevenDaySonnet != nil {
t.updateWindow(&t.sonnet, usage.SevenDaySonnet, "7d_sonnet") t.updateWindow(&t.sonnet, usage.SevenDaySonnet)
} }
if usage.ExtraUsage != nil { if usage.ExtraUsage != nil {
t.extra = *usage.ExtraUsage t.extra = *usage.ExtraUsage
@@ -166,12 +149,8 @@ func (t *Tracker) poll(ctx context.Context) {
log.Debug(). log.Debug().
Float64("5h_util", t.fiveHour.Utilization). Float64("5h_util", t.fiveHour.Utilization).
Time("5h_resets", t.fiveHour.ResetsAt). Time("5h_resets", t.fiveHour.ResetsAt).
Int64("5h_tokens_in", t.fiveHour.TokensIn.Load()).
Int64("5h_tokens_out", t.fiveHour.TokensOut.Load()).
Float64("7d_util", t.sevenDay.Utilization). Float64("7d_util", t.sevenDay.Utilization).
Time("7d_resets", t.sevenDay.ResetsAt). Time("7d_resets", t.sevenDay.ResetsAt).
Int64("7d_tokens_in", t.sevenDay.TokensIn.Load()).
Int64("7d_tokens_out", t.sevenDay.TokensOut.Load()).
Msg("usage poll") Msg("usage poll")
// Warn on high utilization // Warn on high utilization
@@ -183,7 +162,7 @@ func (t *Tracker) poll(ctx context.Context) {
} }
} }
func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) { func (t *Tracker) updateWindow(w *Window, rl *RateLimit) {
if rl.Utilization != nil { if rl.Utilization != nil {
w.Utilization = *rl.Utilization w.Utilization = *rl.Utilization
} }
@@ -193,22 +172,7 @@ func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) {
parsed, err = time.Parse(time.RFC3339, *rl.ResetsAt) parsed, err = time.Parse(time.RFC3339, *rl.ResetsAt)
} }
if err == nil { if err == nil {
t.setResetTime(w, parsed.UTC().Truncate(time.Minute), name) w.ResetsAt = parsed.UTC().Truncate(time.Minute)
} }
} }
} }
func (t *Tracker) setResetTime(w *Window, newReset time.Time, name string) {
if !w.ResetsAt.IsZero() && newReset != w.ResetsAt {
log.Info().
Str("window", name).
Int64("prev_tokens_in", w.TokensIn.Load()).
Int64("prev_tokens_out", w.TokensOut.Load()).
Time("old_reset", w.ResetsAt).
Time("new_reset", newReset).
Msg("window reset detected")
w.TokensIn.Store(0)
w.TokensOut.Store(0)
}
w.ResetsAt = newReset
}
-17
View File
@@ -82,21 +82,4 @@ func InitMetrics(meter metric.Meter, tracker *ratelimit.Tracker) {
}), }),
) )
meter.Int64ObservableGauge("proxy.usage.window_tokens.input",
metric.WithDescription("Proxy input tokens in current window"),
metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error {
o.Observe(tracker.FiveHour().TokensIn, metric.WithAttributes(attr5h))
o.Observe(tracker.SevenDay().TokensIn, metric.WithAttributes(attr7d))
return nil
}),
)
meter.Int64ObservableGauge("proxy.usage.window_tokens.output",
metric.WithDescription("Proxy output tokens in current window"),
metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error {
o.Observe(tracker.FiveHour().TokensOut, metric.WithAttributes(attr5h))
o.Observe(tracker.SevenDay().TokensOut, metric.WithAttributes(attr7d))
return nil
}),
)
} }