Persist metadata to DB, poll download worker, metadata service layer

This commit is contained in:
Alexander
2026-05-08 11:00:04 +02:00
parent 66264e1314
commit 60c94935b2
10 changed files with 489 additions and 58 deletions
+22
View File
@@ -0,0 +1,22 @@
meta {
name: Get Album
type: grpc
seq: 4
}
grpc {
url: localhost:3000
method: /metadata.v1.MetadataService/GetAlbum
body: grpc
auth: inherit
methodType: unary
}
body:grpc {
name: message 1
content: '''
{
"id": "a0b7b436-94db-4df6-8c5f-bc0e5932a90e"
}
'''
}
+38 -20
View File
@@ -11,7 +11,6 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/riverqueue/river" "github.com/riverqueue/river"
@@ -28,10 +27,12 @@ import (
"homelab.lan/music-agregator/internal" "homelab.lan/music-agregator/internal"
"homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/config"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/hello" "homelab.lan/music-agregator/internal/hello"
"homelab.lan/music-agregator/internal/indexer" "homelab.lan/music-agregator/internal/indexer"
"homelab.lan/music-agregator/internal/metadata" "homelab.lan/music-agregator/internal/metadata"
"homelab.lan/music-agregator/internal/torrent" "homelab.lan/music-agregator/internal/torrent"
"homelab.lan/music-agregator/internal/workers"
) )
func main() { func main() {
@@ -68,36 +69,54 @@ func interceptorLogger(l zerolog.Logger) logging.Logger {
}) })
} }
func setupRiver(ctx context.Context, cfg config.Config) (*river.Client[pgx.Tx], *pgxpool.Pool) { func setupDatabase(ctx context.Context, cfg config.Config) *database.DB {
if !cfg.Indexer.Cache.Enabled { db, err := database.New(ctx, cfg.Database.URL)
return nil, nil
}
dbPool, err := pgxpool.New(ctx, cfg.Database.URL)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("failed to connect to database for River") log.Fatal().Err(err).Msg("failed to connect to database")
}
return db
}
type riverSetup struct {
client *river.Client[pgx.Tx]
cacheRefreshWorker *indexer.CacheRefreshWorker
}
func setupRiver(ctx context.Context, cfg config.Config, db *database.DB) *riverSetup {
cacheWorker := &indexer.CacheRefreshWorker{}
pollWorker := &workers.PollDownloadWorker{
Downloads: database.NewDownloadRepository(db.Pool),
DownloadFiles: database.NewDownloadFileRepository(db.Pool),
TorrentClient: torrent.MustNewTorrentClient(cfg),
} }
workers := river.NewWorkers() riverWorkers := river.NewWorkers()
river.AddWorker(workers, &indexer.CacheRefreshWorker{}) river.AddWorker(riverWorkers, cacheWorker)
river.AddWorker(riverWorkers, pollWorker)
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ riverClient, err := river.NewClient(riverpgxv5.New(db.Pool), &river.Config{
Queues: map[string]river.QueueConfig{ Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 2}, river.QueueDefault: {MaxWorkers: 4},
}, },
Workers: workers, Workers: riverWorkers,
}) })
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("failed to create River client") log.Fatal().Err(err).Msg("failed to create River client")
} }
cacheWorker.RiverClient = riverClient
pollWorker.RiverClient = riverClient
if err := riverClient.Start(ctx); err != nil { if err := riverClient.Start(ctx); err != nil {
log.Fatal().Err(err).Msg("failed to start River client") log.Fatal().Err(err).Msg("failed to start River client")
} }
log.Info().Msg("River queue started") log.Info().Msg("River queue started")
return riverClient, dbPool return &riverSetup{
client: riverClient,
cacheRefreshWorker: cacheWorker,
}
} }
func serveGrpc(config config.Config) { func serveGrpc(config config.Config) {
@@ -132,16 +151,15 @@ func serveGrpc(config config.Config) {
) )
ctx := context.Background() ctx := context.Background()
riverClient, dbPool := setupRiver(ctx, config) db := setupDatabase(ctx, config)
if dbPool != nil { defer db.Close()
defer dbPool.Close() rs := setupRiver(ctx, config, db)
}
musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, riverClient) musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, rs.client, db)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("failed to create MusicAgregatorServer") log.Fatal().Err(err).Msg("failed to create MusicAgregatorServer")
} }
indexerServer, err := indexer.NewIndexerServer(config, riverClient) indexerServer, err := indexer.NewIndexerServer(config, rs.client, rs.cacheRefreshWorker)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("failed to create IndexerServer") log.Fatal().Err(err).Msg("failed to create IndexerServer")
} }
+6
View File
@@ -27,6 +27,12 @@ type CacheRefreshWorker struct {
func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error { func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error {
args := job.Args args := job.Args
if w.Cache == nil || w.Indexer == nil {
log.Trace().Str("key", args.Key).Msg("cache disabled, discarding refresh job")
return nil
}
log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started") log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started")
if time.Now().After(args.TTLExpires) { if time.Now().After(args.TTLExpires) {
+2 -2
View File
@@ -17,8 +17,8 @@ type IndexerServer struct {
pb.UnimplementedIndexerServiceServer pb.UnimplementedIndexerServiceServer
} }
func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerServer, error) { func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx], cacheWorker *CacheRefreshWorker) (*IndexerServer, error) {
service, err := NewIndexerService(cfg, riverClient) service, err := NewIndexerService(cfg, riverClient, cacheWorker)
if err != nil { if err != nil {
log.Err(err).Msg("failed to initialize IndexerService") log.Err(err).Msg("failed to initialize IndexerService")
return nil, err return nil, err
+7 -1
View File
@@ -15,7 +15,7 @@ type IndexerService struct {
indexer Indexer indexer Indexer
} }
func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerService, error) { func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx], cacheWorker *CacheRefreshWorker) (*IndexerService, error) {
var idx Indexer var idx Indexer
switch cfg.Indexer.Type { switch cfg.Indexer.Type {
@@ -28,6 +28,12 @@ func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*I
if cfg.Indexer.Cache.Enabled && riverClient != nil { if cfg.Indexer.Cache.Enabled && riverClient != nil {
cache := NewIndexerCache() cache := NewIndexerCache()
idx = NewCachedIndexer(idx, cache, riverClient, cfg.Indexer.Cache) idx = NewCachedIndexer(idx, cache, riverClient, cfg.Indexer.Cache)
if cacheWorker != nil {
cacheWorker.Cache = cache
cacheWorker.Indexer = idx
}
log.Info().Dur("ttl", cfg.Indexer.Cache.TTL).Dur("refresh", cfg.Indexer.Cache.RefreshInterval).Msg("indexer cache enabled") log.Info().Dur("ttl", cfg.Indexer.Cache.TTL).Dur("refresh", cfg.Indexer.Cache.RefreshInterval).Msg("indexer cache enabled")
} }
+116
View File
@@ -0,0 +1,116 @@
package metadata
import (
"context"
"fmt"
"github.com/rs/zerolog/log"
metadataPb "homelab.lan/music-agregator/gen/metadata/v1"
"homelab.lan/music-agregator/internal/database"
)
type MetadataService struct {
client metadataPb.MetadataServiceClient
artists *database.ArtistRepository
albums *database.AlbumRepository
}
func NewMetadataService(client metadataPb.MetadataServiceClient, db *database.DB) *MetadataService {
return &MetadataService{
client: client,
artists: database.NewArtistRepository(db.Pool),
albums: database.NewAlbumRepository(db.Pool),
}
}
func (s *MetadataService) GetAlbum(ctx context.Context, albumID string) (*metadataPb.Album, error) {
resp, err := s.client.GetAlbum(ctx, &metadataPb.GetAlbumRequest{
Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID},
})
if err != nil {
return nil, fmt.Errorf("fetching album: %w", err)
}
album := resp.GetAlbum()
if _, err := s.albums.GetByExternalID(ctx, album.GetId()); err != nil {
s.persistArtist(ctx, album)
s.persistAlbum(ctx, album)
}
return album, nil
}
func (s *MetadataService) GetArtistByExternalID(ctx context.Context, externalID string) (*database.Artist, error) {
return s.artists.GetByExternalID(ctx, externalID)
}
func (s *MetadataService) GetAlbumByExternalID(ctx context.Context, externalID string) (*database.Album, error) {
return s.albums.GetByExternalID(ctx, externalID)
}
func (s *MetadataService) persistArtist(ctx context.Context, album *metadataPb.Album) {
if len(album.GetArtists()) == 0 {
return
}
artist := album.GetArtists()[0].GetArtist()
var genres []string
for _, g := range artist.GetGenres() {
genres = append(genres, g.GetName())
}
err := s.artists.Create(ctx, &database.Artist{
ExternalID: artist.GetId(),
Name: artist.GetName(),
ArtistType: artist.GetArtistType(),
Country: artist.GetCountry(),
Genres: genres,
ImageURL: artist.GetImageUrl(),
})
if err != nil {
log.Warn().Err(err).Str("name", artist.GetName()).Msg("failed to persist artist")
}
}
func (s *MetadataService) persistAlbum(ctx context.Context, album *metadataPb.Album) {
artistID := ""
if len(album.GetArtists()) > 0 {
a, err := s.artists.GetByExternalID(ctx, album.GetArtists()[0].GetArtist().GetId())
if err == nil {
artistID = a.ID
}
}
if artistID == "" {
log.Trace().Str("album", album.GetTitle()).Msg("skipping album persist, no artist in DB")
return
}
var genres []string
for _, g := range album.GetGenres() {
genres = append(genres, g.GetName())
}
labelName := ""
if album.GetLabel() != nil {
labelName = album.GetLabel().GetName()
}
err := s.albums.Create(ctx, &database.Album{
ExternalID: album.GetId(),
ArtistID: artistID,
Title: album.GetTitle(),
AlbumType: album.GetAlbumType(),
TotalTracks: int(album.GetTotalTracks()),
TotalDiscs: int(album.GetTotalDiscs()),
Label: labelName,
Genres: genres,
CoverURL: album.GetCoverUrl(),
IsMonitored: true,
})
if err != nil {
log.Warn().Err(err).Str("title", album.GetTitle()).Msg("failed to persist album")
}
}
+3 -2
View File
@@ -10,6 +10,7 @@ import (
pb "homelab.lan/music-agregator/gen/music_agregator/v1" pb "homelab.lan/music-agregator/gen/music_agregator/v1"
"homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/config"
"homelab.lan/music-agregator/internal/database"
) )
type MusicAgregatorServer struct { type MusicAgregatorServer struct {
@@ -17,8 +18,8 @@ type MusicAgregatorServer struct {
pb.UnimplementedMusicAgregatorServiceServer pb.UnimplementedMusicAgregatorServiceServer
} }
func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorServer, error) { func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx], db *database.DB) (*MusicAgregatorServer, error) {
service, err := NewMusicAgregatorService(cfg, riverClient) service, err := NewMusicAgregatorService(cfg, riverClient, db)
if err != nil { if err != nil {
log.Err(err).Msg("failed to create MusicAgregatorService") log.Err(err).Msg("failed to create MusicAgregatorService")
return nil, err return nil, err
+111 -33
View File
@@ -11,17 +11,18 @@ import (
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/riverqueue/river" "github.com/riverqueue/river"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"google.golang.org/grpc"
metadataPb "homelab.lan/music-agregator/gen/metadata/v1" metadataPb "homelab.lan/music-agregator/gen/metadata/v1"
pb "homelab.lan/music-agregator/gen/music_agregator/v1" pb "homelab.lan/music-agregator/gen/music_agregator/v1"
"homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/config"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/indexer" "homelab.lan/music-agregator/internal/indexer"
"homelab.lan/music-agregator/internal/metadata" "homelab.lan/music-agregator/internal/metadata"
"homelab.lan/music-agregator/internal/release" "homelab.lan/music-agregator/internal/release"
"homelab.lan/music-agregator/internal/torrent" "homelab.lan/music-agregator/internal/torrent"
torrentParser "homelab.lan/music-agregator/internal/tracker" torrentParser "homelab.lan/music-agregator/internal/tracker"
"homelab.lan/music-agregator/internal/workers"
) )
type parsedItem struct { type parsedItem struct {
@@ -32,21 +33,23 @@ type parsedItem struct {
type MusicAgregatorService struct { type MusicAgregatorService struct {
config config.Config config config.Config
metadataClient metadataPb.MetadataServiceClient metadata *metadata.MetadataService
metadataConn *grpc.ClientConn
indexer *indexer.IndexerService indexer *indexer.IndexerService
torrentClient torrent.TorrentClient torrentClient torrent.TorrentClient
magnetResolver *torrentParser.MagnetResolver magnetResolver *torrentParser.MagnetResolver
riverClient *river.Client[pgx.Tx]
torrents *database.TorrentRepository
downloads *database.DownloadRepository
} }
func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorService, error) { func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx], db *database.DB) (*MusicAgregatorService, error) {
indexer, err := indexer.NewIndexerService(cfg, riverClient) idx, err := indexer.NewIndexerService(cfg, riverClient, nil)
if err != nil { if err != nil {
log.Err(err).Msg("failed to create IndexerService") log.Err(err).Msg("failed to create IndexerService")
return nil, err return nil, err
} }
metadataClient, conn, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint) metadataClient, _, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint)
if err != nil { if err != nil {
log.Err(err).Msg("failed to create metadata client") log.Err(err).Msg("failed to create metadata client")
return nil, err return nil, err
@@ -66,29 +69,39 @@ func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.T
return &MusicAgregatorService{ return &MusicAgregatorService{
config: cfg, config: cfg,
metadataClient: metadataClient, metadata: metadata.NewMetadataService(metadataClient, db),
metadataConn: conn, indexer: idx,
indexer: indexer,
torrentClient: torrentClient, torrentClient: torrentClient,
magnetResolver: magnetResolver, magnetResolver: magnetResolver,
riverClient: riverClient,
torrents: database.NewTorrentRepository(db.Pool),
downloads: database.NewDownloadRepository(db.Pool),
}, nil }, nil
} }
func (s *MusicAgregatorService) Close() { func (s *MusicAgregatorService) Close() {
if s.metadataConn != nil {
s.metadataConn.Close()
}
if s.magnetResolver != nil { if s.magnetResolver != nil {
s.magnetResolver.Close() s.magnetResolver.Close()
} }
} }
func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) { func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) {
album, err := service.fetchAlbumMetadata(ctx, req.GetAlbumId()) album, err := service.metadata.GetAlbum(ctx, req.GetAlbumId())
if err != nil { if err != nil {
log.Error().Err(err).Str("album_id", req.GetAlbumId()).Msg("failed to get album")
return nil, err return nil, err
} }
dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, req.GetAlbumId())
if dbAlbum != nil {
qualityStr := normalizeQuality(req.GetQuality(), 0, 0)
owned, err := service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, req.GetQuality().String(), qualityStr)
if err == nil && owned {
log.Info().Str("album", dbAlbum.Title).Str("quality", qualityStr).Msg("album already owned in requested quality")
return &pb.MonitorAlbumResponse{}, nil
}
}
searchResult, err := service.searchIndexer(album, req.GetIndexerOptions().GetTracker()) searchResult, err := service.searchIndexer(album, req.GetIndexerOptions().GetTracker())
if err != nil { if err != nil {
return nil, err return nil, err
@@ -108,31 +121,17 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.
return nil, err return nil, err
} }
if dbAlbum != nil {
service.saveTorrentAndDownload(ctx, dbAlbum.ID, best)
} else {
log.Warn().Str("album_id", req.GetAlbumId()).Msg("album not in DB, skipping torrent/download persistence")
}
return &pb.MonitorAlbumResponse{ return &pb.MonitorAlbumResponse{
Release: buildMonitoredRelease(best), Release: buildMonitoredRelease(best),
}, nil }, nil
} }
func (service *MusicAgregatorService) fetchAlbumMetadata(ctx context.Context, albumID string) (*metadataPb.Album, error) {
resp, err := service.metadataClient.GetAlbum(ctx, &metadataPb.GetAlbumRequest{
Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID},
})
if err != nil {
log.Error().Err(err).Str("album_id", albumID).Msg("metadata GetAlbum failed")
return nil, err
}
album := resp.GetAlbum()
artistName := ""
if len(album.GetArtists()) > 0 {
artistName = album.GetArtists()[0].GetArtist().GetName()
}
log.Debug().Str("album_id", albumID).Str("title", album.GetTitle()).Str("artist", artistName).Msg("album metadata fetched")
return album, nil
}
func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) { func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) {
artistName := "" artistName := ""
if len(album.GetArtists()) > 0 { if len(album.GetArtists()) > 0 {
@@ -277,6 +276,85 @@ func (service *MusicAgregatorService) addToTorrentClient(best parsedItem) error
return nil return nil
} }
func (service *MusicAgregatorService) saveTorrentAndDownload(ctx context.Context, dbAlbumID string, best parsedItem) {
quality := normalizeQuality(pb.QualityType_QUALITY_UNSPECIFIED, best.rel.BitDepth, best.rel.SampleRate)
dbTorrent := &database.Torrent{
AlbumID: dbAlbumID,
InfoHash: best.rel.InfoHash,
Tracker: best.item.Tracker,
Title: best.item.Title,
Format: best.rel.Format.String(),
Quality: quality,
Source: best.rel.Source.String(),
BitDepth: best.rel.BitDepth,
SampleRate: best.rel.SampleRate,
Seeders: best.item.Seeders,
Peers: best.item.Peers,
Size: best.rel.TotalAudioSize,
TrackCount: best.rel.TrackCount,
HasCoverArt: best.rel.HasCoverArt,
HasCueSheet: best.rel.HasCueSheet,
HasRipLog: best.rel.HasRipLog,
DownloadLink: best.item.DownloadLink,
TorrentFile: best.torrentData,
}
if err := service.torrents.Create(ctx, dbTorrent); err != nil {
log.Error().Err(err).Str("hash", best.rel.InfoHash).Msg("failed to save torrent to DB")
return
}
savedTorrent, err := service.torrents.GetByInfoHash(ctx, best.rel.InfoHash)
if err != nil {
log.Error().Err(err).Msg("failed to retrieve saved torrent")
return
}
download := &database.Download{
TorrentID: savedTorrent.ID,
AlbumID: dbAlbumID,
Format: best.rel.Format.String(),
Quality: quality,
State: "downloading",
QbitHash: best.rel.InfoHash,
}
if err := service.downloads.Create(ctx, download); err != nil {
log.Error().Err(err).Msg("failed to save download to DB")
return
}
if service.riverClient != nil {
_, err := service.riverClient.Insert(ctx, workers.PollDownloadArgs{
DownloadID: download.ID,
TorrentHash: best.rel.InfoHash,
CheckInterval: 30 * time.Second,
}, &river.InsertOpts{
ScheduledAt: time.Now().Add(30 * time.Second),
})
if err != nil {
log.Error().Err(err).Msg("failed to schedule download poll job")
} else {
log.Debug().Str("download_id", download.ID).Str("hash", best.rel.InfoHash).Msg("download poll job scheduled")
}
}
log.Info().Str("hash", best.rel.InfoHash).Str("download_id", download.ID).Msg("torrent and download saved to DB")
}
func normalizeQuality(quality pb.QualityType, bitDepth int, sampleRate int) string {
if bitDepth > 0 && sampleRate > 0 {
return fmt.Sprintf("%d-%d", bitDepth, sampleRate/1000)
}
switch quality {
case pb.QualityType_QUALITY_LOSSLESS:
return "16-44"
case pb.QualityType_QUALITY_LOSSY:
return "320"
default:
return ""
}
}
func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease { func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease {
return &pb.MonitoredRelease{ return &pb.MonitoredRelease{
InfoHash: p.rel.InfoHash, InfoHash: p.rel.InfoHash,
+8
View File
@@ -8,6 +8,14 @@ import (
"homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/config"
) )
func MustNewTorrentClient(cfg config.Config) TorrentClient {
client, err := NewTorrentClient(cfg)
if err != nil {
panic(fmt.Sprintf("failed to create torrent client: %v", err))
}
return client
}
func NewTorrentClient(cfg config.Config) (TorrentClient, error) { func NewTorrentClient(cfg config.Config) (TorrentClient, error) {
var client TorrentClient var client TorrentClient
+176
View File
@@ -0,0 +1,176 @@
package workers
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/torrent"
)
type PollDownloadArgs struct {
DownloadID string `json:"download_id"`
TorrentHash string `json:"torrent_hash"`
CheckInterval time.Duration `json:"check_interval"`
}
func (PollDownloadArgs) Kind() string { return "poll_download" }
type PollDownloadWorker struct {
river.WorkerDefaults[PollDownloadArgs]
TorrentClient torrent.TorrentClient
Downloads *database.DownloadRepository
DownloadFiles *database.DownloadFileRepository
RiverClient *river.Client[pgx.Tx]
}
func (w *PollDownloadWorker) Work(ctx context.Context, job *river.Job[PollDownloadArgs]) error {
args := job.Args
log.Trace().Str("download_id", args.DownloadID).Str("hash", args.TorrentHash).Msg("polling download status")
results, err := w.TorrentClient.Find(torrent.FindOptions{Hash: args.TorrentHash})
if err != nil {
log.Error().Err(err).Str("hash", args.TorrentHash).Msg("failed to query torrent client")
return w.reschedule(ctx, args)
}
if len(results) == 0 {
log.Warn().Str("hash", args.TorrentHash).Msg("torrent not found in client, marking failed")
w.Downloads.SetFailed(ctx, args.DownloadID, "torrent not found in client")
return nil
}
t := results[0]
switch {
case t.Progress >= 1.0:
return w.onCompleted(ctx, args, t)
case t.State == "error":
log.Warn().Str("hash", args.TorrentHash).Str("state", t.State).Msg("torrent in error state")
w.Downloads.SetFailed(ctx, args.DownloadID, "torrent error state")
return nil
default:
log.Trace().
Str("hash", args.TorrentHash).
Str("state", t.State).
Float64("progress", t.Progress*100).
Int64("dlspeed", t.DlSpeed).
Msg("download in progress")
return w.reschedule(ctx, args)
}
}
func (w *PollDownloadWorker) onCompleted(ctx context.Context, args PollDownloadArgs, t torrent.TorrentInfo) error {
log.Info().Str("hash", args.TorrentHash).Str("path", t.ContentPath).Msg("download completed")
if err := w.Downloads.SetCompleted(ctx, args.DownloadID, t.SavePath); err != nil {
log.Error().Err(err).Msg("failed to update download as completed")
return err
}
files, err := scanAndHashFiles(t.ContentPath)
if err != nil {
log.Error().Err(err).Str("path", t.ContentPath).Msg("failed to scan downloaded files")
return nil
}
for _, f := range files {
f.DownloadID = args.DownloadID
}
if err := w.DownloadFiles.CreateBatch(ctx, files); err != nil {
log.Error().Err(err).Msg("failed to save download files")
return nil
}
log.Info().
Str("download_id", args.DownloadID).
Int("files", len(files)).
Msg("download files scanned and hashed")
return nil
}
func (w *PollDownloadWorker) reschedule(ctx context.Context, args PollDownloadArgs) error {
_, err := w.RiverClient.Insert(ctx, args, &river.InsertOpts{
ScheduledAt: time.Now().Add(args.CheckInterval),
})
if err != nil {
log.Error().Err(err).Msg("failed to reschedule poll_download")
}
return nil
}
var audioExtensions = map[string]bool{
".flac": true, ".mp3": true, ".aac": true, ".m4a": true,
".ape": true, ".wv": true, ".ogg": true, ".wav": true, ".alac": true,
}
func scanAndHashFiles(rootPath string) ([]*database.DownloadFile, error) {
var files []*database.DownloadFile
err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return err
}
ext := strings.ToLower(filepath.Ext(path))
relPath, _ := filepath.Rel(rootPath, path)
fileType := strings.TrimPrefix(ext, ".")
if fileType == "" {
return nil
}
df := &database.DownloadFile{
FilePath: relPath,
FileSize: info.Size(),
FileType: fileType,
}
if audioExtensions[ext] || ext == ".cue" || ext == ".log" {
hash, err := hashFile(path)
if err != nil {
log.Warn().Err(err).Str("path", path).Msg("failed to hash file")
} else {
df.SHA256Hash = hash
now := time.Now()
df.VerifiedAt = &now
}
}
files = append(files, df)
return nil
})
return files, err
}
func hashFile(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", fmt.Errorf("opening file: %w", err)
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", fmt.Errorf("hashing file: %w", err)
}
return hex.EncodeToString(h.Sum(nil)), nil
}