package internal import ( "context" "fmt" "io" "net/http" "strings" "sync" "time" "github.com/jackc/pgx/v5" "github.com/riverqueue/river" "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" metadataPb "homelab.lan/music-agregator/gen/metadata/v1" pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/analysis" "homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/database" "homelab.lan/music-agregator/internal/indexer" "homelab.lan/music-agregator/internal/metadata" "homelab.lan/music-agregator/internal/release" "homelab.lan/music-agregator/internal/torrent" torrentParser "homelab.lan/music-agregator/internal/tracker" "homelab.lan/music-agregator/internal/workers" ) type parsedItem struct { item *indexer.SearchItemResult rel *release.Release torrentData []byte } type MusicAgregatorService struct { config config.Config metadata *metadata.MetadataService indexer indexer.Searcher torrentClient torrent.TorrentClient magnetResolver torrentParser.Resolver riverClient *river.Client[pgx.Tx] pathMapper *torrent.PathMapper torrents *database.TorrentRepository downloads *database.DownloadRepository artists *database.ArtistRepository downloadFiles *database.DownloadFileRepository albumReleases *database.AlbumReleaseRepository trackReleases *database.TrackReleaseRepository analyzer *analysis.ReleaseAnalyzer workflowRuns *database.WorkflowRunRepository albumEvents *database.AlbumEventRepository shutdownCtx context.Context shutdownCancel context.CancelFunc } func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx], torrentClient torrent.TorrentClient, pathMapper *torrent.PathMapper, db *database.DB) (*MusicAgregatorService, error) { idx, err := indexer.NewIndexerService(cfg, riverClient, nil) if err != nil { log.Err(err).Msg("failed to create IndexerService") return nil, err } metadataClient, _, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint) if err != nil { log.Err(err).Msg("failed to create metadata client") return nil, err } magnetResolver, err := torrentParser.NewMagnetResolver(30 * time.Second) if err != nil { log.Err(err).Msg("failed to create magnet resolver") return nil, err } ctx, cancel := context.WithCancel(context.Background()) return &MusicAgregatorService{ config: cfg, metadata: metadata.NewMetadataService(metadataClient, db), indexer: idx, torrentClient: torrentClient, magnetResolver: magnetResolver, riverClient: riverClient, pathMapper: pathMapper, torrents: database.NewTorrentRepository(db.Pool), downloads: database.NewDownloadRepository(db.Pool), artists: database.NewArtistRepository(db.Pool), downloadFiles: database.NewDownloadFileRepository(db.Pool), albumReleases: database.NewAlbumReleaseRepository(db.Pool), trackReleases: database.NewTrackReleaseRepository(db.Pool), analyzer: analysis.NewReleaseAnalyzer(db), workflowRuns: database.NewWorkflowRunRepository(db.Pool), albumEvents: database.NewAlbumEventRepository(db.Pool), shutdownCtx: ctx, shutdownCancel: cancel, }, nil } func NewMusicAgregatorServiceWithDeps( metadata *metadata.MetadataService, searcher indexer.Searcher, torrentClient torrent.TorrentClient, magnetResolver torrentParser.Resolver, riverClient *river.Client[pgx.Tx], pathMapper *torrent.PathMapper, db *database.DB, ) *MusicAgregatorService { ctx, cancel := context.WithCancel(context.Background()) return &MusicAgregatorService{ metadata: metadata, indexer: searcher, torrentClient: torrentClient, magnetResolver: magnetResolver, riverClient: riverClient, pathMapper: pathMapper, torrents: database.NewTorrentRepository(db.Pool), downloads: database.NewDownloadRepository(db.Pool), artists: database.NewArtistRepository(db.Pool), downloadFiles: database.NewDownloadFileRepository(db.Pool), albumReleases: database.NewAlbumReleaseRepository(db.Pool), trackReleases: database.NewTrackReleaseRepository(db.Pool), analyzer: analysis.NewReleaseAnalyzer(db), workflowRuns: database.NewWorkflowRunRepository(db.Pool), albumEvents: database.NewAlbumEventRepository(db.Pool), shutdownCtx: ctx, shutdownCancel: cancel, } } func (s *MusicAgregatorService) Close() { if s.shutdownCancel != nil { s.shutdownCancel() } if closer, ok := s.magnetResolver.(interface{ Close() }); ok { closer.Close() } } func (s *MusicAgregatorService) RecoverWorkflows(ctx context.Context) { stale, err := s.workflowRuns.GetRunning(ctx) if err != nil { log.Error().Err(err).Msg("failed to query stale workflow runs for recovery") return } if len(stale) == 0 { return } for _, run := range stale { downloads, err := s.downloads.GetByAlbumID(ctx, run.AlbumID) if err != nil { log.Error().Err(err).Str("workflow_run_id", run.ID).Msg("failed to query downloads for recovery") s.workflowRuns.SetFailed(ctx, run.ID, "recovery: failed to query downloads") continue } hasActive := false for _, d := range downloads { if d.State == "downloading" || d.State == "completed" || d.State == "seeding" { hasActive = true break } } if hasActive { s.workflowRuns.SetCompleted(ctx, run.ID) log.Info().Str("workflow_run_id", run.ID).Str("album_id", run.AlbumID).Msg("recovered stale workflow as completed") } else { s.workflowRuns.SetFailed(ctx, run.ID, "server restarted during workflow") log.Warn().Str("workflow_run_id", run.ID).Str("album_id", run.AlbumID).Msg("recovered stale workflow as failed") } } } func (service *MusicAgregatorService) GetArtists(ctx context.Context, _ *pb.GetArtistsRequest) (*pb.GetArtistsResponse, error) { dbArtists, err := service.artists.GetAll(ctx) if err != nil { log.Error().Err(err).Msg("failed to list artists") return nil, fmt.Errorf("listing artists: %w", err) } artists := make([]*pb.ArtistSummary, len(dbArtists)) var mu sync.Mutex g, gCtx := errgroup.WithContext(ctx) g.SetLimit(5) for i, a := range dbArtists { i, a := i, a g.Go(func() error { albums, err := service.buildAlbumsForArtist(gCtx, a) if err != nil { log.Warn().Err(err).Str("artist", a.Name).Msg("failed to build album details, returning artist without albums") } summary := &pb.ArtistSummary{ Id: a.ID, ExternalId: a.ExternalID, Name: a.Name, ArtistType: a.ArtistType, Country: a.Country, Genres: a.Genres, ImageUrl: a.ImageURL, MonitorState: toProtoMonitorState(a.MonitorState), Albums: albums, } mu.Lock() artists[i] = summary mu.Unlock() return nil }) } _ = g.Wait() return &pb.GetArtistsResponse{Artists: artists}, nil } func (service *MusicAgregatorService) buildAlbumsForArtist(ctx context.Context, artist *database.Artist) ([]*pb.AlbumDetail, error) { var metadataAlbums []*metadataPb.Album var err error for attempt := 0; attempt < 3; attempt++ { metadataAlbums, err = service.metadata.GetArtistAlbums(ctx, artist.ExternalID) if err == nil { break } log.Warn().Err(err).Int("attempt", attempt+1).Str("artist", artist.Name).Msg("metadata GetArtistAlbums failed, retrying") time.Sleep(time.Duration(attempt+1) * 200 * time.Millisecond) } if err != nil { return nil, fmt.Errorf("fetching metadata albums: %w", err) } service.metadata.PersistAlbumsForArtist(ctx, metadataAlbums, artist.ID, database.Unmonitored) dbAlbums, err := service.metadata.GetAlbumsByArtistID(ctx, artist.ID) if err != nil { log.Warn().Err(err).Str("artist_id", artist.ID).Msg("failed to get local albums") dbAlbums = nil } dbAlbumsByExternalID := make(map[string]*database.Album, len(dbAlbums)) albumIDs := make([]string, 0, len(dbAlbums)) for _, a := range dbAlbums { dbAlbumsByExternalID[a.ExternalID] = a albumIDs = append(albumIDs, a.ID) } downloadsByAlbumID, _ := service.downloads.GetLatestByAlbumIDs(ctx, albumIDs) albums := make([]*pb.AlbumDetail, 0, len(metadataAlbums)) for _, ma := range metadataAlbums { detail := &pb.AlbumDetail{ ExternalId: ma.GetId(), Title: ma.GetTitle(), AlbumType: ma.GetAlbumType(), ReleaseDate: ma.GetReleaseDate(), TotalTracks: ma.GetTotalTracks(), TotalDiscs: ma.GetTotalDiscs(), CoverUrl: ma.GetCoverUrl(), } if ma.GetLabel() != nil { detail.Label = ma.GetLabel().GetName() } for _, g := range ma.GetGenres() { detail.Genres = append(detail.Genres, g.GetName()) } if dbAlbum, ok := dbAlbumsByExternalID[ma.GetId()]; ok { detail.Id = dbAlbum.ID detail.MonitorState = toProtoMonitorState(dbAlbum.MonitorState) if d, ok := downloadsByAlbumID[dbAlbum.ID]; ok { detail.Download = &pb.DownloadInfo{ State: d.State, Format: d.Format, Quality: d.Quality, SavePath: derefStr(d.SavePath), } } } else { detail.MonitorState = pb.MonitorState_MONITOR_STATE_UNMONITORED } albums = append(albums, detail) } return albums, nil } func (service *MusicAgregatorService) GetAlbum(ctx context.Context, req *pb.GetAlbumRequest) (*pb.GetAlbumResponse, error) { dbAlbum, err := service.metadata.GetAlbumByID(ctx, req.GetAlbumId()) if err != nil { return nil, fmt.Errorf("album not found: %w", err) } info, err := service.buildAlbumInfo(ctx, dbAlbum) if err != nil { return nil, err } return &pb.GetAlbumResponse{Info: info}, nil } func (service *MusicAgregatorService) buildAlbumInfo(ctx context.Context, dbAlbum *database.Album) (*pb.AlbumInfo, error) { metadataAlbum, err := service.metadata.GetAlbum(ctx, dbAlbum.ExternalID) if err != nil { log.Error().Err(err).Str("album_id", dbAlbum.ExternalID).Msg("failed to get album from metadata") return nil, fmt.Errorf("fetching album: %w", err) } metadataTracks, err := service.metadata.GetAlbumTracks(ctx, dbAlbum.ExternalID) if err != nil { log.Warn().Err(err).Str("album_id", dbAlbum.ExternalID).Msg("failed to get tracks from metadata") } service.metadata.PersistTracks(ctx, dbAlbum.ID, metadataTracks) album := &pb.AlbumDetail{ Id: dbAlbum.ID, ExternalId: metadataAlbum.GetId(), Title: metadataAlbum.GetTitle(), AlbumType: metadataAlbum.GetAlbumType(), ReleaseDate: metadataAlbum.GetReleaseDate(), TotalTracks: metadataAlbum.GetTotalTracks(), TotalDiscs: metadataAlbum.GetTotalDiscs(), CoverUrl: metadataAlbum.GetCoverUrl(), MonitorState: toProtoMonitorState(dbAlbum.MonitorState), } if metadataAlbum.GetLabel() != nil { album.Label = metadataAlbum.GetLabel().GetName() } for _, g := range metadataAlbum.GetGenres() { album.Genres = append(album.Genres, g.GetName()) } downloads, err := service.downloads.GetByAlbumID(ctx, dbAlbum.ID) if err == nil && len(downloads) > 0 { best := downloads[0] album.Download = &pb.DownloadInfo{ State: best.State, Format: best.Format, Quality: best.Quality, SavePath: derefStr(best.SavePath), } } var downloadFilesByTrackID map[string]*database.DownloadFile if album.Download != nil { files, err := service.downloadFiles.GetByDownloadID(ctx, downloads[0].ID) if err == nil { downloadFilesByTrackID = make(map[string]*database.DownloadFile, len(files)) for _, f := range files { if f.TrackID != nil { downloadFilesByTrackID[*f.TrackID] = f } } } } dbTracks, _ := service.metadata.GetTracksByAlbumID(ctx, dbAlbum.ID) dbTracksByExternalID := make(map[string]*database.Track, len(dbTracks)) for _, t := range dbTracks { dbTracksByExternalID[t.ExternalID] = t } var trackReleasesByTrackID map[string]*database.TrackRelease albumReleases, err := service.albumReleases.GetByAlbumID(ctx, dbAlbum.ID) if err == nil && len(albumReleases) > 0 { ar := albumReleases[0] album.Release = &pb.AlbumReleaseDetail{ Id: ar.ID, Format: ar.Format, Channels: int32(ar.Channels), IsLossless: ar.IsLossless, TotalSize: ar.TotalSize, TotalDurationMs: int32(ar.TotalDurationMs), TrackCount: int32(ar.TrackCount), HasCoverArt: ar.HasCoverArt, HasCueSheet: ar.HasCueSheet, HasRipLog: ar.HasRipLog, Path: ar.Path, } if ar.BitDepth != nil { album.Release.BitDepth = int32(*ar.BitDepth) } if ar.SampleRate != nil { album.Release.SampleRate = int32(*ar.SampleRate) } if ar.Source != nil { album.Release.Source = *ar.Source } trs, err := service.trackReleases.GetByAlbumReleaseID(ctx, ar.ID) if err == nil { trackReleasesByTrackID = make(map[string]*database.TrackRelease, len(trs)) for _, tr := range trs { if tr.TrackID != nil { trackReleasesByTrackID[*tr.TrackID] = tr } } } } tracks := make([]*pb.TrackDetail, 0, len(metadataTracks)) for _, mt := range metadataTracks { td := &pb.TrackDetail{ ExternalId: mt.GetId(), Title: mt.GetTitle(), DurationMs: mt.GetDurationMs(), DiscNumber: mt.GetDiscNumber(), TrackNumber: mt.GetTrackNumber(), Isrc: mt.GetIsrc(), Explicit: mt.GetExplicit(), } for _, ac := range mt.GetArtists() { td.Artists = append(td.Artists, &pb.ArtistCredit{ Id: ac.GetArtist().GetId(), Name: ac.GetArtist().GetName(), }) } if dbTrack, ok := dbTracksByExternalID[mt.GetId()]; ok { td.Id = dbTrack.ID if tr, ok := trackReleasesByTrackID[dbTrack.ID]; ok { td.FilePath = tr.FilePath td.FileSize = tr.FileSize td.Format = tr.Format td.Channels = int32(tr.Channels) if tr.DurationMs != nil { td.DurationMs = int32(*tr.DurationMs) } if tr.BitDepth != nil { td.BitDepth = int32(*tr.BitDepth) } if tr.SampleRate != nil { td.SampleRate = int32(*tr.SampleRate) } if tr.BitrateKbps != nil { td.BitrateKbps = int32(*tr.BitrateKbps) } } else if df, ok := downloadFilesByTrackID[dbTrack.ID]; ok { td.FilePath = df.FilePath td.FileSize = df.FileSize td.Format = df.FileType } } tracks = append(tracks, td) } return &pb.AlbumInfo{ Album: album, Tracks: tracks, }, nil } func (service *MusicAgregatorService) AnalyzeAlbumRelease(ctx context.Context, req *pb.AnalyzeAlbumReleaseRequest) (*pb.AnalyzeAlbumReleaseResponse, error) { dbAlbum, err := service.metadata.GetAlbumByID(ctx, req.GetAlbumId()) if err != nil { return nil, fmt.Errorf("album not found: %w", err) } downloads, err := service.downloads.GetByAlbumID(ctx, dbAlbum.ID) if err != nil || len(downloads) == 0 { return nil, fmt.Errorf("no downloads found for album") } var download *database.Download for _, d := range downloads { if d.State == "completed" || d.State == "seeding" { download = d break } } if download == nil { return nil, fmt.Errorf("no completed download found for album") } contentPath := "" existingRelease, err := service.albumReleases.GetByDownloadID(ctx, download.ID) if err == nil { contentPath = existingRelease.Path } if contentPath == "" && download.QbitHash != "" { torrents, err := service.torrentClient.Find(torrent.FindOptions{Hash: download.QbitHash}) if err == nil && len(torrents) > 0 { contentPath = torrents[0].ContentPath if service.pathMapper != nil { contentPath = service.pathMapper.ToHost(contentPath) } } } if contentPath == "" { return nil, fmt.Errorf("cannot determine content path for download") } _, _, err = service.analyzer.AnalyzeAndPersist(ctx, download.ID, contentPath) if err != nil { return nil, fmt.Errorf("analyzing release: %w", err) } info, err := service.buildAlbumInfo(ctx, dbAlbum) if err != nil { return nil, err } return &pb.AnalyzeAlbumReleaseResponse{Info: info}, nil } func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) { album, err := service.metadata.GetAlbum(ctx, req.GetAlbumId()) if err != nil { log.Error().Err(err).Str("album_id", req.GetAlbumId()).Msg("failed to get album") return nil, err } dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, album.GetId()) if dbAlbum != nil { service.metadata.SetAlbumMonitorState(ctx, dbAlbum.ID, database.Monitored) dbAlbum.MonitorState = database.Monitored qualityStr := normalizeQuality(req.GetQuality(), 0, 0) owned, err := service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, req.GetQuality().String(), qualityStr) if err == nil && owned { log.Info().Str("album", dbAlbum.Title).Str("quality", qualityStr).Msg("album already owned in requested quality") return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil), nil } } searchResult, err := service.searchIndexer(album, req.GetIndexerOptions().GetTracker()) if err != nil { return nil, err } parsed := service.parseSearchResults(searchResult, album) filtered := filterByQuality(parsed, req.GetQuality()) if len(filtered) == 0 { log.Warn().Str("album", album.GetTitle()).Str("quality", req.GetQuality().String()).Msg("no releases match quality filter") return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil), nil } best := selectBestRelease(filtered) if err := service.addToTorrentClient(best); err != nil { return nil, err } dbAlbum, _ = service.metadata.GetAlbumByExternalID(ctx, album.GetId()) if dbAlbum != nil { service.saveTorrentAndDownload(ctx, dbAlbum.ID, best) } else { log.Warn().Str("album_id", req.GetAlbumId()).Msg("album not in DB, skipping torrent/download persistence") } return service.buildMonitorAlbumResponse(ctx, album, dbAlbum, &best), nil } func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) { artistName := "" if len(album.GetArtists()) > 0 { artistName = album.GetArtists()[0].GetArtist().GetName() } query := album.GetTitle() if artistName != "" { query = artistName + " " + query } if tracker == "" { tracker = "all" } result, err := service.indexer.Search(query, -1, tracker) if err != nil { log.Error().Err(err).Str("query", query).Msg("indexer search failed") return nil, err } log.Debug().Int("results", len(result.Items)).Str("query", query).Msg("indexer search completed") return result, nil } func (service *MusicAgregatorService) parseSearchResults(searchResult *indexer.SearchResponse, album *metadataPb.Album) []parsedItem { parser := torrentParser.NewGenericParser() var parsed []parsedItem for _, item := range searchResult.Items { if item.DownloadLink == "" { log.Trace().Str("title", item.Title).Msg("skipping item without download link") continue } if item.Seeders == 0 { log.Warn().Str("title", item.Title).Str("tracker", item.Tracker).Msg("skipping torrent with no seeders") continue } r, torrentData := service.resolveRelease(parser, item, album) log.Debug(). Str("title", item.Title). Str("format", r.Format.String()). Int("tracks", r.TrackCount). Bool("lossless", r.Format.IsLossless()). Int("seeders", item.Seeders). Str("tracker", item.Tracker). Msg("release parsed") parsed = append(parsed, parsedItem{item: item, rel: r, torrentData: torrentData}) } log.Debug().Int("total", len(searchResult.Items)).Int("parsed", len(parsed)).Msg("parsing complete") return parsed } func (service *MusicAgregatorService) resolveRelease(parser *torrentParser.GenericParser, item *indexer.SearchItemResult, album *metadataPb.Album) (*release.Release, []byte) { if strings.HasPrefix(item.DownloadLink, "magnet:") { log.Trace().Str("title", item.Title).Int("reported_seeders", item.Seeders).Msg("resolving magnet") torrentData, err := service.magnetResolver.Resolve(item.DownloadLink) if err != nil { log.Warn().Err(err).Str("title", item.Title).Int("reported_seeders", item.Seeders).Msg("magnet resolve failed, falling back to title parse") return parser.Parse(item.Title), nil } return parser.ParseTorrent(torrentData, album), torrentData } torrentData, err := downloadTorrentData(item.DownloadLink) if err != nil { log.Warn().Err(err).Str("title", item.Title).Msg("failed to download torrent, falling back to title parse") return parser.Parse(item.Title), nil } return parser.ParseTorrent(torrentData, album), torrentData } func filterByQuality(items []parsedItem, quality pb.QualityType) []parsedItem { var filtered []parsedItem for _, p := range items { match := quality == pb.QualityType_QUALITY_UNSPECIFIED || (quality == pb.QualityType_QUALITY_LOSSLESS && p.rel.Format.IsLossless()) || (quality == pb.QualityType_QUALITY_LOSSY && !p.rel.Format.IsLossless()) if !match { log.Debug().Str("title", p.item.Title).Str("format", p.rel.Format.String()).Str("wanted", quality.String()).Msg("filtered out by quality") continue } filtered = append(filtered, p) } return filtered } func selectBestRelease(items []parsedItem) parsedItem { best := items[0] for _, p := range items[1:] { if p.item.Seeders > best.item.Seeders { best = p } } log.Info(). Str("title", best.item.Title). Str("format", best.rel.Format.String()). Int("seeders", best.item.Seeders). Str("tracker", best.item.Tracker). Str("hash", best.rel.InfoHash). Msg("best release selected") return best } func (service *MusicAgregatorService) addToTorrentClient(best parsedItem) error { if best.rel.InfoHash != "" { existing, err := service.torrentClient.Find(torrent.FindOptions{Hash: best.rel.InfoHash}) if err == nil && len(existing) > 0 { log.Info().Str("hash", best.rel.InfoHash).Str("state", existing[0].State).Msg("torrent already exists in client") return nil } } savePath := "" if service.pathMapper != nil { savePath = service.pathMapper.ContainerDownloadPath() } if strings.HasPrefix(best.item.DownloadLink, "magnet:") { if err := service.torrentClient.AddMagnet(best.item.DownloadLink, savePath); err != nil { log.Error().Err(err).Str("title", best.item.Title).Msg("failed to add magnet to client") return err } } else { if len(best.torrentData) == 0 { log.Error().Str("title", best.item.Title).Msg("no torrent data available") return fmt.Errorf("no torrent data available for best release") } if err := service.torrentClient.AddTorrent(torrent.TorrentFile{ Filename: best.rel.Album + ".torrent", Data: best.torrentData, }, savePath); err != nil { log.Error().Err(err).Str("title", best.item.Title).Msg("failed to add torrent to client") return err } } log.Info().Str("title", best.item.Title).Str("hash", best.rel.InfoHash).Msg("torrent added to client") return nil } func (service *MusicAgregatorService) saveTorrentAndDownload(ctx context.Context, dbAlbumID string, best parsedItem) { quality := normalizeQuality(pb.QualityType_QUALITY_UNSPECIFIED, best.rel.BitDepth, best.rel.SampleRate) dbTorrent := &database.Torrent{ AlbumID: dbAlbumID, InfoHash: best.rel.InfoHash, Tracker: best.item.Tracker, Title: best.item.Title, Format: best.rel.Format.String(), Quality: quality, Source: best.rel.Source.String(), BitDepth: best.rel.BitDepth, SampleRate: best.rel.SampleRate, Seeders: best.item.Seeders, Peers: best.item.Peers, Size: best.rel.TotalAudioSize, TrackCount: best.rel.TrackCount, HasCoverArt: best.rel.HasCoverArt, HasCueSheet: best.rel.HasCueSheet, HasRipLog: best.rel.HasRipLog, DownloadLink: best.item.DownloadLink, TorrentFile: best.torrentData, } if err := service.torrents.Create(ctx, dbTorrent); err != nil { log.Error().Err(err).Str("hash", best.rel.InfoHash).Msg("failed to save torrent to DB") return } savedTorrent, err := service.torrents.GetByInfoHash(ctx, best.rel.InfoHash) if err != nil { log.Error().Err(err).Msg("failed to retrieve saved torrent") return } existingDownload, err := service.downloads.GetActiveByTorrentID(ctx, savedTorrent.ID) if err == nil && existingDownload != nil { log.Info().Str("hash", best.rel.InfoHash).Str("state", existingDownload.State).Msg("active download already exists, skipping") return } download := &database.Download{ TorrentID: savedTorrent.ID, AlbumID: dbAlbumID, Format: best.rel.Format.String(), Quality: quality, State: "downloading", QbitHash: best.rel.InfoHash, } if err := service.downloads.Create(ctx, download); err != nil { log.Error().Err(err).Msg("failed to save download to DB") return } if service.riverClient != nil { _, err := service.riverClient.Insert(ctx, workers.PollDownloadArgs{ DownloadID: download.ID, TorrentHash: best.rel.InfoHash, CheckInterval: 30 * time.Second, }, &river.InsertOpts{ ScheduledAt: time.Now().Add(30 * time.Second), }) if err != nil { log.Error().Err(err).Msg("failed to schedule download poll job") } else { log.Debug().Str("download_id", download.ID).Str("hash", best.rel.InfoHash).Msg("download poll job scheduled") } } log.Info().Str("hash", best.rel.InfoHash).Str("download_id", download.ID).Msg("torrent and download saved to DB") } func normalizeQuality(quality pb.QualityType, bitDepth int, sampleRate int) string { if bitDepth > 0 && sampleRate > 0 { return fmt.Sprintf("%d-%d", bitDepth, sampleRate/1000) } switch quality { case pb.QualityType_QUALITY_LOSSLESS: return "16-44" case pb.QualityType_QUALITY_LOSSY: return "320" default: return "" } } func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease { return &pb.MonitoredRelease{ InfoHash: p.rel.InfoHash, Artist: p.rel.Artist, Album: p.rel.Album, Year: int32(p.rel.Year), Format: p.rel.Format.String(), Lossless: p.rel.Format.IsLossless(), BitDepth: int32(p.rel.BitDepth), SampleRate: int32(p.rel.SampleRate), Source: p.rel.Source.String(), TrackCount: int32(p.rel.TrackCount), TrackNames: p.rel.TrackNames, HasCoverArt: p.rel.HasCoverArt, HasCueSheet: p.rel.HasCueSheet, HasRipLog: p.rel.HasRipLog, TotalAudioSize: p.rel.TotalAudioSize, DownloadLink: p.item.DownloadLink, Seeders: int32(p.item.Seeders), Tracker: p.item.Tracker, } } func (service *MusicAgregatorService) buildMonitorAlbumResponse(ctx context.Context, metadataAlbum *metadataPb.Album, dbAlbum *database.Album, best *parsedItem) *pb.MonitorAlbumResponse { resp := &pb.MonitorAlbumResponse{} if best != nil { resp.Release = buildMonitoredRelease(*best) } if dbAlbum != nil { resp.Album = service.buildAlbumDetail(ctx, dbAlbum) } if len(metadataAlbum.GetArtists()) > 0 { dbArtist, err := service.metadata.GetArtistByExternalID(ctx, metadataAlbum.GetArtists()[0].GetArtist().GetId()) if err == nil { resp.Artist = &pb.ArtistSummary{ Id: dbArtist.ID, ExternalId: dbArtist.ExternalID, Name: dbArtist.Name, ArtistType: dbArtist.ArtistType, Country: dbArtist.Country, Genres: dbArtist.Genres, ImageUrl: dbArtist.ImageURL, MonitorState: toProtoMonitorState(dbArtist.MonitorState), } } } return resp } func (service *MusicAgregatorService) buildAlbumDetail(ctx context.Context, dbAlbum *database.Album) *pb.AlbumDetail { detail := &pb.AlbumDetail{ Id: dbAlbum.ID, ExternalId: dbAlbum.ExternalID, Title: dbAlbum.Title, AlbumType: dbAlbum.AlbumType, TotalTracks: int32(dbAlbum.TotalTracks), TotalDiscs: int32(dbAlbum.TotalDiscs), Label: dbAlbum.Label, Genres: dbAlbum.Genres, CoverUrl: dbAlbum.CoverURL, MonitorState: toProtoMonitorState(dbAlbum.MonitorState), } if dbAlbum.ReleaseDate != nil { detail.ReleaseDate = dbAlbum.ReleaseDate.Format("2006-01-02") } downloads, err := service.downloads.GetByAlbumID(ctx, dbAlbum.ID) if err == nil && len(downloads) > 0 { best := downloads[0] detail.Download = &pb.DownloadInfo{ State: best.State, Format: best.Format, Quality: best.Quality, SavePath: derefStr(best.SavePath), } } return detail } func toProtoMonitorState(state database.MonitorState) pb.MonitorState { switch state { case database.Monitored: return pb.MonitorState_MONITOR_STATE_MONITORED case database.Unmonitored: return pb.MonitorState_MONITOR_STATE_UNMONITORED case database.Excluded: return pb.MonitorState_MONITOR_STATE_EXCLUDED default: return pb.MonitorState_MONITOR_STATE_UNSPECIFIED } } func derefStr(s *string) string { if s == nil { return "" } return *s } func downloadTorrentData(url string) ([]byte, error) { client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Get(url) if err != nil { return nil, fmt.Errorf("downloading torrent: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("torrent download returned HTTP %d", resp.StatusCode) } data, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("reading torrent data: %w", err) } return data, nil } func (service *MusicAgregatorService) SearchArtists(ctx context.Context, req *pb.SearchArtistsRequest) (*pb.SearchArtistsResponse, error) { resp, err := service.metadata.SearchArtists(ctx, req.GetQuery(), req.GetLimit(), req.GetOffset()) if err != nil { return nil, err } artists := make([]*pb.SearchArtistResult, len(resp.GetArtists())) for i, a := range resp.GetArtists() { genres := make([]string, len(a.GetGenres())) for j, g := range a.GetGenres() { genres[j] = g.GetName() } extIDs := make([]*pb.ExternalId, len(a.GetExternalIds())) for j, e := range a.GetExternalIds() { extIDs[j] = &pb.ExternalId{ Provider: e.GetSource(), Id: e.GetSourceId(), } } artists[i] = &pb.SearchArtistResult{ Id: a.GetId(), Name: a.GetName(), SortName: a.GetSortName(), ArtistType: a.GetArtistType(), Country: a.GetCountry(), FormedDate: a.GetFormedDate(), DisbandedDate: a.GetDisbandedDate(), Description: a.GetDescription(), ImageUrl: a.GetImageUrl(), Genres: genres, ExternalIds: extIDs, } } return &pb.SearchArtistsResponse{ Artists: artists, Total: resp.GetTotal(), }, nil } func (service *MusicAgregatorService) GetArtistAlbums(ctx context.Context, req *pb.GetArtistAlbumsRequest) (*pb.GetArtistAlbumsResponse, error) { resp, err := service.metadata.GetArtistAlbumsWithPagination(ctx, req.GetArtistId(), req.GetLimit(), req.GetOffset()) if err != nil { return nil, err } albums := make([]*pb.AlbumResult, len(resp.GetAlbums())) for i, a := range resp.GetAlbums() { genres := make([]string, len(a.GetGenres())) for j, g := range a.GetGenres() { genres[j] = g.GetName() } extIDs := make([]*pb.ExternalId, len(a.GetExternalIds())) for j, e := range a.GetExternalIds() { extIDs[j] = &pb.ExternalId{ Provider: e.GetSource(), Id: e.GetSourceId(), } } artists := make([]*pb.AlbumArtistCredit, len(a.GetArtists())) for j, ac := range a.GetArtists() { artists[j] = &pb.AlbumArtistCredit{ Id: ac.GetArtist().GetId(), Name: ac.GetArtist().GetName(), Role: ac.GetRole(), } } var label *pb.AlbumLabel if a.GetLabel() != nil { label = &pb.AlbumLabel{ Id: a.GetLabel().GetId(), Name: a.GetLabel().GetName(), Country: a.GetLabel().GetCountry(), } } albums[i] = &pb.AlbumResult{ Id: a.GetId(), Title: a.GetTitle(), AlbumType: a.GetAlbumType(), ReleaseDate: a.GetReleaseDate(), Upc: a.GetUpc(), TotalTracks: a.GetTotalTracks(), TotalDiscs: a.GetTotalDiscs(), CoverUrl: a.GetCoverUrl(), Artists: artists, Label: label, Genres: genres, ExternalIds: extIDs, } } return &pb.GetArtistAlbumsResponse{ Albums: albums, Total: resp.GetTotal(), }, nil }