From f5e2f764b543a45b750f92f749cea2a0923d4b12 Mon Sep 17 00:00:00 2001 From: Alexander Date: Sun, 10 May 2026 00:06:58 +0200 Subject: [PATCH] Optimize GetArtists: parallel artist processing, batch album upserts, batch download lookups, retry on metadata calls --- internal/database/album_repository.go | 41 ++++++++++++ internal/database/download_repository.go | 25 ++++++++ internal/metadata/service.go | 34 ++++++++++ internal/service.go | 79 ++++++++++++++++-------- 4 files changed, 152 insertions(+), 27 deletions(-) diff --git a/internal/database/album_repository.go b/internal/database/album_repository.go index be14df3..78f18ae 100644 --- a/internal/database/album_repository.go +++ b/internal/database/album_repository.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) @@ -60,6 +61,46 @@ func (r *AlbumRepository) Create(ctx context.Context, a *Album) error { return nil } +func (r *AlbumRepository) CreateBatch(ctx context.Context, albums []*Album) error { + if len(albums) == 0 { + return nil + } + + batch := &pgx.Batch{} + for _, a := range albums { + batch.Queue( + `INSERT INTO albums (external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, monitor_state) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (external_id) DO UPDATE SET + title = EXCLUDED.title, + album_type = EXCLUDED.album_type, + release_date = EXCLUDED.release_date, + total_tracks = EXCLUDED.total_tracks, + total_discs = EXCLUDED.total_discs, + label = EXCLUDED.label, + genres = EXCLUDED.genres, + cover_url = EXCLUDED.cover_url, + monitor_state = CASE + WHEN albums.monitor_state = 'excluded' THEN albums.monitor_state + WHEN albums.monitor_state = 'monitored' THEN albums.monitor_state + ELSE EXCLUDED.monitor_state + END, + updated_at = NOW()`, + a.ExternalID, a.ArtistID, a.Title, a.AlbumType, a.ReleaseDate, a.TotalTracks, a.TotalDiscs, a.Label, a.Genres, a.CoverURL, a.MonitorState, + ) + } + + results := r.pool.SendBatch(ctx, batch) + defer results.Close() + + for range albums { + if _, err := results.Exec(); err != nil { + return fmt.Errorf("batch creating album: %w", err) + } + } + return nil +} + func (r *AlbumRepository) GetByExternalID(ctx context.Context, externalID string) (*Album, error) { a := &Album{} err := r.pool.QueryRow(ctx, diff --git a/internal/database/download_repository.go b/internal/database/download_repository.go index c4d0d6f..528e5ed 100644 --- a/internal/database/download_repository.go +++ b/internal/database/download_repository.go @@ -153,6 +153,31 @@ func (r *DownloadRepository) GetByID(ctx context.Context, id string) (*Download, return d, nil } +func (r *DownloadRepository) GetLatestByAlbumIDs(ctx context.Context, albumIDs []string) (map[string]*Download, error) { + if len(albumIDs) == 0 { + return nil, nil + } + + rows, err := r.pool.Query(ctx, + `SELECT DISTINCT ON (album_id) id, torrent_id, album_id, format, quality, state, qbit_hash, save_path, error_message, queued_at, started_at, completed_at, created_at, updated_at + FROM downloads WHERE album_id = ANY($1) ORDER BY album_id, created_at DESC`, albumIDs, + ) + if err != nil { + return nil, fmt.Errorf("batch listing downloads: %w", err) + } + defer rows.Close() + + result := make(map[string]*Download, len(albumIDs)) + for rows.Next() { + d := &Download{} + if err := rows.Scan(&d.ID, &d.TorrentID, &d.AlbumID, &d.Format, &d.Quality, &d.State, &d.QbitHash, &d.SavePath, &d.ErrorMessage, &d.QueuedAt, &d.StartedAt, &d.CompletedAt, &d.CreatedAt, &d.UpdatedAt); err != nil { + return nil, fmt.Errorf("scanning download: %w", err) + } + result[d.AlbumID] = d + } + return result, nil +} + func (r *DownloadRepository) HasAlbumInQuality(ctx context.Context, albumID string, format string, quality string) (bool, error) { var exists bool err := r.pool.QueryRow(ctx, diff --git a/internal/metadata/service.go b/internal/metadata/service.go index a2c2b2e..bda0381 100644 --- a/internal/metadata/service.go +++ b/internal/metadata/service.go @@ -117,6 +117,40 @@ func (s *MetadataService) PersistAlbum(ctx context.Context, album *metadataPb.Al s.PersistAlbumForArtist(ctx, album, "", state) } +func (s *MetadataService) PersistAlbumsForArtist(ctx context.Context, metadataAlbums []*metadataPb.Album, artistDBID string, state database.MonitorState) { + if len(metadataAlbums) == 0 || artistDBID == "" { + return + } + + dbAlbums := make([]*database.Album, 0, len(metadataAlbums)) + for _, album := range metadataAlbums { + var genres []string + for _, g := range album.GetGenres() { + genres = append(genres, g.GetName()) + } + labelName := "" + if album.GetLabel() != nil { + labelName = album.GetLabel().GetName() + } + dbAlbums = append(dbAlbums, &database.Album{ + ExternalID: album.GetId(), + ArtistID: artistDBID, + Title: album.GetTitle(), + AlbumType: album.GetAlbumType(), + TotalTracks: int(album.GetTotalTracks()), + TotalDiscs: int(album.GetTotalDiscs()), + Label: labelName, + Genres: genres, + CoverURL: album.GetCoverUrl(), + MonitorState: state, + }) + } + + if err := s.albums.CreateBatch(ctx, dbAlbums); err != nil { + log.Warn().Err(err).Int("count", len(dbAlbums)).Msg("failed to batch persist albums") + } +} + func (s *MetadataService) PersistAlbumForArtist(ctx context.Context, album *metadataPb.Album, artistDBID string, state database.MonitorState) { if artistDBID == "" { if len(album.GetArtists()) > 0 { diff --git a/internal/service.go b/internal/service.go index 98eab89..71ba522 100644 --- a/internal/service.go +++ b/internal/service.go @@ -6,11 +6,13 @@ import ( "io" "net/http" "strings" + "sync" "time" "github.com/jackc/pgx/v5" "github.com/riverqueue/river" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" metadataPb "homelab.lan/music-agregator/gen/metadata/v1" pb "homelab.lan/music-agregator/gen/music_agregator/v1" @@ -125,38 +127,59 @@ func (service *MusicAgregatorService) GetArtists(ctx context.Context, _ *pb.GetA return nil, fmt.Errorf("listing artists: %w", err) } - artists := make([]*pb.ArtistSummary, 0, len(dbArtists)) - for _, a := range dbArtists { - albums, err := service.buildAlbumsForArtist(ctx, a) - if err != nil { - log.Warn().Err(err).Str("artist", a.Name).Msg("failed to build album details, returning artist without albums") - } + artists := make([]*pb.ArtistSummary, len(dbArtists)) + var mu sync.Mutex + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(5) - artists = append(artists, &pb.ArtistSummary{ - Id: a.ID, - ExternalId: a.ExternalID, - Name: a.Name, - ArtistType: a.ArtistType, - Country: a.Country, - Genres: a.Genres, - ImageUrl: a.ImageURL, - MonitorState: toProtoMonitorState(a.MonitorState), - Albums: albums, + for i, a := range dbArtists { + i, a := i, a + g.Go(func() error { + albums, err := service.buildAlbumsForArtist(gCtx, a) + if err != nil { + log.Warn().Err(err).Str("artist", a.Name).Msg("failed to build album details, returning artist without albums") + } + + summary := &pb.ArtistSummary{ + Id: a.ID, + ExternalId: a.ExternalID, + Name: a.Name, + ArtistType: a.ArtistType, + Country: a.Country, + Genres: a.Genres, + ImageUrl: a.ImageURL, + MonitorState: toProtoMonitorState(a.MonitorState), + Albums: albums, + } + + mu.Lock() + artists[i] = summary + mu.Unlock() + return nil }) } + _ = g.Wait() + return &pb.GetArtistsResponse{Artists: artists}, nil } func (service *MusicAgregatorService) buildAlbumsForArtist(ctx context.Context, artist *database.Artist) ([]*pb.AlbumDetail, error) { - metadataAlbums, err := service.metadata.GetArtistAlbums(ctx, artist.ExternalID) + var metadataAlbums []*metadataPb.Album + var err error + for attempt := 0; attempt < 3; attempt++ { + metadataAlbums, err = service.metadata.GetArtistAlbums(ctx, artist.ExternalID) + if err == nil { + break + } + log.Warn().Err(err).Int("attempt", attempt+1).Str("artist", artist.Name).Msg("metadata GetArtistAlbums failed, retrying") + time.Sleep(time.Duration(attempt+1) * 200 * time.Millisecond) + } if err != nil { return nil, fmt.Errorf("fetching metadata albums: %w", err) } - for _, ma := range metadataAlbums { - service.metadata.PersistAlbumForArtist(ctx, ma, artist.ID, database.Unmonitored) - } + service.metadata.PersistAlbumsForArtist(ctx, metadataAlbums, artist.ID, database.Unmonitored) dbAlbums, err := service.metadata.GetAlbumsByArtistID(ctx, artist.ID) if err != nil { @@ -165,10 +188,14 @@ func (service *MusicAgregatorService) buildAlbumsForArtist(ctx context.Context, } dbAlbumsByExternalID := make(map[string]*database.Album, len(dbAlbums)) + albumIDs := make([]string, 0, len(dbAlbums)) for _, a := range dbAlbums { dbAlbumsByExternalID[a.ExternalID] = a + albumIDs = append(albumIDs, a.ID) } + downloadsByAlbumID, _ := service.downloads.GetLatestByAlbumIDs(ctx, albumIDs) + albums := make([]*pb.AlbumDetail, 0, len(metadataAlbums)) for _, ma := range metadataAlbums { detail := &pb.AlbumDetail{ @@ -192,14 +219,12 @@ func (service *MusicAgregatorService) buildAlbumsForArtist(ctx context.Context, detail.Id = dbAlbum.ID detail.MonitorState = toProtoMonitorState(dbAlbum.MonitorState) - downloads, err := service.downloads.GetByAlbumID(ctx, dbAlbum.ID) - if err == nil && len(downloads) > 0 { - best := downloads[0] + if d, ok := downloadsByAlbumID[dbAlbum.ID]; ok { detail.Download = &pb.DownloadInfo{ - State: best.State, - Format: best.Format, - Quality: best.Quality, - SavePath: derefStr(best.SavePath), + State: d.State, + Format: d.Format, + Quality: d.Quality, + SavePath: derefStr(d.SavePath), } } } else {