Compare commits
2 Commits
84a6fe8ec7
...
60c94935b2
| Author | SHA1 | Date | |
|---|---|---|---|
| 60c94935b2 | |||
| 66264e1314 |
@@ -0,0 +1,22 @@
|
|||||||
|
meta {
|
||||||
|
name: Get Album
|
||||||
|
type: grpc
|
||||||
|
seq: 4
|
||||||
|
}
|
||||||
|
|
||||||
|
grpc {
|
||||||
|
url: localhost:3000
|
||||||
|
method: /metadata.v1.MetadataService/GetAlbum
|
||||||
|
body: grpc
|
||||||
|
auth: inherit
|
||||||
|
methodType: unary
|
||||||
|
}
|
||||||
|
|
||||||
|
body:grpc {
|
||||||
|
name: message 1
|
||||||
|
content: '''
|
||||||
|
{
|
||||||
|
"id": "a0b7b436-94db-4df6-8c5f-bc0e5932a90e"
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
}
|
||||||
+38
-20
@@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
|
||||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"github.com/riverqueue/river"
|
"github.com/riverqueue/river"
|
||||||
@@ -28,10 +27,12 @@ import (
|
|||||||
|
|
||||||
"homelab.lan/music-agregator/internal"
|
"homelab.lan/music-agregator/internal"
|
||||||
"homelab.lan/music-agregator/internal/config"
|
"homelab.lan/music-agregator/internal/config"
|
||||||
|
"homelab.lan/music-agregator/internal/database"
|
||||||
"homelab.lan/music-agregator/internal/hello"
|
"homelab.lan/music-agregator/internal/hello"
|
||||||
"homelab.lan/music-agregator/internal/indexer"
|
"homelab.lan/music-agregator/internal/indexer"
|
||||||
"homelab.lan/music-agregator/internal/metadata"
|
"homelab.lan/music-agregator/internal/metadata"
|
||||||
"homelab.lan/music-agregator/internal/torrent"
|
"homelab.lan/music-agregator/internal/torrent"
|
||||||
|
"homelab.lan/music-agregator/internal/workers"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -68,36 +69,54 @@ func interceptorLogger(l zerolog.Logger) logging.Logger {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupRiver(ctx context.Context, cfg config.Config) (*river.Client[pgx.Tx], *pgxpool.Pool) {
|
func setupDatabase(ctx context.Context, cfg config.Config) *database.DB {
|
||||||
if !cfg.Indexer.Cache.Enabled {
|
db, err := database.New(ctx, cfg.Database.URL)
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
dbPool, err := pgxpool.New(ctx, cfg.Database.URL)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal().Err(err).Msg("failed to connect to database for River")
|
log.Fatal().Err(err).Msg("failed to connect to database")
|
||||||
|
}
|
||||||
|
return db
|
||||||
|
}
|
||||||
|
|
||||||
|
type riverSetup struct {
|
||||||
|
client *river.Client[pgx.Tx]
|
||||||
|
cacheRefreshWorker *indexer.CacheRefreshWorker
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupRiver(ctx context.Context, cfg config.Config, db *database.DB) *riverSetup {
|
||||||
|
cacheWorker := &indexer.CacheRefreshWorker{}
|
||||||
|
pollWorker := &workers.PollDownloadWorker{
|
||||||
|
Downloads: database.NewDownloadRepository(db.Pool),
|
||||||
|
DownloadFiles: database.NewDownloadFileRepository(db.Pool),
|
||||||
|
TorrentClient: torrent.MustNewTorrentClient(cfg),
|
||||||
}
|
}
|
||||||
|
|
||||||
workers := river.NewWorkers()
|
riverWorkers := river.NewWorkers()
|
||||||
river.AddWorker(workers, &indexer.CacheRefreshWorker{})
|
river.AddWorker(riverWorkers, cacheWorker)
|
||||||
|
river.AddWorker(riverWorkers, pollWorker)
|
||||||
|
|
||||||
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
|
riverClient, err := river.NewClient(riverpgxv5.New(db.Pool), &river.Config{
|
||||||
Queues: map[string]river.QueueConfig{
|
Queues: map[string]river.QueueConfig{
|
||||||
river.QueueDefault: {MaxWorkers: 2},
|
river.QueueDefault: {MaxWorkers: 4},
|
||||||
},
|
},
|
||||||
Workers: workers,
|
Workers: riverWorkers,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal().Err(err).Msg("failed to create River client")
|
log.Fatal().Err(err).Msg("failed to create River client")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cacheWorker.RiverClient = riverClient
|
||||||
|
pollWorker.RiverClient = riverClient
|
||||||
|
|
||||||
if err := riverClient.Start(ctx); err != nil {
|
if err := riverClient.Start(ctx); err != nil {
|
||||||
log.Fatal().Err(err).Msg("failed to start River client")
|
log.Fatal().Err(err).Msg("failed to start River client")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info().Msg("River queue started")
|
log.Info().Msg("River queue started")
|
||||||
|
|
||||||
return riverClient, dbPool
|
return &riverSetup{
|
||||||
|
client: riverClient,
|
||||||
|
cacheRefreshWorker: cacheWorker,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func serveGrpc(config config.Config) {
|
func serveGrpc(config config.Config) {
|
||||||
@@ -132,16 +151,15 @@ func serveGrpc(config config.Config) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
riverClient, dbPool := setupRiver(ctx, config)
|
db := setupDatabase(ctx, config)
|
||||||
if dbPool != nil {
|
defer db.Close()
|
||||||
defer dbPool.Close()
|
rs := setupRiver(ctx, config, db)
|
||||||
}
|
|
||||||
|
|
||||||
musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, riverClient)
|
musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, rs.client, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal().Err(err).Msg("failed to create MusicAgregatorServer")
|
log.Fatal().Err(err).Msg("failed to create MusicAgregatorServer")
|
||||||
}
|
}
|
||||||
indexerServer, err := indexer.NewIndexerServer(config, riverClient)
|
indexerServer, err := indexer.NewIndexerServer(config, rs.client, rs.cacheRefreshWorker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal().Err(err).Msg("failed to create IndexerServer")
|
log.Fatal().Err(err).Msg("failed to create IndexerServer")
|
||||||
}
|
}
|
||||||
|
|||||||
+114
-208
@@ -16,259 +16,165 @@ skinparam package {
|
|||||||
|
|
||||||
title Music Aggregator - Database Structure
|
title Music Aggregator - Database Structure
|
||||||
|
|
||||||
' ══════════════════════════════════════════════════════════════
|
package "Music Metadata" #E3F2FD {
|
||||||
' CORE MUSIC ENTITIES
|
entity "artists" {
|
||||||
' ══════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
package "Core Music Entities" #E3F2FD {
|
|
||||||
entity "artist_metadata" {
|
|
||||||
* id : UUID <<PK>>
|
* id : UUID <<PK>>
|
||||||
--
|
--
|
||||||
foreign_artist_id : TEXT <<UNIQUE>>
|
external_id : VARCHAR(255) <<UNIQUE>>
|
||||||
name : TEXT
|
name : VARCHAR(500)
|
||||||
sort_name : TEXT
|
artist_type : VARCHAR(50)
|
||||||
disambiguation : TEXT
|
country : VARCHAR(10)
|
||||||
artist_type : TEXT
|
genres : TEXT[]
|
||||||
status : TEXT
|
image_url : TEXT
|
||||||
overview : TEXT
|
|
||||||
images : JSONB
|
|
||||||
links : JSONB
|
|
||||||
genres : JSONB
|
|
||||||
--
|
--
|
||||||
created_at : TIMESTAMPTZ
|
created_at : TIMESTAMPTZ
|
||||||
updated_at : TIMESTAMPTZ
|
updated_at : TIMESTAMPTZ
|
||||||
}
|
}
|
||||||
|
|
||||||
entity "artists" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
|
||||||
metadata_id : UUID <<FK>>
|
|
||||||
quality_profile_id : UUID <<FK>>
|
|
||||||
metadata_profile_id : UUID <<FK>>
|
|
||||||
root_folder_id : UUID <<FK>>
|
|
||||||
--
|
|
||||||
path : TEXT
|
|
||||||
monitored : BOOLEAN
|
|
||||||
monitor_new_items : TEXT
|
|
||||||
--
|
|
||||||
last_info_sync : TIMESTAMPTZ
|
|
||||||
added_at : TIMESTAMPTZ
|
|
||||||
}
|
|
||||||
|
|
||||||
entity "albums" {
|
entity "albums" {
|
||||||
* id : UUID <<PK>>
|
* id : UUID <<PK>>
|
||||||
--
|
--
|
||||||
artist_metadata_id : UUID <<FK>>
|
external_id : VARCHAR(255) <<UNIQUE>>
|
||||||
|
artist_id : UUID <<FK>>
|
||||||
--
|
--
|
||||||
foreign_album_id : TEXT <<UNIQUE>>
|
title : VARCHAR(500)
|
||||||
title : TEXT
|
album_type : VARCHAR(50)
|
||||||
clean_title : TEXT
|
|
||||||
disambiguation : TEXT
|
|
||||||
overview : TEXT
|
|
||||||
album_type : TEXT
|
|
||||||
release_date : DATE
|
release_date : DATE
|
||||||
images : JSONB
|
total_tracks : INT
|
||||||
genres : JSONB
|
total_discs : INT
|
||||||
|
label : VARCHAR(255)
|
||||||
|
genres : TEXT[]
|
||||||
|
cover_url : TEXT
|
||||||
|
is_monitored : BOOLEAN
|
||||||
--
|
--
|
||||||
monitored : BOOLEAN
|
created_at : TIMESTAMPTZ
|
||||||
added_at : TIMESTAMPTZ
|
updated_at : TIMESTAMPTZ
|
||||||
}
|
|
||||||
|
|
||||||
entity "album_releases" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
|
||||||
album_id : UUID <<FK>>
|
|
||||||
--
|
|
||||||
foreign_release_id : TEXT <<UNIQUE>>
|
|
||||||
title : TEXT
|
|
||||||
status : TEXT
|
|
||||||
duration_ms : INT
|
|
||||||
release_date : DATE
|
|
||||||
country : TEXT[]
|
|
||||||
label : TEXT[]
|
|
||||||
format : TEXT
|
|
||||||
track_count : INT
|
|
||||||
--
|
|
||||||
monitored : BOOLEAN
|
|
||||||
}
|
}
|
||||||
|
|
||||||
entity "tracks" {
|
entity "tracks" {
|
||||||
* id : UUID <<PK>>
|
* id : UUID <<PK>>
|
||||||
--
|
--
|
||||||
album_release_id : UUID <<FK>>
|
external_id : VARCHAR(255) <<UNIQUE>>
|
||||||
artist_metadata_id : UUID <<FK>>
|
album_id : UUID <<FK>>
|
||||||
track_file_id : UUID <<FK NULL>>
|
|
||||||
--
|
--
|
||||||
foreign_track_id : TEXT <<UNIQUE>>
|
title : VARCHAR(500)
|
||||||
title : TEXT
|
|
||||||
track_number : INT
|
|
||||||
disc_number : INT
|
|
||||||
duration_ms : INT
|
duration_ms : INT
|
||||||
explicit : BOOLEAN
|
isrc : VARCHAR(20)
|
||||||
}
|
disc_number : INT
|
||||||
|
track_number : INT
|
||||||
entity "track_files" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
--
|
||||||
album_id : UUID <<FK>>
|
created_at : TIMESTAMPTZ
|
||||||
--
|
|
||||||
path : TEXT
|
|
||||||
relative_path : TEXT
|
|
||||||
size : BIGINT
|
|
||||||
--
|
|
||||||
file_hash : TEXT
|
|
||||||
audio_hash : TEXT
|
|
||||||
--
|
|
||||||
quality : JSONB
|
|
||||||
media_info : JSONB
|
|
||||||
--
|
|
||||||
scene_name : TEXT
|
|
||||||
release_group : TEXT
|
|
||||||
--
|
|
||||||
date_added : TIMESTAMPTZ
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
' ══════════════════════════════════════════════════════════════
|
package "Torrent Catalog" #FFF3E0 {
|
||||||
' CONFIGURATION
|
entity "torrents" {
|
||||||
' ══════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
package "Configuration" #FFF3E0 {
|
|
||||||
entity "quality_profiles" {
|
|
||||||
* id : UUID <<PK>>
|
* id : UUID <<PK>>
|
||||||
--
|
--
|
||||||
name : TEXT <<UNIQUE>>
|
|
||||||
cutoff : INT
|
|
||||||
items : JSONB
|
|
||||||
upgrade_allowed : BOOLEAN
|
|
||||||
}
|
|
||||||
|
|
||||||
entity "metadata_profiles" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
|
||||||
name : TEXT <<UNIQUE>>
|
|
||||||
primary_album_types : JSONB
|
|
||||||
secondary_album_types : JSONB
|
|
||||||
release_statuses : JSONB
|
|
||||||
}
|
|
||||||
|
|
||||||
entity "root_folders" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
|
||||||
name : TEXT
|
|
||||||
path : TEXT <<UNIQUE>>
|
|
||||||
default_quality_profile_id : UUID <<FK>>
|
|
||||||
default_metadata_profile_id : UUID <<FK>>
|
|
||||||
}
|
|
||||||
|
|
||||||
entity "indexers" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
|
||||||
name : TEXT
|
|
||||||
implementation : TEXT
|
|
||||||
settings : JSONB
|
|
||||||
enable_rss : BOOLEAN
|
|
||||||
enable_search : BOOLEAN
|
|
||||||
priority : INT
|
|
||||||
}
|
|
||||||
|
|
||||||
entity "download_clients" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
|
||||||
name : TEXT
|
|
||||||
implementation : TEXT
|
|
||||||
settings : JSONB
|
|
||||||
protocol : TEXT
|
|
||||||
priority : INT
|
|
||||||
enabled : BOOLEAN
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
' ══════════════════════════════════════════════════════════════
|
|
||||||
' DOWNLOAD TRACKING
|
|
||||||
' ══════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
package "Download Tracking" #E8F5E9 {
|
|
||||||
entity "wanted_albums" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
|
||||||
album_id : UUID <<FK>> <<UNIQUE>>
|
|
||||||
--
|
|
||||||
priority : INT
|
|
||||||
search_count : INT
|
|
||||||
last_searched_at : TIMESTAMPTZ
|
|
||||||
added_at : TIMESTAMPTZ
|
|
||||||
}
|
|
||||||
|
|
||||||
entity "download_queue" {
|
|
||||||
* id : UUID <<PK>>
|
|
||||||
--
|
|
||||||
artist_id : UUID <<FK>>
|
|
||||||
album_id : UUID <<FK>>
|
album_id : UUID <<FK>>
|
||||||
|
info_hash : VARCHAR(40) <<UNIQUE>>
|
||||||
--
|
--
|
||||||
download_id : TEXT
|
tracker : VARCHAR(100)
|
||||||
title : TEXT
|
title : TEXT
|
||||||
|
format : VARCHAR(20)
|
||||||
|
quality : VARCHAR(20)
|
||||||
|
source : VARCHAR(20)
|
||||||
|
bit_depth : INT
|
||||||
|
sample_rate : INT
|
||||||
|
seeders : INT
|
||||||
|
peers : INT
|
||||||
size : BIGINT
|
size : BIGINT
|
||||||
size_left : BIGINT
|
track_count : INT
|
||||||
|
has_cover_art : BOOLEAN
|
||||||
|
has_cue_sheet : BOOLEAN
|
||||||
|
has_rip_log : BOOLEAN
|
||||||
|
download_link : TEXT
|
||||||
|
torrent_file : BYTEA
|
||||||
--
|
--
|
||||||
status : TEXT
|
created_at : TIMESTAMPTZ
|
||||||
progress : REAL
|
updated_at : TIMESTAMPTZ
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
package "Download Management" #E8F5E9 {
|
||||||
|
entity "downloads" {
|
||||||
|
* id : UUID <<PK>>
|
||||||
|
--
|
||||||
|
torrent_id : UUID <<FK>>
|
||||||
|
album_id : UUID
|
||||||
|
format : VARCHAR(20)
|
||||||
|
quality : VARCHAR(20)
|
||||||
|
--
|
||||||
|
state : download_state
|
||||||
|
qbit_hash : VARCHAR(64)
|
||||||
|
save_path : TEXT
|
||||||
error_message : TEXT
|
error_message : TEXT
|
||||||
--
|
--
|
||||||
protocol : TEXT
|
queued_at : TIMESTAMPTZ
|
||||||
indexer : TEXT
|
started_at : TIMESTAMPTZ
|
||||||
download_client : TEXT
|
|
||||||
torrent_hash : TEXT
|
|
||||||
output_path : TEXT
|
|
||||||
--
|
|
||||||
added_at : TIMESTAMPTZ
|
|
||||||
completed_at : TIMESTAMPTZ
|
completed_at : TIMESTAMPTZ
|
||||||
|
created_at : TIMESTAMPTZ
|
||||||
|
updated_at : TIMESTAMPTZ
|
||||||
}
|
}
|
||||||
|
|
||||||
entity "blocklist" {
|
entity "download_files" {
|
||||||
* id : UUID <<PK>>
|
* id : UUID <<PK>>
|
||||||
--
|
--
|
||||||
artist_id : UUID <<FK>>
|
download_id : UUID <<FK>>
|
||||||
album_id : UUID <<FK>>
|
track_id : UUID <<FK NULL>>
|
||||||
--
|
--
|
||||||
source_title : TEXT
|
file_path : TEXT
|
||||||
quality : JSONB
|
file_size : BIGINT
|
||||||
size : BIGINT
|
file_type : VARCHAR(20)
|
||||||
protocol : TEXT
|
sha256_hash : VARCHAR(64)
|
||||||
indexer : TEXT
|
|
||||||
message : TEXT
|
|
||||||
torrent_hash : TEXT
|
|
||||||
--
|
--
|
||||||
date : TIMESTAMPTZ
|
verified_at : TIMESTAMPTZ
|
||||||
|
created_at : TIMESTAMPTZ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
' ══════════════════════════════════════════════════════════════
|
package "Caching & Queue (River)" #F3E5F5 {
|
||||||
' RELATIONSHIPS
|
entity "river_job" {
|
||||||
' ══════════════════════════════════════════════════════════════
|
* id : BIGSERIAL <<PK>>
|
||||||
|
--
|
||||||
|
kind : TEXT
|
||||||
|
state : river_job_state
|
||||||
|
queue : TEXT
|
||||||
|
args : JSONB
|
||||||
|
metadata : JSONB
|
||||||
|
--
|
||||||
|
attempt : SMALLINT
|
||||||
|
max_attempts : SMALLINT
|
||||||
|
priority : SMALLINT
|
||||||
|
--
|
||||||
|
scheduled_at : TIMESTAMPTZ
|
||||||
|
attempted_at : TIMESTAMPTZ
|
||||||
|
created_at : TIMESTAMPTZ
|
||||||
|
finalized_at : TIMESTAMPTZ
|
||||||
|
}
|
||||||
|
|
||||||
' Core music relationships
|
entity "river_queue" {
|
||||||
artist_metadata ||--|| artists : "has config"
|
* name : TEXT <<PK>>
|
||||||
artist_metadata ||--o{ albums : "released"
|
--
|
||||||
albums ||--o{ album_releases : "has releases"
|
metadata : JSONB
|
||||||
album_releases ||--o{ tracks : "contains"
|
paused_at : TIMESTAMPTZ
|
||||||
tracks }o--o| track_files : "stored in"
|
created_at : TIMESTAMPTZ
|
||||||
track_files }o--|| albums : "belongs to"
|
updated_at : TIMESTAMPTZ
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
' Artist config relationships
|
note right of river_job
|
||||||
artists }o--|| quality_profiles : "uses"
|
Cache refresh jobs:
|
||||||
artists }o--o| metadata_profiles : "uses"
|
kind = "indexer_cache_refresh"
|
||||||
artists }o--o| root_folders : "stored in"
|
args = {key, url, ttl_expires, refresh_interval}
|
||||||
|
scheduled_at = next refresh time
|
||||||
|
end note
|
||||||
|
|
||||||
' Root folder defaults
|
artists ||--o{ albums : "released"
|
||||||
root_folders }o--o| quality_profiles : "default"
|
albums ||--o{ tracks : "contains"
|
||||||
root_folders }o--o| metadata_profiles : "default"
|
albums ||--o{ torrents : "available on"
|
||||||
|
torrents ||--o| downloads : "downloaded as"
|
||||||
' Download tracking relationships
|
downloads ||--o{ download_files : "consists of"
|
||||||
wanted_albums ||--|| albums : "targets"
|
tracks ||--o| download_files : "matched to"
|
||||||
download_queue }o--o| artists : "for"
|
|
||||||
download_queue }o--o| albums : "for"
|
|
||||||
blocklist }o--|| artists : "for"
|
|
||||||
blocklist }o--o| albums : "for"
|
|
||||||
|
|
||||||
@enduml
|
@enduml
|
||||||
|
|||||||
@@ -0,0 +1,132 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Album struct {
|
||||||
|
ID string
|
||||||
|
ExternalID string
|
||||||
|
ArtistID string
|
||||||
|
Title string
|
||||||
|
AlbumType string
|
||||||
|
ReleaseDate *time.Time
|
||||||
|
TotalTracks int
|
||||||
|
TotalDiscs int
|
||||||
|
Label string
|
||||||
|
Genres []string
|
||||||
|
CoverURL string
|
||||||
|
IsMonitored bool
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type AlbumRepository struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAlbumRepository(pool *pgxpool.Pool) *AlbumRepository {
|
||||||
|
return &AlbumRepository{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumRepository) Create(ctx context.Context, a *Album) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`INSERT INTO albums (external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||||
|
ON CONFLICT (external_id) DO UPDATE SET
|
||||||
|
title = EXCLUDED.title,
|
||||||
|
album_type = EXCLUDED.album_type,
|
||||||
|
release_date = EXCLUDED.release_date,
|
||||||
|
total_tracks = EXCLUDED.total_tracks,
|
||||||
|
total_discs = EXCLUDED.total_discs,
|
||||||
|
label = EXCLUDED.label,
|
||||||
|
genres = EXCLUDED.genres,
|
||||||
|
cover_url = EXCLUDED.cover_url,
|
||||||
|
updated_at = NOW()`,
|
||||||
|
a.ExternalID, a.ArtistID, a.Title, a.AlbumType, a.ReleaseDate, a.TotalTracks, a.TotalDiscs, a.Label, a.Genres, a.CoverURL, a.IsMonitored,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating album: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumRepository) GetByExternalID(ctx context.Context, externalID string) (*Album, error) {
|
||||||
|
a := &Album{}
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT id, external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored, created_at, updated_at
|
||||||
|
FROM albums WHERE external_id = $1`, externalID,
|
||||||
|
).Scan(&a.ID, &a.ExternalID, &a.ArtistID, &a.Title, &a.AlbumType, &a.ReleaseDate, &a.TotalTracks, &a.TotalDiscs, &a.Label, &a.Genres, &a.CoverURL, &a.IsMonitored, &a.CreatedAt, &a.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting album: %w", err)
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumRepository) GetByID(ctx context.Context, id string) (*Album, error) {
|
||||||
|
a := &Album{}
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT id, external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored, created_at, updated_at
|
||||||
|
FROM albums WHERE id = $1`, id,
|
||||||
|
).Scan(&a.ID, &a.ExternalID, &a.ArtistID, &a.Title, &a.AlbumType, &a.ReleaseDate, &a.TotalTracks, &a.TotalDiscs, &a.Label, &a.Genres, &a.CoverURL, &a.IsMonitored, &a.CreatedAt, &a.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting album: %w", err)
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumRepository) GetByArtistID(ctx context.Context, artistID string) ([]*Album, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored, created_at, updated_at
|
||||||
|
FROM albums WHERE artist_id = $1 ORDER BY release_date DESC`, artistID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing albums: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var albums []*Album
|
||||||
|
for rows.Next() {
|
||||||
|
a := &Album{}
|
||||||
|
if err := rows.Scan(&a.ID, &a.ExternalID, &a.ArtistID, &a.Title, &a.AlbumType, &a.ReleaseDate, &a.TotalTracks, &a.TotalDiscs, &a.Label, &a.Genres, &a.CoverURL, &a.IsMonitored, &a.CreatedAt, &a.UpdatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning album: %w", err)
|
||||||
|
}
|
||||||
|
albums = append(albums, a)
|
||||||
|
}
|
||||||
|
return albums, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumRepository) GetMonitored(ctx context.Context) ([]*Album, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, external_id, artist_id, title, album_type, release_date, total_tracks, total_discs, label, genres, cover_url, is_monitored, created_at, updated_at
|
||||||
|
FROM albums WHERE is_monitored = TRUE ORDER BY release_date DESC`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing monitored albums: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var albums []*Album
|
||||||
|
for rows.Next() {
|
||||||
|
a := &Album{}
|
||||||
|
if err := rows.Scan(&a.ID, &a.ExternalID, &a.ArtistID, &a.Title, &a.AlbumType, &a.ReleaseDate, &a.TotalTracks, &a.TotalDiscs, &a.Label, &a.Genres, &a.CoverURL, &a.IsMonitored, &a.CreatedAt, &a.UpdatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning album: %w", err)
|
||||||
|
}
|
||||||
|
albums = append(albums, a)
|
||||||
|
}
|
||||||
|
return albums, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumRepository) SetMonitored(ctx context.Context, id string, monitored bool) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE albums SET is_monitored = $1, updated_at = NOW() WHERE id = $2`, monitored, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("updating monitored state: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,73 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Artist struct {
|
||||||
|
ID string
|
||||||
|
ExternalID string
|
||||||
|
Name string
|
||||||
|
ArtistType string
|
||||||
|
Country string
|
||||||
|
Genres []string
|
||||||
|
ImageURL string
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type ArtistRepository struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewArtistRepository(pool *pgxpool.Pool) *ArtistRepository {
|
||||||
|
return &ArtistRepository{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ArtistRepository) Create(ctx context.Context, a *Artist) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`INSERT INTO artists (external_id, name, artist_type, country, genres, image_url)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
|
ON CONFLICT (external_id) DO UPDATE SET
|
||||||
|
name = EXCLUDED.name,
|
||||||
|
artist_type = EXCLUDED.artist_type,
|
||||||
|
country = EXCLUDED.country,
|
||||||
|
genres = EXCLUDED.genres,
|
||||||
|
image_url = EXCLUDED.image_url,
|
||||||
|
updated_at = NOW()
|
||||||
|
RETURNING id, created_at, updated_at`,
|
||||||
|
a.ExternalID, a.Name, a.ArtistType, a.Country, a.Genres, a.ImageURL,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating artist: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ArtistRepository) GetByExternalID(ctx context.Context, externalID string) (*Artist, error) {
|
||||||
|
a := &Artist{}
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT id, external_id, name, artist_type, country, genres, image_url, created_at, updated_at
|
||||||
|
FROM artists WHERE external_id = $1`, externalID,
|
||||||
|
).Scan(&a.ID, &a.ExternalID, &a.Name, &a.ArtistType, &a.Country, &a.Genres, &a.ImageURL, &a.CreatedAt, &a.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting artist: %w", err)
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ArtistRepository) GetByID(ctx context.Context, id string) (*Artist, error) {
|
||||||
|
a := &Artist{}
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT id, external_id, name, artist_type, country, genres, image_url, created_at, updated_at
|
||||||
|
FROM artists WHERE id = $1`, id,
|
||||||
|
).Scan(&a.ID, &a.ExternalID, &a.Name, &a.ArtistType, &a.Country, &a.Genres, &a.ImageURL, &a.CreatedAt, &a.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting artist: %w", err)
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DB struct {
|
||||||
|
Pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(ctx context.Context, databaseURL string) (*DB, error) {
|
||||||
|
pool, err := pgxpool.New(ctx, databaseURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("connecting to database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pool.Ping(ctx); err != nil {
|
||||||
|
return nil, fmt.Errorf("pinging database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Str("url", databaseURL).Msg("database connected")
|
||||||
|
|
||||||
|
return &DB{Pool: pool}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) Close() {
|
||||||
|
db.Pool.Close()
|
||||||
|
}
|
||||||
@@ -0,0 +1,104 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DownloadFile struct {
|
||||||
|
ID string
|
||||||
|
DownloadID string
|
||||||
|
TrackID *string
|
||||||
|
FilePath string
|
||||||
|
FileSize int64
|
||||||
|
FileType string
|
||||||
|
SHA256Hash string
|
||||||
|
VerifiedAt *time.Time
|
||||||
|
CreatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type DownloadFileRepository struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDownloadFileRepository(pool *pgxpool.Pool) *DownloadFileRepository {
|
||||||
|
return &DownloadFileRepository{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadFileRepository) Create(ctx context.Context, f *DownloadFile) error {
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`INSERT INTO download_files (download_id, track_id, file_path, file_size, file_type, sha256_hash)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
|
RETURNING id, created_at`,
|
||||||
|
f.DownloadID, f.TrackID, f.FilePath, f.FileSize, f.FileType, f.SHA256Hash,
|
||||||
|
).Scan(&f.ID, &f.CreatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating download file: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadFileRepository) CreateBatch(ctx context.Context, files []*DownloadFile) error {
|
||||||
|
for _, f := range files {
|
||||||
|
if err := r.Create(ctx, f); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadFileRepository) GetByDownloadID(ctx context.Context, downloadID string) ([]*DownloadFile, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, download_id, track_id, file_path, file_size, file_type, sha256_hash, verified_at, created_at
|
||||||
|
FROM download_files WHERE download_id = $1 ORDER BY file_path`, downloadID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing download files: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var files []*DownloadFile
|
||||||
|
for rows.Next() {
|
||||||
|
f := &DownloadFile{}
|
||||||
|
if err := rows.Scan(&f.ID, &f.DownloadID, &f.TrackID, &f.FilePath, &f.FileSize, &f.FileType, &f.SHA256Hash, &f.VerifiedAt, &f.CreatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning download file: %w", err)
|
||||||
|
}
|
||||||
|
files = append(files, f)
|
||||||
|
}
|
||||||
|
return files, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadFileRepository) SetHash(ctx context.Context, id string, hash string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE download_files SET sha256_hash = $1, verified_at = NOW() WHERE id = $2`, hash, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting file hash: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadFileRepository) GetUnverified(ctx context.Context) ([]*DownloadFile, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, download_id, track_id, file_path, file_size, file_type, sha256_hash, verified_at, created_at
|
||||||
|
FROM download_files WHERE sha256_hash IS NULL OR verified_at < NOW() - INTERVAL '30 days'
|
||||||
|
ORDER BY created_at`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing unverified files: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var files []*DownloadFile
|
||||||
|
for rows.Next() {
|
||||||
|
f := &DownloadFile{}
|
||||||
|
if err := rows.Scan(&f.ID, &f.DownloadID, &f.TrackID, &f.FilePath, &f.FileSize, &f.FileType, &f.SHA256Hash, &f.VerifiedAt, &f.CreatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning download file: %w", err)
|
||||||
|
}
|
||||||
|
files = append(files, f)
|
||||||
|
}
|
||||||
|
return files, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,122 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Download struct {
|
||||||
|
ID string
|
||||||
|
TorrentID string
|
||||||
|
AlbumID string
|
||||||
|
Format string
|
||||||
|
Quality string
|
||||||
|
State string
|
||||||
|
QbitHash string
|
||||||
|
SavePath string
|
||||||
|
ErrorMessage string
|
||||||
|
QueuedAt time.Time
|
||||||
|
StartedAt *time.Time
|
||||||
|
CompletedAt *time.Time
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type DownloadRepository struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDownloadRepository(pool *pgxpool.Pool) *DownloadRepository {
|
||||||
|
return &DownloadRepository{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) Create(ctx context.Context, d *Download) error {
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`INSERT INTO downloads (torrent_id, album_id, format, quality, state, qbit_hash, save_path)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||||
|
RETURNING id, queued_at, created_at, updated_at`,
|
||||||
|
d.TorrentID, d.AlbumID, d.Format, d.Quality, d.State, d.QbitHash, d.SavePath,
|
||||||
|
).Scan(&d.ID, &d.QueuedAt, &d.CreatedAt, &d.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating download: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) UpdateState(ctx context.Context, id string, state string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE downloads SET state = $1, updated_at = NOW() WHERE id = $2`, state, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("updating download state: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) SetStarted(ctx context.Context, id string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE downloads SET state = 'downloading', started_at = NOW(), updated_at = NOW() WHERE id = $1`, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting download started: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) SetCompleted(ctx context.Context, id string, savePath string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE downloads SET state = 'completed', save_path = $1, completed_at = NOW(), updated_at = NOW() WHERE id = $2`, savePath, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting download completed: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) SetFailed(ctx context.Context, id string, errorMsg string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE downloads SET state = 'failed', error_message = $1, updated_at = NOW() WHERE id = $2`, errorMsg, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting download failed: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Download, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, torrent_id, album_id, format, quality, state, qbit_hash, save_path, error_message, queued_at, started_at, completed_at, created_at, updated_at
|
||||||
|
FROM downloads WHERE album_id = $1 ORDER BY created_at DESC`, albumID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing downloads: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var downloads []*Download
|
||||||
|
for rows.Next() {
|
||||||
|
d := &Download{}
|
||||||
|
if err := rows.Scan(&d.ID, &d.TorrentID, &d.AlbumID, &d.Format, &d.Quality, &d.State, &d.QbitHash, &d.SavePath, &d.ErrorMessage, &d.QueuedAt, &d.StartedAt, &d.CompletedAt, &d.CreatedAt, &d.UpdatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning download: %w", err)
|
||||||
|
}
|
||||||
|
downloads = append(downloads, d)
|
||||||
|
}
|
||||||
|
return downloads, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) HasAlbumInQuality(ctx context.Context, albumID string, format string, quality string) (bool, error) {
|
||||||
|
var exists bool
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT EXISTS(
|
||||||
|
SELECT 1 FROM downloads
|
||||||
|
WHERE album_id = $1 AND format = $2 AND quality = $3 AND state IN ('completed', 'seeding')
|
||||||
|
)`, albumID, format, quality,
|
||||||
|
).Scan(&exists)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("checking album quality: %w", err)
|
||||||
|
}
|
||||||
|
return exists, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,104 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Torrent struct {
|
||||||
|
ID string
|
||||||
|
AlbumID string
|
||||||
|
InfoHash string
|
||||||
|
Tracker string
|
||||||
|
Title string
|
||||||
|
Format string
|
||||||
|
Quality string
|
||||||
|
Source string
|
||||||
|
BitDepth int
|
||||||
|
SampleRate int
|
||||||
|
Seeders int
|
||||||
|
Peers int
|
||||||
|
Size int64
|
||||||
|
TrackCount int
|
||||||
|
HasCoverArt bool
|
||||||
|
HasCueSheet bool
|
||||||
|
HasRipLog bool
|
||||||
|
DownloadLink string
|
||||||
|
TorrentFile []byte
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type TorrentRepository struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTorrentRepository(pool *pgxpool.Pool) *TorrentRepository {
|
||||||
|
return &TorrentRepository{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TorrentRepository) Create(ctx context.Context, t *Torrent) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`INSERT INTO torrents (album_id, info_hash, tracker, title, format, quality, source, bit_depth, sample_rate, seeders, peers, size, track_count, has_cover_art, has_cue_sheet, has_rip_log, download_link, torrent_file)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
|
||||||
|
ON CONFLICT (info_hash) DO UPDATE SET
|
||||||
|
seeders = EXCLUDED.seeders,
|
||||||
|
peers = EXCLUDED.peers,
|
||||||
|
updated_at = NOW()`,
|
||||||
|
t.AlbumID, t.InfoHash, t.Tracker, t.Title, t.Format, t.Quality, t.Source, t.BitDepth, t.SampleRate, t.Seeders, t.Peers, t.Size, t.TrackCount, t.HasCoverArt, t.HasCueSheet, t.HasRipLog, t.DownloadLink, t.TorrentFile,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating torrent: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TorrentRepository) GetByInfoHash(ctx context.Context, infoHash string) (*Torrent, error) {
|
||||||
|
t := &Torrent{}
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT id, album_id, info_hash, tracker, title, format, quality, source, bit_depth, sample_rate, seeders, peers, size, track_count, has_cover_art, has_cue_sheet, has_rip_log, download_link, torrent_file, created_at, updated_at
|
||||||
|
FROM torrents WHERE info_hash = $1`, infoHash,
|
||||||
|
).Scan(&t.ID, &t.AlbumID, &t.InfoHash, &t.Tracker, &t.Title, &t.Format, &t.Quality, &t.Source, &t.BitDepth, &t.SampleRate, &t.Seeders, &t.Peers, &t.Size, &t.TrackCount, &t.HasCoverArt, &t.HasCueSheet, &t.HasRipLog, &t.DownloadLink, &t.TorrentFile, &t.CreatedAt, &t.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting torrent: %w", err)
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TorrentRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Torrent, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, album_id, info_hash, tracker, title, format, quality, source, bit_depth, sample_rate, seeders, peers, size, track_count, has_cover_art, has_cue_sheet, has_rip_log, download_link, torrent_file, created_at, updated_at
|
||||||
|
FROM torrents WHERE album_id = $1 ORDER BY seeders DESC`, albumID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing torrents: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var torrents []*Torrent
|
||||||
|
for rows.Next() {
|
||||||
|
t := &Torrent{}
|
||||||
|
if err := rows.Scan(&t.ID, &t.AlbumID, &t.InfoHash, &t.Tracker, &t.Title, &t.Format, &t.Quality, &t.Source, &t.BitDepth, &t.SampleRate, &t.Seeders, &t.Peers, &t.Size, &t.TrackCount, &t.HasCoverArt, &t.HasCueSheet, &t.HasRipLog, &t.DownloadLink, &t.TorrentFile, &t.CreatedAt, &t.UpdatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning torrent: %w", err)
|
||||||
|
}
|
||||||
|
torrents = append(torrents, t)
|
||||||
|
}
|
||||||
|
return torrents, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TorrentRepository) HasAlbumInFormat(ctx context.Context, albumID string, format string) (bool, error) {
|
||||||
|
var exists bool
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT EXISTS(
|
||||||
|
SELECT 1 FROM downloads
|
||||||
|
WHERE album_id = $1 AND format = $2 AND state IN ('completed', 'seeding')
|
||||||
|
)`, albumID, format,
|
||||||
|
).Scan(&exists)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("checking album format: %w", err)
|
||||||
|
}
|
||||||
|
return exists, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,68 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Track struct {
|
||||||
|
ID string
|
||||||
|
ExternalID string
|
||||||
|
AlbumID string
|
||||||
|
Title string
|
||||||
|
DurationMS int
|
||||||
|
ISRC string
|
||||||
|
DiscNumber int
|
||||||
|
TrackNumber int
|
||||||
|
CreatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type TrackRepository struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTrackRepository(pool *pgxpool.Pool) *TrackRepository {
|
||||||
|
return &TrackRepository{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TrackRepository) Create(ctx context.Context, t *Track) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`INSERT INTO tracks (external_id, album_id, title, duration_ms, isrc, disc_number, track_number)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||||
|
ON CONFLICT (external_id) DO UPDATE SET
|
||||||
|
title = EXCLUDED.title,
|
||||||
|
duration_ms = EXCLUDED.duration_ms,
|
||||||
|
isrc = EXCLUDED.isrc,
|
||||||
|
disc_number = EXCLUDED.disc_number,
|
||||||
|
track_number = EXCLUDED.track_number`,
|
||||||
|
t.ExternalID, t.AlbumID, t.Title, t.DurationMS, t.ISRC, t.DiscNumber, t.TrackNumber,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating track: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *TrackRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Track, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, external_id, album_id, title, duration_ms, isrc, disc_number, track_number, created_at
|
||||||
|
FROM tracks WHERE album_id = $1 ORDER BY disc_number, track_number`, albumID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing tracks: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var tracks []*Track
|
||||||
|
for rows.Next() {
|
||||||
|
t := &Track{}
|
||||||
|
if err := rows.Scan(&t.ID, &t.ExternalID, &t.AlbumID, &t.Title, &t.DurationMS, &t.ISRC, &t.DiscNumber, &t.TrackNumber, &t.CreatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning track: %w", err)
|
||||||
|
}
|
||||||
|
tracks = append(tracks, t)
|
||||||
|
}
|
||||||
|
return tracks, nil
|
||||||
|
}
|
||||||
@@ -27,6 +27,12 @@ type CacheRefreshWorker struct {
|
|||||||
|
|
||||||
func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error {
|
func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error {
|
||||||
args := job.Args
|
args := job.Args
|
||||||
|
|
||||||
|
if w.Cache == nil || w.Indexer == nil {
|
||||||
|
log.Trace().Str("key", args.Key).Msg("cache disabled, discarding refresh job")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started")
|
log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started")
|
||||||
|
|
||||||
if time.Now().After(args.TTLExpires) {
|
if time.Now().After(args.TTLExpires) {
|
||||||
|
|||||||
@@ -17,8 +17,8 @@ type IndexerServer struct {
|
|||||||
pb.UnimplementedIndexerServiceServer
|
pb.UnimplementedIndexerServiceServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerServer, error) {
|
func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx], cacheWorker *CacheRefreshWorker) (*IndexerServer, error) {
|
||||||
service, err := NewIndexerService(cfg, riverClient)
|
service, err := NewIndexerService(cfg, riverClient, cacheWorker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err(err).Msg("failed to initialize IndexerService")
|
log.Err(err).Msg("failed to initialize IndexerService")
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ type IndexerService struct {
|
|||||||
indexer Indexer
|
indexer Indexer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerService, error) {
|
func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx], cacheWorker *CacheRefreshWorker) (*IndexerService, error) {
|
||||||
var idx Indexer
|
var idx Indexer
|
||||||
|
|
||||||
switch cfg.Indexer.Type {
|
switch cfg.Indexer.Type {
|
||||||
@@ -28,6 +28,12 @@ func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*I
|
|||||||
if cfg.Indexer.Cache.Enabled && riverClient != nil {
|
if cfg.Indexer.Cache.Enabled && riverClient != nil {
|
||||||
cache := NewIndexerCache()
|
cache := NewIndexerCache()
|
||||||
idx = NewCachedIndexer(idx, cache, riverClient, cfg.Indexer.Cache)
|
idx = NewCachedIndexer(idx, cache, riverClient, cfg.Indexer.Cache)
|
||||||
|
|
||||||
|
if cacheWorker != nil {
|
||||||
|
cacheWorker.Cache = cache
|
||||||
|
cacheWorker.Indexer = idx
|
||||||
|
}
|
||||||
|
|
||||||
log.Info().Dur("ttl", cfg.Indexer.Cache.TTL).Dur("refresh", cfg.Indexer.Cache.RefreshInterval).Msg("indexer cache enabled")
|
log.Info().Dur("ttl", cfg.Indexer.Cache.TTL).Dur("refresh", cfg.Indexer.Cache.RefreshInterval).Msg("indexer cache enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,116 @@
|
|||||||
|
package metadata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
|
||||||
|
metadataPb "homelab.lan/music-agregator/gen/metadata/v1"
|
||||||
|
"homelab.lan/music-agregator/internal/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MetadataService struct {
|
||||||
|
client metadataPb.MetadataServiceClient
|
||||||
|
artists *database.ArtistRepository
|
||||||
|
albums *database.AlbumRepository
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetadataService(client metadataPb.MetadataServiceClient, db *database.DB) *MetadataService {
|
||||||
|
return &MetadataService{
|
||||||
|
client: client,
|
||||||
|
artists: database.NewArtistRepository(db.Pool),
|
||||||
|
albums: database.NewAlbumRepository(db.Pool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetadataService) GetAlbum(ctx context.Context, albumID string) (*metadataPb.Album, error) {
|
||||||
|
resp, err := s.client.GetAlbum(ctx, &metadataPb.GetAlbumRequest{
|
||||||
|
Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("fetching album: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
album := resp.GetAlbum()
|
||||||
|
|
||||||
|
if _, err := s.albums.GetByExternalID(ctx, album.GetId()); err != nil {
|
||||||
|
s.persistArtist(ctx, album)
|
||||||
|
s.persistAlbum(ctx, album)
|
||||||
|
}
|
||||||
|
|
||||||
|
return album, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetadataService) GetArtistByExternalID(ctx context.Context, externalID string) (*database.Artist, error) {
|
||||||
|
return s.artists.GetByExternalID(ctx, externalID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetadataService) GetAlbumByExternalID(ctx context.Context, externalID string) (*database.Album, error) {
|
||||||
|
return s.albums.GetByExternalID(ctx, externalID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetadataService) persistArtist(ctx context.Context, album *metadataPb.Album) {
|
||||||
|
if len(album.GetArtists()) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
artist := album.GetArtists()[0].GetArtist()
|
||||||
|
var genres []string
|
||||||
|
for _, g := range artist.GetGenres() {
|
||||||
|
genres = append(genres, g.GetName())
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.artists.Create(ctx, &database.Artist{
|
||||||
|
ExternalID: artist.GetId(),
|
||||||
|
Name: artist.GetName(),
|
||||||
|
ArtistType: artist.GetArtistType(),
|
||||||
|
Country: artist.GetCountry(),
|
||||||
|
Genres: genres,
|
||||||
|
ImageURL: artist.GetImageUrl(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Warn().Err(err).Str("name", artist.GetName()).Msg("failed to persist artist")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetadataService) persistAlbum(ctx context.Context, album *metadataPb.Album) {
|
||||||
|
artistID := ""
|
||||||
|
if len(album.GetArtists()) > 0 {
|
||||||
|
a, err := s.artists.GetByExternalID(ctx, album.GetArtists()[0].GetArtist().GetId())
|
||||||
|
if err == nil {
|
||||||
|
artistID = a.ID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if artistID == "" {
|
||||||
|
log.Trace().Str("album", album.GetTitle()).Msg("skipping album persist, no artist in DB")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var genres []string
|
||||||
|
for _, g := range album.GetGenres() {
|
||||||
|
genres = append(genres, g.GetName())
|
||||||
|
}
|
||||||
|
|
||||||
|
labelName := ""
|
||||||
|
if album.GetLabel() != nil {
|
||||||
|
labelName = album.GetLabel().GetName()
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.albums.Create(ctx, &database.Album{
|
||||||
|
ExternalID: album.GetId(),
|
||||||
|
ArtistID: artistID,
|
||||||
|
Title: album.GetTitle(),
|
||||||
|
AlbumType: album.GetAlbumType(),
|
||||||
|
TotalTracks: int(album.GetTotalTracks()),
|
||||||
|
TotalDiscs: int(album.GetTotalDiscs()),
|
||||||
|
Label: labelName,
|
||||||
|
Genres: genres,
|
||||||
|
CoverURL: album.GetCoverUrl(),
|
||||||
|
IsMonitored: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Warn().Err(err).Str("title", album.GetTitle()).Msg("failed to persist album")
|
||||||
|
}
|
||||||
|
}
|
||||||
+3
-2
@@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
|
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
|
||||||
"homelab.lan/music-agregator/internal/config"
|
"homelab.lan/music-agregator/internal/config"
|
||||||
|
"homelab.lan/music-agregator/internal/database"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MusicAgregatorServer struct {
|
type MusicAgregatorServer struct {
|
||||||
@@ -17,8 +18,8 @@ type MusicAgregatorServer struct {
|
|||||||
pb.UnimplementedMusicAgregatorServiceServer
|
pb.UnimplementedMusicAgregatorServiceServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorServer, error) {
|
func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx], db *database.DB) (*MusicAgregatorServer, error) {
|
||||||
service, err := NewMusicAgregatorService(cfg, riverClient)
|
service, err := NewMusicAgregatorService(cfg, riverClient, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err(err).Msg("failed to create MusicAgregatorService")
|
log.Err(err).Msg("failed to create MusicAgregatorService")
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
+111
-33
@@ -11,17 +11,18 @@ import (
|
|||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/riverqueue/river"
|
"github.com/riverqueue/river"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
metadataPb "homelab.lan/music-agregator/gen/metadata/v1"
|
metadataPb "homelab.lan/music-agregator/gen/metadata/v1"
|
||||||
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
|
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
|
||||||
|
|
||||||
"homelab.lan/music-agregator/internal/config"
|
"homelab.lan/music-agregator/internal/config"
|
||||||
|
"homelab.lan/music-agregator/internal/database"
|
||||||
"homelab.lan/music-agregator/internal/indexer"
|
"homelab.lan/music-agregator/internal/indexer"
|
||||||
"homelab.lan/music-agregator/internal/metadata"
|
"homelab.lan/music-agregator/internal/metadata"
|
||||||
"homelab.lan/music-agregator/internal/release"
|
"homelab.lan/music-agregator/internal/release"
|
||||||
"homelab.lan/music-agregator/internal/torrent"
|
"homelab.lan/music-agregator/internal/torrent"
|
||||||
torrentParser "homelab.lan/music-agregator/internal/tracker"
|
torrentParser "homelab.lan/music-agregator/internal/tracker"
|
||||||
|
"homelab.lan/music-agregator/internal/workers"
|
||||||
)
|
)
|
||||||
|
|
||||||
type parsedItem struct {
|
type parsedItem struct {
|
||||||
@@ -32,21 +33,23 @@ type parsedItem struct {
|
|||||||
|
|
||||||
type MusicAgregatorService struct {
|
type MusicAgregatorService struct {
|
||||||
config config.Config
|
config config.Config
|
||||||
metadataClient metadataPb.MetadataServiceClient
|
metadata *metadata.MetadataService
|
||||||
metadataConn *grpc.ClientConn
|
|
||||||
indexer *indexer.IndexerService
|
indexer *indexer.IndexerService
|
||||||
torrentClient torrent.TorrentClient
|
torrentClient torrent.TorrentClient
|
||||||
magnetResolver *torrentParser.MagnetResolver
|
magnetResolver *torrentParser.MagnetResolver
|
||||||
|
riverClient *river.Client[pgx.Tx]
|
||||||
|
torrents *database.TorrentRepository
|
||||||
|
downloads *database.DownloadRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorService, error) {
|
func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx], db *database.DB) (*MusicAgregatorService, error) {
|
||||||
indexer, err := indexer.NewIndexerService(cfg, riverClient)
|
idx, err := indexer.NewIndexerService(cfg, riverClient, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err(err).Msg("failed to create IndexerService")
|
log.Err(err).Msg("failed to create IndexerService")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
metadataClient, conn, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint)
|
metadataClient, _, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err(err).Msg("failed to create metadata client")
|
log.Err(err).Msg("failed to create metadata client")
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -66,29 +69,39 @@ func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.T
|
|||||||
|
|
||||||
return &MusicAgregatorService{
|
return &MusicAgregatorService{
|
||||||
config: cfg,
|
config: cfg,
|
||||||
metadataClient: metadataClient,
|
metadata: metadata.NewMetadataService(metadataClient, db),
|
||||||
metadataConn: conn,
|
indexer: idx,
|
||||||
indexer: indexer,
|
|
||||||
torrentClient: torrentClient,
|
torrentClient: torrentClient,
|
||||||
magnetResolver: magnetResolver,
|
magnetResolver: magnetResolver,
|
||||||
|
riverClient: riverClient,
|
||||||
|
torrents: database.NewTorrentRepository(db.Pool),
|
||||||
|
downloads: database.NewDownloadRepository(db.Pool),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MusicAgregatorService) Close() {
|
func (s *MusicAgregatorService) Close() {
|
||||||
if s.metadataConn != nil {
|
|
||||||
s.metadataConn.Close()
|
|
||||||
}
|
|
||||||
if s.magnetResolver != nil {
|
if s.magnetResolver != nil {
|
||||||
s.magnetResolver.Close()
|
s.magnetResolver.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) {
|
func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) {
|
||||||
album, err := service.fetchAlbumMetadata(ctx, req.GetAlbumId())
|
album, err := service.metadata.GetAlbum(ctx, req.GetAlbumId())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error().Err(err).Str("album_id", req.GetAlbumId()).Msg("failed to get album")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbAlbum, _ := service.metadata.GetAlbumByExternalID(ctx, req.GetAlbumId())
|
||||||
|
if dbAlbum != nil {
|
||||||
|
qualityStr := normalizeQuality(req.GetQuality(), 0, 0)
|
||||||
|
owned, err := service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, req.GetQuality().String(), qualityStr)
|
||||||
|
if err == nil && owned {
|
||||||
|
log.Info().Str("album", dbAlbum.Title).Str("quality", qualityStr).Msg("album already owned in requested quality")
|
||||||
|
return &pb.MonitorAlbumResponse{}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
searchResult, err := service.searchIndexer(album, req.GetIndexerOptions().GetTracker())
|
searchResult, err := service.searchIndexer(album, req.GetIndexerOptions().GetTracker())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -108,31 +121,17 @@ func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if dbAlbum != nil {
|
||||||
|
service.saveTorrentAndDownload(ctx, dbAlbum.ID, best)
|
||||||
|
} else {
|
||||||
|
log.Warn().Str("album_id", req.GetAlbumId()).Msg("album not in DB, skipping torrent/download persistence")
|
||||||
|
}
|
||||||
|
|
||||||
return &pb.MonitorAlbumResponse{
|
return &pb.MonitorAlbumResponse{
|
||||||
Release: buildMonitoredRelease(best),
|
Release: buildMonitoredRelease(best),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *MusicAgregatorService) fetchAlbumMetadata(ctx context.Context, albumID string) (*metadataPb.Album, error) {
|
|
||||||
resp, err := service.metadataClient.GetAlbum(ctx, &metadataPb.GetAlbumRequest{
|
|
||||||
Identifier: &metadataPb.GetAlbumRequest_Id{Id: albumID},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Str("album_id", albumID).Msg("metadata GetAlbum failed")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
album := resp.GetAlbum()
|
|
||||||
artistName := ""
|
|
||||||
if len(album.GetArtists()) > 0 {
|
|
||||||
artistName = album.GetArtists()[0].GetArtist().GetName()
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debug().Str("album_id", albumID).Str("title", album.GetTitle()).Str("artist", artistName).Msg("album metadata fetched")
|
|
||||||
|
|
||||||
return album, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) {
|
func (service *MusicAgregatorService) searchIndexer(album *metadataPb.Album, tracker string) (*indexer.SearchResponse, error) {
|
||||||
artistName := ""
|
artistName := ""
|
||||||
if len(album.GetArtists()) > 0 {
|
if len(album.GetArtists()) > 0 {
|
||||||
@@ -277,6 +276,85 @@ func (service *MusicAgregatorService) addToTorrentClient(best parsedItem) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (service *MusicAgregatorService) saveTorrentAndDownload(ctx context.Context, dbAlbumID string, best parsedItem) {
|
||||||
|
quality := normalizeQuality(pb.QualityType_QUALITY_UNSPECIFIED, best.rel.BitDepth, best.rel.SampleRate)
|
||||||
|
|
||||||
|
dbTorrent := &database.Torrent{
|
||||||
|
AlbumID: dbAlbumID,
|
||||||
|
InfoHash: best.rel.InfoHash,
|
||||||
|
Tracker: best.item.Tracker,
|
||||||
|
Title: best.item.Title,
|
||||||
|
Format: best.rel.Format.String(),
|
||||||
|
Quality: quality,
|
||||||
|
Source: best.rel.Source.String(),
|
||||||
|
BitDepth: best.rel.BitDepth,
|
||||||
|
SampleRate: best.rel.SampleRate,
|
||||||
|
Seeders: best.item.Seeders,
|
||||||
|
Peers: best.item.Peers,
|
||||||
|
Size: best.rel.TotalAudioSize,
|
||||||
|
TrackCount: best.rel.TrackCount,
|
||||||
|
HasCoverArt: best.rel.HasCoverArt,
|
||||||
|
HasCueSheet: best.rel.HasCueSheet,
|
||||||
|
HasRipLog: best.rel.HasRipLog,
|
||||||
|
DownloadLink: best.item.DownloadLink,
|
||||||
|
TorrentFile: best.torrentData,
|
||||||
|
}
|
||||||
|
if err := service.torrents.Create(ctx, dbTorrent); err != nil {
|
||||||
|
log.Error().Err(err).Str("hash", best.rel.InfoHash).Msg("failed to save torrent to DB")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
savedTorrent, err := service.torrents.GetByInfoHash(ctx, best.rel.InfoHash)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to retrieve saved torrent")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
download := &database.Download{
|
||||||
|
TorrentID: savedTorrent.ID,
|
||||||
|
AlbumID: dbAlbumID,
|
||||||
|
Format: best.rel.Format.String(),
|
||||||
|
Quality: quality,
|
||||||
|
State: "downloading",
|
||||||
|
QbitHash: best.rel.InfoHash,
|
||||||
|
}
|
||||||
|
if err := service.downloads.Create(ctx, download); err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to save download to DB")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if service.riverClient != nil {
|
||||||
|
_, err := service.riverClient.Insert(ctx, workers.PollDownloadArgs{
|
||||||
|
DownloadID: download.ID,
|
||||||
|
TorrentHash: best.rel.InfoHash,
|
||||||
|
CheckInterval: 30 * time.Second,
|
||||||
|
}, &river.InsertOpts{
|
||||||
|
ScheduledAt: time.Now().Add(30 * time.Second),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to schedule download poll job")
|
||||||
|
} else {
|
||||||
|
log.Debug().Str("download_id", download.ID).Str("hash", best.rel.InfoHash).Msg("download poll job scheduled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Str("hash", best.rel.InfoHash).Str("download_id", download.ID).Msg("torrent and download saved to DB")
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeQuality(quality pb.QualityType, bitDepth int, sampleRate int) string {
|
||||||
|
if bitDepth > 0 && sampleRate > 0 {
|
||||||
|
return fmt.Sprintf("%d-%d", bitDepth, sampleRate/1000)
|
||||||
|
}
|
||||||
|
switch quality {
|
||||||
|
case pb.QualityType_QUALITY_LOSSLESS:
|
||||||
|
return "16-44"
|
||||||
|
case pb.QualityType_QUALITY_LOSSY:
|
||||||
|
return "320"
|
||||||
|
default:
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease {
|
func buildMonitoredRelease(p parsedItem) *pb.MonitoredRelease {
|
||||||
return &pb.MonitoredRelease{
|
return &pb.MonitoredRelease{
|
||||||
InfoHash: p.rel.InfoHash,
|
InfoHash: p.rel.InfoHash,
|
||||||
|
|||||||
@@ -8,6 +8,14 @@ import (
|
|||||||
"homelab.lan/music-agregator/internal/config"
|
"homelab.lan/music-agregator/internal/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func MustNewTorrentClient(cfg config.Config) TorrentClient {
|
||||||
|
client, err := NewTorrentClient(cfg)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("failed to create torrent client: %v", err))
|
||||||
|
}
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
func NewTorrentClient(cfg config.Config) (TorrentClient, error) {
|
func NewTorrentClient(cfg config.Config) (TorrentClient, error) {
|
||||||
var client TorrentClient
|
var client TorrentClient
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,176 @@
|
|||||||
|
package workers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/riverqueue/river"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
|
||||||
|
"homelab.lan/music-agregator/internal/database"
|
||||||
|
"homelab.lan/music-agregator/internal/torrent"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PollDownloadArgs struct {
|
||||||
|
DownloadID string `json:"download_id"`
|
||||||
|
TorrentHash string `json:"torrent_hash"`
|
||||||
|
CheckInterval time.Duration `json:"check_interval"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (PollDownloadArgs) Kind() string { return "poll_download" }
|
||||||
|
|
||||||
|
type PollDownloadWorker struct {
|
||||||
|
river.WorkerDefaults[PollDownloadArgs]
|
||||||
|
TorrentClient torrent.TorrentClient
|
||||||
|
Downloads *database.DownloadRepository
|
||||||
|
DownloadFiles *database.DownloadFileRepository
|
||||||
|
RiverClient *river.Client[pgx.Tx]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *PollDownloadWorker) Work(ctx context.Context, job *river.Job[PollDownloadArgs]) error {
|
||||||
|
args := job.Args
|
||||||
|
|
||||||
|
log.Trace().Str("download_id", args.DownloadID).Str("hash", args.TorrentHash).Msg("polling download status")
|
||||||
|
|
||||||
|
results, err := w.TorrentClient.Find(torrent.FindOptions{Hash: args.TorrentHash})
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Str("hash", args.TorrentHash).Msg("failed to query torrent client")
|
||||||
|
return w.reschedule(ctx, args)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(results) == 0 {
|
||||||
|
log.Warn().Str("hash", args.TorrentHash).Msg("torrent not found in client, marking failed")
|
||||||
|
w.Downloads.SetFailed(ctx, args.DownloadID, "torrent not found in client")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
t := results[0]
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case t.Progress >= 1.0:
|
||||||
|
return w.onCompleted(ctx, args, t)
|
||||||
|
|
||||||
|
case t.State == "error":
|
||||||
|
log.Warn().Str("hash", args.TorrentHash).Str("state", t.State).Msg("torrent in error state")
|
||||||
|
w.Downloads.SetFailed(ctx, args.DownloadID, "torrent error state")
|
||||||
|
return nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Trace().
|
||||||
|
Str("hash", args.TorrentHash).
|
||||||
|
Str("state", t.State).
|
||||||
|
Float64("progress", t.Progress*100).
|
||||||
|
Int64("dlspeed", t.DlSpeed).
|
||||||
|
Msg("download in progress")
|
||||||
|
return w.reschedule(ctx, args)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *PollDownloadWorker) onCompleted(ctx context.Context, args PollDownloadArgs, t torrent.TorrentInfo) error {
|
||||||
|
log.Info().Str("hash", args.TorrentHash).Str("path", t.ContentPath).Msg("download completed")
|
||||||
|
|
||||||
|
if err := w.Downloads.SetCompleted(ctx, args.DownloadID, t.SavePath); err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to update download as completed")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
files, err := scanAndHashFiles(t.ContentPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Str("path", t.ContentPath).Msg("failed to scan downloaded files")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range files {
|
||||||
|
f.DownloadID = args.DownloadID
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.DownloadFiles.CreateBatch(ctx, files); err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to save download files")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().
|
||||||
|
Str("download_id", args.DownloadID).
|
||||||
|
Int("files", len(files)).
|
||||||
|
Msg("download files scanned and hashed")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *PollDownloadWorker) reschedule(ctx context.Context, args PollDownloadArgs) error {
|
||||||
|
_, err := w.RiverClient.Insert(ctx, args, &river.InsertOpts{
|
||||||
|
ScheduledAt: time.Now().Add(args.CheckInterval),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("failed to reschedule poll_download")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var audioExtensions = map[string]bool{
|
||||||
|
".flac": true, ".mp3": true, ".aac": true, ".m4a": true,
|
||||||
|
".ape": true, ".wv": true, ".ogg": true, ".wav": true, ".alac": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
func scanAndHashFiles(rootPath string) ([]*database.DownloadFile, error) {
|
||||||
|
var files []*database.DownloadFile
|
||||||
|
|
||||||
|
err := filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil || info.IsDir() {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ext := strings.ToLower(filepath.Ext(path))
|
||||||
|
relPath, _ := filepath.Rel(rootPath, path)
|
||||||
|
|
||||||
|
fileType := strings.TrimPrefix(ext, ".")
|
||||||
|
if fileType == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
df := &database.DownloadFile{
|
||||||
|
FilePath: relPath,
|
||||||
|
FileSize: info.Size(),
|
||||||
|
FileType: fileType,
|
||||||
|
}
|
||||||
|
|
||||||
|
if audioExtensions[ext] || ext == ".cue" || ext == ".log" {
|
||||||
|
hash, err := hashFile(path)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn().Err(err).Str("path", path).Msg("failed to hash file")
|
||||||
|
} else {
|
||||||
|
df.SHA256Hash = hash
|
||||||
|
now := time.Now()
|
||||||
|
df.VerifiedAt = &now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
files = append(files, df)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return files, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func hashFile(path string) (string, error) {
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("opening file: %w", err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
h := sha256.New()
|
||||||
|
if _, err := io.Copy(h, f); err != nil {
|
||||||
|
return "", fmt.Errorf("hashing file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return hex.EncodeToString(h.Sum(nil)), nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user