package workers import ( "context" "crypto/sha256" "encoding/hex" "fmt" "io" "os" "path/filepath" "strings" "time" "github.com/jackc/pgx/v5" "github.com/riverqueue/river" "github.com/rs/zerolog/log" "homelab.lan/music-agregator/internal/database" "homelab.lan/music-agregator/internal/torrent" ) type PollDownloadArgs struct { DownloadID string `json:"download_id"` TorrentHash string `json:"torrent_hash"` CheckInterval time.Duration `json:"check_interval"` } func (PollDownloadArgs) Kind() string { return "poll_download" } type PollDownloadWorker struct { river.WorkerDefaults[PollDownloadArgs] TorrentClient torrent.TorrentClient Downloads *database.DownloadRepository DownloadFiles *database.DownloadFileRepository RiverClient *river.Client[pgx.Tx] } func (w *PollDownloadWorker) Work(ctx context.Context, job *river.Job[PollDownloadArgs]) error { args := job.Args log.Trace().Str("download_id", args.DownloadID).Str("hash", args.TorrentHash).Msg("polling download status") results, err := w.TorrentClient.Find(torrent.FindOptions{Hash: args.TorrentHash}) if err != nil { log.Error().Err(err).Str("hash", args.TorrentHash).Msg("failed to query torrent client") return w.reschedule(ctx, args) } if len(results) == 0 { log.Warn().Str("hash", args.TorrentHash).Msg("torrent not found in client, marking failed") w.Downloads.SetFailed(ctx, args.DownloadID, "torrent not found in client") return nil } t := results[0] switch { case t.Progress >= 1.0: return w.onCompleted(ctx, args, t) case t.State == "error": log.Warn().Str("hash", args.TorrentHash).Str("state", t.State).Msg("torrent in error state") w.Downloads.SetFailed(ctx, args.DownloadID, "torrent error state") return nil default: log.Trace(). Str("hash", args.TorrentHash). Str("state", t.State). Float64("progress", t.Progress*100). Int64("dlspeed", t.DlSpeed). Msg("download in progress") return w.reschedule(ctx, args) } } func (w *PollDownloadWorker) onCompleted(ctx context.Context, args PollDownloadArgs, t torrent.TorrentInfo) error { log.Info().Str("hash", args.TorrentHash).Str("path", t.ContentPath).Msg("download completed") if err := w.Downloads.SetCompleted(ctx, args.DownloadID, t.SavePath); err != nil { log.Error().Err(err).Msg("failed to update download as completed") return err } files, err := scanAndHashFiles(t.ContentPath) if err != nil { log.Error().Err(err).Str("path", t.ContentPath).Msg("failed to scan downloaded files") return nil } for _, f := range files { f.DownloadID = args.DownloadID } if err := w.DownloadFiles.CreateBatch(ctx, files); err != nil { log.Error().Err(err).Msg("failed to save download files") return nil } log.Info(). Str("download_id", args.DownloadID). Int("files", len(files)). Msg("download files scanned and hashed") return nil } func (w *PollDownloadWorker) reschedule(ctx context.Context, args PollDownloadArgs) error { if w.RiverClient == nil { log.Warn().Str("download_id", args.DownloadID).Msg("no river client, cannot reschedule poll_download") return nil } _, err := w.RiverClient.Insert(ctx, args, &river.InsertOpts{ ScheduledAt: time.Now().Add(args.CheckInterval), }) if err != nil { log.Error().Err(err).Msg("failed to reschedule poll_download") } return nil } func (w *PollDownloadWorker) RecoverOrphanedDownloads(ctx context.Context) { active, err := w.Downloads.GetActive(ctx) if err != nil { log.Error().Err(err).Msg("failed to query active downloads for recovery") return } if len(active) == 0 { return } for _, d := range active { _, err := w.RiverClient.Insert(ctx, PollDownloadArgs{ DownloadID: d.ID, TorrentHash: d.QbitHash, CheckInterval: 30 * time.Second, }, &river.InsertOpts{ ScheduledAt: time.Now().Add(5 * time.Second), UniqueOpts: river.UniqueOpts{ ByArgs: true, }, }) if err != nil { log.Error().Err(err).Str("download_id", d.ID).Msg("failed to reschedule orphaned download") } else { log.Info().Str("download_id", d.ID).Str("hash", d.QbitHash).Msg("recovered orphaned download poll job") } } } var audioExtensions = map[string]bool{ ".flac": true, ".mp3": true, ".aac": true, ".m4a": true, ".ape": true, ".wv": true, ".ogg": true, ".wav": true, ".alac": true, } func scanAndHashFiles(rootPath string) ([]*database.DownloadFile, error) { var files []*database.DownloadFile err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { if err != nil || info.IsDir() { return err } ext := strings.ToLower(filepath.Ext(path)) relPath, _ := filepath.Rel(rootPath, path) fileType := strings.TrimPrefix(ext, ".") if fileType == "" { return nil } df := &database.DownloadFile{ FilePath: relPath, FileSize: info.Size(), FileType: fileType, } if audioExtensions[ext] || ext == ".cue" || ext == ".log" { hash, err := hashFile(path) if err != nil { log.Warn().Err(err).Str("path", path).Msg("failed to hash file") } else { df.SHA256Hash = hash now := time.Now() df.VerifiedAt = &now } } files = append(files, df) return nil }) return files, err } func hashFile(path string) (string, error) { f, err := os.Open(path) if err != nil { return "", fmt.Errorf("opening file: %w", err) } defer f.Close() h := sha256.New() if _, err := io.Copy(h, f); err != nil { return "", fmt.Errorf("hashing file: %w", err) } return hex.EncodeToString(h.Sum(nil)), nil }