Enrich MonitorAlbum response, prevent duplicate downloads, recover orphaned jobs on startup
This commit is contained in:
@@ -113,6 +113,8 @@ func setupRiver(ctx context.Context, cfg config.Config, db *database.DB) *riverS
|
|||||||
|
|
||||||
log.Info().Msg("River queue started")
|
log.Info().Msg("River queue started")
|
||||||
|
|
||||||
|
pollWorker.RecoverOrphanedDownloads(ctx)
|
||||||
|
|
||||||
return &riverSetup{
|
return &riverSetup{
|
||||||
client: riverClient,
|
client: riverClient,
|
||||||
cacheRefreshWorker: cacheWorker,
|
cacheRefreshWorker: cacheWorker,
|
||||||
|
|||||||
@@ -107,6 +107,27 @@ func (r *DownloadRepository) GetByAlbumID(ctx context.Context, albumID string) (
|
|||||||
return downloads, nil
|
return downloads, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) GetActive(ctx context.Context) ([]*Download, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, torrent_id, album_id, format, quality, state, qbit_hash, save_path, error_message, queued_at, started_at, completed_at, created_at, updated_at
|
||||||
|
FROM downloads WHERE state IN ('pending', 'downloading') ORDER BY created_at`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing active downloads: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var downloads []*Download
|
||||||
|
for rows.Next() {
|
||||||
|
d := &Download{}
|
||||||
|
if err := rows.Scan(&d.ID, &d.TorrentID, &d.AlbumID, &d.Format, &d.Quality, &d.State, &d.QbitHash, &d.SavePath, &d.ErrorMessage, &d.QueuedAt, &d.StartedAt, &d.CompletedAt, &d.CreatedAt, &d.UpdatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning download: %w", err)
|
||||||
|
}
|
||||||
|
downloads = append(downloads, d)
|
||||||
|
}
|
||||||
|
return downloads, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *DownloadRepository) HasAlbumInQuality(ctx context.Context, albumID string, format string, quality string) (bool, error) {
|
func (r *DownloadRepository) HasAlbumInQuality(ctx context.Context, albumID string, format string, quality string) (bool, error) {
|
||||||
var exists bool
|
var exists bool
|
||||||
err := r.pool.QueryRow(ctx,
|
err := r.pool.QueryRow(ctx,
|
||||||
|
|||||||
+77
-6
@@ -297,13 +297,13 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, req.GetAlbumId())
|
dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, album.GetId())
|
||||||
if dbAlbum != nil {
|
if dbAlbum != nil {
|
||||||
qualityStr := normalizeQuality(req.GetQuality(), 0, 0)
|
qualityStr := normalizeQuality(req.GetQuality(), 0, 0)
|
||||||
owned, err := service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, req.GetQuality().String(), qualityStr)
|
owned, err := service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, req.GetQuality().String(), qualityStr)
|
||||||
if err == nil && owned {
|
if err == nil && owned {
|
||||||
log.Info().Str("album", dbAlbum.Title).Str("quality", qualityStr).Msg("album already owned in requested quality")
|
log.Info().Str("album", dbAlbum.Title).Str("quality", qualityStr).Msg("album already owned in requested quality")
|
||||||
return &pb.MonitorAlbumResponse{}, nil
|
return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -317,7 +317,7 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.
|
|||||||
filtered := filterByQuality(parsed, req.GetQuality())
|
filtered := filterByQuality(parsed, req.GetQuality())
|
||||||
if len(filtered) == 0 {
|
if len(filtered) == 0 {
|
||||||
log.Warn().Str("album", album.GetTitle()).Str("quality", req.GetQuality().String()).Msg("no releases match quality filter")
|
log.Warn().Str("album", album.GetTitle()).Str("quality", req.GetQuality().String()).Msg("no releases match quality filter")
|
||||||
return &pb.MonitorAlbumResponse{}, nil
|
return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
best := selectBestRelease(filtered)
|
best := selectBestRelease(filtered)
|
||||||
@@ -326,15 +326,14 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbAlbum, _ = service.metadata.GetAlbumByExternalID(ctx, album.GetId())
|
||||||
if dbAlbum != nil {
|
if dbAlbum != nil {
|
||||||
service.saveTorrentAndDownload(ctx, dbAlbum.ID, best)
|
service.saveTorrentAndDownload(ctx, dbAlbum.ID, best)
|
||||||
} else {
|
} else {
|
||||||
log.Warn().Str("album_id", req.GetAlbumId()).Msg("album not in DB, skipping torrent/download persistence")
|
log.Warn().Str("album_id", req.GetAlbumId()).Msg("album not in DB, skipping torrent/download persistence")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &pb.MonitorAlbumResponse{
|
return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, &best), nil
|
||||||
Release: buildMonitoredRelease(best),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) {
|
func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) {
|
||||||
@@ -515,6 +514,16 @@ func (service *MusicAgregatorService) saveTorrentAndDownload(ctx context.Context
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
existingDownloads, err := service.downloads.GetByAlbumID(ctx, dbAlbumID)
|
||||||
|
if err == nil {
|
||||||
|
for _, d := range existingDownloads {
|
||||||
|
if d.TorrentID == savedTorrent.ID && d.State != "failed" {
|
||||||
|
log.Info().Str("hash", best.rel.InfoHash).Str("state", d.State).Msg("active download already exists, skipping")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
download := &database.Download{
|
download := &database.Download{
|
||||||
TorrentID: savedTorrent.ID,
|
TorrentID: savedTorrent.ID,
|
||||||
AlbumID: dbAlbumID,
|
AlbumID: dbAlbumID,
|
||||||
@@ -583,6 +592,68 @@ func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (service *MusicAgregatorService) buildMonitorAlbumResponse(ctx context.Context, metadataAlbum *metadataPb.Album, dbAlbum *database.Album, best *parsedItem) *pb.MonitorAlbumResponse {
|
||||||
|
resp := &pb.MonitorAlbumResponse{}
|
||||||
|
|
||||||
|
if best != nil {
|
||||||
|
resp.Release = buildMonitoredRelease(*best)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dbAlbum != nil {
|
||||||
|
resp.Album = service.buildAlbumDetail(ctx, dbAlbum)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(metadataAlbum.GetArtists()) > 0 {
|
||||||
|
dbArtist, err := service.metadata.GetArtistByExternalID(ctx, metadataAlbum.GetArtists()[0].GetArtist().GetId())
|
||||||
|
if err == nil {
|
||||||
|
resp.Artist = &pb.ArtistSummary{
|
||||||
|
Id: dbArtist.ID,
|
||||||
|
ExternalId: dbArtist.ExternalID,
|
||||||
|
Name: dbArtist.Name,
|
||||||
|
ArtistType: dbArtist.ArtistType,
|
||||||
|
Country: dbArtist.Country,
|
||||||
|
Genres: dbArtist.Genres,
|
||||||
|
ImageUrl: dbArtist.ImageURL,
|
||||||
|
MonitorState: toProtoMonitorState(dbArtist.MonitorState),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (service *MusicAgregatorService) buildAlbumDetail(ctx context.Context, dbAlbum *database.Album) *pb.AlbumDetail {
|
||||||
|
detail := &pb.AlbumDetail{
|
||||||
|
Id: dbAlbum.ID,
|
||||||
|
ExternalId: dbAlbum.ExternalID,
|
||||||
|
Title: dbAlbum.Title,
|
||||||
|
AlbumType: dbAlbum.AlbumType,
|
||||||
|
TotalTracks: int32(dbAlbum.TotalTracks),
|
||||||
|
TotalDiscs: int32(dbAlbum.TotalDiscs),
|
||||||
|
Label: dbAlbum.Label,
|
||||||
|
Genres: dbAlbum.Genres,
|
||||||
|
CoverUrl: dbAlbum.CoverURL,
|
||||||
|
MonitorState: toProtoMonitorState(dbAlbum.MonitorState),
|
||||||
|
}
|
||||||
|
|
||||||
|
if dbAlbum.ReleaseDate != nil {
|
||||||
|
detail.ReleaseDate = dbAlbum.ReleaseDate.Format("2006-01-02")
|
||||||
|
}
|
||||||
|
|
||||||
|
downloads, err := service.downloads.GetByAlbumID(ctx, dbAlbum.ID)
|
||||||
|
if err == nil && len(downloads) > 0 {
|
||||||
|
best := downloads[0]
|
||||||
|
detail.Download = &pb.DownloadInfo{
|
||||||
|
State: best.State,
|
||||||
|
Format: best.Format,
|
||||||
|
Quality: best.Quality,
|
||||||
|
SavePath: best.SavePath,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return detail
|
||||||
|
}
|
||||||
|
|
||||||
func toProtoMonitorState(state database.MonitorState) pb.MonitorState {
|
func toProtoMonitorState(state database.MonitorState) pb.MonitorState {
|
||||||
switch state {
|
switch state {
|
||||||
case database.Monitored:
|
case database.Monitored:
|
||||||
|
|||||||
@@ -115,6 +115,36 @@ func (w *PollDownloadWorker) reschedule(ctx context.Context, args PollDownloadAr
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *PollDownloadWorker) RecoverOrphanedDownloads(ctx context.Context) {
|
||||||
|
active, err := w.Downloads.GetActive(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to query active downloads for recovery")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(active) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, d := range active {
|
||||||
|
_, err := w.RiverClient.Insert(ctx, PollDownloadArgs{
|
||||||
|
DownloadID: d.ID,
|
||||||
|
TorrentHash: d.QbitHash,
|
||||||
|
CheckInterval: 30 * time.Second,
|
||||||
|
}, &river.InsertOpts{
|
||||||
|
ScheduledAt: time.Now().Add(5 * time.Second),
|
||||||
|
UniqueOpts: river.UniqueOpts{
|
||||||
|
ByArgs: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Str("download_id", d.ID).Msg("failed to reschedule orphaned download")
|
||||||
|
} else {
|
||||||
|
log.Info().Str("download_id", d.ID).Str("hash", d.QbitHash).Msg("recovered orphaned download poll job")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var audioExtensions = map[string]bool{
|
var audioExtensions = map[string]bool{
|
||||||
".flac": true, ".mp3": true, ".aac": true, ".m4a": true,
|
".flac": true, ".mp3": true, ".aac": true, ".m4a": true,
|
||||||
".ape": true, ".wv": true, ".ogg": true, ".wav": true, ".alac": true,
|
".ape": true, ".wv": true, ".ogg": true, ".wav": true, ".alac": true,
|
||||||
|
|||||||
@@ -25,7 +25,9 @@ enum QualityType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message MonitorAlbumResponse {
|
message MonitorAlbumResponse {
|
||||||
MonitoredRelease release = 1;
|
AlbumDetail album = 1;
|
||||||
|
ArtistSummary artist = 2;
|
||||||
|
MonitoredRelease release = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message GetArtistsRequest {}
|
message GetArtistsRequest {}
|
||||||
|
|||||||
Reference in New Issue
Block a user