289 lines
9.5 KiB
Go
289 lines
9.5 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
|
|
"github.com/fujin/music-agregator/internal/database"
|
|
"github.com/fujin/music-agregator/internal/torrent"
|
|
"github.com/google/uuid"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type QueueSyncResult struct {
|
|
Synced int `json:"synced"`
|
|
Updated int `json:"updated"`
|
|
}
|
|
|
|
func SyncDownloadQueue(ctx context.Context, db *database.DB, torrentService *TorrentService) (*QueueSyncResult, error) {
|
|
log.Info().Msg("[QUEUE_SYNC] starting queue sync")
|
|
|
|
if !torrentService.IsConfigured() {
|
|
log.Warn().Msg("[QUEUE_SYNC] torrent service not configured, skipping")
|
|
return &QueueSyncResult{}, nil
|
|
}
|
|
|
|
torrents, err := torrentService.ListTorrents(ctx)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("[QUEUE_SYNC] failed to list torrents")
|
|
return nil, err
|
|
}
|
|
|
|
log.Info().Int("torrent_count", len(torrents)).Msg("[QUEUE_SYNC] fetched torrents from client")
|
|
|
|
torrentMap := make(map[string]torrent.TorrentInfo)
|
|
torrentByName := make(map[string]torrent.TorrentInfo)
|
|
for _, t := range torrents {
|
|
torrentMap[t.Hash] = t
|
|
nameLower := strings.ToLower(t.Name)
|
|
torrentByName[nameLower] = t
|
|
log.Debug().
|
|
Str("hash", t.Hash).
|
|
Str("name", t.Name).
|
|
Str("state", string(t.State)).
|
|
Float64("progress", t.Progress).
|
|
Msg("[QUEUE_SYNC] torrent info")
|
|
}
|
|
|
|
queueItems, err := db.ListDownloadQueue(ctx, nil)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("[QUEUE_SYNC] failed to list queue items")
|
|
return nil, err
|
|
}
|
|
|
|
log.Info().Int("queue_count", len(queueItems)).Msg("[QUEUE_SYNC] fetched queue items from database")
|
|
|
|
var synced, updated int
|
|
for _, item := range queueItems {
|
|
var t torrent.TorrentInfo
|
|
var exists bool
|
|
|
|
if item.TorrentHash != nil {
|
|
t, exists = torrentMap[*item.TorrentHash]
|
|
if !exists {
|
|
log.Debug().Str("hash", *item.TorrentHash).Str("title", item.Title).Msg("[QUEUE_SYNC] torrent not found by hash")
|
|
}
|
|
}
|
|
|
|
if !exists {
|
|
titleLower := strings.ToLower(item.Title)
|
|
for name, torr := range torrentByName {
|
|
if strings.Contains(name, titleLower) || strings.Contains(titleLower, name) {
|
|
t = torr
|
|
exists = true
|
|
hash := t.Hash
|
|
if item.TorrentHash == nil {
|
|
log.Info().Str("title", item.Title).Str("matched_name", t.Name).Str("hash", hash).Msg("[QUEUE_SYNC] matched by title, updating hash")
|
|
if err := db.UpdateDownloadQueueHash(ctx, item.ID, hash); err != nil {
|
|
log.Error().Err(err).Msg("[QUEUE_SYNC] failed to update hash")
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if !exists {
|
|
log.Debug().Str("title", item.Title).Msg("[QUEUE_SYNC] no matching torrent found")
|
|
continue
|
|
}
|
|
|
|
synced++
|
|
|
|
newStatus := mapTorrentState(t.State)
|
|
sizeLeft := int64(float64(item.Size) * (1 - t.Progress))
|
|
|
|
if newStatus != item.Status || item.Progress != float32(t.Progress) {
|
|
log.Info().
|
|
Str("title", item.Title).
|
|
Str("old_status", item.Status).
|
|
Str("new_status", newStatus).
|
|
Float32("old_progress", item.Progress).
|
|
Float64("new_progress", t.Progress).
|
|
Msg("[QUEUE_SYNC] updating queue item")
|
|
|
|
if err := db.UpdateDownloadQueueProgress(ctx, item.ID, float32(t.Progress), sizeLeft, newStatus); err != nil {
|
|
log.Error().Err(err).Str("title", item.Title).Msg("[QUEUE_SYNC] failed to update queue item")
|
|
continue
|
|
}
|
|
updated++
|
|
|
|
if newStatus == "completed" && item.AlbumID != nil {
|
|
log.Info().Str("title", item.Title).Msg("[QUEUE_SYNC] download completed, removing from wanted albums")
|
|
db.RemoveFromWantedAlbums(ctx, *item.AlbumID)
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Info().Int("synced", synced).Int("updated", updated).Msg("[QUEUE_SYNC] sync completed")
|
|
return &QueueSyncResult{Synced: synced, Updated: updated}, nil
|
|
}
|
|
|
|
func mapTorrentState(state torrent.TorrentState) string {
|
|
switch state {
|
|
case torrent.StateDownloading:
|
|
return "downloading"
|
|
case torrent.StateSeeding:
|
|
return "completed"
|
|
case torrent.StatePaused:
|
|
return "paused"
|
|
case torrent.StateQueued:
|
|
return "queued"
|
|
case torrent.StateChecking:
|
|
return "checking"
|
|
case torrent.StateError:
|
|
return "failed"
|
|
default:
|
|
return "queued"
|
|
}
|
|
}
|
|
|
|
func HandleFailedDownload(ctx context.Context, db *database.DB, queueID uuid.UUID, errorMessage string) error {
|
|
log.Info().Str("queue_id", queueID.String()).Str("error", errorMessage).Msg("[FAILED_DOWNLOAD] handling failed download")
|
|
|
|
item, err := db.GetDownloadQueueItem(ctx, queueID)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("queue_id", queueID.String()).Msg("[FAILED_DOWNLOAD] failed to get queue item")
|
|
return err
|
|
}
|
|
|
|
log.Info().Str("title", item.Title).Msg("[FAILED_DOWNLOAD] marking as failed")
|
|
if err := db.UpdateDownloadQueueStatus(ctx, queueID, "failed", &errorMessage); err != nil {
|
|
log.Error().Err(err).Msg("[FAILED_DOWNLOAD] failed to update status")
|
|
return err
|
|
}
|
|
|
|
if item.ArtistID != nil && item.AlbumID != nil {
|
|
log.Info().Str("title", item.Title).Msg("[FAILED_DOWNLOAD] adding to blocklist")
|
|
if err := db.AddToBlocklist(ctx, *item.ArtistID, *item.AlbumID, item.Title, item.TorrentHash, item.Indexer); err != nil {
|
|
log.Error().Err(err).Msg("[FAILED_DOWNLOAD] failed to add to blocklist")
|
|
return err
|
|
}
|
|
}
|
|
|
|
if item.AlbumID != nil {
|
|
log.Info().Str("title", item.Title).Msg("[FAILED_DOWNLOAD] re-adding to wanted albums for retry")
|
|
if err := db.AddToWantedAlbums(ctx, *item.AlbumID); err != nil {
|
|
log.Error().Err(err).Msg("[FAILED_DOWNLOAD] failed to add to wanted albums")
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.Info().Str("title", item.Title).Msg("[FAILED_DOWNLOAD] handling complete")
|
|
return nil
|
|
}
|
|
|
|
type BlocklistResult struct {
|
|
Blocklisted bool `json:"blocklisted"`
|
|
Removed bool `json:"removed"`
|
|
}
|
|
|
|
type JobStatus struct {
|
|
ID string `json:"id"`
|
|
Title string `json:"title"`
|
|
Status string `json:"status"`
|
|
Progress float32 `json:"progress"`
|
|
Size int64 `json:"size"`
|
|
SizeLeft int64 `json:"size_left"`
|
|
TorrentHash *string `json:"torrent_hash,omitempty"`
|
|
Indexer *string `json:"indexer,omitempty"`
|
|
ErrorMessage *string `json:"error_message,omitempty"`
|
|
CreatedAt string `json:"created_at"`
|
|
CompletedAt *string `json:"completed_at,omitempty"`
|
|
}
|
|
|
|
func GetJobStatus(ctx context.Context, db *database.DB, torrentService *TorrentService, jobID uuid.UUID) (*JobStatus, error) {
|
|
log.Info().Str("job_id", jobID.String()).Msg("[JOB_STATUS] fetching job status")
|
|
|
|
item, err := db.GetDownloadQueueItem(ctx, jobID)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("job_id", jobID.String()).Msg("[JOB_STATUS] job not found")
|
|
return nil, err
|
|
}
|
|
|
|
status := &JobStatus{
|
|
ID: item.ID.String(),
|
|
Title: item.Title,
|
|
Status: item.Status,
|
|
Progress: item.Progress,
|
|
Size: item.Size,
|
|
SizeLeft: item.SizeLeft,
|
|
TorrentHash: item.TorrentHash,
|
|
Indexer: item.Indexer,
|
|
ErrorMessage: item.ErrorMessage,
|
|
CreatedAt: item.AddedAt.Format("2006-01-02T15:04:05Z07:00"),
|
|
}
|
|
|
|
if item.CompletedAt != nil {
|
|
completedStr := item.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
|
|
status.CompletedAt = &completedStr
|
|
}
|
|
|
|
if (item.Status == "downloading" || item.Status == "queued") && item.TorrentHash != nil && torrentService.IsConfigured() {
|
|
log.Debug().Str("hash", *item.TorrentHash).Msg("[JOB_STATUS] fetching torrent progress")
|
|
torrent, err := torrentService.GetTorrent(ctx, *item.TorrentHash)
|
|
if err == nil {
|
|
status.Progress = float32(torrent.Progress)
|
|
status.SizeLeft = int64(float64(item.Size) * (1 - torrent.Progress))
|
|
status.Status = mapTorrentState(torrent.State)
|
|
log.Info().
|
|
Str("status", status.Status).
|
|
Float32("progress", status.Progress).
|
|
Msg("[JOB_STATUS] updated from torrent client")
|
|
} else {
|
|
log.Warn().Err(err).Str("hash", *item.TorrentHash).Msg("[JOB_STATUS] failed to get torrent info")
|
|
}
|
|
}
|
|
|
|
log.Info().Str("status", status.Status).Float32("progress", status.Progress).Msg("[JOB_STATUS] returning status")
|
|
return status, nil
|
|
}
|
|
|
|
func BlocklistAndRemove(ctx context.Context, db *database.DB, torrentService *TorrentService, queueID uuid.UUID) (*BlocklistResult, error) {
|
|
log.Info().Str("queue_id", queueID.String()).Msg("[BLOCKLIST] starting blocklist and remove")
|
|
|
|
item, err := db.GetDownloadQueueItem(ctx, queueID)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("queue_id", queueID.String()).Msg("[BLOCKLIST] failed to get queue item")
|
|
return nil, err
|
|
}
|
|
|
|
log.Info().Str("title", item.Title).Interface("torrent_hash", item.TorrentHash).Msg("[BLOCKLIST] processing item")
|
|
|
|
result := &BlocklistResult{}
|
|
|
|
if item.ArtistID != nil {
|
|
albumID := item.AlbumID
|
|
if albumID == nil {
|
|
albumID = &uuid.Nil
|
|
}
|
|
log.Info().Str("title", item.Title).Msg("[BLOCKLIST] adding to blocklist")
|
|
if err := db.AddToBlocklist(ctx, *item.ArtistID, *albumID, item.Title, item.TorrentHash, item.Indexer); err == nil {
|
|
result.Blocklisted = true
|
|
log.Info().Str("title", item.Title).Msg("[BLOCKLIST] added to blocklist")
|
|
} else {
|
|
log.Warn().Err(err).Str("title", item.Title).Msg("[BLOCKLIST] failed to add to blocklist")
|
|
}
|
|
}
|
|
|
|
if item.TorrentHash != nil && torrentService.IsConfigured() {
|
|
log.Info().Str("hash", *item.TorrentHash).Msg("[BLOCKLIST] removing torrent from client")
|
|
torrentService.RemoveTorrent(ctx, *item.TorrentHash, true)
|
|
}
|
|
|
|
log.Info().Str("title", item.Title).Msg("[BLOCKLIST] deleting from queue")
|
|
if err := db.DeleteDownloadQueueItem(ctx, queueID); err != nil {
|
|
log.Error().Err(err).Msg("[BLOCKLIST] failed to delete queue item")
|
|
return nil, err
|
|
}
|
|
result.Removed = true
|
|
|
|
if item.AlbumID != nil {
|
|
log.Info().Str("title", item.Title).Msg("[BLOCKLIST] re-adding album to wanted list")
|
|
db.AddToWantedAlbums(ctx, *item.AlbumID)
|
|
}
|
|
|
|
log.Info().Bool("blocklisted", result.Blocklisted).Bool("removed", result.Removed).Msg("[BLOCKLIST] completed")
|
|
return result, nil
|
|
}
|