Compare commits

...

2 Commits

Author SHA1 Message Date
Alexander 60c94935b2 Persist metadata to DB, poll download worker, metadata service layer 2026-05-08 11:00:04 +02:00
Alexander 66264e1314 Add database schema, ERD, and repository layer 2026-05-08 10:03:28 +02:00
18 changed files with 1238 additions and 266 deletions
+22
View File
@@ -0,0 +1,22 @@
meta {
name: Get Album
type: grpc
seq: 4
}
grpc {
url: localhost:3000
method: /metadata.v1.MetadataService/GetAlbum
body: grpc
auth: inherit
methodType: unary
}
body:grpc {
name: message 1
content: '''
{
"id": "a0b7b436-94db-4df6-8c5f-bc0e5932a90e"
}
'''
}
+38 -20
View File
@@ -11,7 +11,6 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/riverqueue/river"
@@ -28,10 +27,12 @@ import (
"homelab.lan/music-agregator/internal"
"homelab.lan/music-agregator/internal/config"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/hello"
"homelab.lan/music-agregator/internal/indexer"
"homelab.lan/music-agregator/internal/metadata"
"homelab.lan/music-agregator/internal/torrent"
"homelab.lan/music-agregator/internal/workers"
)
func main() {
@@ -68,36 +69,54 @@ func interceptorLogger(l zerolog.Logger) logging.Logger {
})
}
func setupRiver(ctx context.Context, cfg config.Config) (*river.Client[pgx.Tx], *pgxpool.Pool) {
if !cfg.Indexer.Cache.Enabled {
return nil, nil
}
dbPool, err := pgxpool.New(ctx, cfg.Database.URL)
func setupDatabase(ctx context.Context, cfg config.Config) *database.DB {
db, err := database.New(ctx, cfg.Database.URL)
if err != nil {
log.Fatal().Err(err).Msg("failed to connect to database for River")
log.Fatal().Err(err).Msg("failed to connect to database")
}
return db
}
type riverSetup struct {
client *river.Client[pgx.Tx]
cacheRefreshWorker *indexer.CacheRefreshWorker
}
func setupRiver(ctx context.Context, cfg config.Config, db *database.DB) *riverSetup {
cacheWorker := &indexer.CacheRefreshWorker{}
pollWorker := &workers.PollDownloadWorker{
Downloads: database.NewDownloadRepository(db.Pool),
DownloadFiles: database.NewDownloadFileRepository(db.Pool),
TorrentClient: torrent.MustNewTorrentClient(cfg),
}
workers := river.NewWorkers()
river.AddWorker(workers, &indexer.CacheRefreshWorker{})
riverWorkers := river.NewWorkers()
river.AddWorker(riverWorkers, cacheWorker)
river.AddWorker(riverWorkers, pollWorker)
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
riverClient, err := river.NewClient(riverpgxv5.New(db.Pool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 2},
river.QueueDefault: {MaxWorkers: 4},
},
Workers: workers,
Workers: riverWorkers,
})
if err != nil {
log.Fatal().Err(err).Msg("failed to create River client")
}
cacheWorker.RiverClient = riverClient
pollWorker.RiverClient = riverClient
if err := riverClient.Start(ctx); err != nil {
log.Fatal().Err(err).Msg("failed to start River client")
}
log.Info().Msg("River queue started")
return riverClient, dbPool
return &riverSetup{
client: riverClient,
cacheRefreshWorker: cacheWorker,
}
}
func serveGrpc(config config.Config) {
@@ -132,16 +151,15 @@ func serveGrpc(config config.Config) {
)
ctx := context.Background()
riverClient, dbPool := setupRiver(ctx, config)
if dbPool != nil {
defer dbPool.Close()
}
db := setupDatabase(ctx, config)
defer db.Close()
rs := setupRiver(ctx, config, db)
musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, riverClient)
musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, rs.client, db)
if err != nil {
log.Fatal().Err(err).Msg("failed to create MusicAgregatorServer")
}
indexerServer, err := indexer.NewIndexerServer(config, riverClient)
indexerServer, err := indexer.NewIndexerServer(config, rs.client, rs.cacheRefreshWorker)
if err != nil {
log.Fatal().Err(err).Msg("failed to create IndexerServer")
}
+114 -208
View File
@@ -16,259 +16,165 @@ skinparam package {
title Music Aggregator - Database Structure
' ══════════════════════════════════════════════════════════════
' CORE MUSIC ENTITIES
' ══════════════════════════════════════════════════════════════
package "Core Music Entities" #E3F2FD {
entity "artist_metadata" {
package "Music Metadata" #E3F2FD {
entity "artists" {
* id : UUID <<PK>>
--
foreign_artist_id : TEXT <<UNIQUE>>
name : TEXT
sort_name : TEXT
disambiguation : TEXT
artist_type : TEXT
status : TEXT
overview : TEXT
images : JSONB
links : JSONB
genres : JSONB
external_id : VARCHAR(255) <<UNIQUE>>
name : VARCHAR(500)
artist_type : VARCHAR(50)
country : VARCHAR(10)
genres : TEXT[]
image_url : TEXT
--
created_at : TIMESTAMPTZ
updated_at : TIMESTAMPTZ
}
entity "artists" {
* id : UUID <<PK>>
--
metadata_id : UUID <<FK>>
quality_profile_id : UUID <<FK>>
metadata_profile_id : UUID <<FK>>
root_folder_id : UUID <<FK>>
--
path : TEXT
monitored : BOOLEAN
monitor_new_items : TEXT
--
last_info_sync : TIMESTAMPTZ
added_at : TIMESTAMPTZ
}
entity "albums" {
* id : UUID <<PK>>
--
artist_metadata_id : UUID <<FK>>
external_id : VARCHAR(255) <<UNIQUE>>
artist_id : UUID <<FK>>
--
foreign_album_id : TEXT <<UNIQUE>>
title : TEXT
clean_title : TEXT
disambiguation : TEXT
overview : TEXT
album_type : TEXT
title : VARCHAR(500)
album_type : VARCHAR(50)
release_date : DATE
images : JSONB
genres : JSONB
total_tracks : INT
total_discs : INT
label : VARCHAR(255)
genres : TEXT[]
cover_url : TEXT
is_monitored : BOOLEAN
--
monitored : BOOLEAN
added_at : TIMESTAMPTZ
}
entity "album_releases" {
* id : UUID <<PK>>
--
album_id : UUID <<FK>>
--
foreign_release_id : TEXT <<UNIQUE>>
title : TEXT
status : TEXT
duration_ms : INT
release_date : DATE
country : TEXT[]
label : TEXT[]
format : TEXT
track_count : INT
--
monitored : BOOLEAN
created_at : TIMESTAMPTZ
updated_at : TIMESTAMPTZ
}
entity "tracks" {
* id : UUID <<PK>>
--
album_release_id : UUID <<FK>>
artist_metadata_id : UUID <<FK>>
track_file_id : UUID <<FK NULL>>
external_id : VARCHAR(255) <<UNIQUE>>
album_id : UUID <<FK>>
--
foreign_track_id : TEXT <<UNIQUE>>
title : TEXT
track_number : INT
disc_number : INT
title : VARCHAR(500)
duration_ms : INT
explicit : BOOLEAN
}
entity "track_files" {
* id : UUID <<PK>>
isrc : VARCHAR(20)
disc_number : INT
track_number : INT
--
album_id : UUID <<FK>>
--
path : TEXT
relative_path : TEXT
size : BIGINT
--
file_hash : TEXT
audio_hash : TEXT
--
quality : JSONB
media_info : JSONB
--
scene_name : TEXT
release_group : TEXT
--
date_added : TIMESTAMPTZ
created_at : TIMESTAMPTZ
}
}
' ══════════════════════════════════════════════════════════════
' CONFIGURATION
' ══════════════════════════════════════════════════════════════
package "Configuration" #FFF3E0 {
entity "quality_profiles" {
package "Torrent Catalog" #FFF3E0 {
entity "torrents" {
* id : UUID <<PK>>
--
name : TEXT <<UNIQUE>>
cutoff : INT
items : JSONB
upgrade_allowed : BOOLEAN
}
entity "metadata_profiles" {
* id : UUID <<PK>>
--
name : TEXT <<UNIQUE>>
primary_album_types : JSONB
secondary_album_types : JSONB
release_statuses : JSONB
}
entity "root_folders" {
* id : UUID <<PK>>
--
name : TEXT
path : TEXT <<UNIQUE>>
default_quality_profile_id : UUID <<FK>>
default_metadata_profile_id : UUID <<FK>>
}
entity "indexers" {
* id : UUID <<PK>>
--
name : TEXT
implementation : TEXT
settings : JSONB
enable_rss : BOOLEAN
enable_search : BOOLEAN
priority : INT
}
entity "download_clients" {
* id : UUID <<PK>>
--
name : TEXT
implementation : TEXT
settings : JSONB
protocol : TEXT
priority : INT
enabled : BOOLEAN
}
}
' ══════════════════════════════════════════════════════════════
' DOWNLOAD TRACKING
' ══════════════════════════════════════════════════════════════
package "Download Tracking" #E8F5E9 {
entity "wanted_albums" {
* id : UUID <<PK>>
--
album_id : UUID <<FK>> <<UNIQUE>>
--
priority : INT
search_count : INT
last_searched_at : TIMESTAMPTZ
added_at : TIMESTAMPTZ
}
entity "download_queue" {
* id : UUID <<PK>>
--
artist_id : UUID <<FK>>
album_id : UUID <<FK>>
info_hash : VARCHAR(40) <<UNIQUE>>
--
download_id : TEXT
tracker : VARCHAR(100)
title : TEXT
format : VARCHAR(20)
quality : VARCHAR(20)
source : VARCHAR(20)
bit_depth : INT
sample_rate : INT
seeders : INT
peers : INT
size : BIGINT
size_left : BIGINT
track_count : INT
has_cover_art : BOOLEAN
has_cue_sheet : BOOLEAN
has_rip_log : BOOLEAN
download_link : TEXT
torrent_file : BYTEA
--
status : TEXT
progress : REAL
created_at : TIMESTAMPTZ
updated_at : TIMESTAMPTZ
}
}
package "Download Management" #E8F5E9 {
entity "downloads" {
* id : UUID <<PK>>
--
torrent_id : UUID <<FK>>
album_id : UUID
format : VARCHAR(20)
quality : VARCHAR(20)
--
state : download_state
qbit_hash : VARCHAR(64)
save_path : TEXT
error_message : TEXT
--
protocol : TEXT
indexer : TEXT
download_client : TEXT
torrent_hash : TEXT
output_path : TEXT
--
added_at : TIMESTAMPTZ
queued_at : TIMESTAMPTZ
started_at : TIMESTAMPTZ
completed_at : TIMESTAMPTZ
created_at : TIMESTAMPTZ
updated_at : TIMESTAMPTZ
}
entity "blocklist" {
entity "download_files" {
* id : UUID <<PK>>
--
artist_id : UUID <<FK>>
album_id : UUID <<FK>>
download_id : UUID <<FK>>
track_id : UUID <<FK NULL>>
--
source_title : TEXT
quality : JSONB
size : BIGINT
protocol : TEXT
indexer : TEXT
message : TEXT
torrent_hash : TEXT
file_path : TEXT
file_size : BIGINT
file_type : VARCHAR(20)
sha256_hash : VARCHAR(64)
--
date : TIMESTAMPTZ
verified_at : TIMESTAMPTZ
created_at : TIMESTAMPTZ
}
}
' ══════════════════════════════════════════════════════════════
' RELATIONSHIPS
' ══════════════════════════════════════════════════════════════
package "Caching & Queue (River)" #F3E5F5 {
entity "river_job" {
* id : BIGSERIAL <<PK>>
--
kind : TEXT
state : river_job_state
queue : TEXT
args : JSONB
metadata : JSONB
--
attempt : SMALLINT
max_attempts : SMALLINT
priority : SMALLINT
--
scheduled_at : TIMESTAMPTZ
attempted_at : TIMESTAMPTZ
created_at : TIMESTAMPTZ
finalized_at : TIMESTAMPTZ
}
' Core music relationships
artist_metadata ||--|| artists : "has config"
artist_metadata ||--o{ albums : "released"
albums ||--o{ album_releases : "has releases"
album_releases ||--o{ tracks : "contains"
tracks }o--o| track_files : "stored in"
track_files }o--|| albums : "belongs to"
entity "river_queue" {
* name : TEXT <<PK>>
--
metadata : JSONB
paused_at : TIMESTAMPTZ
created_at : TIMESTAMPTZ
updated_at : TIMESTAMPTZ
}
}
' Artist config relationships
artists }o--|| quality_profiles : "uses"
artists }o--o| metadata_profiles : "uses"
artists }o--o| root_folders : "stored in"
note right of river_job
Cache refresh jobs:
kind = "indexer_cache_refresh"
args = {key, url, ttl_expires, refresh_interval}
scheduled_at = next refresh time
end note
' Root folder defaults
root_folders }o--o| quality_profiles : "default"
root_folders }o--o| metadata_profiles : "default"
' Download tracking relationships
wanted_albums ||--|| albums : "targets"
download_queue }o--o| artists : "for"
download_queue }o--o| albums : "for"
blocklist }o--|| artists : "for"
blocklist }o--o| albums : "for"
artists ||--o{ albums : "released"
albums ||--o{ tracks : "contains"
albums ||--o{ torrents : "available on"
torrents ||--o| downloads : "downloaded as"
downloads ||--o{ download_files : "consists of"
tracks ||--o| download_files : "matched to"
@enduml
+132
View File
@@ -0,0 +1,132 @@
package database
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Album struct {
ID string
ExternalID string
ArtistID string
Title string
AlbumType string
ReleaseDate *time.Time
TotalTracks int
TotalDiscs int
Label string
Genres []string
CoverURL string
IsMonitored bool
CreatedAt time.Time
UpdatedAt time.Time
}
type AlbumRepository struct {
pool *pgxpool.Pool
}
func NewAlbumRepository(pool *pgxpool.Pool) *AlbumRepository {
return &AlbumRepository{pool: pool}
}
func (r *AlbumRepository) Create(ctx context.Context, a *Album) error {
_, err := r.pool.Exec(ctx,
`INSERT INTO albums (external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (external_id) DO UPDATE SET
title = EXCLUDED.title,
album_type = EXCLUDED.album_type,
release_date = EXCLUDED.release_date,
total_tracks = EXCLUDED.total_tracks,
total_discs = EXCLUDED.total_discs,
label = EXCLUDED.label,
genres = EXCLUDED.genres,
cover_url = EXCLUDED.cover_url,
updated_at = NOW()`,
a.ExternalID, a.ArtistID, a.Title, a.AlbumType, a.ReleaseDate, a.TotalTracks, a.TotalDiscs, a.Label, a.Genres, a.CoverURL, a.IsMonitored,
)
if err != nil {
return fmt.Errorf("creating album: %w", err)
}
return nil
}
func (r *AlbumRepository) GetByExternalID(ctx context.Context, externalID string) (*Album, error) {
a := &Album{}
err := r.pool.QueryRow(ctx,
`SELECT id, external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored, created_at, updated_at
FROM albums WHERE external_id = $1`, externalID,
).Scan(&a.ID, &a.ExternalID, &a.ArtistID, &a.Title, &a.AlbumType, &a.ReleaseDate, &a.TotalTracks, &a.TotalDiscs, &a.Label, &a.Genres, &a.CoverURL, &a.IsMonitored, &a.CreatedAt, &a.UpdatedAt)
if err != nil {
return nil, fmt.Errorf("getting album: %w", err)
}
return a, nil
}
func (r *AlbumRepository) GetByID(ctx context.Context, id string) (*Album, error) {
a := &Album{}
err := r.pool.QueryRow(ctx,
`SELECT id, external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored, created_at, updated_at
FROM albums WHERE id = $1`, id,
).Scan(&a.ID, &a.ExternalID, &a.ArtistID, &a.Title, &a.AlbumType, &a.ReleaseDate, &a.TotalTracks, &a.TotalDiscs, &a.Label, &a.Genres, &a.CoverURL, &a.IsMonitored, &a.CreatedAt, &a.UpdatedAt)
if err != nil {
return nil, fmt.Errorf("getting album: %w", err)
}
return a, nil
}
func (r *AlbumRepository) GetByArtistID(ctx context.Context, artistID string) ([]*Album, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored, created_at, updated_at
FROM albums WHERE artist_id = $1 ORDER BY release_date DESC`, artistID,
)
if err != nil {
return nil, fmt.Errorf("listing albums: %w", err)
}
defer rows.Close()
var albums []*Album
for rows.Next() {
a := &Album{}
if err := rows.Scan(&a.ID, &a.ExternalID, &a.ArtistID, &a.Title, &a.AlbumType, &a.ReleaseDate, &a.TotalTracks, &a.TotalDiscs, &a.Label, &a.Genres, &a.CoverURL, &a.IsMonitored, &a.CreatedAt, &a.UpdatedAt); err != nil {
return nil, fmt.Errorf("scanning album: %w", err)
}
albums = append(albums, a)
}
return albums, nil
}
func (r *AlbumRepository) GetMonitored(ctx context.Context) ([]*Album, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored, created_at, updated_at
FROM albums WHERE is_monitored = TRUE ORDER BY release_date DESC`,
)
if err != nil {
return nil, fmt.Errorf("listing monitored albums: %w", err)
}
defer rows.Close()
var albums []*Album
for rows.Next() {
a := &Album{}
if err := rows.Scan(&a.ID, &a.ExternalID, &a.ArtistID, &a.Title, &a.AlbumType, &a.ReleaseDate, &a.TotalTracks, &a.TotalDiscs, &a.Label, &a.Genres, &a.CoverURL, &a.IsMonitored, &a.CreatedAt, &a.UpdatedAt); err != nil {
return nil, fmt.Errorf("scanning album: %w", err)
}
albums = append(albums, a)
}
return albums, nil
}
func (r *AlbumRepository) SetMonitored(ctx context.Context, id string, monitored bool) error {
_, err := r.pool.Exec(ctx,
`UPDATE albums SET is_monitored = $1, updated_at = NOW() WHERE id = $2`, monitored, id,
)
if err != nil {
return fmt.Errorf("updating monitored state: %w", err)
}
return nil
}
+73
View File
@@ -0,0 +1,73 @@
package database
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Artist struct {
ID string
ExternalID string
Name string
ArtistType string
Country string
Genres []string
ImageURL string
CreatedAt time.Time
UpdatedAt time.Time
}
type ArtistRepository struct {
pool *pgxpool.Pool
}
func NewArtistRepository(pool *pgxpool.Pool) *ArtistRepository {
return &ArtistRepository{pool: pool}
}
func (r *ArtistRepository) Create(ctx context.Context, a *Artist) error {
_, err := r.pool.Exec(ctx,
`INSERT INTO artists (external_id, name, artist_type, country, genres, image_url)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (external_id) DO UPDATE SET
name = EXCLUDED.name,
artist_type = EXCLUDED.artist_type,
country = EXCLUDED.country,
genres = EXCLUDED.genres,
image_url = EXCLUDED.image_url,
updated_at = NOW()
RETURNING id, created_at, updated_at`,
a.ExternalID, a.Name, a.ArtistType, a.Country, a.Genres, a.ImageURL,
)
if err != nil {
return fmt.Errorf("creating artist: %w", err)
}
return nil
}
func (r *ArtistRepository) GetByExternalID(ctx context.Context, externalID string) (*Artist, error) {
a := &Artist{}
err := r.pool.QueryRow(ctx,
`SELECT id, external_id, name, artist_type, country, genres, image_url, created_at, updated_at
FROM artists WHERE external_id = $1`, externalID,
).Scan(&a.ID, &a.ExternalID, &a.Name, &a.ArtistType, &a.Country, &a.Genres, &a.ImageURL, &a.CreatedAt, &a.UpdatedAt)
if err != nil {
return nil, fmt.Errorf("getting artist: %w", err)
}
return a, nil
}
func (r *ArtistRepository) GetByID(ctx context.Context, id string) (*Artist, error) {
a := &Artist{}
err := r.pool.QueryRow(ctx,
`SELECT id, external_id, name, artist_type, country, genres, image_url, created_at, updated_at
FROM artists WHERE id = $1`, id,
).Scan(&a.ID, &a.ExternalID, &a.Name, &a.ArtistType, &a.Country, &a.Genres, &a.ImageURL, &a.CreatedAt, &a.UpdatedAt)
if err != nil {
return nil, fmt.Errorf("getting artist: %w", err)
}
return a, nil
}
+32
View File
@@ -0,0 +1,32 @@
package database
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog/log"
)
type DB struct {
Pool *pgxpool.Pool
}
func New(ctx context.Context, databaseURL string) (*DB, error) {
pool, err := pgxpool.New(ctx, databaseURL)
if err != nil {
return nil, fmt.Errorf("connecting to database: %w", err)
}
if err := pool.Ping(ctx); err != nil {
return nil, fmt.Errorf("pinging database: %w", err)
}
log.Info().Str("url", databaseURL).Msg("database connected")
return &DB{Pool: pool}, nil
}
func (db *DB) Close() {
db.Pool.Close()
}
@@ -0,0 +1,104 @@
package database
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type DownloadFile struct {
ID string
DownloadID string
TrackID *string
FilePath string
FileSize int64
FileType string
SHA256Hash string
VerifiedAt *time.Time
CreatedAt time.Time
}
type DownloadFileRepository struct {
pool *pgxpool.Pool
}
func NewDownloadFileRepository(pool *pgxpool.Pool) *DownloadFileRepository {
return &DownloadFileRepository{pool: pool}
}
func (r *DownloadFileRepository) Create(ctx context.Context, f *DownloadFile) error {
err := r.pool.QueryRow(ctx,
`INSERT INTO download_files (download_id, track_id, file_path, file_size, file_type, sha256_hash)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, created_at`,
f.DownloadID, f.TrackID, f.FilePath, f.FileSize, f.FileType, f.SHA256Hash,
).Scan(&f.ID, &f.CreatedAt)
if err != nil {
return fmt.Errorf("creating download file: %w", err)
}
return nil
}
func (r *DownloadFileRepository) CreateBatch(ctx context.Context, files []*DownloadFile) error {
for _, f := range files {
if err := r.Create(ctx, f); err != nil {
return err
}
}
return nil
}
func (r *DownloadFileRepository) GetByDownloadID(ctx context.Context, downloadID string) ([]*DownloadFile, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, download_id, track_id, file_path, file_size, file_type, sha256_hash, verified_at, created_at
FROM download_files WHERE download_id = $1 ORDER BY file_path`, downloadID,
)
if err != nil {
return nil, fmt.Errorf("listing download files: %w", err)
}
defer rows.Close()
var files []*DownloadFile
for rows.Next() {
f := &DownloadFile{}
if err := rows.Scan(&f.ID, &f.DownloadID, &f.TrackID, &f.FilePath, &f.FileSize, &f.FileType, &f.SHA256Hash, &f.VerifiedAt, &f.CreatedAt); err != nil {
return nil, fmt.Errorf("scanning download file: %w", err)
}
files = append(files, f)
}
return files, nil
}
func (r *DownloadFileRepository) SetHash(ctx context.Context, id string, hash string) error {
_, err := r.pool.Exec(ctx,
`UPDATE download_files SET sha256_hash = $1, verified_at = NOW() WHERE id = $2`, hash, id,
)
if err != nil {
return fmt.Errorf("setting file hash: %w", err)
}
return nil
}
func (r *DownloadFileRepository) GetUnverified(ctx context.Context) ([]*DownloadFile, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, download_id, track_id, file_path, file_size, file_type, sha256_hash, verified_at, created_at
FROM download_files WHERE sha256_hash IS NULL OR verified_at < NOW() - INTERVAL '30 days'
ORDER BY created_at`,
)
if err != nil {
return nil, fmt.Errorf("listing unverified files: %w", err)
}
defer rows.Close()
var files []*DownloadFile
for rows.Next() {
f := &DownloadFile{}
if err := rows.Scan(&f.ID, &f.DownloadID, &f.TrackID, &f.FilePath, &f.FileSize, &f.FileType, &f.SHA256Hash, &f.VerifiedAt, &f.CreatedAt); err != nil {
return nil, fmt.Errorf("scanning download file: %w", err)
}
files = append(files, f)
}
return files, nil
}
+122
View File
@@ -0,0 +1,122 @@
package database
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Download struct {
ID string
TorrentID string
AlbumID string
Format string
Quality string
State string
QbitHash string
SavePath string
ErrorMessage string
QueuedAt time.Time
StartedAt *time.Time
CompletedAt *time.Time
CreatedAt time.Time
UpdatedAt time.Time
}
type DownloadRepository struct {
pool *pgxpool.Pool
}
func NewDownloadRepository(pool *pgxpool.Pool) *DownloadRepository {
return &DownloadRepository{pool: pool}
}
func (r *DownloadRepository) Create(ctx context.Context, d *Download) error {
err := r.pool.QueryRow(ctx,
`INSERT INTO downloads (torrent_id, album_id, format, quality, state, qbit_hash, save_path)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id, queued_at, created_at, updated_at`,
d.TorrentID, d.AlbumID, d.Format, d.Quality, d.State, d.QbitHash, d.SavePath,
).Scan(&d.ID, &d.QueuedAt, &d.CreatedAt, &d.UpdatedAt)
if err != nil {
return fmt.Errorf("creating download: %w", err)
}
return nil
}
func (r *DownloadRepository) UpdateState(ctx context.Context, id string, state string) error {
_, err := r.pool.Exec(ctx,
`UPDATE downloads SET state = $1, updated_at = NOW() WHERE id = $2`, state, id,
)
if err != nil {
return fmt.Errorf("updating download state: %w", err)
}
return nil
}
func (r *DownloadRepository) SetStarted(ctx context.Context, id string) error {
_, err := r.pool.Exec(ctx,
`UPDATE downloads SET state = 'downloading', started_at = NOW(), updated_at = NOW() WHERE id = $1`, id,
)
if err != nil {
return fmt.Errorf("setting download started: %w", err)
}
return nil
}
func (r *DownloadRepository) SetCompleted(ctx context.Context, id string, savePath string) error {
_, err := r.pool.Exec(ctx,
`UPDATE downloads SET state = 'completed', save_path = $1, completed_at = NOW(), updated_at = NOW() WHERE id = $2`, savePath, id,
)
if err != nil {
return fmt.Errorf("setting download completed: %w", err)
}
return nil
}
func (r *DownloadRepository) SetFailed(ctx context.Context, id string, errorMsg string) error {
_, err := r.pool.Exec(ctx,
`UPDATE downloads SET state = 'failed', error_message = $1, updated_at = NOW() WHERE id = $2`, errorMsg, id,
)
if err != nil {
return fmt.Errorf("setting download failed: %w", err)
}
return nil
}
func (r *DownloadRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Download, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, torrent_id, album_id, format, quality, state, qbit_hash, save_path, error_message, queued_at, started_at, completed_at, created_at, updated_at
FROM downloads WHERE album_id = $1 ORDER BY created_at DESC`, albumID,
)
if err != nil {
return nil, fmt.Errorf("listing downloads: %w", err)
}
defer rows.Close()
var downloads []*Download
for rows.Next() {
d := &Download{}
if err := rows.Scan(&d.ID, &d.TorrentID, &d.AlbumID, &d.Format, &d.Quality, &d.State, &d.QbitHash, &d.SavePath, &d.ErrorMessage, &d.QueuedAt, &d.StartedAt, &d.CompletedAt, &d.CreatedAt, &d.UpdatedAt); err != nil {
return nil, fmt.Errorf("scanning download: %w", err)
}
downloads = append(downloads, d)
}
return downloads, nil
}
func (r *DownloadRepository) HasAlbumInQuality(ctx context.Context, albumID string, format string, quality string) (bool, error) {
var exists bool
err := r.pool.QueryRow(ctx,
`SELECT EXISTS(
SELECT 1 FROM downloads
WHERE album_id = $1 AND format = $2 AND quality = $3 AND state IN ('completed', 'seeding')
)`, albumID, format, quality,
).Scan(&exists)
if err != nil {
return false, fmt.Errorf("checking album quality: %w", err)
}
return exists, nil
}
+104
View File
@@ -0,0 +1,104 @@
package database
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Torrent struct {
ID string
AlbumID string
InfoHash string
Tracker string
Title string
Format string
Quality string
Source string
BitDepth int
SampleRate int
Seeders int
Peers int
Size int64
TrackCount int
HasCoverArt bool
HasCueSheet bool
HasRipLog bool
DownloadLink string
TorrentFile []byte
CreatedAt time.Time
UpdatedAt time.Time
}
type TorrentRepository struct {
pool *pgxpool.Pool
}
func NewTorrentRepository(pool *pgxpool.Pool) *TorrentRepository {
return &TorrentRepository{pool: pool}
}
func (r *TorrentRepository) Create(ctx context.Context, t *Torrent) error {
_, err := r.pool.Exec(ctx,
`INSERT INTO torrents (album_id, info_hash, tracker, title, format, quality, source, bit_depth, sample_rate, seeders, peers, size, track_count, has_cover_art, has_cue_sheet, has_rip_log, download_link, torrent_file)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
ON CONFLICT (info_hash) DO UPDATE SET
seeders = EXCLUDED.seeders,
peers = EXCLUDED.peers,
updated_at = NOW()`,
t.AlbumID, t.InfoHash, t.Tracker, t.Title, t.Format, t.Quality, t.Source, t.BitDepth, t.SampleRate, t.Seeders, t.Peers, t.Size, t.TrackCount, t.HasCoverArt, t.HasCueSheet, t.HasRipLog, t.DownloadLink, t.TorrentFile,
)
if err != nil {
return fmt.Errorf("creating torrent: %w", err)
}
return nil
}
func (r *TorrentRepository) GetByInfoHash(ctx context.Context, infoHash string) (*Torrent, error) {
t := &Torrent{}
err := r.pool.QueryRow(ctx,
`SELECT id, album_id, info_hash, tracker, title, format, quality, source, bit_depth, sample_rate, seeders, peers, size, track_count, has_cover_art, has_cue_sheet, has_rip_log, download_link, torrent_file, created_at, updated_at
FROM torrents WHERE info_hash = $1`, infoHash,
).Scan(&t.ID, &t.AlbumID, &t.InfoHash, &t.Tracker, &t.Title, &t.Format, &t.Quality, &t.Source, &t.BitDepth, &t.SampleRate, &t.Seeders, &t.Peers, &t.Size, &t.TrackCount, &t.HasCoverArt, &t.HasCueSheet, &t.HasRipLog, &t.DownloadLink, &t.TorrentFile, &t.CreatedAt, &t.UpdatedAt)
if err != nil {
return nil, fmt.Errorf("getting torrent: %w", err)
}
return t, nil
}
func (r *TorrentRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Torrent, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, album_id, info_hash, tracker, title, format, quality, source, bit_depth, sample_rate, seeders, peers, size, track_count, has_cover_art, has_cue_sheet, has_rip_log, download_link, torrent_file, created_at, updated_at
FROM torrents WHERE album_id = $1 ORDER BY seeders DESC`, albumID,
)
if err != nil {
return nil, fmt.Errorf("listing torrents: %w", err)
}
defer rows.Close()
var torrents []*Torrent
for rows.Next() {
t := &Torrent{}
if err := rows.Scan(&t.ID, &t.AlbumID, &t.InfoHash, &t.Tracker, &t.Title, &t.Format, &t.Quality, &t.Source, &t.BitDepth, &t.SampleRate, &t.Seeders, &t.Peers, &t.Size, &t.TrackCount, &t.HasCoverArt, &t.HasCueSheet, &t.HasRipLog, &t.DownloadLink, &t.TorrentFile, &t.CreatedAt, &t.UpdatedAt); err != nil {
return nil, fmt.Errorf("scanning torrent: %w", err)
}
torrents = append(torrents, t)
}
return torrents, nil
}
func (r *TorrentRepository) HasAlbumInFormat(ctx context.Context, albumID string, format string) (bool, error) {
var exists bool
err := r.pool.QueryRow(ctx,
`SELECT EXISTS(
SELECT 1 FROM downloads
WHERE album_id = $1 AND format = $2 AND state IN ('completed', 'seeding')
)`, albumID, format,
).Scan(&exists)
if err != nil {
return false, fmt.Errorf("checking album format: %w", err)
}
return exists, nil
}
+68
View File
@@ -0,0 +1,68 @@
package database
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Track struct {
ID string
ExternalID string
AlbumID string
Title string
DurationMS int
ISRC string
DiscNumber int
TrackNumber int
CreatedAt time.Time
}
type TrackRepository struct {
pool *pgxpool.Pool
}
func NewTrackRepository(pool *pgxpool.Pool) *TrackRepository {
return &TrackRepository{pool: pool}
}
func (r *TrackRepository) Create(ctx context.Context, t *Track) error {
_, err := r.pool.Exec(ctx,
`INSERT INTO tracks (external_id, album_id, title, duration_ms, isrc, disc_number, track_number)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (external_id) DO UPDATE SET
title = EXCLUDED.title,
duration_ms = EXCLUDED.duration_ms,
isrc = EXCLUDED.isrc,
disc_number = EXCLUDED.disc_number,
track_number = EXCLUDED.track_number`,
t.ExternalID, t.AlbumID, t.Title, t.DurationMS, t.ISRC, t.DiscNumber, t.TrackNumber,
)
if err != nil {
return fmt.Errorf("creating track: %w", err)
}
return nil
}
func (r *TrackRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Track, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, external_id, album_id, title, duration_ms, isrc, disc_number, track_number, created_at
FROM tracks WHERE album_id = $1 ORDER BY disc_number, track_number`, albumID,
)
if err != nil {
return nil, fmt.Errorf("listing tracks: %w", err)
}
defer rows.Close()
var tracks []*Track
for rows.Next() {
t := &Track{}
if err := rows.Scan(&t.ID, &t.ExternalID, &t.AlbumID, &t.Title, &t.DurationMS, &t.ISRC, &t.DiscNumber, &t.TrackNumber, &t.CreatedAt); err != nil {
return nil, fmt.Errorf("scanning track: %w", err)
}
tracks = append(tracks, t)
}
return tracks, nil
}
+6
View File
@@ -27,6 +27,12 @@ type CacheRefreshWorker struct {
func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error {
args := job.Args
if w.Cache == nil || w.Indexer == nil {
log.Trace().Str("key", args.Key).Msg("cache disabled, discarding refresh job")
return nil
}
log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started")
if time.Now().After(args.TTLExpires) {
+2 -2
View File
@@ -17,8 +17,8 @@ type IndexerServer struct {
pb.UnimplementedIndexerServiceServer
}
func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerServer, error) {
service, err := NewIndexerService(cfg, riverClient)
func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx], cacheWorker *CacheRefreshWorker) (*IndexerServer, error) {
service, err := NewIndexerService(cfg, riverClient, cacheWorker)
if err != nil {
log.Err(err).Msg("failed to initialize IndexerService")
return nil, err
+7 -1
View File
@@ -15,7 +15,7 @@ type IndexerService struct {
indexer Indexer
}
func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerService, error) {
func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx], cacheWorker *CacheRefreshWorker) (*IndexerService, error) {
var idx Indexer
switch cfg.Indexer.Type {
@@ -28,6 +28,12 @@ func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*I
if cfg.Indexer.Cache.Enabled && riverClient != nil {
cache := NewIndexerCache()
idx = NewCachedIndexer(idx, cache, riverClient, cfg.Indexer.Cache)
if cacheWorker != nil {
cacheWorker.Cache = cache
cacheWorker.Indexer = idx
}
log.Info().Dur("ttl", cfg.Indexer.Cache.TTL).Dur("refresh", cfg.Indexer.Cache.RefreshInterval).Msg("indexer cache enabled")
}
+116
View File
@@ -0,0 +1,116 @@
package metadata
import (
"context"
"fmt"
"github.com/rs/zerolog/log"
metadataPb "homelab.lan/music-agregator/gen/metadata/v1"
"homelab.lan/music-agregator/internal/database"
)
type MetadataService struct {
client metadataPb.MetadataServiceClient
artists *database.ArtistRepository
albums *database.AlbumRepository
}
func NewMetadataService(client metadataPb.MetadataServiceClient, db *database.DB) *MetadataService {
return &MetadataService{
client: client,
artists: database.NewArtistRepository(db.Pool),
albums: database.NewAlbumRepository(db.Pool),
}
}
func (s *MetadataService) GetAlbum(ctx context.Context, albumID string) (*metadataPb.Album, error) {
resp, err := s.client.GetAlbum(ctx, &metadataPb.GetAlbumRequest{
Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID},
})
if err != nil {
return nil, fmt.Errorf("fetching album: %w", err)
}
album := resp.GetAlbum()
if _, err := s.albums.GetByExternalID(ctx, album.GetId()); err != nil {
s.persistArtist(ctx, album)
s.persistAlbum(ctx, album)
}
return album, nil
}
func (s *MetadataService) GetArtistByExternalID(ctx context.Context, externalID string) (*database.Artist, error) {
return s.artists.GetByExternalID(ctx, externalID)
}
func (s *MetadataService) GetAlbumByExternalID(ctx context.Context, externalID string) (*database.Album, error) {
return s.albums.GetByExternalID(ctx, externalID)
}
func (s *MetadataService) persistArtist(ctx context.Context, album *metadataPb.Album) {
if len(album.GetArtists()) == 0 {
return
}
artist := album.GetArtists()[0].GetArtist()
var genres []string
for _, g := range artist.GetGenres() {
genres = append(genres, g.GetName())
}
err := s.artists.Create(ctx, &database.Artist{
ExternalID: artist.GetId(),
Name: artist.GetName(),
ArtistType: artist.GetArtistType(),
Country: artist.GetCountry(),
Genres: genres,
ImageURL: artist.GetImageUrl(),
})
if err != nil {
log.Warn().Err(err).Str("name", artist.GetName()).Msg("failed to persist artist")
}
}
func (s *MetadataService) persistAlbum(ctx context.Context, album *metadataPb.Album) {
artistID := ""
if len(album.GetArtists()) > 0 {
a, err := s.artists.GetByExternalID(ctx, album.GetArtists()[0].GetArtist().GetId())
if err == nil {
artistID = a.ID
}
}
if artistID == "" {
log.Trace().Str("album", album.GetTitle()).Msg("skipping album persist, no artist in DB")
return
}
var genres []string
for _, g := range album.GetGenres() {
genres = append(genres, g.GetName())
}
labelName := ""
if album.GetLabel() != nil {
labelName = album.GetLabel().GetName()
}
err := s.albums.Create(ctx, &database.Album{
ExternalID: album.GetId(),
ArtistID: artistID,
Title: album.GetTitle(),
AlbumType: album.GetAlbumType(),
TotalTracks: int(album.GetTotalTracks()),
TotalDiscs: int(album.GetTotalDiscs()),
Label: labelName,
Genres: genres,
CoverURL: album.GetCoverUrl(),
IsMonitored: true,
})
if err != nil {
log.Warn().Err(err).Str("title", album.GetTitle()).Msg("failed to persist album")
}
}
+3 -2
View File
@@ -10,6 +10,7 @@ import (
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
"homelab.lan/music-agregator/internal/config"
"homelab.lan/music-agregator/internal/database"
)
type MusicAgregatorServer struct {
@@ -17,8 +18,8 @@ type MusicAgregatorServer struct {
pb.UnimplementedMusicAgregatorServiceServer
}
func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorServer, error) {
service, err := NewMusicAgregatorService(cfg, riverClient)
func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx], db *database.DB) (*MusicAgregatorServer, error) {
service, err := NewMusicAgregatorService(cfg, riverClient, db)
if err != nil {
log.Err(err).Msg("failed to create MusicAgregatorService")
return nil, err
+111 -33
View File
@@ -11,17 +11,18 @@ import (
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
metadataPb "homelab.lan/music-agregator/gen/metadata/v1"
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
"homelab.lan/music-agregator/internal/config"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/indexer"
"homelab.lan/music-agregator/internal/metadata"
"homelab.lan/music-agregator/internal/release"
"homelab.lan/music-agregator/internal/torrent"
torrentParser "homelab.lan/music-agregator/internal/tracker"
"homelab.lan/music-agregator/internal/workers"
)
type parsedItem struct {
@@ -32,21 +33,23 @@ type parsedItem struct {
type MusicAgregatorService struct {
config config.Config
metadataClient metadataPb.MetadataServiceClient
metadataConn *grpc.ClientConn
metadata *metadata.MetadataService
indexer *indexer.IndexerService
torrentClient torrent.TorrentClient
magnetResolver *torrentParser.MagnetResolver
riverClient *river.Client[pgx.Tx]
torrents *database.TorrentRepository
downloads *database.DownloadRepository
}
func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorService, error) {
indexer, err := indexer.NewIndexerService(cfg, riverClient)
func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx], db *database.DB) (*MusicAgregatorService, error) {
idx, err := indexer.NewIndexerService(cfg, riverClient, nil)
if err != nil {
log.Err(err).Msg("failed to create IndexerService")
return nil, err
}
metadataClient, conn, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint)
metadataClient, _, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint)
if err != nil {
log.Err(err).Msg("failed to create metadata client")
return nil, err
@@ -66,29 +69,39 @@ func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.T
return &MusicAgregatorService{
config: cfg,
metadataClient: metadataClient,
metadataConn: conn,
indexer: indexer,
metadata: metadata.NewMetadataService(metadataClient, db),
indexer: idx,
torrentClient: torrentClient,
magnetResolver: magnetResolver,
riverClient: riverClient,
torrents: database.NewTorrentRepository(db.Pool),
downloads: database.NewDownloadRepository(db.Pool),
}, nil
}
func (s *MusicAgregatorService) Close() {
if s.metadataConn != nil {
s.metadataConn.Close()
}
if s.magnetResolver != nil {
s.magnetResolver.Close()
}
}
func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) {
album, err := service.fetchAlbumMetadata(ctx, req.GetAlbumId())
album, err := service.metadata.GetAlbum(ctx, req.GetAlbumId())
if err != nil {
log.Error().Err(err).Str("album_id", req.GetAlbumId()).Msg("failed to get album")
return nil, err
}
dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, req.GetAlbumId())
if dbAlbum != nil {
qualityStr := normalizeQuality(req.GetQuality(), 0, 0)
owned, err := service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, req.GetQuality().String(), qualityStr)
if err == nil && owned {
log.Info().Str("album", dbAlbum.Title).Str("quality", qualityStr).Msg("album already owned in requested quality")
return &pb.MonitorAlbumResponse{}, nil
}
}
searchResult, err := service.searchIndexer(album, req.GetIndexerOptions().GetTracker())
if err != nil {
return nil, err
@@ -108,31 +121,17 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.
return nil, err
}
if dbAlbum != nil {
service.saveTorrentAndDownload(ctx, dbAlbum.ID, best)
} else {
log.Warn().Str("album_id", req.GetAlbumId()).Msg("album not in DB, skipping torrent/download persistence")
}
return &pb.MonitorAlbumResponse{
Release: buildMonitoredRelease(best),
}, nil
}
func (service *MusicAgregatorService) fetchAlbumMetadata(ctx context.Context, albumID string) (*metadataPb.Album, error) {
resp, err := service.metadataClient.GetAlbum(ctx, &metadataPb.GetAlbumRequest{
Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID},
})
if err != nil {
log.Error().Err(err).Str("album_id", albumID).Msg("metadata GetAlbum failed")
return nil, err
}
album := resp.GetAlbum()
artistName := ""
if len(album.GetArtists()) > 0 {
artistName = album.GetArtists()[0].GetArtist().GetName()
}
log.Debug().Str("album_id", albumID).Str("title", album.GetTitle()).Str("artist", artistName).Msg("album metadata fetched")
return album, nil
}
func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) {
artistName := ""
if len(album.GetArtists()) > 0 {
@@ -277,6 +276,85 @@ func (service *MusicAgregatorService) addToTorrentClient(best parsedItem) error
return nil
}
func (service *MusicAgregatorService) saveTorrentAndDownload(ctx context.Context, dbAlbumID string, best parsedItem) {
quality := normalizeQuality(pb.QualityType_QUALITY_UNSPECIFIED, best.rel.BitDepth, best.rel.SampleRate)
dbTorrent := &database.Torrent{
AlbumID: dbAlbumID,
InfoHash: best.rel.InfoHash,
Tracker: best.item.Tracker,
Title: best.item.Title,
Format: best.rel.Format.String(),
Quality: quality,
Source: best.rel.Source.String(),
BitDepth: best.rel.BitDepth,
SampleRate: best.rel.SampleRate,
Seeders: best.item.Seeders,
Peers: best.item.Peers,
Size: best.rel.TotalAudioSize,
TrackCount: best.rel.TrackCount,
HasCoverArt: best.rel.HasCoverArt,
HasCueSheet: best.rel.HasCueSheet,
HasRipLog: best.rel.HasRipLog,
DownloadLink: best.item.DownloadLink,
TorrentFile: best.torrentData,
}
if err := service.torrents.Create(ctx, dbTorrent); err != nil {
log.Error().Err(err).Str("hash", best.rel.InfoHash).Msg("failed to save torrent to DB")
return
}
savedTorrent, err := service.torrents.GetByInfoHash(ctx, best.rel.InfoHash)
if err != nil {
log.Error().Err(err).Msg("failed to retrieve saved torrent")
return
}
download := &database.Download{
TorrentID: savedTorrent.ID,
AlbumID: dbAlbumID,
Format: best.rel.Format.String(),
Quality: quality,
State: "downloading",
QbitHash: best.rel.InfoHash,
}
if err := service.downloads.Create(ctx, download); err != nil {
log.Error().Err(err).Msg("failed to save download to DB")
return
}
if service.riverClient != nil {
_, err := service.riverClient.Insert(ctx, workers.PollDownloadArgs{
DownloadID: download.ID,
TorrentHash: best.rel.InfoHash,
CheckInterval: 30 * time.Second,
}, &river.InsertOpts{
ScheduledAt: time.Now().Add(30 * time.Second),
})
if err != nil {
log.Error().Err(err).Msg("failed to schedule download poll job")
} else {
log.Debug().Str("download_id", download.ID).Str("hash", best.rel.InfoHash).Msg("download poll job scheduled")
}
}
log.Info().Str("hash", best.rel.InfoHash).Str("download_id", download.ID).Msg("torrent and download saved to DB")
}
func normalizeQuality(quality pb.QualityType, bitDepth int, sampleRate int) string {
if bitDepth > 0 && sampleRate > 0 {
return fmt.Sprintf("%d-%d", bitDepth, sampleRate/1000)
}
switch quality {
case pb.QualityType_QUALITY_LOSSLESS:
return "16-44"
case pb.QualityType_QUALITY_LOSSY:
return "320"
default:
return ""
}
}
func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease {
return &pb.MonitoredRelease{
InfoHash: p.rel.InfoHash,
+8
View File
@@ -8,6 +8,14 @@ import (
"homelab.lan/music-agregator/internal/config"
)
func MustNewTorrentClient(cfg config.Config) TorrentClient {
client, err := NewTorrentClient(cfg)
if err != nil {
panic(fmt.Sprintf("failed to create torrent client: %v", err))
}
return client
}
func NewTorrentClient(cfg config.Config) (TorrentClient, error) {
var client TorrentClient
+176
View File
@@ -0,0 +1,176 @@
package workers
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/torrent"
)
type PollDownloadArgs struct {
DownloadID string `json:"download_id"`
TorrentHash string `json:"torrent_hash"`
CheckInterval time.Duration `json:"check_interval"`
}
func (PollDownloadArgs) Kind() string { return "poll_download" }
type PollDownloadWorker struct {
river.WorkerDefaults[PollDownloadArgs]
TorrentClient torrent.TorrentClient
Downloads *database.DownloadRepository
DownloadFiles *database.DownloadFileRepository
RiverClient *river.Client[pgx.Tx]
}
func (w *PollDownloadWorker) Work(ctx context.Context, job *river.Job[PollDownloadArgs]) error {
args := job.Args
log.Trace().Str("download_id", args.DownloadID).Str("hash", args.TorrentHash).Msg("polling download status")
results, err := w.TorrentClient.Find(torrent.FindOptions{Hash: args.TorrentHash})
if err != nil {
log.Error().Err(err).Str("hash", args.TorrentHash).Msg("failed to query torrent client")
return w.reschedule(ctx, args)
}
if len(results) == 0 {
log.Warn().Str("hash", args.TorrentHash).Msg("torrent not found in client, marking failed")
w.Downloads.SetFailed(ctx, args.DownloadID, "torrent not found in client")
return nil
}
t := results[0]
switch {
case t.Progress >= 1.0:
return w.onCompleted(ctx, args, t)
case t.State == "error":
log.Warn().Str("hash", args.TorrentHash).Str("state", t.State).Msg("torrent in error state")
w.Downloads.SetFailed(ctx, args.DownloadID, "torrent error state")
return nil
default:
log.Trace().
Str("hash", args.TorrentHash).
Str("state", t.State).
Float64("progress", t.Progress*100).
Int64("dlspeed", t.DlSpeed).
Msg("download in progress")
return w.reschedule(ctx, args)
}
}
func (w *PollDownloadWorker) onCompleted(ctx context.Context, args PollDownloadArgs, t torrent.TorrentInfo) error {
log.Info().Str("hash", args.TorrentHash).Str("path", t.ContentPath).Msg("download completed")
if err := w.Downloads.SetCompleted(ctx, args.DownloadID, t.SavePath); err != nil {
log.Error().Err(err).Msg("failed to update download as completed")
return err
}
files, err := scanAndHashFiles(t.ContentPath)
if err != nil {
log.Error().Err(err).Str("path", t.ContentPath).Msg("failed to scan downloaded files")
return nil
}
for _, f := range files {
f.DownloadID = args.DownloadID
}
if err := w.DownloadFiles.CreateBatch(ctx, files); err != nil {
log.Error().Err(err).Msg("failed to save download files")
return nil
}
log.Info().
Str("download_id", args.DownloadID).
Int("files", len(files)).
Msg("download files scanned and hashed")
return nil
}
func (w *PollDownloadWorker) reschedule(ctx context.Context, args PollDownloadArgs) error {
_, err := w.RiverClient.Insert(ctx, args, &river.InsertOpts{
ScheduledAt: time.Now().Add(args.CheckInterval),
})
if err != nil {
log.Error().Err(err).Msg("failed to reschedule poll_download")
}
return nil
}
var audioExtensions = map[string]bool{
".flac": true, ".mp3": true, ".aac": true, ".m4a": true,
".ape": true, ".wv": true, ".ogg": true, ".wav": true, ".alac": true,
}
func scanAndHashFiles(rootPath string) ([]*database.DownloadFile, error) {
var files []*database.DownloadFile
err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return err
}
ext := strings.ToLower(filepath.Ext(path))
relPath, _ := filepath.Rel(rootPath, path)
fileType := strings.TrimPrefix(ext, ".")
if fileType == "" {
return nil
}
df := &database.DownloadFile{
FilePath: relPath,
FileSize: info.Size(),
FileType: fileType,
}
if audioExtensions[ext] || ext == ".cue" || ext == ".log" {
hash, err := hashFile(path)
if err != nil {
log.Warn().Err(err).Str("path", path).Msg("failed to hash file")
} else {
df.SHA256Hash = hash
now := time.Now()
df.VerifiedAt = &now
}
}
files = append(files, df)
return nil
})
return files, err
}
func hashFile(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", fmt.Errorf("opening file: %w", err)
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", fmt.Errorf("hashing file: %w", err)
}
return hex.EncodeToString(h.Sum(nil)), nil
}