744abc1d24
UpdateFromHeaders was silently updating ResetsAt without clearing token counters. When a window rolled over, the poll method would see ResetsAt already updated and skip the reset. Extract setResetTime helper used by both code paths.
215 lines
5.6 KiB
Go
215 lines
5.6 KiB
Go
package ratelimit
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// Window holds per-window usage state.
|
|
type Window struct {
|
|
Utilization float64 // 0-100 from API
|
|
ResetsAt time.Time // when window resets
|
|
TokensIn atomic.Int64
|
|
TokensOut atomic.Int64
|
|
}
|
|
|
|
// Snapshot is a read-only copy of a Window's state.
|
|
type Snapshot struct {
|
|
Utilization float64
|
|
ResetsAt time.Time
|
|
TokensIn int64
|
|
TokensOut int64
|
|
}
|
|
|
|
// Tracker polls /api/oauth/usage and tracks per-window token usage.
|
|
type Tracker struct {
|
|
tokenFn func() string // returns current OAuth access token
|
|
mu sync.RWMutex
|
|
fiveHour Window
|
|
sevenDay Window
|
|
sonnet Window
|
|
extra ExtraUsage
|
|
}
|
|
|
|
// NewTracker creates a tracker. tokenFn should return the current access token.
|
|
func NewTracker(tokenFn func() string) *Tracker {
|
|
return &Tracker{tokenFn: tokenFn}
|
|
}
|
|
|
|
// Start begins the background poll loop.
|
|
func (t *Tracker) Start(ctx context.Context) {
|
|
go func() {
|
|
// Poll immediately on start
|
|
t.poll(ctx)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(5 * time.Minute):
|
|
t.poll(ctx)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// UpdateFromHeaders extracts rate limit data from /v1/messages response headers.
|
|
// This provides real-time utilization updates without polling the usage API.
|
|
func (t *Tracker) UpdateFromHeaders(h http.Header) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if v := h.Get("Anthropic-Ratelimit-Unified-5h-Utilization"); v != "" {
|
|
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
|
t.fiveHour.Utilization = f * 100 // header is 0-1, we store 0-100
|
|
}
|
|
}
|
|
if v := h.Get("Anthropic-Ratelimit-Unified-5h-Reset"); v != "" {
|
|
if ts, err := strconv.ParseInt(v, 10, 64); err == nil {
|
|
t.setResetTime(&t.fiveHour, time.Unix(ts, 0).UTC().Truncate(time.Minute), "5h")
|
|
}
|
|
}
|
|
if v := h.Get("Anthropic-Ratelimit-Unified-7d-Utilization"); v != "" {
|
|
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
|
t.sevenDay.Utilization = f * 100
|
|
}
|
|
}
|
|
if v := h.Get("Anthropic-Ratelimit-Unified-7d-Reset"); v != "" {
|
|
if ts, err := strconv.ParseInt(v, 10, 64); err == nil {
|
|
t.setResetTime(&t.sevenDay, time.Unix(ts, 0).UTC().Truncate(time.Minute), "7d")
|
|
}
|
|
}
|
|
}
|
|
|
|
// RecordTokens adds tokens to all active windows.
|
|
func (t *Tracker) RecordTokens(inputTokens, outputTokens int64) {
|
|
t.fiveHour.TokensIn.Add(inputTokens)
|
|
t.fiveHour.TokensOut.Add(outputTokens)
|
|
t.sevenDay.TokensIn.Add(inputTokens)
|
|
t.sevenDay.TokensOut.Add(outputTokens)
|
|
}
|
|
|
|
// FiveHour returns a snapshot of the 5-hour window.
|
|
func (t *Tracker) FiveHour() Snapshot {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
return Snapshot{
|
|
Utilization: t.fiveHour.Utilization,
|
|
ResetsAt: t.fiveHour.ResetsAt,
|
|
TokensIn: t.fiveHour.TokensIn.Load(),
|
|
TokensOut: t.fiveHour.TokensOut.Load(),
|
|
}
|
|
}
|
|
|
|
// SevenDay returns a snapshot of the 7-day window.
|
|
func (t *Tracker) SevenDay() Snapshot {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
return Snapshot{
|
|
Utilization: t.sevenDay.Utilization,
|
|
ResetsAt: t.sevenDay.ResetsAt,
|
|
TokensIn: t.sevenDay.TokensIn.Load(),
|
|
TokensOut: t.sevenDay.TokensOut.Load(),
|
|
}
|
|
}
|
|
|
|
// Sonnet returns a snapshot of the 7-day sonnet window.
|
|
func (t *Tracker) Sonnet() Snapshot {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
return Snapshot{
|
|
Utilization: t.sonnet.Utilization,
|
|
ResetsAt: t.sonnet.ResetsAt,
|
|
}
|
|
}
|
|
|
|
// Extra returns the current extra usage state.
|
|
func (t *Tracker) Extra() ExtraUsage {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
return t.extra
|
|
}
|
|
|
|
func (t *Tracker) poll(ctx context.Context) {
|
|
token := t.tokenFn()
|
|
if token == "" {
|
|
return
|
|
}
|
|
|
|
usage, err := fetchUsage(ctx, token)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("usage poll failed")
|
|
return
|
|
}
|
|
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if usage.FiveHour != nil {
|
|
t.updateWindow(&t.fiveHour, usage.FiveHour, "5h")
|
|
}
|
|
if usage.SevenDay != nil {
|
|
t.updateWindow(&t.sevenDay, usage.SevenDay, "7d")
|
|
}
|
|
if usage.SevenDaySonnet != nil {
|
|
t.updateWindow(&t.sonnet, usage.SevenDaySonnet, "7d_sonnet")
|
|
}
|
|
if usage.ExtraUsage != nil {
|
|
t.extra = *usage.ExtraUsage
|
|
}
|
|
|
|
log.Debug().
|
|
Float64("5h_util", t.fiveHour.Utilization).
|
|
Time("5h_resets", t.fiveHour.ResetsAt).
|
|
Int64("5h_tokens_in", t.fiveHour.TokensIn.Load()).
|
|
Int64("5h_tokens_out", t.fiveHour.TokensOut.Load()).
|
|
Float64("7d_util", t.sevenDay.Utilization).
|
|
Time("7d_resets", t.sevenDay.ResetsAt).
|
|
Int64("7d_tokens_in", t.sevenDay.TokensIn.Load()).
|
|
Int64("7d_tokens_out", t.sevenDay.TokensOut.Load()).
|
|
Msg("usage poll")
|
|
|
|
// Warn on high utilization
|
|
if t.fiveHour.Utilization > 80 {
|
|
log.Warn().Float64("utilization", t.fiveHour.Utilization).Time("resets_at", t.fiveHour.ResetsAt).Msg("5h window utilization high")
|
|
}
|
|
if t.sevenDay.Utilization > 80 {
|
|
log.Warn().Float64("utilization", t.sevenDay.Utilization).Time("resets_at", t.sevenDay.ResetsAt).Msg("7d window utilization high")
|
|
}
|
|
}
|
|
|
|
func (t *Tracker) updateWindow(w *Window, rl *RateLimit, name string) {
|
|
if rl.Utilization != nil {
|
|
w.Utilization = *rl.Utilization
|
|
}
|
|
if rl.ResetsAt != nil {
|
|
parsed, err := time.Parse(time.RFC3339Nano, *rl.ResetsAt)
|
|
if err != nil {
|
|
parsed, err = time.Parse(time.RFC3339, *rl.ResetsAt)
|
|
}
|
|
if err == nil {
|
|
t.setResetTime(w, parsed.UTC().Truncate(time.Minute), name)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *Tracker) setResetTime(w *Window, newReset time.Time, name string) {
|
|
if !w.ResetsAt.IsZero() && newReset != w.ResetsAt {
|
|
log.Info().
|
|
Str("window", name).
|
|
Int64("prev_tokens_in", w.TokensIn.Load()).
|
|
Int64("prev_tokens_out", w.TokensOut.Load()).
|
|
Time("old_reset", w.ResetsAt).
|
|
Time("new_reset", newReset).
|
|
Msg("window reset detected")
|
|
w.TokensIn.Store(0)
|
|
w.TokensOut.Store(0)
|
|
}
|
|
w.ResetsAt = newReset
|
|
}
|