Implement MonitorAlbum: search, parse, filter by quality, add best to qbittorrent
This commit is contained in:
+208
-6
@@ -2,6 +2,11 @@ package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/riverqueue/river"
|
||||
@@ -14,13 +19,24 @@ import (
|
||||
"homelab.lan/music-agregator/internal/config"
|
||||
"homelab.lan/music-agregator/internal/indexer"
|
||||
"homelab.lan/music-agregator/internal/metadata"
|
||||
"homelab.lan/music-agregator/internal/release"
|
||||
"homelab.lan/music-agregator/internal/torrent"
|
||||
torrentParser "homelab.lan/music-agregator/internal/tracker"
|
||||
)
|
||||
|
||||
type parsedItem struct {
|
||||
item *indexer.SearchItemResult
|
||||
rel *release.Release
|
||||
torrentData []byte
|
||||
}
|
||||
|
||||
type MusicAgregatorService struct {
|
||||
config config.Config
|
||||
metadataClient metadataPb.MetadataServiceClient
|
||||
metadataConn *grpc.ClientConn
|
||||
indexer *indexer.IndexerService
|
||||
torrentClient torrent.TorrentClient
|
||||
magnetResolver *torrentParser.MagnetResolver
|
||||
}
|
||||
|
||||
func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorService, error) {
|
||||
@@ -36,11 +52,25 @@ func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.T
|
||||
return nil, err
|
||||
}
|
||||
|
||||
magnetResolver, err := torrentParser.NewMagnetResolver(30 * time.Second)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("failed to create magnet resolver")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
torrentClient, err := torrent.NewTorrentClient(cfg)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("failed to create torrent client")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &MusicAgregatorService{
|
||||
config: cfg,
|
||||
metadataClient: metadataClient,
|
||||
metadataConn: conn,
|
||||
indexer: indexer,
|
||||
torrentClient: torrentClient,
|
||||
magnetResolver: magnetResolver,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -48,6 +78,9 @@ func (s *MusicAgregatorService) Close() {
|
||||
if s.metadataConn != nil {
|
||||
s.metadataConn.Close()
|
||||
}
|
||||
if s.magnetResolver != nil {
|
||||
s.magnetResolver.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) {
|
||||
@@ -75,17 +108,186 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.
|
||||
query = artistName + " " + query
|
||||
}
|
||||
|
||||
tracker := req.GetIndexerOptions().GetTracker()
|
||||
if tracker == "" {
|
||||
tracker = "all"
|
||||
trackerName := req.GetIndexerOptions().GetTracker()
|
||||
if trackerName == "" {
|
||||
trackerName = "all"
|
||||
}
|
||||
|
||||
searchResult, err := service.indexer.Search(query, -1, tracker)
|
||||
searchResult, err := service.indexer.Search(query, -1, trackerName)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("query", query).Msg("indexer search album failed")
|
||||
return nil, err
|
||||
}
|
||||
log.Debug().Int("results", len(searchResult.GetResult())).Str("query", query).Msg("indexer search completed")
|
||||
log.Debug().Int("results", len(searchResult.Items)).Str("query", query).Msg("indexer search completed")
|
||||
|
||||
return &pb.MonitorAlbumResponse{}, nil
|
||||
parser := torrentParser.NewGenericParser()
|
||||
var parsed []parsedItem
|
||||
|
||||
for _, item := range searchResult.Items {
|
||||
if item.DownloadLink == "" {
|
||||
log.Trace().Str("title", item.Title).Msg("skipping item without download link")
|
||||
continue
|
||||
}
|
||||
|
||||
if item.Seeders == 0 {
|
||||
log.Warn().Str("title", item.Title).Str("tracker", item.Tracker).Msg("skipping torrent with no seeders")
|
||||
continue
|
||||
}
|
||||
|
||||
var r *release.Release
|
||||
var torrentData []byte
|
||||
|
||||
if strings.HasPrefix(item.DownloadLink, "magnet:") {
|
||||
log.Trace().Str("title", item.Title).Int("reported_seeders", item.Seeders).Msg("resolving magnet")
|
||||
torrentData, err = service.magnetResolver.Resolve(item.DownloadLink)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("title", item.Title).Int("reported_seeders", item.Seeders).Msg("magnet resolve failed, falling back to title parse")
|
||||
r = parser.Parse(item.Title)
|
||||
} else {
|
||||
r = parser.ParseTorrent(torrentData, album)
|
||||
}
|
||||
} else {
|
||||
torrentData, err = downloadTorrentData(item.DownloadLink)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("title", item.Title).Msg("failed to download torrent, falling back to title parse")
|
||||
r = parser.Parse(item.Title)
|
||||
} else {
|
||||
r = parser.ParseTorrent(torrentData, album)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().
|
||||
Str("title", item.Title).
|
||||
Str("format", r.Format.String()).
|
||||
Int("tracks", r.TrackCount).
|
||||
Bool("lossless", r.Format.IsLossless()).
|
||||
Int("seeders", item.Seeders).
|
||||
Str("tracker", item.Tracker).
|
||||
Msg("release parsed")
|
||||
|
||||
parsed = append(parsed, parsedItem{item: item, rel: r, torrentData: torrentData})
|
||||
}
|
||||
|
||||
log.Debug().Int("total", len(searchResult.Items)).Int("parsed", len(parsed)).Msg("parsing complete")
|
||||
|
||||
qualityType := req.GetQuality()
|
||||
var filtered []parsedItem
|
||||
for _, p := range parsed {
|
||||
match := qualityType == pb.QualityType_QUALITY_UNSPECIFIED ||
|
||||
(qualityType == pb.QualityType_QUALITY_LOSSLESS && p.rel.Format.IsLossless()) ||
|
||||
(qualityType == pb.QualityType_QUALITY_LOSSY && !p.rel.Format.IsLossless())
|
||||
|
||||
if !match {
|
||||
log.Debug().Str("title", p.item.Title).Str("format", p.rel.Format.String()).Str("wanted", qualityType.String()).Msg("filtered out by quality")
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, p)
|
||||
}
|
||||
|
||||
if len(filtered) == 0 {
|
||||
log.Warn().Str("query", query).Str("quality", qualityType.String()).Msg("no releases match quality filter")
|
||||
return &pb.MonitorAlbumResponse{}, nil
|
||||
}
|
||||
|
||||
best := filtered[0]
|
||||
for _, p := range filtered[1:] {
|
||||
if p.item.Seeders > best.item.Seeders {
|
||||
best = p
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("title", best.item.Title).
|
||||
Str("format", best.rel.Format.String()).
|
||||
Int("seeders", best.item.Seeders).
|
||||
Str("tracker", best.item.Tracker).
|
||||
Str("hash", best.rel.InfoHash).
|
||||
Msg("best release selected, adding to torrent client")
|
||||
|
||||
if best.rel.InfoHash != "" {
|
||||
existing, err := service.torrentClient.Find(torrent.FindOptions{Hash: best.rel.InfoHash})
|
||||
if err == nil && len(existing) > 0 {
|
||||
log.Info().
|
||||
Str("hash", best.rel.InfoHash).
|
||||
Str("state", existing[0].State).
|
||||
Str("artist", best.rel.Artist).
|
||||
Str("album", best.rel.Album).
|
||||
Str("format", best.rel.Format.String()).
|
||||
Int("tracks", best.rel.TrackCount).
|
||||
Msg("torrent already exists in client")
|
||||
resp := buildMonitoredRelease(best)
|
||||
log.Trace().Interface("response", resp).Msg("returning existing torrent response")
|
||||
return &pb.MonitorAlbumResponse{
|
||||
Release: resp,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
if strings.HasPrefix(best.item.DownloadLink, "magnet:") {
|
||||
if err := service.torrentClient.AddMagnet(best.item.DownloadLink); err != nil {
|
||||
log.Error().Err(err).Str("title", best.item.Title).Msg("failed to add magnet to client")
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if len(best.torrentData) == 0 {
|
||||
log.Error().Str("title", best.item.Title).Msg("no torrent data available")
|
||||
return nil, fmt.Errorf("no torrent data available for best release")
|
||||
}
|
||||
if err := service.torrentClient.AddTorrent(torrent.TorrentFile{
|
||||
Filename: best.rel.Album + ".torrent",
|
||||
Data: best.torrentData,
|
||||
}); err != nil {
|
||||
log.Error().Err(err).Str("title", best.item.Title).Msg("failed to add torrent to client")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Str("title", best.item.Title).Str("hash", best.rel.InfoHash).Msg("torrent added to client")
|
||||
|
||||
return &pb.MonitorAlbumResponse{
|
||||
Release: buildMonitoredRelease(best),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease {
|
||||
return &pb.MonitoredRelease{
|
||||
InfoHash: p.rel.InfoHash,
|
||||
Artist: p.rel.Artist,
|
||||
Album: p.rel.Album,
|
||||
Year: int32(p.rel.Year),
|
||||
Format: p.rel.Format.String(),
|
||||
Lossless: p.rel.Format.IsLossless(),
|
||||
BitDepth: int32(p.rel.BitDepth),
|
||||
SampleRate: int32(p.rel.SampleRate),
|
||||
Source: p.rel.Source.String(),
|
||||
TrackCount: int32(p.rel.TrackCount),
|
||||
TrackNames: p.rel.TrackNames,
|
||||
HasCoverArt: p.rel.HasCoverArt,
|
||||
HasCueSheet: p.rel.HasCueSheet,
|
||||
HasRipLog: p.rel.HasRipLog,
|
||||
TotalAudioSize: p.rel.TotalAudioSize,
|
||||
DownloadLink: p.item.DownloadLink,
|
||||
Seeders: int32(p.item.Seeders),
|
||||
Tracker: p.item.Tracker,
|
||||
}
|
||||
}
|
||||
|
||||
func downloadTorrentData(url string) ([]byte, error) {
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("downloading torrent: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("torrent download returned HTTP %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading torrent data: %w", err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user