refactor(ratelimit): remove per-window token tracking from proxy
Window token counts are now computed in Grafana using the @ modifier with dashboard variables derived from proxy_usage_resets_at. This eliminates in-memory state, file persistence, and restart sensitivity. Removes: TokensIn/Out, RecordTokens, setResetTime, persist.go, window_tokens observable gauges. -171 lines.
This commit is contained in:
@@ -149,7 +149,6 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p
|
|||||||
telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs)
|
telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs)
|
||||||
telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs)
|
telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs)
|
||||||
if tracker != nil {
|
if tracker != nil {
|
||||||
tracker.RecordTokens(inputTokens, outputTokens)
|
|
||||||
tracker.UpdateFromHeaders(headers)
|
tracker.UpdateFromHeaders(headers)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -307,7 +306,6 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool
|
|||||||
telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs)
|
telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs)
|
||||||
telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs)
|
telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs)
|
||||||
if tracker != nil {
|
if tracker != nil {
|
||||||
tracker.RecordTokens(inputTokens, outputTokens)
|
|
||||||
tracker.UpdateFromHeaders(resp.Header)
|
tracker.UpdateFromHeaders(resp.Header)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,89 +0,0 @@
|
|||||||
package ratelimit
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
const stateFile = ".anthropic-proxy-state.json"
|
|
||||||
|
|
||||||
type windowState struct {
|
|
||||||
ResetsAt time.Time `json:"resets_at"`
|
|
||||||
TokensIn int64 `json:"tokens_in"`
|
|
||||||
TokensOut int64 `json:"tokens_out"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type persistedState struct {
|
|
||||||
FiveHour windowState `json:"five_hour"`
|
|
||||||
SevenDay windowState `json:"seven_day"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func statePath() string {
|
|
||||||
home, err := os.UserHomeDir()
|
|
||||||
if err != nil {
|
|
||||||
return stateFile
|
|
||||||
}
|
|
||||||
return filepath.Join(home, ".claude", stateFile)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tracker) Save() {
|
|
||||||
t.mu.RLock()
|
|
||||||
state := persistedState{
|
|
||||||
FiveHour: windowState{
|
|
||||||
ResetsAt: t.fiveHour.ResetsAt,
|
|
||||||
TokensIn: t.fiveHour.TokensIn.Load(),
|
|
||||||
TokensOut: t.fiveHour.TokensOut.Load(),
|
|
||||||
},
|
|
||||||
SevenDay: windowState{
|
|
||||||
ResetsAt: t.sevenDay.ResetsAt,
|
|
||||||
TokensIn: t.sevenDay.TokensIn.Load(),
|
|
||||||
TokensOut: t.sevenDay.TokensOut.Load(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
t.mu.RUnlock()
|
|
||||||
|
|
||||||
data, err := json.Marshal(state)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn().Err(err).Msg("failed to marshal tracker state")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
path := statePath()
|
|
||||||
if err := os.WriteFile(path, data, 0644); err != nil {
|
|
||||||
log.Warn().Err(err).Str("path", path).Msg("failed to save tracker state")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tracker) Restore() {
|
|
||||||
path := statePath()
|
|
||||||
data, err := os.ReadFile(path)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var state persistedState
|
|
||||||
if err := json.Unmarshal(data, &state); err != nil {
|
|
||||||
log.Warn().Err(err).Msg("failed to parse tracker state")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.mu.Lock()
|
|
||||||
defer t.mu.Unlock()
|
|
||||||
|
|
||||||
if !state.FiveHour.ResetsAt.IsZero() && state.FiveHour.ResetsAt == t.fiveHour.ResetsAt {
|
|
||||||
t.fiveHour.TokensIn.Store(state.FiveHour.TokensIn)
|
|
||||||
t.fiveHour.TokensOut.Store(state.FiveHour.TokensOut)
|
|
||||||
log.Info().Int64("tokens_in", state.FiveHour.TokensIn).Int64("tokens_out", state.FiveHour.TokensOut).Msg("restored 5h window tokens")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !state.SevenDay.ResetsAt.IsZero() && state.SevenDay.ResetsAt == t.sevenDay.ResetsAt {
|
|
||||||
t.sevenDay.TokensIn.Store(state.SevenDay.TokensIn)
|
|
||||||
t.sevenDay.TokensOut.Store(state.SevenDay.TokensOut)
|
|
||||||
log.Info().Int64("tokens_in", state.SevenDay.TokensIn).Int64("tokens_out", state.SevenDay.TokensOut).Msg("restored 7d window tokens")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
@@ -15,21 +14,17 @@ import (
|
|||||||
type Window struct {
|
type Window struct {
|
||||||
Utilization float64 // 0-100 from API
|
Utilization float64 // 0-100 from API
|
||||||
ResetsAt time.Time // when window resets
|
ResetsAt time.Time // when window resets
|
||||||
TokensIn atomic.Int64
|
|
||||||
TokensOut atomic.Int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot is a read-only copy of a Window's state.
|
// Snapshot is a read-only copy of a Window's state.
|
||||||
type Snapshot struct {
|
type Snapshot struct {
|
||||||
Utilization float64
|
Utilization float64
|
||||||
ResetsAt time.Time
|
ResetsAt time.Time
|
||||||
TokensIn int64
|
|
||||||
TokensOut int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tracker polls /api/oauth/usage and tracks per-window token usage.
|
// Tracker polls /api/oauth/usage and tracks per-window utilization.
|
||||||
type Tracker struct {
|
type Tracker struct {
|
||||||
tokenFn func() string // returns current OAuth access token
|
tokenFn func() string
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
fiveHour Window
|
fiveHour Window
|
||||||
sevenDay Window
|
sevenDay Window
|
||||||
@@ -46,34 +41,30 @@ func NewTracker(tokenFn func() string) *Tracker {
|
|||||||
func (t *Tracker) Start(ctx context.Context) {
|
func (t *Tracker) Start(ctx context.Context) {
|
||||||
go func() {
|
go func() {
|
||||||
t.poll(ctx)
|
t.poll(ctx)
|
||||||
t.Restore()
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
t.Save()
|
|
||||||
return
|
return
|
||||||
case <-time.After(5 * time.Minute):
|
case <-time.After(5 * time.Minute):
|
||||||
t.poll(ctx)
|
t.poll(ctx)
|
||||||
t.Save()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateFromHeaders extracts rate limit data from /v1/messages response headers.
|
// UpdateFromHeaders extracts rate limit data from /v1/messages response headers.
|
||||||
// This provides real-time utilization updates without polling the usage API.
|
|
||||||
func (t *Tracker) UpdateFromHeaders(h http.Header) {
|
func (t *Tracker) UpdateFromHeaders(h http.Header) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
if v := h.Get("Anthropic-Ratelimit-Unified-5h-Utilization"); v != "" {
|
if v := h.Get("Anthropic-Ratelimit-Unified-5h-Utilization"); v != "" {
|
||||||
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
||||||
t.fiveHour.Utilization = f * 100 // header is 0-1, we store 0-100
|
t.fiveHour.Utilization = f * 100
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if v := h.Get("Anthropic-Ratelimit-Unified-5h-Reset"); v != "" {
|
if v := h.Get("Anthropic-Ratelimit-Unified-5h-Reset"); v != "" {
|
||||||
if ts, err := strconv.ParseInt(v, 10, 64); err == nil {
|
if ts, err := strconv.ParseInt(v, 10, 64); err == nil {
|
||||||
t.setResetTime(&t.fiveHour, time.Unix(ts, 0).UTC().Truncate(time.Minute), "5h")
|
t.fiveHour.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if v := h.Get("Anthropic-Ratelimit-Unified-7d-Utilization"); v != "" {
|
if v := h.Get("Anthropic-Ratelimit-Unified-7d-Utilization"); v != "" {
|
||||||
@@ -83,51 +74,30 @@ func (t *Tracker) UpdateFromHeaders(h http.Header) {
|
|||||||
}
|
}
|
||||||
if v := h.Get("Anthropic-Ratelimit-Unified-7d-Reset"); v != "" {
|
if v := h.Get("Anthropic-Ratelimit-Unified-7d-Reset"); v != "" {
|
||||||
if ts, err := strconv.ParseInt(v, 10, 64); err == nil {
|
if ts, err := strconv.ParseInt(v, 10, 64); err == nil {
|
||||||
t.setResetTime(&t.sevenDay, time.Unix(ts, 0).UTC().Truncate(time.Minute), "7d")
|
t.sevenDay.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RecordTokens adds tokens to all active windows.
|
|
||||||
func (t *Tracker) RecordTokens(inputTokens, outputTokens int64) {
|
|
||||||
t.fiveHour.TokensIn.Add(inputTokens)
|
|
||||||
t.fiveHour.TokensOut.Add(outputTokens)
|
|
||||||
t.sevenDay.TokensIn.Add(inputTokens)
|
|
||||||
t.sevenDay.TokensOut.Add(outputTokens)
|
|
||||||
}
|
|
||||||
|
|
||||||
// FiveHour returns a snapshot of the 5-hour window.
|
// FiveHour returns a snapshot of the 5-hour window.
|
||||||
func (t *Tracker) FiveHour() Snapshot {
|
func (t *Tracker) FiveHour() Snapshot {
|
||||||
t.mu.RLock()
|
t.mu.RLock()
|
||||||
defer t.mu.RUnlock()
|
defer t.mu.RUnlock()
|
||||||
return Snapshot{
|
return Snapshot{Utilization: t.fiveHour.Utilization, ResetsAt: t.fiveHour.ResetsAt}
|
||||||
Utilization: t.fiveHour.Utilization,
|
|
||||||
ResetsAt: t.fiveHour.ResetsAt,
|
|
||||||
TokensIn: t.fiveHour.TokensIn.Load(),
|
|
||||||
TokensOut: t.fiveHour.TokensOut.Load(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SevenDay returns a snapshot of the 7-day window.
|
// SevenDay returns a snapshot of the 7-day window.
|
||||||
func (t *Tracker) SevenDay() Snapshot {
|
func (t *Tracker) SevenDay() Snapshot {
|
||||||
t.mu.RLock()
|
t.mu.RLock()
|
||||||
defer t.mu.RUnlock()
|
defer t.mu.RUnlock()
|
||||||
return Snapshot{
|
return Snapshot{Utilization: t.sevenDay.Utilization, ResetsAt: t.sevenDay.ResetsAt}
|
||||||
Utilization: t.sevenDay.Utilization,
|
|
||||||
ResetsAt: t.sevenDay.ResetsAt,
|
|
||||||
TokensIn: t.sevenDay.TokensIn.Load(),
|
|
||||||
TokensOut: t.sevenDay.TokensOut.Load(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sonnet returns a snapshot of the 7-day sonnet window.
|
// Sonnet returns a snapshot of the 7-day sonnet window.
|
||||||
func (t *Tracker) Sonnet() Snapshot {
|
func (t *Tracker) Sonnet() Snapshot {
|
||||||
t.mu.RLock()
|
t.mu.RLock()
|
||||||
defer t.mu.RUnlock()
|
defer t.mu.RUnlock()
|
||||||
return Snapshot{
|
return Snapshot{Utilization: t.sonnet.Utilization, ResetsAt: t.sonnet.ResetsAt}
|
||||||
Utilization: t.sonnet.Utilization,
|
|
||||||
ResetsAt: t.sonnet.ResetsAt,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extra returns the current extra usage state.
|
// Extra returns the current extra usage state.
|
||||||
@@ -153,13 +123,13 @@ func (t *Tracker) poll(ctx context.Context) {
|
|||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
if usage.FiveHour != nil {
|
if usage.FiveHour != nil {
|
||||||
t.updateWindow(&t.fiveHour, usage.FiveHour, "5h")
|
t.updateWindow(&t.fiveHour, usage.FiveHour)
|
||||||
}
|
}
|
||||||
if usage.SevenDay != nil {
|
if usage.SevenDay != nil {
|
||||||
t.updateWindow(&t.sevenDay, usage.SevenDay, "7d")
|
t.updateWindow(&t.sevenDay, usage.SevenDay)
|
||||||
}
|
}
|
||||||
if usage.SevenDaySonnet != nil {
|
if usage.SevenDaySonnet != nil {
|
||||||
t.updateWindow(&t.sonnet, usage.SevenDaySonnet, "7d_sonnet")
|
t.updateWindow(&t.sonnet, usage.SevenDaySonnet)
|
||||||
}
|
}
|
||||||
if usage.ExtraUsage != nil {
|
if usage.ExtraUsage != nil {
|
||||||
t.extra = *usage.ExtraUsage
|
t.extra = *usage.ExtraUsage
|
||||||
@@ -168,15 +138,10 @@ func (t *Tracker) poll(ctx context.Context) {
|
|||||||
log.Debug().
|
log.Debug().
|
||||||
Float64("5h_util", t.fiveHour.Utilization).
|
Float64("5h_util", t.fiveHour.Utilization).
|
||||||
Time("5h_resets", t.fiveHour.ResetsAt).
|
Time("5h_resets", t.fiveHour.ResetsAt).
|
||||||
Int64("5h_tokens_in", t.fiveHour.TokensIn.Load()).
|
|
||||||
Int64("5h_tokens_out", t.fiveHour.TokensOut.Load()).
|
|
||||||
Float64("7d_util", t.sevenDay.Utilization).
|
Float64("7d_util", t.sevenDay.Utilization).
|
||||||
Time("7d_resets", t.sevenDay.ResetsAt).
|
Time("7d_resets", t.sevenDay.ResetsAt).
|
||||||
Int64("7d_tokens_in", t.sevenDay.TokensIn.Load()).
|
|
||||||
Int64("7d_tokens_out", t.sevenDay.TokensOut.Load()).
|
|
||||||
Msg("usage poll")
|
Msg("usage poll")
|
||||||
|
|
||||||
// Warn on high utilization
|
|
||||||
if t.fiveHour.Utilization > 80 {
|
if t.fiveHour.Utilization > 80 {
|
||||||
log.Warn().Float64("utilization", t.fiveHour.Utilization).Time("resets_at", t.fiveHour.ResetsAt).Msg("5h window utilization high")
|
log.Warn().Float64("utilization", t.fiveHour.Utilization).Time("resets_at", t.fiveHour.ResetsAt).Msg("5h window utilization high")
|
||||||
}
|
}
|
||||||
@@ -185,7 +150,7 @@ func (t *Tracker) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) {
|
func (t *Tracker) updateWindow(w *Window, rl *RateLimit) {
|
||||||
if rl.Utilization != nil {
|
if rl.Utilization != nil {
|
||||||
w.Utilization = *rl.Utilization
|
w.Utilization = *rl.Utilization
|
||||||
}
|
}
|
||||||
@@ -195,22 +160,7 @@ func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) {
|
|||||||
parsed, err = time.Parse(time.RFC3339, *rl.ResetsAt)
|
parsed, err = time.Parse(time.RFC3339, *rl.ResetsAt)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.setResetTime(w, parsed.UTC().Truncate(time.Minute), name)
|
w.ResetsAt = parsed.UTC().Truncate(time.Minute)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Tracker) setResetTime(w *Window, newReset time.Time, name string) {
|
|
||||||
if !w.ResetsAt.IsZero() && newReset != w.ResetsAt {
|
|
||||||
log.Info().
|
|
||||||
Str("window", name).
|
|
||||||
Int64("prev_tokens_in", w.TokensIn.Load()).
|
|
||||||
Int64("prev_tokens_out", w.TokensOut.Load()).
|
|
||||||
Time("old_reset", w.ResetsAt).
|
|
||||||
Time("new_reset", newReset).
|
|
||||||
Msg("window reset detected")
|
|
||||||
w.TokensIn.Store(0)
|
|
||||||
w.TokensOut.Store(0)
|
|
||||||
}
|
|
||||||
w.ResetsAt = newReset
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -82,21 +82,4 @@ func InitMetrics(meter metric.Meter, tracker *ratelimit.Tracker) {
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
meter.Int64ObservableGauge("proxy.usage.window_tokens.input",
|
|
||||||
metric.WithDescription("Proxy input tokens in current window"),
|
|
||||||
metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error {
|
|
||||||
o.Observe(tracker.FiveHour().TokensIn, metric.WithAttributes(attr5h))
|
|
||||||
o.Observe(tracker.SevenDay().TokensIn, metric.WithAttributes(attr7d))
|
|
||||||
return nil
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
meter.Int64ObservableGauge("proxy.usage.window_tokens.output",
|
|
||||||
metric.WithDescription("Proxy output tokens in current window"),
|
|
||||||
metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error {
|
|
||||||
o.Observe(tracker.FiveHour().TokensOut, metric.WithAttributes(attr5h))
|
|
||||||
o.Observe(tracker.SevenDay().TokensOut, metric.WithAttributes(attr7d))
|
|
||||||
return nil
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user