Add indexer cache with River queue for scheduled refresh

This commit is contained in:
Alexander
2026-05-07 21:41:17 +02:00
parent 2041c154cf
commit 79f3f145de
22 changed files with 686 additions and 26 deletions
+79
View File
@@ -0,0 +1,79 @@
package indexer
import (
"sync"
"time"
"github.com/rs/zerolog/log"
)
type CacheEntry struct {
Key string
URL string
Result SearchResult
CreatedAt time.Time
TTL time.Duration
RefreshInterval time.Duration
}
func (e *CacheEntry) IsExpired() bool {
return time.Now().After(e.CreatedAt.Add(e.TTL))
}
type IndexerCache struct {
entries map[string]*CacheEntry
mu sync.RWMutex
}
func NewIndexerCache() *IndexerCache {
return &IndexerCache{
entries: make(map[string]*CacheEntry),
}
}
func (c *IndexerCache) Get(key string) (*CacheEntry, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
entry, ok := c.entries[key]
if !ok {
log.Trace().Str("key", key).Msg("cache miss")
return nil, false
}
if entry.IsExpired() {
log.Trace().Str("key", key).Msg("cache expired")
return nil, false
}
log.Trace().Str("key", key).Int("items", len(entry.Result.Items)).Msg("cache hit")
return entry, true
}
func (c *IndexerCache) Add(entry CacheEntry) {
c.mu.Lock()
defer c.mu.Unlock()
c.entries[entry.Key] = &entry
log.Debug().Str("key", entry.Key).Int("items", len(entry.Result.Items)).Dur("ttl", entry.TTL).Dur("refresh", entry.RefreshInterval).Msg("cache entry added")
}
func (c *IndexerCache) Update(key string, result SearchResult) {
c.mu.Lock()
defer c.mu.Unlock()
if entry, ok := c.entries[key]; ok {
entry.Result = result
log.Debug().Str("key", key).Int("items", len(result.Items)).Msg("cache entry updated")
} else {
log.Warn().Str("key", key).Msg("cache update for missing entry")
}
}
func (c *IndexerCache) Remove(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.entries, key)
log.Debug().Str("key", key).Msg("cache entry removed")
}
+65
View File
@@ -0,0 +1,65 @@
package indexer
import (
"context"
"time"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
)
type CacheRefreshArgs struct {
Key string `json:"key"`
URL string `json:"url"`
TTLExpires time.Time `json:"ttl_expires"`
RefreshInterval time.Duration `json:"refresh_interval"`
}
func (CacheRefreshArgs) Kind() string { return "indexer_cache_refresh" }
type CacheRefreshWorker struct {
river.WorkerDefaults[CacheRefreshArgs]
Cache *IndexerCache
Indexer Indexer
RiverClient *river.Client[pgx.Tx]
}
func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error {
args := job.Args
log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started")
if time.Now().After(args.TTLExpires) {
w.Cache.Remove(args.Key)
log.Debug().Str("key", args.Key).Msg("cache entry TTL expired, removed")
return nil
}
log.Trace().Str("key", args.Key).Str("url", args.URL).Msg("fetching fresh data from indexer")
start := time.Now()
result, err := w.Indexer.FetchURL(args.URL)
if err != nil {
retryAt := time.Now().Add(5 * time.Minute)
log.Error().Err(err).Str("key", args.Key).Time("retry_at", retryAt).Msg("cache refresh failed, scheduling retry")
w.RiverClient.Insert(ctx, args, &river.InsertOpts{
ScheduledAt: retryAt,
})
return nil
}
log.Trace().Str("key", args.Key).Int("items", len(result.Items)).Dur("duration", time.Since(start)).Msg("fresh data fetched")
w.Cache.Update(args.Key, result)
nextRefresh := time.Now().Add(args.RefreshInterval)
_, err = w.RiverClient.Insert(ctx, args, &river.InsertOpts{
ScheduledAt: nextRefresh,
})
if err != nil {
log.Error().Err(err).Str("key", args.Key).Msg("failed to schedule next cache refresh")
} else {
log.Trace().Str("key", args.Key).Time("next_refresh", nextRefresh).Msg("next refresh scheduled")
}
log.Debug().Str("key", args.Key).Int("items", len(result.Items)).Msg("cache refreshed")
return nil
}
+90
View File
@@ -0,0 +1,90 @@
package indexer
import (
"context"
"time"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
"homelab.lan/music-agregator/internal/config"
)
type CachedIndexer struct {
inner Indexer
cache *IndexerCache
riverClient *river.Client[pgx.Tx]
cfg config.CacheConfig
}
func NewCachedIndexer(inner Indexer, cache *IndexerCache, riverClient *river.Client[pgx.Tx], cfg config.CacheConfig) *CachedIndexer {
return &CachedIndexer{
inner: inner,
cache: cache,
riverClient: riverClient,
cfg: cfg,
}
}
func (c *CachedIndexer) Search(query string, limit int32, tracker string) (SearchResult, error) {
key := query + "|" + tracker
log.Trace().Str("key", key).Str("query", query).Str("tracker", tracker).Msg("cached indexer search")
if entry, ok := c.cache.Get(key); ok {
log.Debug().Str("key", key).Int("items", len(entry.Result.Items)).Msg("returning cached result")
return entry.Result, nil
}
log.Trace().Str("key", key).Msg("cache miss, fetching from indexer")
result, err := c.inner.Search(query, limit, tracker)
if err != nil {
log.Error().Err(err).Str("key", key).Msg("cached indexer fetch failed")
return SearchResult{}, err
}
url := c.inner.BuildSearchURL(query, limit, tracker)
log.Trace().Str("key", key).Str("url", url).Int("items", len(result.Items)).Msg("caching result")
c.cache.Add(CacheEntry{
Key: key,
URL: url,
Result: result,
CreatedAt: time.Now(),
TTL: c.cfg.TTL,
RefreshInterval: c.cfg.RefreshInterval,
})
scheduleAt := time.Now().Add(c.cfg.RefreshInterval)
_, err = c.riverClient.Insert(context.Background(), CacheRefreshArgs{
Key: key,
URL: url,
TTLExpires: time.Now().Add(c.cfg.TTL),
RefreshInterval: c.cfg.RefreshInterval,
}, &river.InsertOpts{
ScheduledAt: scheduleAt,
})
if err != nil {
log.Error().Err(err).Str("key", key).Msg("failed to schedule cache refresh job")
} else {
log.Debug().Str("key", key).Time("scheduled_at", scheduleAt).Msg("cache refresh job scheduled")
}
log.Debug().Str("key", key).Dur("ttl", c.cfg.TTL).Dur("refresh", c.cfg.RefreshInterval).Int("items", len(result.Items)).Msg("cached indexer search complete")
return result, nil
}
func (c *CachedIndexer) FetchURL(url string) (SearchResult, error) {
log.Trace().Str("url", url).Msg("cached indexer fetch URL passthrough")
return c.inner.FetchURL(url)
}
func (c *CachedIndexer) BuildSearchURL(query string, limit int32, tracker string) string {
return c.inner.BuildSearchURL(query, limit, tracker)
}
func (c *CachedIndexer) Capabilities(indexerName string) (IndexerCapabilities, error) {
log.Trace().Str("indexer", indexerName).Msg("cached indexer capabilities passthrough")
return c.inner.Capabilities(indexerName)
}
+2
View File
@@ -2,5 +2,7 @@ package indexer
type Indexer interface {
Search(query string, limit int32, indexer string) (SearchResult, error)
FetchURL(url string) (SearchResult, error)
BuildSearchURL(query string, limit int32, tracker string) string
Capabilities(indexerName string) (IndexerCapabilities, error)
}
+36 -5
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"time"
"github.com/rs/zerolog/log"
@@ -20,21 +21,42 @@ func NewIndexer(cfg config.Config) Indexer {
return &JacketIndexer{
cfg: cfg,
client: &http.Client{
Timeout: time.Second * 10,
Timeout: 60 * time.Second,
},
}
}
func (indexer *JacketIndexer) Search(query string, limit int32, tracker string) (SearchResult, error) {
func (indexer *JacketIndexer) BuildSearchURL(query string, limit int32, tracker string) string {
searchTracker := "all"
if len(tracker) != 0 {
searchTracker = tracker
}
url := indexer.cfg.Indexer.Url
uri := fmt.Sprintf("%v/api/v2.0/indexers/%v/results/torznab?apikey=%v&limit=%d&cat=3010,3040&q=%v&t=search", url, searchTracker, indexer.cfg.Indexer.ApiKey, limit, query)
uri := fmt.Sprintf("%v/api/v2.0/indexers/%v/results/torznab?apikey=%v&cat=3010,3040&q=%v&t=search",
indexer.cfg.Indexer.Url, searchTracker, indexer.cfg.Indexer.ApiKey, url.QueryEscape(query))
if limit > 0 {
uri += fmt.Sprintf("&limit=%d", limit)
}
log.Trace().Str("tracker", searchTracker).Str("query", query).Int32("limit", limit).Msg("jackett request")
return uri
}
func (indexer *JacketIndexer) Search(query string, limit int32, tracker string) (SearchResult, error) {
uri := indexer.BuildSearchURL(query, limit, tracker)
return indexer.FetchURL(uri)
}
type JackettError struct {
Code string `xml:"code,attr"`
Description string `xml:"description,attr"`
}
func (e *JackettError) Error() string {
return fmt.Sprintf("jackett error %s: %s", e.Code, e.Description)
}
func (indexer *JacketIndexer) FetchURL(uri string) (SearchResult, error) {
log.Trace().Str("uri", uri).Msg("jackett request")
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
@@ -62,6 +84,15 @@ func (indexer *JacketIndexer) Search(query string, limit int32, tracker string)
Dur("duration", time.Since(start)).
Msg("jackett response")
if resp.StatusCode != http.StatusOK {
var jackettErr JackettError
if xmlErr := xml.Unmarshal(body, &jackettErr); xmlErr == nil && jackettErr.Code != "" {
log.Error().Str("code", jackettErr.Code).Str("description", jackettErr.Description).Msg("jackett returned error")
return SearchResult{}, &jackettErr
}
return SearchResult{}, fmt.Errorf("jackett returned HTTP %d", resp.StatusCode)
}
var searchResult SearchResult
if err := xml.Unmarshal(body, &searchResult); err != nil {
log.Error().Err(err).Msg("error parsing search XML")
+8 -4
View File
@@ -3,6 +3,8 @@ package indexer
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
@@ -15,10 +17,10 @@ type IndexerServer struct {
pb.UnimplementedIndexerServiceServer
}
func NewIndexerServer(cfg config.Config) (*IndexerServer, error) {
service, err := NewIndexerService(cfg)
func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerServer, error) {
service, err := NewIndexerService(cfg, riverClient)
if err != nil {
log.Err(err).Msg("Failed to initialize IndexerService")
log.Err(err).Msg("failed to initialize IndexerService")
return nil, err
}
@@ -32,7 +34,9 @@ func (server *IndexerServer) Search(ctx context.Context, req *pb.SearchRequest)
Str("tracker", req.GetTracker()).
Msg("search started")
resp, err := server.service.Search(req)
log.Trace().Str("query", req.GetQuery()).Msg("fetching results from indexer")
resp, err := server.service.Search(req.GetQuery(), req.GetLimit(), req.GetTracker())
if err != nil {
log.Error().Err(err).Str("query", req.GetQuery()).Msg("search failed")
return nil, err
+17 -7
View File
@@ -3,6 +3,8 @@ package indexer
import (
"fmt"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
pb "homelab.lan/music-agregator/gen/music_agregator/indexer/v1"
@@ -13,20 +15,28 @@ type IndexerService struct {
indexer Indexer
}
func NewIndexerService(cfg config.Config) (*IndexerService, error) {
func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerService, error) {
var idx Indexer
switch cfg.Indexer.Type {
case config.IndexerTypeJackett:
indexer := NewIndexer(cfg)
return &IndexerService{indexer: indexer}, nil
idx = NewIndexer(cfg)
default:
return nil, fmt.Errorf("Unable to create the indexer for type: %v", cfg.Indexer.Type)
return nil, fmt.Errorf("unable to create the indexer for type: %v", cfg.Indexer.Type)
}
if cfg.Indexer.Cache.Enabled && riverClient != nil {
cache := NewIndexerCache()
idx = NewCachedIndexer(idx, cache, riverClient, cfg.Indexer.Cache)
log.Info().Dur("ttl", cfg.Indexer.Cache.TTL).Dur("refresh", cfg.Indexer.Cache.RefreshInterval).Msg("indexer cache enabled")
}
return &IndexerService{indexer: idx}, nil
}
func (service *IndexerService) Search(req *pb.SearchRequest) (*pb.SearchResponse, error) {
log.Trace().Str("query", req.GetQuery()).Msg("fetching results from indexer")
func (service *IndexerService) Search(query string, limit int32, indexer string) (*pb.SearchResponse, error) {
searchResult, err := service.indexer.Search(req.GetQuery(), req.GetLimit(), req.GetTracker())
searchResult, err := service.indexer.Search(query, limit, indexer)
if err != nil {
log.Error().Err(err).Msg("failed to search in indexer")
return nil, err