diff --git a/cmd/music-agregator/main.go b/cmd/music-agregator/main.go index 50c9544..76ed4b5 100644 --- a/cmd/music-agregator/main.go +++ b/cmd/music-agregator/main.go @@ -113,6 +113,8 @@ func setupRiver(ctx context.Context, cfg config.Config, db *database.DB) *riverS log.Info().Msg("River queue started") + pollWorker.RecoverOrphanedDownloads(ctx) + return &riverSetup{ client: riverClient, cacheRefreshWorker: cacheWorker, diff --git a/internal/database/download_repository.go b/internal/database/download_repository.go index e51c6a7..535d982 100644 --- a/internal/database/download_repository.go +++ b/internal/database/download_repository.go @@ -107,6 +107,27 @@ func (r *DownloadRepository) GetByAlbumID(ctx context.Context, albumID string) ( return downloads, nil } +func (r *DownloadRepository) GetActive(ctx context.Context) ([]*Download, error) { + rows, err := r.pool.Query(ctx, + `SELECT id, torrent_id, album_id, format, quality, state, qbit_hash, save_path, error_message, queued_at, started_at, completed_at, created_at, updated_at + FROM downloads WHERE state IN ('pending', 'downloading') ORDER BY created_at`, + ) + if err != nil { + return nil, fmt.Errorf("listing active downloads: %w", err) + } + defer rows.Close() + + var downloads []*Download + for rows.Next() { + d := &Download{} + if err := rows.Scan(&d.ID, &d.TorrentID, &d.AlbumID, &d.Format, &d.Quality, &d.State, &d.QbitHash, &d.SavePath, &d.ErrorMessage, &d.QueuedAt, &d.StartedAt, &d.CompletedAt, &d.CreatedAt, &d.UpdatedAt); err != nil { + return nil, fmt.Errorf("scanning download: %w", err) + } + downloads = append(downloads, d) + } + return downloads, nil +} + func (r *DownloadRepository) HasAlbumInQuality(ctx context.Context, albumID string, format string, quality string) (bool, error) { var exists bool err := r.pool.QueryRow(ctx, diff --git a/internal/service.go b/internal/service.go index 2f66627..d1688a4 100644 --- a/internal/service.go +++ b/internal/service.go @@ -297,13 +297,13 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb. return nil, err } - dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, req.GetAlbumId()) + dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, album.GetId()) if dbAlbum != nil { qualityStr := normalizeQuality(req.GetQuality(), 0, 0) owned, err := service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, req.GetQuality().String(), qualityStr) if err == nil && owned { log.Info().Str("album", dbAlbum.Title).Str("quality", qualityStr).Msg("album already owned in requested quality") - return &pb.MonitorAlbumResponse{}, nil + return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil), nil } } @@ -317,7 +317,7 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb. filtered := filterByQuality(parsed, req.GetQuality()) if len(filtered) == 0 { log.Warn().Str("album", album.GetTitle()).Str("quality", req.GetQuality().String()).Msg("no releases match quality filter") - return &pb.MonitorAlbumResponse{}, nil + return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil), nil } best := selectBestRelease(filtered) @@ -326,15 +326,14 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb. return nil, err } + dbAlbum, _ = service.metadata.GetAlbumByExternalID(ctx, album.GetId()) if dbAlbum != nil { service.saveTorrentAndDownload(ctx, dbAlbum.ID, best) } else { log.Warn().Str("album_id", req.GetAlbumId()).Msg("album not in DB, skipping torrent/download persistence") } - return &pb.MonitorAlbumResponse{ - Release: buildMonitoredRelease(best), - }, nil + return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, &best), nil } func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) { @@ -515,6 +514,16 @@ func (service *MusicAgregatorService) saveTorrentAndDownload(ctx context.Context return } + existingDownloads, err := service.downloads.GetByAlbumID(ctx, dbAlbumID) + if err == nil { + for _, d := range existingDownloads { + if d.TorrentID == savedTorrent.ID && d.State != "failed" { + log.Info().Str("hash", best.rel.InfoHash).Str("state", d.State).Msg("active download already exists, skipping") + return + } + } + } + download := &database.Download{ TorrentID: savedTorrent.ID, AlbumID: dbAlbumID, @@ -583,6 +592,68 @@ func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease { } } +func (service *MusicAgregatorService) buildMonitorAlbumResponse(ctx context.Context, metadataAlbum *metadataPb.Album, dbAlbum *database.Album, best *parsedItem) *pb.MonitorAlbumResponse { + resp := &pb.MonitorAlbumResponse{} + + if best != nil { + resp.Release = buildMonitoredRelease(*best) + } + + if dbAlbum != nil { + resp.Album = service.buildAlbumDetail(ctx, dbAlbum) + } + + if len(metadataAlbum.GetArtists()) > 0 { + dbArtist, err := service.metadata.GetArtistByExternalID(ctx, metadataAlbum.GetArtists()[0].GetArtist().GetId()) + if err == nil { + resp.Artist = &pb.ArtistSummary{ + Id: dbArtist.ID, + ExternalId: dbArtist.ExternalID, + Name: dbArtist.Name, + ArtistType: dbArtist.ArtistType, + Country: dbArtist.Country, + Genres: dbArtist.Genres, + ImageUrl: dbArtist.ImageURL, + MonitorState: toProtoMonitorState(dbArtist.MonitorState), + } + } + } + + return resp +} + +func (service *MusicAgregatorService) buildAlbumDetail(ctx context.Context, dbAlbum *database.Album) *pb.AlbumDetail { + detail := &pb.AlbumDetail{ + Id: dbAlbum.ID, + ExternalId: dbAlbum.ExternalID, + Title: dbAlbum.Title, + AlbumType: dbAlbum.AlbumType, + TotalTracks: int32(dbAlbum.TotalTracks), + TotalDiscs: int32(dbAlbum.TotalDiscs), + Label: dbAlbum.Label, + Genres: dbAlbum.Genres, + CoverUrl: dbAlbum.CoverURL, + MonitorState: toProtoMonitorState(dbAlbum.MonitorState), + } + + if dbAlbum.ReleaseDate != nil { + detail.ReleaseDate = dbAlbum.ReleaseDate.Format("2006-01-02") + } + + downloads, err := service.downloads.GetByAlbumID(ctx, dbAlbum.ID) + if err == nil && len(downloads) > 0 { + best := downloads[0] + detail.Download = &pb.DownloadInfo{ + State: best.State, + Format: best.Format, + Quality: best.Quality, + SavePath: best.SavePath, + } + } + + return detail +} + func toProtoMonitorState(state database.MonitorState) pb.MonitorState { switch state { case database.Monitored: diff --git a/internal/workers/poll_download.go b/internal/workers/poll_download.go index d6f0de1..9eccc8d 100644 --- a/internal/workers/poll_download.go +++ b/internal/workers/poll_download.go @@ -115,6 +115,36 @@ func (w *PollDownloadWorker) reschedule(ctx context.Context, args PollDownloadAr return nil } +func (w *PollDownloadWorker) RecoverOrphanedDownloads(ctx context.Context) { + active, err := w.Downloads.GetActive(ctx) + if err != nil { + log.Error().Err(err).Msg("failed to query active downloads for recovery") + return + } + + if len(active) == 0 { + return + } + + for _, d := range active { + _, err := w.RiverClient.Insert(ctx, PollDownloadArgs{ + DownloadID: d.ID, + TorrentHash: d.QbitHash, + CheckInterval: 30 * time.Second, + }, &river.InsertOpts{ + ScheduledAt: time.Now().Add(5 * time.Second), + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + }, + }) + if err != nil { + log.Error().Err(err).Str("download_id", d.ID).Msg("failed to reschedule orphaned download") + } else { + log.Info().Str("download_id", d.ID).Str("hash", d.QbitHash).Msg("recovered orphaned download poll job") + } + } +} + var audioExtensions = map[string]bool{ ".flac": true, ".mp3": true, ".aac": true, ".m4a": true, ".ape": true, ".wv": true, ".ogg": true, ".wav": true, ".alac": true, diff --git a/proto/music_agregator/v1/music_agregator.proto b/proto/music_agregator/v1/music_agregator.proto index 3d909a8..c7b08b6 100644 --- a/proto/music_agregator/v1/music_agregator.proto +++ b/proto/music_agregator/v1/music_agregator.proto @@ -25,7 +25,9 @@ enum QualityType { } message MonitorAlbumResponse { - MonitoredRelease release = 1; + AlbumDetail album = 1; + ArtistSummary artist = 2; + MonitoredRelease release = 3; } message GetArtistsRequest {}