diff --git a/api/Metadata Agregator/Get Artist Albums.bru b/api/Metadata Agregator/Get Artist Albums.bru new file mode 100644 index 0000000..7c9674b --- /dev/null +++ b/api/Metadata Agregator/Get Artist Albums.bru @@ -0,0 +1,22 @@ +meta { + name: Get Artist Albums + type: grpc + seq: 3 +} + +grpc { + url: localhost:3000 + method: /metadata.v1.MetadataService/GetArtistAlbums + body: grpc + auth: inherit + methodType: unary +} + +body:grpc { + name: message 1 + content: ''' + { + "artist_id": "65f4f0c5-ef9e-490c-aee3-909e7ae6b2ab" + } + ''' +} diff --git a/api/Metadata Agregator/Get Artist.bru b/api/Metadata Agregator/Get Artist.bru new file mode 100644 index 0000000..add9880 --- /dev/null +++ b/api/Metadata Agregator/Get Artist.bru @@ -0,0 +1,22 @@ +meta { + name: Get Artist + type: grpc + seq: 2 +} + +grpc { + url: localhost:3000 + method: /metadata.v1.MetadataService/GetArtist + body: grpc + auth: inherit + methodType: unary +} + +body:grpc { + name: message 1 + content: ''' + { + "id": "65f4f0c5-ef9e-490c-aee3-909e7ae6b2ab" + } + ''' +} diff --git a/api/Metadata Agregator/Search Artists.bru b/api/Metadata Agregator/Search Artists.bru new file mode 100644 index 0000000..26cf468 --- /dev/null +++ b/api/Metadata Agregator/Search Artists.bru @@ -0,0 +1,22 @@ +meta { + name: Search Artists + type: grpc + seq: 1 +} + +grpc { + url: localhost:3000 + method: /metadata.v1.MetadataService/SearchArtists + body: grpc + auth: inherit + methodType: unary +} + +body:grpc { + name: message 1 + content: ''' + { + "query": "Metallica" + } + ''' +} diff --git a/api/Metadata Agregator/folder.bru b/api/Metadata Agregator/folder.bru new file mode 100644 index 0000000..746a658 --- /dev/null +++ b/api/Metadata Agregator/folder.bru @@ -0,0 +1,8 @@ +meta { + name: Metadata Agregator + seq: 6 +} + +auth { + mode: inherit +} diff --git a/api/Monitor Album.bru b/api/Monitor Album.bru new file mode 100644 index 0000000..6729626 --- /dev/null +++ b/api/Monitor Album.bru @@ -0,0 +1,25 @@ +meta { + name: Monitor Album + type: grpc + seq: 5 +} + +grpc { + url: localhost:3000 + method: /music_agregator.v1.MusicAgregatorService/MonitorAlbum + body: grpc + auth: inherit + methodType: unary +} + +body:grpc { + name: message 1 + content: ''' + { + "album_id": "a0b7b436-94db-4df6-8c5f-bc0e5932a90e", + "indexer_options": { + "tracker": "rutracker" + } + } + ''' +} diff --git a/buf.gen.yaml b/buf.gen.yaml index 1453b6c..588644e 100644 --- a/buf.gen.yaml +++ b/buf.gen.yaml @@ -3,9 +3,9 @@ inputs: - directory: proto - directory: ../metadata-agregator/proto plugins: - - remote: buf.build/protocolbuffers/go + - local: protoc-gen-go out: gen opt: paths=source_relative - - remote: buf.build/grpc/go + - local: protoc-gen-go-grpc out: gen opt: paths=source_relative diff --git a/cmd/music-agregator/main.go b/cmd/music-agregator/main.go index aa31dea..3bc40d1 100644 --- a/cmd/music-agregator/main.go +++ b/cmd/music-agregator/main.go @@ -10,12 +10,17 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" "gopkg.in/yaml.v2" @@ -63,6 +68,38 @@ func interceptorLogger(l zerolog.Logger) logging.Logger { }) } +func setupRiver(ctx context.Context, cfg config.Config) (*river.Client[pgx.Tx], *pgxpool.Pool) { + if !cfg.Indexer.Cache.Enabled { + return nil, nil + } + + dbPool, err := pgxpool.New(ctx, cfg.Database.URL) + if err != nil { + log.Fatal().Err(err).Msg("failed to connect to database for River") + } + + workers := river.NewWorkers() + river.AddWorker(workers, &indexer.CacheRefreshWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 2}, + }, + Workers: workers, + }) + if err != nil { + log.Fatal().Err(err).Msg("failed to create River client") + } + + if err := riverClient.Start(ctx); err != nil { + log.Fatal().Err(err).Msg("failed to start River client") + } + + log.Info().Msg("River queue started") + + return riverClient, dbPool +} + func serveGrpc(config config.Config) { srvMetrics := grpcprom.NewServerMetrics( grpcprom.WithServerHandlingTimeHistogram( @@ -94,7 +131,17 @@ func serveGrpc(config config.Config) { ), ) - indexerServer, err := indexer.NewIndexerServer(config) + ctx := context.Background() + riverClient, dbPool := setupRiver(ctx, config) + if dbPool != nil { + defer dbPool.Close() + } + + musiscAgregatorSeerver, err := internal.NewMusicAgregatorServer(config, riverClient) + if err != nil { + log.Fatal().Err(err).Msg("failed to create MusicAgregatorServer") + } + indexerServer, err := indexer.NewIndexerServer(config, riverClient) if err != nil { log.Fatal().Err(err).Msg("failed to create IndexerServer") } @@ -112,6 +159,7 @@ func serveGrpc(config config.Config) { indexerServer, torrentServer, metadataServer, + musiscAgregatorSeerver, } for _, service := range services { @@ -120,6 +168,7 @@ func serveGrpc(config config.Config) { srvMetrics.InitializeMetrics(server) prometheus.MustRegister(srvMetrics) + reflection.Register(server) go func() { mux := http.NewServeMux() diff --git a/go.mod b/go.mod index 8581c56..40fab03 100644 --- a/go.mod +++ b/go.mod @@ -7,26 +7,45 @@ require github.com/rs/zerolog v1.35.1 require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.9.2 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect - github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/riverqueue/river v0.35.1 // indirect + github.com/riverqueue/river/riverdriver v0.35.1 // indirect + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.35.1 // indirect + github.com/riverqueue/river/rivershared v0.35.1 // indirect + github.com/riverqueue/river/rivertype v0.35.1 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect + github.com/stretchr/testify v1.11.1 // indirect + github.com/tidwall/gjson v1.18.0 // indirect + github.com/tidwall/match v1.2.0 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect + go.uber.org/goleak v1.3.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sync v0.20.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) require ( github.com/mattn/go-isatty v0.0.20 // indirect golang.org/x/net v0.51.0 // indirect golang.org/x/sys v0.42.0 // indirect - golang.org/x/text v0.34.0 // indirect + golang.org/x/text v0.36.0 // indirect google.golang.org/grpc v1.81.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 75a962e..b62874d 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,9 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -17,6 +20,14 @@ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 h1:QGLs github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0/go.mod h1:hM2alZsMUni80N33RBe6J0e423LB+odMj7d3EMP9l20= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3 h1:B+8ClL/kCQkRiU82d9xajRPKYMrB7E0MbtzWVi1K4ns= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3/go.mod h1:NbCUVmiS4foBGBHOYlCT25+YmGpJ32dZPi75pGEUpj4= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= +github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -31,6 +42,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= @@ -39,11 +52,38 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/riverqueue/river v0.35.1 h1:TK1LLGRdTWL7ARPbIUB+TqMnTYJ0GiCoy5Q/yEf5yBE= +github.com/riverqueue/river v0.35.1/go.mod h1:jDt0LimObI+5e6FVy7LyuIWfHftmV0wARmiK7W+9D64= +github.com/riverqueue/river/riverdriver v0.35.1 h1:zJx8SaQdMP7zVEfd8SDoe8KjVHCXoXoFfzt6v+SJtQg= +github.com/riverqueue/river/riverdriver v0.35.1/go.mod h1:Y+rQzz0uvh+pQI+mzJh3qgAGGNxestOWgjKa7mob87w= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.35.1 h1:GL+ztwpXgIqBin/3wNzq8h1/H8befxl61/DlLvVCAAY= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.35.1/go.mod h1:5Llh5ONCFsW67dLm5+OelSWTKhliQ989JLbVMwyuN2U= +github.com/riverqueue/river/rivershared v0.35.1 h1:XEHf7yj35p5Os5r6K08q9BVaAKsvWhP9hfxEr+MwXqg= +github.com/riverqueue/river/rivershared v0.35.1/go.mod h1:YqVk7bZoojLsx58kyQ6ZU2FHP91HP4whVj6MTCtih/c= +github.com/riverqueue/river/rivertype v0.35.1 h1:7SfjZ3Hkr7gRjItMHAUzJBAHIqx41yS/4yjVPQVtNfM= +github.com/riverqueue/river/rivertype v0.35.1/go.mod h1:D1Ad+EaZiaXbQbJcJcfeicXJMBKno0n6UcfKI5Q7DIQ= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/zerolog v1.35.1 h1:m7xQeoiLIiV0BCEY4Hs+j2NG4Gp2o2KPKmhnnLiazKI= github.com/rs/zerolog v1.35.1/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM= +github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= @@ -56,15 +96,21 @@ go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfC go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ= @@ -78,3 +124,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/config.go b/internal/config/config.go index c14eb86..8c8871c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "time" ) const ( @@ -21,11 +22,16 @@ type Config struct { Port string `yaml:"port"` } `yaml:"app"` + Database struct { + URL string `yaml:"url"` + } `yaml:"database"` + Indexer struct { Url string `yaml:"url"` Port string `yaml:"port"` Type IndexerType `yaml:"type"` ApiKey string `yaml:"api_key"` + Cache CacheConfig `yaml:"cache"` } `yaml:"indexer"` Torrent struct { @@ -40,6 +46,12 @@ type Config struct { } `yaml:"metadata"` } +type CacheConfig struct { + Enabled bool `yaml:"enabled"` + RefreshInterval time.Duration `yaml:"refresh_interval"` + TTL time.Duration `yaml:"ttl"` +} + func (t *IndexerType) UnmarshalYAML(unmarshal func(any) error) error { var value string if err := unmarshal(&value); err != nil { diff --git a/internal/indexer/cache.go b/internal/indexer/cache.go new file mode 100644 index 0000000..4088b8c --- /dev/null +++ b/internal/indexer/cache.go @@ -0,0 +1,79 @@ +package indexer + +import ( + "sync" + "time" + + "github.com/rs/zerolog/log" +) + +type CacheEntry struct { + Key string + URL string + Result SearchResult + CreatedAt time.Time + TTL time.Duration + RefreshInterval time.Duration +} + +func (e *CacheEntry) IsExpired() bool { + return time.Now().After(e.CreatedAt.Add(e.TTL)) +} + +type IndexerCache struct { + entries map[string]*CacheEntry + mu sync.RWMutex +} + +func NewIndexerCache() *IndexerCache { + return &IndexerCache{ + entries: make(map[string]*CacheEntry), + } +} + +func (c *IndexerCache) Get(key string) (*CacheEntry, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + entry, ok := c.entries[key] + if !ok { + log.Trace().Str("key", key).Msg("cache miss") + return nil, false + } + + if entry.IsExpired() { + log.Trace().Str("key", key).Msg("cache expired") + return nil, false + } + + log.Trace().Str("key", key).Int("items", len(entry.Result.Items)).Msg("cache hit") + return entry, true +} + +func (c *IndexerCache) Add(entry CacheEntry) { + c.mu.Lock() + defer c.mu.Unlock() + + c.entries[entry.Key] = &entry + log.Debug().Str("key", entry.Key).Int("items", len(entry.Result.Items)).Dur("ttl", entry.TTL).Dur("refresh", entry.RefreshInterval).Msg("cache entry added") +} + +func (c *IndexerCache) Update(key string, result SearchResult) { + c.mu.Lock() + defer c.mu.Unlock() + + if entry, ok := c.entries[key]; ok { + entry.Result = result + log.Debug().Str("key", key).Int("items", len(result.Items)).Msg("cache entry updated") + } else { + log.Warn().Str("key", key).Msg("cache update for missing entry") + } +} + +func (c *IndexerCache) Remove(key string) { + c.mu.Lock() + defer c.mu.Unlock() + + delete(c.entries, key) + log.Debug().Str("key", key).Msg("cache entry removed") +} diff --git a/internal/indexer/cache_worker.go b/internal/indexer/cache_worker.go new file mode 100644 index 0000000..cfdc864 --- /dev/null +++ b/internal/indexer/cache_worker.go @@ -0,0 +1,65 @@ +package indexer + +import ( + "context" + "time" + + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" + "github.com/rs/zerolog/log" +) + +type CacheRefreshArgs struct { + Key string `json:"key"` + URL string `json:"url"` + TTLExpires time.Time `json:"ttl_expires"` + RefreshInterval time.Duration `json:"refresh_interval"` +} + +func (CacheRefreshArgs) Kind() string { return "indexer_cache_refresh" } + +type CacheRefreshWorker struct { + river.WorkerDefaults[CacheRefreshArgs] + Cache *IndexerCache + Indexer Indexer + RiverClient *river.Client[pgx.Tx] +} + +func (w *CacheRefreshWorker) Work(ctx context.Context, job *river.Job[CacheRefreshArgs]) error { + args := job.Args + log.Trace().Str("key", args.Key).Int64("job_id", job.ID).Time("ttl_expires", args.TTLExpires).Msg("cache refresh worker started") + + if time.Now().After(args.TTLExpires) { + w.Cache.Remove(args.Key) + log.Debug().Str("key", args.Key).Msg("cache entry TTL expired, removed") + return nil + } + + log.Trace().Str("key", args.Key).Str("url", args.URL).Msg("fetching fresh data from indexer") + start := time.Now() + result, err := w.Indexer.FetchURL(args.URL) + if err != nil { + retryAt := time.Now().Add(5 * time.Minute) + log.Error().Err(err).Str("key", args.Key).Time("retry_at", retryAt).Msg("cache refresh failed, scheduling retry") + w.RiverClient.Insert(ctx, args, &river.InsertOpts{ + ScheduledAt: retryAt, + }) + return nil + } + log.Trace().Str("key", args.Key).Int("items", len(result.Items)).Dur("duration", time.Since(start)).Msg("fresh data fetched") + + w.Cache.Update(args.Key, result) + + nextRefresh := time.Now().Add(args.RefreshInterval) + _, err = w.RiverClient.Insert(ctx, args, &river.InsertOpts{ + ScheduledAt: nextRefresh, + }) + if err != nil { + log.Error().Err(err).Str("key", args.Key).Msg("failed to schedule next cache refresh") + } else { + log.Trace().Str("key", args.Key).Time("next_refresh", nextRefresh).Msg("next refresh scheduled") + } + + log.Debug().Str("key", args.Key).Int("items", len(result.Items)).Msg("cache refreshed") + return nil +} diff --git a/internal/indexer/cached_indexer.go b/internal/indexer/cached_indexer.go new file mode 100644 index 0000000..2e1b457 --- /dev/null +++ b/internal/indexer/cached_indexer.go @@ -0,0 +1,90 @@ +package indexer + +import ( + "context" + "time" + + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" + "github.com/rs/zerolog/log" + + "homelab.lan/music-agregator/internal/config" +) + +type CachedIndexer struct { + inner Indexer + cache *IndexerCache + riverClient *river.Client[pgx.Tx] + cfg config.CacheConfig +} + +func NewCachedIndexer(inner Indexer, cache *IndexerCache, riverClient *river.Client[pgx.Tx], cfg config.CacheConfig) *CachedIndexer { + return &CachedIndexer{ + inner: inner, + cache: cache, + riverClient: riverClient, + cfg: cfg, + } +} + +func (c *CachedIndexer) Search(query string, limit int32, tracker string) (SearchResult, error) { + key := query + "|" + tracker + log.Trace().Str("key", key).Str("query", query).Str("tracker", tracker).Msg("cached indexer search") + + if entry, ok := c.cache.Get(key); ok { + log.Debug().Str("key", key).Int("items", len(entry.Result.Items)).Msg("returning cached result") + return entry.Result, nil + } + + log.Trace().Str("key", key).Msg("cache miss, fetching from indexer") + result, err := c.inner.Search(query, limit, tracker) + if err != nil { + log.Error().Err(err).Str("key", key).Msg("cached indexer fetch failed") + return SearchResult{}, err + } + + url := c.inner.BuildSearchURL(query, limit, tracker) + log.Trace().Str("key", key).Str("url", url).Int("items", len(result.Items)).Msg("caching result") + + c.cache.Add(CacheEntry{ + Key: key, + URL: url, + Result: result, + CreatedAt: time.Now(), + TTL: c.cfg.TTL, + RefreshInterval: c.cfg.RefreshInterval, + }) + + scheduleAt := time.Now().Add(c.cfg.RefreshInterval) + _, err = c.riverClient.Insert(context.Background(), CacheRefreshArgs{ + Key: key, + URL: url, + TTLExpires: time.Now().Add(c.cfg.TTL), + RefreshInterval: c.cfg.RefreshInterval, + }, &river.InsertOpts{ + ScheduledAt: scheduleAt, + }) + if err != nil { + log.Error().Err(err).Str("key", key).Msg("failed to schedule cache refresh job") + } else { + log.Debug().Str("key", key).Time("scheduled_at", scheduleAt).Msg("cache refresh job scheduled") + } + + log.Debug().Str("key", key).Dur("ttl", c.cfg.TTL).Dur("refresh", c.cfg.RefreshInterval).Int("items", len(result.Items)).Msg("cached indexer search complete") + + return result, nil +} + +func (c *CachedIndexer) FetchURL(url string) (SearchResult, error) { + log.Trace().Str("url", url).Msg("cached indexer fetch URL passthrough") + return c.inner.FetchURL(url) +} + +func (c *CachedIndexer) BuildSearchURL(query string, limit int32, tracker string) string { + return c.inner.BuildSearchURL(query, limit, tracker) +} + +func (c *CachedIndexer) Capabilities(indexerName string) (IndexerCapabilities, error) { + log.Trace().Str("indexer", indexerName).Msg("cached indexer capabilities passthrough") + return c.inner.Capabilities(indexerName) +} diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index d53c7cc..aab9591 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -2,5 +2,7 @@ package indexer type Indexer interface { Search(query string, limit int32, indexer string) (SearchResult, error) + FetchURL(url string) (SearchResult, error) + BuildSearchURL(query string, limit int32, tracker string) string Capabilities(indexerName string) (IndexerCapabilities, error) } diff --git a/internal/indexer/jackett.go b/internal/indexer/jackett.go index 525ba08..aea4aae 100644 --- a/internal/indexer/jackett.go +++ b/internal/indexer/jackett.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net/http" + "net/url" "time" "github.com/rs/zerolog/log" @@ -20,21 +21,42 @@ func NewIndexer(cfg config.Config) Indexer { return &JacketIndexer{ cfg: cfg, client: &http.Client{ - Timeout: time.Second * 10, + Timeout: 60 * time.Second, }, } } -func (indexer *JacketIndexer) Search(query string, limit int32, tracker string) (SearchResult, error) { +func (indexer *JacketIndexer) BuildSearchURL(query string, limit int32, tracker string) string { searchTracker := "all" if len(tracker) != 0 { searchTracker = tracker } - url := indexer.cfg.Indexer.Url - uri := fmt.Sprintf("%v/api/v2.0/indexers/%v/results/torznab?apikey=%v&limit=%d&cat=3010,3040&q=%v&t=search", url, searchTracker, indexer.cfg.Indexer.ApiKey, limit, query) + uri := fmt.Sprintf("%v/api/v2.0/indexers/%v/results/torznab?apikey=%v&cat=3010,3040&q=%v&t=search", + indexer.cfg.Indexer.Url, searchTracker, indexer.cfg.Indexer.ApiKey, url.QueryEscape(query)) + if limit > 0 { + uri += fmt.Sprintf("&limit=%d", limit) + } - log.Trace().Str("tracker", searchTracker).Str("query", query).Int32("limit", limit).Msg("jackett request") + return uri +} + +func (indexer *JacketIndexer) Search(query string, limit int32, tracker string) (SearchResult, error) { + uri := indexer.BuildSearchURL(query, limit, tracker) + return indexer.FetchURL(uri) +} + +type JackettError struct { + Code string `xml:"code,attr"` + Description string `xml:"description,attr"` +} + +func (e *JackettError) Error() string { + return fmt.Sprintf("jackett error %s: %s", e.Code, e.Description) +} + +func (indexer *JacketIndexer) FetchURL(uri string) (SearchResult, error) { + log.Trace().Str("uri", uri).Msg("jackett request") req, err := http.NewRequest("GET", uri, nil) if err != nil { @@ -62,6 +84,15 @@ func (indexer *JacketIndexer) Search(query string, limit int32, tracker string) Dur("duration", time.Since(start)). Msg("jackett response") + if resp.StatusCode != http.StatusOK { + var jackettErr JackettError + if xmlErr := xml.Unmarshal(body, &jackettErr); xmlErr == nil && jackettErr.Code != "" { + log.Error().Str("code", jackettErr.Code).Str("description", jackettErr.Description).Msg("jackett returned error") + return SearchResult{}, &jackettErr + } + return SearchResult{}, fmt.Errorf("jackett returned HTTP %d", resp.StatusCode) + } + var searchResult SearchResult if err := xml.Unmarshal(body, &searchResult); err != nil { log.Error().Err(err).Msg("error parsing search XML") diff --git a/internal/indexer/server.go b/internal/indexer/server.go index 976ab46..ccd3d23 100644 --- a/internal/indexer/server.go +++ b/internal/indexer/server.go @@ -3,6 +3,8 @@ package indexer import ( "context" + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" "github.com/rs/zerolog/log" "google.golang.org/grpc" @@ -15,10 +17,10 @@ type IndexerServer struct { pb.UnimplementedIndexerServiceServer } -func NewIndexerServer(cfg config.Config) (*IndexerServer, error) { - service, err := NewIndexerService(cfg) +func NewIndexerServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerServer, error) { + service, err := NewIndexerService(cfg, riverClient) if err != nil { - log.Err(err).Msg("Failed to initialize IndexerService") + log.Err(err).Msg("failed to initialize IndexerService") return nil, err } @@ -32,7 +34,9 @@ func (server *IndexerServer) Search(ctx context.Context, req *pb.SearchRequest) Str("tracker", req.GetTracker()). Msg("search started") - resp, err := server.service.Search(req) + log.Trace().Str("query", req.GetQuery()).Msg("fetching results from indexer") + + resp, err := server.service.Search(req.GetQuery(), req.GetLimit(), req.GetTracker()) if err != nil { log.Error().Err(err).Str("query", req.GetQuery()).Msg("search failed") return nil, err diff --git a/internal/indexer/service.go b/internal/indexer/service.go index 39973e3..c69e830 100644 --- a/internal/indexer/service.go +++ b/internal/indexer/service.go @@ -3,6 +3,8 @@ package indexer import ( "fmt" + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" "github.com/rs/zerolog/log" pb "homelab.lan/music-agregator/gen/music_agregator/indexer/v1" @@ -13,20 +15,28 @@ type IndexerService struct { indexer Indexer } -func NewIndexerService(cfg config.Config) (*IndexerService, error) { +func NewIndexerService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*IndexerService, error) { + var idx Indexer + switch cfg.Indexer.Type { case config.IndexerTypeJackett: - indexer := NewIndexer(cfg) - return &IndexerService{indexer: indexer}, nil + idx = NewIndexer(cfg) default: - return nil, fmt.Errorf("Unable to create the indexer for type: %v", cfg.Indexer.Type) + return nil, fmt.Errorf("unable to create the indexer for type: %v", cfg.Indexer.Type) } + + if cfg.Indexer.Cache.Enabled && riverClient != nil { + cache := NewIndexerCache() + idx = NewCachedIndexer(idx, cache, riverClient, cfg.Indexer.Cache) + log.Info().Dur("ttl", cfg.Indexer.Cache.TTL).Dur("refresh", cfg.Indexer.Cache.RefreshInterval).Msg("indexer cache enabled") + } + + return &IndexerService{indexer: idx}, nil } -func (service *IndexerService) Search(req *pb.SearchRequest) (*pb.SearchResponse, error) { - log.Trace().Str("query", req.GetQuery()).Msg("fetching results from indexer") +func (service *IndexerService) Search(query string, limit int32, indexer string) (*pb.SearchResponse, error) { - searchResult, err := service.indexer.Search(req.GetQuery(), req.GetLimit(), req.GetTracker()) + searchResult, err := service.indexer.Search(query, limit, indexer) if err != nil { log.Error().Err(err).Msg("failed to search in indexer") return nil, err diff --git a/internal/metadata/client.go b/internal/metadata/client.go index de3b91d..930f900 100644 --- a/internal/metadata/client.go +++ b/internal/metadata/client.go @@ -10,7 +10,7 @@ import ( pb "homelab.lan/music-agregator/gen/metadata/v1" ) -func newMetadataClient(endpoint string) (pb.MetadataServiceClient, *grpc.ClientConn, error) { +func NewMetadataClient(endpoint string) (pb.MetadataServiceClient, *grpc.ClientConn, error) { log.Trace().Str("endpoint", endpoint).Msg("connecting to metadata service") conn, err := grpc.NewClient(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) diff --git a/internal/metadata/server.go b/internal/metadata/server.go index d44d8df..3f8ba0b 100644 --- a/internal/metadata/server.go +++ b/internal/metadata/server.go @@ -17,7 +17,7 @@ type MetadataServer struct { } func NewMetadataServer(cfg config.Config) (*MetadataServer, error) { - client, conn, err := newMetadataClient(cfg.Metadata.Endpoint) + client, conn, err := NewMetadataClient(cfg.Metadata.Endpoint) if err != nil { log.Err(err).Msg("failed to initialize MetadataServer") return nil, err @@ -36,7 +36,7 @@ func (s *MetadataServer) Register(server *grpc.Server) { pb.RegisterMetadataServiceServer(server, s) } -func (s *MetadataServer) GetArtist(ctx context.Context, req *pb.GetArtistRequest) (*pb.Artist, error) { +func (s *MetadataServer) GetArtist(ctx context.Context, req *pb.GetArtistRequest) (*pb.GetArtistResponse, error) { log.Debug().Msg("metadata GetArtist") return s.client.GetArtist(ctx, req) } @@ -46,7 +46,7 @@ func (s *MetadataServer) SearchArtists(ctx context.Context, req *pb.SearchArtist return s.client.SearchArtists(ctx, req) } -func (s *MetadataServer) GetAlbum(ctx context.Context, req *pb.GetAlbumRequest) (*pb.Album, error) { +func (s *MetadataServer) GetAlbum(ctx context.Context, req *pb.GetAlbumRequest) (*pb.GetAlbumResponse, error) { log.Debug().Msg("metadata GetAlbum") return s.client.GetAlbum(ctx, req) } @@ -56,7 +56,7 @@ func (s *MetadataServer) GetArtistAlbums(ctx context.Context, req *pb.GetArtistA return s.client.GetArtistAlbums(ctx, req) } -func (s *MetadataServer) GetTrack(ctx context.Context, req *pb.GetTrackRequest) (*pb.Track, error) { +func (s *MetadataServer) GetTrack(ctx context.Context, req *pb.GetTrackRequest) (*pb.GetTrackResponse, error) { log.Debug().Msg("metadata GetTrack") return s.client.GetTrack(ctx, req) } @@ -66,6 +66,11 @@ func (s *MetadataServer) GetAlbumTracks(ctx context.Context, req *pb.GetAlbumTra return s.client.GetAlbumTracks(ctx, req) } +func (s *MetadataServer) SearchAlbums(ctx context.Context, req *pb.SearchAlbumsRequest) (*pb.SearchAlbumsResponse, error) { + log.Debug().Str("query", req.GetQuery()).Str("artist", req.GetArtist()).Msg("metadata SearchAlbums") + return s.client.SearchAlbums(ctx, req) +} + func (s *MetadataServer) SyncArtist(ctx context.Context, req *pb.SyncArtistRequest) (*pb.SyncArtistResponse, error) { log.Debug().Msg("metadata SyncArtist") return s.client.SyncArtist(ctx, req) diff --git a/internal/server.go b/internal/server.go new file mode 100644 index 0000000..0c3cd49 --- /dev/null +++ b/internal/server.go @@ -0,0 +1,37 @@ +package internal + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + + pb "homelab.lan/music-agregator/gen/music_agregator/v1" + "homelab.lan/music-agregator/internal/config" +) + +type MusicAgregatorServer struct { + service *MusicAgregatorService + pb.UnimplementedMusicAgregatorServiceServer +} + +func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorServer, error) { + service, err := NewMusicAgregatorService(cfg, riverClient) + if err != nil { + log.Err(err).Msg("failed to create MusicAgregatorService") + return nil, err + } + return &MusicAgregatorServer{ + service: service, + }, nil +} + +func (s *MusicAgregatorServer) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) { + return s.service.MonitorAlbum(ctx, req) +} + +func (s *MusicAgregatorServer) Register(server *grpc.Server) { + pb.RegisterMusicAgregatorServiceServer(server, s) +} diff --git a/internal/service.go b/internal/service.go new file mode 100644 index 0000000..c3fd49c --- /dev/null +++ b/internal/service.go @@ -0,0 +1,91 @@ +package internal + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + + metadataPb "homelab.lan/music-agregator/gen/metadata/v1" + pb "homelab.lan/music-agregator/gen/music_agregator/v1" + + "homelab.lan/music-agregator/internal/config" + "homelab.lan/music-agregator/internal/indexer" + "homelab.lan/music-agregator/internal/metadata" +) + +type MusicAgregatorService struct { + config config.Config + metadataClient metadataPb.MetadataServiceClient + metadataConn *grpc.ClientConn + indexer *indexer.IndexerService +} + +func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx]) (*MusicAgregatorService, error) { + indexer, err := indexer.NewIndexerService(cfg, riverClient) + if err != nil { + log.Err(err).Msg("failed to create IndexerService") + return nil, err + } + + metadataClient, conn, err := metadata.NewMetadataClient(cfg.Metadata.Endpoint) + if err != nil { + log.Err(err).Msg("failed to create metadata client") + return nil, err + } + + return &MusicAgregatorService{ + config: cfg, + metadataClient: metadataClient, + metadataConn: conn, + indexer: indexer, + }, nil +} + +func (s *MusicAgregatorService) Close() { + if s.metadataConn != nil { + s.metadataConn.Close() + } +} + +func (service *MusicAgregatorService) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) { + resp, err := service.metadataClient.GetAlbum(ctx, &metadataPb.GetAlbumRequest{ + Identifier: &metadataPb.GetAlbumRequest_Id{Id: req.GetAlbumId()}, + }) + if err != nil { + log.Error().Err(err).Str("album_id", req.GetAlbumId()).Msg("metadata GetAlbum failed") + return nil, err + } + album := resp.GetAlbum() + artistName := "" + if len(album.GetArtists()) > 0 { + artistName = album.GetArtists()[0].GetArtist().GetName() + } + + log.Debug(). + Str("album_id", req.GetAlbumId()). + Str("title", album.GetTitle()). + Str("artist", artistName). + Msg("album found, monitoring") + + query := album.GetTitle() + if artistName != "" { + query = artistName + " " + query + } + + tracker := req.GetIndexerOptions().GetTracker() + if tracker == "" { + tracker = "all" + } + + searchResult, err := service.indexer.Search(query, -1, tracker) + if err != nil { + log.Error().Err(err).Str("query", query).Msg("indexer search album failed") + return nil, err + } + log.Debug().Int("results", len(searchResult.GetResult())).Str("query", query).Msg("indexer search completed") + + return &pb.MonitorAlbumResponse{}, nil +} diff --git a/proto/music_agregator/v1/music_agregator.proto b/proto/music_agregator/v1/music_agregator.proto new file mode 100644 index 0000000..088e4c4 --- /dev/null +++ b/proto/music_agregator/v1/music_agregator.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; +package music_agregator.v1; +option go_package = "homelab.lan/music-agregator/gen/music_agregator/v1/"; + +service MusicAgregatorService { + rpc MonitorAlbum(MonitorAlbumRequest) returns (MonitorAlbumResponse) {} +} + +message MonitorAlbumRequest { + string album_id = 1; + IndexerOptions indexer_options = 2; + + message IndexerOptions { + string tracker = 1; + } +} + +message MonitorAlbumResponse {}