diff --git a/internal/proxy/handler.go b/internal/proxy/handler.go index 1f01e3a..146d92e 100644 --- a/internal/proxy/handler.go +++ b/internal/proxy/handler.go @@ -149,6 +149,7 @@ func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, p telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs) telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs) if tracker != nil { + tracker.RecordTokens(inputTokens, outputTokens) tracker.UpdateFromHeaders(headers) } @@ -304,6 +305,7 @@ func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool telemetry.TokensInput.Add(ctx, inputTokens, tokenAttrs) telemetry.TokensOutput.Add(ctx, outputTokens, tokenAttrs) if tracker != nil { + tracker.RecordTokens(inputTokens, outputTokens) tracker.UpdateFromHeaders(resp.Header) } } diff --git a/internal/ratelimit/tracker.go b/internal/ratelimit/tracker.go index 7ae154a..42b5fdc 100644 --- a/internal/ratelimit/tracker.go +++ b/internal/ratelimit/tracker.go @@ -5,6 +5,7 @@ import ( "net/http" "strconv" "sync" + "sync/atomic" "time" "github.com/rs/zerolog/log" @@ -14,12 +15,16 @@ import ( type Window struct { Utilization float64 // 0-100 from API ResetsAt time.Time // when window resets + TokensIn atomic.Int64 + TokensOut atomic.Int64 } // Snapshot is a read-only copy of a Window's state. type Snapshot struct { Utilization float64 ResetsAt time.Time + TokensIn int64 + TokensOut int64 } // Tracker polls /api/oauth/usage and tracks per-window token usage. @@ -66,7 +71,7 @@ func (t *Tracker) UpdateFromHeaders(h http.Header) { } if v := h.Get("Anthropic-Ratelimit-Unified-5h-Reset"); v != "" { if ts, err := strconv.ParseInt(v, 10, 64); err == nil { - t.fiveHour.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute) + t.setResetTime(&t.fiveHour, time.Unix(ts, 0).UTC().Truncate(time.Minute), "5h") } } if v := h.Get("Anthropic-Ratelimit-Unified-7d-Utilization"); v != "" { @@ -76,11 +81,19 @@ func (t *Tracker) UpdateFromHeaders(h http.Header) { } if v := h.Get("Anthropic-Ratelimit-Unified-7d-Reset"); v != "" { if ts, err := strconv.ParseInt(v, 10, 64); err == nil { - t.sevenDay.ResetsAt = time.Unix(ts, 0).UTC().Truncate(time.Minute) + t.setResetTime(&t.sevenDay, time.Unix(ts, 0).UTC().Truncate(time.Minute), "7d") } } } +// RecordTokens adds tokens to all active windows. +func (t *Tracker) RecordTokens(inputTokens, outputTokens int64) { + t.fiveHour.TokensIn.Add(inputTokens) + t.fiveHour.TokensOut.Add(outputTokens) + t.sevenDay.TokensIn.Add(inputTokens) + t.sevenDay.TokensOut.Add(outputTokens) +} + // FiveHour returns a snapshot of the 5-hour window. func (t *Tracker) FiveHour() Snapshot { t.mu.RLock() @@ -88,6 +101,8 @@ func (t *Tracker) FiveHour() Snapshot { return Snapshot{ Utilization: t.fiveHour.Utilization, ResetsAt: t.fiveHour.ResetsAt, + TokensIn: t.fiveHour.TokensIn.Load(), + TokensOut: t.fiveHour.TokensOut.Load(), } } @@ -98,6 +113,8 @@ func (t *Tracker) SevenDay() Snapshot { return Snapshot{ Utilization: t.sevenDay.Utilization, ResetsAt: t.sevenDay.ResetsAt, + TokensIn: t.sevenDay.TokensIn.Load(), + TokensOut: t.sevenDay.TokensOut.Load(), } } @@ -134,13 +151,13 @@ func (t *Tracker) poll(ctx context.Context) { defer t.mu.Unlock() if usage.FiveHour != nil { - t.updateWindow(&t.fiveHour, usage.FiveHour) + t.updateWindow(&t.fiveHour, usage.FiveHour, "5h") } if usage.SevenDay != nil { - t.updateWindow(&t.sevenDay, usage.SevenDay) + t.updateWindow(&t.sevenDay, usage.SevenDay, "7d") } if usage.SevenDaySonnet != nil { - t.updateWindow(&t.sonnet, usage.SevenDaySonnet) + t.updateWindow(&t.sonnet, usage.SevenDaySonnet, "7d_sonnet") } if usage.ExtraUsage != nil { t.extra = *usage.ExtraUsage @@ -149,8 +166,12 @@ func (t *Tracker) poll(ctx context.Context) { log.Debug(). Float64("5h_util", t.fiveHour.Utilization). Time("5h_resets", t.fiveHour.ResetsAt). + Int64("5h_tokens_in", t.fiveHour.TokensIn.Load()). + Int64("5h_tokens_out", t.fiveHour.TokensOut.Load()). Float64("7d_util", t.sevenDay.Utilization). Time("7d_resets", t.sevenDay.ResetsAt). + Int64("7d_tokens_in", t.sevenDay.TokensIn.Load()). + Int64("7d_tokens_out", t.sevenDay.TokensOut.Load()). Msg("usage poll") // Warn on high utilization @@ -162,7 +183,7 @@ func (t *Tracker) poll(ctx context.Context) { } } -func (t *Tracker) updateWindow(w *Window, rl *RateLimit) { +func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) { if rl.Utilization != nil { w.Utilization = *rl.Utilization } @@ -172,7 +193,22 @@ func (t *Tracker) updateWindow(w *Window, rl *RateLimit) { parsed, err = time.Parse(time.RFC3339, *rl.ResetsAt) } if err == nil { - w.ResetsAt = parsed.UTC().Truncate(time.Minute) + t.setResetTime(w, parsed.UTC().Truncate(time.Minute), name) } } } + +func (t *Tracker) setResetTime(w *Window, newReset time.Time, name string) { + if !w.ResetsAt.IsZero() && newReset != w.ResetsAt { + log.Info(). + Str("window", name). + Int64("prev_tokens_in", w.TokensIn.Load()). + Int64("prev_tokens_out", w.TokensOut.Load()). + Time("old_reset", w.ResetsAt). + Time("new_reset", newReset). + Msg("window reset detected") + w.TokensIn.Store(0) + w.TokensOut.Store(0) + } + w.ResetsAt = newReset +} diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 795b95a..a5f0662 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -82,4 +82,21 @@ func InitMetrics(meter metric.Meter, tracker *ratelimit.Tracker) { }), ) + meter.Int64ObservableGauge("proxy.usage.window_tokens.input", + metric.WithDescription("Proxy input tokens in current window"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(tracker.FiveHour().TokensIn, metric.WithAttributes(attr5h)) + o.Observe(tracker.SevenDay().TokensIn, metric.WithAttributes(attr7d)) + return nil + }), + ) + + meter.Int64ObservableGauge("proxy.usage.window_tokens.output", + metric.WithDescription("Proxy output tokens in current window"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(tracker.FiveHour().TokensOut, metric.WithAttributes(attr5h)) + o.Observe(tracker.SevenDay().TokensOut, metric.WithAttributes(attr7d)) + return nil + }), + ) }