package internal import ( "context" "fmt" "io" "net/http" "strings" "time" "github.com/jackc/pgx/v5" "github.com/riverqueue/river" "github.com/rs/zerolog/log" "google.golang.org/grpc" metadataPb "homelab.lan/music-agregator/gen/metadata/v1" pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/indexer" "homelab.lan/music-agregator/internal/metadata" "homelab.lan/music-agregator/internal/release" "homelab.lan/music-agregator/internal/torrent" torrentParser "homelab.lan/music-agregator/internal/tracker" ) type parsedItem struct { item *indexer.SearchItemResult rel *release.Release torrentData []byte } type MusicAgregatorService struct { config config.Config metadataClient metadataPb.MetadataServiceClient metadataConn *grpc.ClientConn indexer *indexer.IndexerService torrentClient torrent.TorrentClient magnetResolver *torrentParser.MagnetResolver } func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorService, error) { indexer, err := indexer.NewIndexerService(cfg, riverClient) if err != nil { log.Err(err).Msg("failed to create IndexerService") return nil, err } metadataClient, conn, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint) if err != nil { log.Err(err).Msg("failed to create metadata client") return nil, err } magnetResolver, err := torrentParser.NewMagnetResolver(30 * time.Second) if err != nil { log.Err(err).Msg("failed to create magnet resolver") return nil, err } torrentClient, err := torrent.NewTorrentClient(cfg) if err != nil { log.Err(err).Msg("failed to create torrent client") return nil, err } return &MusicAgregatorService{ config: cfg, metadataClient: metadataClient, metadataConn: conn, indexer: indexer, torrentClient: torrentClient, magnetResolver: magnetResolver, }, nil } func (s *MusicAgregatorService) Close() { if s.metadataConn != nil { s.metadataConn.Close() } if s.magnetResolver != nil { s.magnetResolver.Close() } } func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) { album, err := service.fetchAlbumMetadata(ctx, req.GetAlbumId()) if err != nil { return nil, err } searchResult, err := service.searchIndexer(album, req.GetIndexerOptions().GetTracker()) if err != nil { return nil, err } parsed := service.parseSearchResults(searchResult, album) filtered := filterByQuality(parsed, req.GetQuality()) if len(filtered) == 0 { log.Warn().Str("album", album.GetTitle()).Str("quality", req.GetQuality().String()).Msg("no releases match quality filter") return &pb.MonitorAlbumResponse{}, nil } best := selectBestRelease(filtered) if err := service.addToTorrentClient(best); err != nil { return nil, err } return &pb.MonitorAlbumResponse{ Release: buildMonitoredRelease(best), }, nil } func (service *MusicAgregatorService) fetchAlbumMetadata(ctx context.Context, albumID string) (*metadataPb.Album, error) { resp, err := service.metadataClient.GetAlbum(ctx, &metadataPb.GetAlbumRequest{ Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID}, }) if err != nil { log.Error().Err(err).Str("album_id", albumID).Msg("metadata GetAlbum failed") return nil, err } album := resp.GetAlbum() artistName := "" if len(album.GetArtists()) > 0 { artistName = album.GetArtists()[0].GetArtist().GetName() } log.Debug().Str("album_id", albumID).Str("title", album.GetTitle()).Str("artist", artistName).Msg("album metadata fetched") return album, nil } func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) { artistName := "" if len(album.GetArtists()) > 0 { artistName = album.GetArtists()[0].GetArtist().GetName() } query := album.GetTitle() if artistName != "" { query = artistName + " " + query } if tracker == "" { tracker = "all" } result, err := service.indexer.Search(query, -1, tracker) if err != nil { log.Error().Err(err).Str("query", query).Msg("indexer search failed") return nil, err } log.Debug().Int("results", len(result.Items)).Str("query", query).Msg("indexer search completed") return result, nil } func (service *MusicAgregatorService) parseSearchResults(searchResult *indexer.SearchResponse, album *metadataPb.Album) []parsedItem { parser := torrentParser.NewGenericParser() var parsed []parsedItem for _, item := range searchResult.Items { if item.DownloadLink == "" { log.Trace().Str("title", item.Title).Msg("skipping item without download link") continue } if item.Seeders == 0 { log.Warn().Str("title", item.Title).Str("tracker", item.Tracker).Msg("skipping torrent with no seeders") continue } r, torrentData := service.resolveRelease(parser, item, album) log.Debug(). Str("title", item.Title). Str("format", r.Format.String()). Int("tracks", r.TrackCount). Bool("lossless", r.Format.IsLossless()). Int("seeders", item.Seeders). Str("tracker", item.Tracker). Msg("release parsed") parsed = append(parsed, parsedItem{item: item, rel: r, torrentData: torrentData}) } log.Debug().Int("total", len(searchResult.Items)).Int("parsed", len(parsed)).Msg("parsing complete") return parsed } func (service *MusicAgregatorService) resolveRelease(parser *torrentParser.GenericParser, item *indexer.SearchItemResult, album *metadataPb.Album) (*release.Release, []byte) { if strings.HasPrefix(item.DownloadLink, "magnet:") { log.Trace().Str("title", item.Title).Int("reported_seeders", item.Seeders).Msg("resolving magnet") torrentData, err := service.magnetResolver.Resolve(item.DownloadLink) if err != nil { log.Warn().Err(err).Str("title", item.Title).Int("reported_seeders", item.Seeders).Msg("magnet resolve failed, falling back to title parse") return parser.Parse(item.Title), nil } return parser.ParseTorrent(torrentData, album), torrentData } torrentData, err := downloadTorrentData(item.DownloadLink) if err != nil { log.Warn().Err(err).Str("title", item.Title).Msg("failed to download torrent, falling back to title parse") return parser.Parse(item.Title), nil } return parser.ParseTorrent(torrentData, album), torrentData } func filterByQuality(items []parsedItem, quality pb.QualityType) []parsedItem { var filtered []parsedItem for _, p := range items { match := quality == pb.QualityType_QUALITY_UNSPECIFIED || (quality == pb.QualityType_QUALITY_LOSSLESS && p.rel.Format.IsLossless()) || (quality == pb.QualityType_QUALITY_LOSSY && !p.rel.Format.IsLossless()) if !match { log.Debug().Str("title", p.item.Title).Str("format", p.rel.Format.String()).Str("wanted", quality.String()).Msg("filtered out by quality") continue } filtered = append(filtered, p) } return filtered } func selectBestRelease(items []parsedItem) parsedItem { best := items[0] for _, p := range items[1:] { if p.item.Seeders > best.item.Seeders { best = p } } log.Info(). Str("title", best.item.Title). Str("format", best.rel.Format.String()). Int("seeders", best.item.Seeders). Str("tracker", best.item.Tracker). Str("hash", best.rel.InfoHash). Msg("best release selected") return best } func (service *MusicAgregatorService) addToTorrentClient(best parsedItem) error { if best.rel.InfoHash != "" { existing, err := service.torrentClient.Find(torrent.FindOptions{Hash: best.rel.InfoHash}) if err == nil && len(existing) > 0 { log.Info().Str("hash", best.rel.InfoHash).Str("state", existing[0].State).Msg("torrent already exists in client") return nil } } if strings.HasPrefix(best.item.DownloadLink, "magnet:") { if err := service.torrentClient.AddMagnet(best.item.DownloadLink); err != nil { log.Error().Err(err).Str("title", best.item.Title).Msg("failed to add magnet to client") return err } } else { if len(best.torrentData) == 0 { log.Error().Str("title", best.item.Title).Msg("no torrent data available") return fmt.Errorf("no torrent data available for best release") } if err := service.torrentClient.AddTorrent(torrent.TorrentFile{ Filename: best.rel.Album + ".torrent", Data: best.torrentData, }); err != nil { log.Error().Err(err).Str("title", best.item.Title).Msg("failed to add torrent to client") return err } } log.Info().Str("title", best.item.Title).Str("hash", best.rel.InfoHash).Msg("torrent added to client") return nil } func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease { return &pb.MonitoredRelease{ InfoHash: p.rel.InfoHash, Artist: p.rel.Artist, Album: p.rel.Album, Year: int32(p.rel.Year), Format: p.rel.Format.String(), Lossless: p.rel.Format.IsLossless(), BitDepth: int32(p.rel.BitDepth), SampleRate: int32(p.rel.SampleRate), Source: p.rel.Source.String(), TrackCount: int32(p.rel.TrackCount), TrackNames: p.rel.TrackNames, HasCoverArt: p.rel.HasCoverArt, HasCueSheet: p.rel.HasCueSheet, HasRipLog: p.rel.HasRipLog, TotalAudioSize: p.rel.TotalAudioSize, DownloadLink: p.item.DownloadLink, Seeders: int32(p.item.Seeders), Tracker: p.item.Tracker, } } func downloadTorrentData(url string) ([]byte, error) { client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Get(url) if err != nil { return nil, fmt.Errorf("downloading torrent: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("torrent download returned HTTP %d", resp.StatusCode) } data, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("reading torrent data: %w", err) } return data, nil }