From 60c94935b2a5987c6c95dd926ffbabcd009d07df Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 8 May 2026 11:00:04 +0200 Subject: [PATCH] Persist metadata to DB, poll download worker, metadata service layer --- api/Metadata Agregator/Get Album.bru | 22 ++++ cmd/music-agregator/main.go | 58 ++++++--- internal/indexer/cache_worker.go | 6 + internal/indexer/server.go | 4 +- internal/indexer/service.go | 8 +- internal/metadata/service.go | 116 ++++++++++++++++++ internal/server.go | 5 +- internal/service.go | 144 +++++++++++++++++----- internal/torrent/factory.go | 8 ++ internal/workers/poll_download.go | 176 +++++++++++++++++++++++++++ 10 files changed, 489 insertions(+), 58 deletions(-) create mode 100644 api/Metadata Agregator/Get Album.bru create mode 100644 internal/metadata/service.go create mode 100644 internal/workers/poll_download.go diff --git a/api/Metadata Agregator/Get Album.bru b/api/Metadata Agregator/Get Album.bru new file mode 100644 index 0000000..0fb0625 --- /dev/null +++ b/api/Metadata Agregator/Get Album.bru @@ -0,0 +1,22 @@ +meta { + name: Get Album + type: grpc + seq: 4 +} + +grpc { + url: localhost:3000 + method: /metadata.v1.MetadataService/GetAlbum + body: grpc + auth: inherit + methodType: unary +} + +body:grpc { + name: message 1 + content: ''' + { + "id": "a0b7b436-94db-4df6-8c5f-bc0e5932a90e" + } + ''' +} diff --git a/cmd/music-agregator/main.go b/cmd/music-agregator/main.go index 3bc40d1..50c9544 100644 --- a/cmd/music-agregator/main.go +++ b/cmd/music-agregator/main.go @@ -11,7 +11,6 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/riverqueue/river" @@ -28,10 +27,12 @@ import ( "homelab.lan/music-agregator/internal" "homelab.lan/music-agregator/internal/config" + "homelab.lan/music-agregator/internal/database" "homelab.lan/music-agregator/internal/hello" "homelab.lan/music-agregator/internal/indexer" "homelab.lan/music-agregator/internal/metadata" "homelab.lan/music-agregator/internal/torrent" + "homelab.lan/music-agregator/internal/workers" ) func main() { @@ -68,36 +69,54 @@ func interceptorLogger(l zerolog.Logger) logging.Logger { }) } -func setupRiver(ctx context.Context, cfg config.Config) (*river.Client[pgx.Tx], *pgxpool.Pool) { - if !cfg.Indexer.Cache.Enabled { - return nil, nil - } - - dbPool, err := pgxpool.New(ctx, cfg.Database.URL) +func setupDatabase(ctx context.Context, cfg config.Config) *database.DB { + db, err := database.New(ctx, cfg.Database.URL) if err != nil { - log.Fatal().Err(err).Msg("failed to connect to database for River") + log.Fatal().Err(err).Msg("failed to connect to database") + } + return db +} + +type riverSetup struct { + client *river.Client[pgx.Tx] + cacheRefreshWorker *indexer.CacheRefreshWorker +} + +func setupRiver(ctx context.Context, cfg config.Config, db *database.DB) *riverSetup { + cacheWorker := &indexer.CacheRefreshWorker{} + pollWorker := &workers.PollDownloadWorker{ + Downloads: database.NewDownloadRepository(db.Pool), + DownloadFiles: database.NewDownloadFileRepository(db.Pool), + TorrentClient: torrent.MustNewTorrentClient(cfg), } - workers := river.NewWorkers() - river.AddWorker(workers, &indexer.CacheRefreshWorker{}) + riverWorkers := river.NewWorkers() + river.AddWorker(riverWorkers, cacheWorker) + river.AddWorker(riverWorkers, pollWorker) - riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + riverClient, err := river.NewClient(riverpgxv5.New(db.Pool), &river.Config{ Queues: map[string]river.QueueConfig{ - river.QueueDefault: {MaxWorkers: 2}, + river.QueueDefault: {MaxWorkers: 4}, }, - Workers: workers, + Workers: riverWorkers, }) if err != nil { log.Fatal().Err(err).Msg("failed to create River client") } + cacheWorker.RiverClient = riverClient + pollWorker.RiverClient = riverClient + if err := riverClient.Start(ctx); err != nil { log.Fatal().Err(err).Msg("failed to start River client") } log.Info().Msg("River queue started") - return riverClient, dbPool + return &riverSetup{ + client: riverClient, + cacheRefreshWorker: cacheWorker, + } } func serveGrpc(config config.Config) { @@ -132,16 +151,15 @@ func serveGrpc(config config.Config) { ) ctx := context.Background() - riverClient, dbPool := setupRiver(ctx, config) - if dbPool != nil { - defer dbPool.Close() - } + db := setupDatabase(ctx, config) + defer db.Close() + rs := setupRiver(ctx, config, db) - musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, riverClient) + musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, rs.client, db) if err != nil { log.Fatal().Err(err).Msg("failed to create MusicAgregatorServer") } - indexerServer, err := indexer.NewIndexerServer(config, riverClient) + indexerServer, err := indexer.NewIndexerServer(config, rs.client, rs.cacheRefreshWorker) if err != nil { log.Fatal().Err(err).Msg("failed to create IndexerServer") } diff --git a/internal/indexer/cache_worker.go b/internal/indexer/cache_worker.go index cfdc864..fa4ce9f 100644 --- a/internal/indexer/cache_worker.go +++ b/internal/indexer/cache_worker.go @@ -27,6 +27,12 @@ type CacheRefreshWorker struct { func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error { args := job.Args + + if w.Cache == nil || w.Indexer == nil { + log.Trace().Str("key", args.Key).Msg("cache disabled, discarding refresh job") + return nil + } + log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started") if time.Now().After(args.TTLExpires) { diff --git a/internal/indexer/server.go b/internal/indexer/server.go index 52bfbca..2291933 100644 --- a/internal/indexer/server.go +++ b/internal/indexer/server.go @@ -17,8 +17,8 @@ type IndexerServer struct { pb.UnimplementedIndexerServiceServer } -func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerServer, error) { - service, err := NewIndexerService(cfg, riverClient) +func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx], cacheWorker *CacheRefreshWorker) (*IndexerServer, error) { + service, err := NewIndexerService(cfg, riverClient, cacheWorker) if err != nil { log.Err(err).Msg("failed to initialize IndexerService") return nil, err diff --git a/internal/indexer/service.go b/internal/indexer/service.go index b3508ce..533b529 100644 --- a/internal/indexer/service.go +++ b/internal/indexer/service.go @@ -15,7 +15,7 @@ type IndexerService struct { indexer Indexer } -func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerService, error) { +func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx], cacheWorker *CacheRefreshWorker) (*IndexerService, error) { var idx Indexer switch cfg.Indexer.Type { @@ -28,6 +28,12 @@ func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*I if cfg.Indexer.Cache.Enabled && riverClient != nil { cache := NewIndexerCache() idx = NewCachedIndexer(idx, cache, riverClient, cfg.Indexer.Cache) + + if cacheWorker != nil { + cacheWorker.Cache = cache + cacheWorker.Indexer = idx + } + log.Info().Dur("ttl", cfg.Indexer.Cache.TTL).Dur("refresh", cfg.Indexer.Cache.RefreshInterval).Msg("indexer cache enabled") } diff --git a/internal/metadata/service.go b/internal/metadata/service.go new file mode 100644 index 0000000..83e77bc --- /dev/null +++ b/internal/metadata/service.go @@ -0,0 +1,116 @@ +package metadata + +import ( + "context" + "fmt" + + "github.com/rs/zerolog/log" + + metadataPb "homelab.lan/music-agregator/gen/metadata/v1" + "homelab.lan/music-agregator/internal/database" +) + +type MetadataService struct { + client metadataPb.MetadataServiceClient + artists *database.ArtistRepository + albums *database.AlbumRepository +} + +func NewMetadataService(client metadataPb.MetadataServiceClient, db *database.DB) *MetadataService { + return &MetadataService{ + client: client, + artists: database.NewArtistRepository(db.Pool), + albums: database.NewAlbumRepository(db.Pool), + } +} + +func (s *MetadataService) GetAlbum(ctx context.Context, albumID string) (*metadataPb.Album, error) { + resp, err := s.client.GetAlbum(ctx, &metadataPb.GetAlbumRequest{ + Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID}, + }) + if err != nil { + return nil, fmt.Errorf("fetching album: %w", err) + } + + album := resp.GetAlbum() + + if _, err := s.albums.GetByExternalID(ctx, album.GetId()); err != nil { + s.persistArtist(ctx, album) + s.persistAlbum(ctx, album) + } + + return album, nil +} + +func (s *MetadataService) GetArtistByExternalID(ctx context.Context, externalID string) (*database.Artist, error) { + return s.artists.GetByExternalID(ctx, externalID) +} + +func (s *MetadataService) GetAlbumByExternalID(ctx context.Context, externalID string) (*database.Album, error) { + return s.albums.GetByExternalID(ctx, externalID) +} + +func (s *MetadataService) persistArtist(ctx context.Context, album *metadataPb.Album) { + if len(album.GetArtists()) == 0 { + return + } + + artist := album.GetArtists()[0].GetArtist() + var genres []string + for _, g := range artist.GetGenres() { + genres = append(genres, g.GetName()) + } + + err := s.artists.Create(ctx, &database.Artist{ + ExternalID: artist.GetId(), + Name: artist.GetName(), + ArtistType: artist.GetArtistType(), + Country: artist.GetCountry(), + Genres: genres, + ImageURL: artist.GetImageUrl(), + }) + if err != nil { + log.Warn().Err(err).Str("name", artist.GetName()).Msg("failed to persist artist") + } +} + +func (s *MetadataService) persistAlbum(ctx context.Context, album *metadataPb.Album) { + artistID := "" + if len(album.GetArtists()) > 0 { + a, err := s.artists.GetByExternalID(ctx, album.GetArtists()[0].GetArtist().GetId()) + if err == nil { + artistID = a.ID + } + } + + if artistID == "" { + log.Trace().Str("album", album.GetTitle()).Msg("skipping album persist, no artist in DB") + return + } + + var genres []string + for _, g := range album.GetGenres() { + genres = append(genres, g.GetName()) + } + + labelName := "" + if album.GetLabel() != nil { + labelName = album.GetLabel().GetName() + } + + err := s.albums.Create(ctx, &database.Album{ + ExternalID: album.GetId(), + ArtistID: artistID, + Title: album.GetTitle(), + AlbumType: album.GetAlbumType(), + TotalTracks: int(album.GetTotalTracks()), + TotalDiscs: int(album.GetTotalDiscs()), + Label: labelName, + Genres: genres, + CoverURL: album.GetCoverUrl(), + IsMonitored: true, + }) + if err != nil { + log.Warn().Err(err).Str("title", album.GetTitle()).Msg("failed to persist album") + } +} diff --git a/internal/server.go b/internal/server.go index 0c3cd49..8caa3b5 100644 --- a/internal/server.go +++ b/internal/server.go @@ -10,6 +10,7 @@ import ( pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/config" + "homelab.lan/music-agregator/internal/database" ) type MusicAgregatorServer struct { @@ -17,8 +18,8 @@ type MusicAgregatorServer struct { pb.UnimplementedMusicAgregatorServiceServer } -func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorServer, error) { - service, err := NewMusicAgregatorService(cfg, riverClient) +func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx], db *database.DB) (*MusicAgregatorServer, error) { + service, err := NewMusicAgregatorService(cfg, riverClient, db) if err != nil { log.Err(err).Msg("failed to create MusicAgregatorService") return nil, err diff --git a/internal/service.go b/internal/service.go index 02c738d..1a56f99 100644 --- a/internal/service.go +++ b/internal/service.go @@ -11,17 +11,18 @@ import ( "github.com/jackc/pgx/v5" "github.com/riverqueue/river" "github.com/rs/zerolog/log" - "google.golang.org/grpc" metadataPb "homelab.lan/music-agregator/gen/metadata/v1" pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/config" + "homelab.lan/music-agregator/internal/database" "homelab.lan/music-agregator/internal/indexer" "homelab.lan/music-agregator/internal/metadata" "homelab.lan/music-agregator/internal/release" "homelab.lan/music-agregator/internal/torrent" torrentParser "homelab.lan/music-agregator/internal/tracker" + "homelab.lan/music-agregator/internal/workers" ) type parsedItem struct { @@ -32,21 +33,23 @@ type parsedItem struct { type MusicAgregatorService struct { config config.Config - metadataClient metadataPb.MetadataServiceClient - metadataConn *grpc.ClientConn + metadata *metadata.MetadataService indexer *indexer.IndexerService torrentClient torrent.TorrentClient magnetResolver *torrentParser.MagnetResolver + riverClient *river.Client[pgx.Tx] + torrents *database.TorrentRepository + downloads *database.DownloadRepository } -func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorService, error) { - indexer, err := indexer.NewIndexerService(cfg, riverClient) +func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx], db *database.DB) (*MusicAgregatorService, error) { + idx, err := indexer.NewIndexerService(cfg, riverClient, nil) if err != nil { log.Err(err).Msg("failed to create IndexerService") return nil, err } - metadataClient, conn, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint) + metadataClient, _, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint) if err != nil { log.Err(err).Msg("failed to create metadata client") return nil, err @@ -66,29 +69,39 @@ func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.T return &MusicAgregatorService{ config: cfg, - metadataClient: metadataClient, - metadataConn: conn, - indexer: indexer, + metadata: metadata.NewMetadataService(metadataClient, db), + indexer: idx, torrentClient: torrentClient, magnetResolver: magnetResolver, + riverClient: riverClient, + torrents: database.NewTorrentRepository(db.Pool), + downloads: database.NewDownloadRepository(db.Pool), }, nil } func (s *MusicAgregatorService) Close() { - if s.metadataConn != nil { - s.metadataConn.Close() - } if s.magnetResolver != nil { s.magnetResolver.Close() } } func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) { - album, err := service.fetchAlbumMetadata(ctx, req.GetAlbumId()) + album, err := service.metadata.GetAlbum(ctx, req.GetAlbumId()) if err != nil { + log.Error().Err(err).Str("album_id", req.GetAlbumId()).Msg("failed to get album") return nil, err } + dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, req.GetAlbumId()) + if dbAlbum != nil { + qualityStr := normalizeQuality(req.GetQuality(), 0, 0) + owned, err := service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, req.GetQuality().String(), qualityStr) + if err == nil && owned { + log.Info().Str("album", dbAlbum.Title).Str("quality", qualityStr).Msg("album already owned in requested quality") + return &pb.MonitorAlbumResponse{}, nil + } + } + searchResult, err := service.searchIndexer(album, req.GetIndexerOptions().GetTracker()) if err != nil { return nil, err @@ -108,31 +121,17 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb. return nil, err } + if dbAlbum != nil { + service.saveTorrentAndDownload(ctx, dbAlbum.ID, best) + } else { + log.Warn().Str("album_id", req.GetAlbumId()).Msg("album not in DB, skipping torrent/download persistence") + } + return &pb.MonitorAlbumResponse{ Release: buildMonitoredRelease(best), }, nil } -func (service *MusicAgregatorService) fetchAlbumMetadata(ctx context.Context, albumID string) (*metadataPb.Album, error) { - resp, err := service.metadataClient.GetAlbum(ctx, &metadataPb.GetAlbumRequest{ - Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID}, - }) - if err != nil { - log.Error().Err(err).Str("album_id", albumID).Msg("metadata GetAlbum failed") - return nil, err - } - - album := resp.GetAlbum() - artistName := "" - if len(album.GetArtists()) > 0 { - artistName = album.GetArtists()[0].GetArtist().GetName() - } - - log.Debug().Str("album_id", albumID).Str("title", album.GetTitle()).Str("artist", artistName).Msg("album metadata fetched") - - return album, nil -} - func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) { artistName := "" if len(album.GetArtists()) > 0 { @@ -277,6 +276,85 @@ func (service *MusicAgregatorService) addToTorrentClient(best parsedItem) error return nil } +func (service *MusicAgregatorService) saveTorrentAndDownload(ctx context.Context, dbAlbumID string, best parsedItem) { + quality := normalizeQuality(pb.QualityType_QUALITY_UNSPECIFIED, best.rel.BitDepth, best.rel.SampleRate) + + dbTorrent := &database.Torrent{ + AlbumID: dbAlbumID, + InfoHash: best.rel.InfoHash, + Tracker: best.item.Tracker, + Title: best.item.Title, + Format: best.rel.Format.String(), + Quality: quality, + Source: best.rel.Source.String(), + BitDepth: best.rel.BitDepth, + SampleRate: best.rel.SampleRate, + Seeders: best.item.Seeders, + Peers: best.item.Peers, + Size: best.rel.TotalAudioSize, + TrackCount: best.rel.TrackCount, + HasCoverArt: best.rel.HasCoverArt, + HasCueSheet: best.rel.HasCueSheet, + HasRipLog: best.rel.HasRipLog, + DownloadLink: best.item.DownloadLink, + TorrentFile: best.torrentData, + } + if err := service.torrents.Create(ctx, dbTorrent); err != nil { + log.Error().Err(err).Str("hash", best.rel.InfoHash).Msg("failed to save torrent to DB") + return + } + + savedTorrent, err := service.torrents.GetByInfoHash(ctx, best.rel.InfoHash) + if err != nil { + log.Error().Err(err).Msg("failed to retrieve saved torrent") + return + } + + download := &database.Download{ + TorrentID: savedTorrent.ID, + AlbumID: dbAlbumID, + Format: best.rel.Format.String(), + Quality: quality, + State: "downloading", + QbitHash: best.rel.InfoHash, + } + if err := service.downloads.Create(ctx, download); err != nil { + log.Error().Err(err).Msg("failed to save download to DB") + return + } + + if service.riverClient != nil { + _, err := service.riverClient.Insert(ctx, workers.PollDownloadArgs{ + DownloadID: download.ID, + TorrentHash: best.rel.InfoHash, + CheckInterval: 30 * time.Second, + }, &river.InsertOpts{ + ScheduledAt: time.Now().Add(30 * time.Second), + }) + if err != nil { + log.Error().Err(err).Msg("failed to schedule download poll job") + } else { + log.Debug().Str("download_id", download.ID).Str("hash", best.rel.InfoHash).Msg("download poll job scheduled") + } + } + + log.Info().Str("hash", best.rel.InfoHash).Str("download_id", download.ID).Msg("torrent and download saved to DB") +} + +func normalizeQuality(quality pb.QualityType, bitDepth int, sampleRate int) string { + if bitDepth > 0 && sampleRate > 0 { + return fmt.Sprintf("%d-%d", bitDepth, sampleRate/1000) + } + switch quality { + case pb.QualityType_QUALITY_LOSSLESS: + return "16-44" + case pb.QualityType_QUALITY_LOSSY: + return "320" + default: + return "" + } +} + func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease { return &pb.MonitoredRelease{ InfoHash: p.rel.InfoHash, diff --git a/internal/torrent/factory.go b/internal/torrent/factory.go index 3fa0653..d1603f7 100644 --- a/internal/torrent/factory.go +++ b/internal/torrent/factory.go @@ -8,6 +8,14 @@ import ( "homelab.lan/music-agregator/internal/config" ) +func MustNewTorrentClient(cfg config.Config) TorrentClient { + client, err := NewTorrentClient(cfg) + if err != nil { + panic(fmt.Sprintf("failed to create torrent client: %v", err)) + } + return client +} + func NewTorrentClient(cfg config.Config) (TorrentClient, error) { var client TorrentClient diff --git a/internal/workers/poll_download.go b/internal/workers/poll_download.go new file mode 100644 index 0000000..d6f0de1 --- /dev/null +++ b/internal/workers/poll_download.go @@ -0,0 +1,176 @@ +package workers + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" + "github.com/rs/zerolog/log" + + "homelab.lan/music-agregator/internal/database" + "homelab.lan/music-agregator/internal/torrent" +) + +type PollDownloadArgs struct { + DownloadID string `json:"download_id"` + TorrentHash string `json:"torrent_hash"` + CheckInterval time.Duration `json:"check_interval"` +} + +func (PollDownloadArgs) Kind() string { return "poll_download" } + +type PollDownloadWorker struct { + river.WorkerDefaults[PollDownloadArgs] + TorrentClient torrent.TorrentClient + Downloads *database.DownloadRepository + DownloadFiles *database.DownloadFileRepository + RiverClient *river.Client[pgx.Tx] +} + +func (w *PollDownloadWorker) Work(ctx context.Context, job *river.Job[PollDownloadArgs]) error { + args := job.Args + + log.Trace().Str("download_id", args.DownloadID).Str("hash", args.TorrentHash).Msg("polling download status") + + results, err := w.TorrentClient.Find(torrent.FindOptions{Hash: args.TorrentHash}) + if err != nil { + log.Error().Err(err).Str("hash", args.TorrentHash).Msg("failed to query torrent client") + return w.reschedule(ctx, args) + } + + if len(results) == 0 { + log.Warn().Str("hash", args.TorrentHash).Msg("torrent not found in client, marking failed") + w.Downloads.SetFailed(ctx, args.DownloadID, "torrent not found in client") + return nil + } + + t := results[0] + + switch { + case t.Progress >= 1.0: + return w.onCompleted(ctx, args, t) + + case t.State == "error": + log.Warn().Str("hash", args.TorrentHash).Str("state", t.State).Msg("torrent in error state") + w.Downloads.SetFailed(ctx, args.DownloadID, "torrent error state") + return nil + + default: + log.Trace(). + Str("hash", args.TorrentHash). + Str("state", t.State). + Float64("progress", t.Progress*100). + Int64("dlspeed", t.DlSpeed). + Msg("download in progress") + return w.reschedule(ctx, args) + } +} + +func (w *PollDownloadWorker) onCompleted(ctx context.Context, args PollDownloadArgs, t torrent.TorrentInfo) error { + log.Info().Str("hash", args.TorrentHash).Str("path", t.ContentPath).Msg("download completed") + + if err := w.Downloads.SetCompleted(ctx, args.DownloadID, t.SavePath); err != nil { + log.Error().Err(err).Msg("failed to update download as completed") + return err + } + + files, err := scanAndHashFiles(t.ContentPath) + if err != nil { + log.Error().Err(err).Str("path", t.ContentPath).Msg("failed to scan downloaded files") + return nil + } + + for _, f := range files { + f.DownloadID = args.DownloadID + } + + if err := w.DownloadFiles.CreateBatch(ctx, files); err != nil { + log.Error().Err(err).Msg("failed to save download files") + return nil + } + + log.Info(). + Str("download_id", args.DownloadID). + Int("files", len(files)). + Msg("download files scanned and hashed") + + return nil +} + +func (w *PollDownloadWorker) reschedule(ctx context.Context, args PollDownloadArgs) error { + _, err := w.RiverClient.Insert(ctx, args, &river.InsertOpts{ + ScheduledAt: time.Now().Add(args.CheckInterval), + }) + if err != nil { + log.Error().Err(err).Msg("failed to reschedule poll_download") + } + return nil +} + +var audioExtensions = map[string]bool{ + ".flac": true, ".mp3": true, ".aac": true, ".m4a": true, + ".ape": true, ".wv": true, ".ogg": true, ".wav": true, ".alac": true, +} + +func scanAndHashFiles(rootPath string) ([]*database.DownloadFile, error) { + var files []*database.DownloadFile + + err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { + if err != nil || info.IsDir() { + return err + } + + ext := strings.ToLower(filepath.Ext(path)) + relPath, _ := filepath.Rel(rootPath, path) + + fileType := strings.TrimPrefix(ext, ".") + if fileType == "" { + return nil + } + + df := &database.DownloadFile{ + FilePath: relPath, + FileSize: info.Size(), + FileType: fileType, + } + + if audioExtensions[ext] || ext == ".cue" || ext == ".log" { + hash, err := hashFile(path) + if err != nil { + log.Warn().Err(err).Str("path", path).Msg("failed to hash file") + } else { + df.SHA256Hash = hash + now := time.Now() + df.VerifiedAt = &now + } + } + + files = append(files, df) + return nil + }) + + return files, err +} + +func hashFile(path string) (string, error) { + f, err := os.Open(path) + if err != nil { + return "", fmt.Errorf("opening file: %w", err) + } + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + return "", fmt.Errorf("hashing file: %w", err) + } + + return hex.EncodeToString(h.Sum(nil)), nil +}