72 lines
2.2 KiB
Go
72 lines
2.2 KiB
Go
package indexer
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/riverqueue/river"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type CacheRefreshArgs struct {
|
|
Key string `json:"key"`
|
|
URL string `json:"url"`
|
|
TTLExpires time.Time `json:"ttl_expires"`
|
|
RefreshInterval time.Duration `json:"refresh_interval"`
|
|
}
|
|
|
|
func (CacheRefreshArgs) Kind() string { return "indexer_cache_refresh" }
|
|
|
|
type CacheRefreshWorker struct {
|
|
river.WorkerDefaults[CacheRefreshArgs]
|
|
Cache *IndexerCache
|
|
Indexer Indexer
|
|
RiverClient *river.Client[pgx.Tx]
|
|
}
|
|
|
|
func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error {
|
|
args := job.Args
|
|
|
|
if w.Cache == nil || w.Indexer == nil {
|
|
log.Trace().Str("key", args.Key).Msg("cache disabled, discarding refresh job")
|
|
return nil
|
|
}
|
|
|
|
log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started")
|
|
|
|
if time.Now().After(args.TTLExpires) {
|
|
w.Cache.Remove(args.Key)
|
|
log.Debug().Str("key", args.Key).Msg("cache entry TTL expired, removed")
|
|
return nil
|
|
}
|
|
|
|
log.Trace().Str("key", args.Key).Str("url", args.URL).Msg("fetching fresh data from indexer")
|
|
start := time.Now()
|
|
result, err := w.Indexer.FetchURL(args.URL)
|
|
if err != nil {
|
|
retryAt := time.Now().Add(5 * time.Minute)
|
|
log.Error().Err(err).Str("key", args.Key).Time("retry_at", retryAt).Msg("cache refresh failed, scheduling retry")
|
|
w.RiverClient.Insert(ctx, args, &river.InsertOpts{
|
|
ScheduledAt: retryAt,
|
|
})
|
|
return nil
|
|
}
|
|
log.Trace().Str("key", args.Key).Int("items", len(result.Items)).Dur("duration", time.Since(start)).Msg("fresh data fetched")
|
|
|
|
w.Cache.Update(args.Key, result)
|
|
|
|
nextRefresh := time.Now().Add(args.RefreshInterval)
|
|
_, err = w.RiverClient.Insert(ctx, args, &river.InsertOpts{
|
|
ScheduledAt: nextRefresh,
|
|
})
|
|
if err != nil {
|
|
log.Error().Err(err).Str("key", args.Key).Msg("failed to schedule next cache refresh")
|
|
} else {
|
|
log.Trace().Str("key", args.Key).Time("next_refresh", nextRefresh).Msg("next refresh scheduled")
|
|
}
|
|
|
|
log.Debug().Str("key", args.Key).Int("items", len(result.Items)).Msg("cache refreshed")
|
|
return nil
|
|
}
|