Files
music-agregator/internal/workers/poll_download.go
T

166 lines
4.6 KiB
Go

package workers
import (
"context"
"time"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
"homelab.lan/music-agregator/internal/analysis"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/torrent"
)
type PollDownloadArgs struct {
DownloadID string `json:"download_id"`
TorrentHash string `json:"torrent_hash"`
CheckInterval time.Duration `json:"check_interval"`
}
func (PollDownloadArgs) Kind() string { return "poll_download" }
type PollDownloadWorker struct {
river.WorkerDefaults[PollDownloadArgs]
TorrentClient torrent.TorrentClient
Downloads *database.DownloadRepository
DownloadFiles *database.DownloadFileRepository
AlbumReleases *database.AlbumReleaseRepository
TrackReleases *database.TrackReleaseRepository
RiverClient *river.Client[pgx.Tx]
PathMapper *torrent.PathMapper
Analyzer *analysis.ReleaseAnalyzer
}
func (w *PollDownloadWorker) Work(ctx context.Context, job *river.Job[PollDownloadArgs]) error {
args := job.Args
log.Trace().Str("download_id", args.DownloadID).Str("hash", args.TorrentHash).Msg("polling download status")
results, err := w.TorrentClient.Find(torrent.FindOptions{Hash: args.TorrentHash})
if err != nil {
log.Error().Err(err).Str("hash", args.TorrentHash).Msg("failed to query torrent client")
return w.reschedule(ctx, args)
}
if len(results) == 0 {
log.Warn().Str("hash", args.TorrentHash).Msg("torrent not found in client, marking failed")
w.Downloads.SetFailed(ctx, args.DownloadID, "torrent not found in client")
return nil
}
t := results[0]
switch {
case t.Progress >= 1.0:
return w.onCompleted(ctx, args, t)
case t.State == "error":
log.Warn().Str("hash", args.TorrentHash).Str("state", t.State).Msg("torrent in error state")
w.Downloads.SetFailed(ctx, args.DownloadID, "torrent error state")
return nil
default:
log.Trace().
Str("hash", args.TorrentHash).
Str("state", t.State).
Float64("progress", t.Progress*100).
Int64("dlspeed", t.DlSpeed).
Msg("download in progress")
return w.reschedule(ctx, args)
}
}
func (w *PollDownloadWorker) onCompleted(ctx context.Context, args PollDownloadArgs, t torrent.TorrentInfo) error {
log.Info().Str("hash", args.TorrentHash).Str("path", t.ContentPath).Msg("download completed")
contentPath := t.ContentPath
if w.PathMapper != nil {
contentPath = w.PathMapper.ToHost(contentPath)
}
savePath := t.SavePath
if w.PathMapper != nil {
savePath = w.PathMapper.ToHost(savePath)
}
if err := w.Downloads.SetCompleted(ctx, args.DownloadID, savePath); err != nil {
log.Error().Err(err).Msg("failed to update download as completed")
return err
}
files, err := analysis.ScanAndHashFiles(contentPath)
if err != nil {
log.Error().Err(err).Str("path", contentPath).Msg("failed to scan downloaded files")
return nil
}
for _, f := range files {
f.DownloadID = args.DownloadID
}
if err := w.DownloadFiles.CreateBatch(ctx, files); err != nil {
log.Error().Err(err).Msg("failed to save download files")
return nil
}
log.Info().
Str("download_id", args.DownloadID).
Int("files", len(files)).
Msg("download files scanned and hashed")
if w.Analyzer != nil {
_, _, err := w.Analyzer.AnalyzeAndPersist(ctx, args.DownloadID, contentPath)
if err != nil {
log.Error().Err(err).Msg("failed to analyze release")
}
}
return nil
}
func (w *PollDownloadWorker) reschedule(ctx context.Context, args PollDownloadArgs) error {
if w.RiverClient == nil {
log.Warn().Str("download_id", args.DownloadID).Msg("no river client, cannot reschedule poll_download")
return nil
}
_, err := w.RiverClient.Insert(ctx, args, &river.InsertOpts{
ScheduledAt: time.Now().Add(args.CheckInterval),
})
if err != nil {
log.Error().Err(err).Msg("failed to reschedule poll_download")
}
return nil
}
func (w *PollDownloadWorker) RecoverOrphanedDownloads(ctx context.Context) {
active, err := w.Downloads.GetActive(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to query active downloads for recovery")
return
}
if len(active) == 0 {
return
}
for _, d := range active {
_, err := w.RiverClient.Insert(ctx, PollDownloadArgs{
DownloadID: d.ID,
TorrentHash: d.QbitHash,
CheckInterval: 30 * time.Second,
}, &river.InsertOpts{
ScheduledAt: time.Now().Add(5 * time.Second),
UniqueOpts: river.UniqueOpts{
ByArgs: true,
},
})
if err != nil {
log.Error().Err(err).Str("download_id", d.ID).Msg("failed to reschedule orphaned download")
} else {
log.Info().Str("download_id", d.ID).Str("hash", d.QbitHash).Msg("recovered orphaned download poll job")
}
}
}