From 24f355c5ae3d0e8cd764aac111576367aac356b9 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 11 May 2026 10:26:37 +0200 Subject: [PATCH] Add MonitorAlbumStream bidirectional streaming RPC with automatic and manual interaction modes Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent) Co-authored-by: Sisyphus --- internal/monitor_workflow.go | 488 ++++++ internal/server.go | 25 + .../music_agregator/v1/music_agregator.proto | 147 ++ test/component/monitor_album_stream_test.go | 1527 +++++++++++++++++ 4 files changed, 2187 insertions(+) create mode 100644 internal/monitor_workflow.go create mode 100644 test/component/monitor_album_stream_test.go diff --git a/internal/monitor_workflow.go b/internal/monitor_workflow.go new file mode 100644 index 0000000..26fcf01 --- /dev/null +++ b/internal/monitor_workflow.go @@ -0,0 +1,488 @@ +package internal + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog/log" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + pb "homelab.lan/music-agregator/gen/music_agregator/v1" + "homelab.lan/music-agregator/internal/database" +) + +var MaxPromptTimeout = 300 * time.Second + +type monitorWorkflow struct { + stream pb.MusicAgregatorService_MonitorAlbumStreamServer + mode pb.InteractionMode + req *pb.StartMonitorRequest + service *MusicAgregatorService + decisions chan *pb.UserDecision + cancel context.CancelFunc + mu sync.Mutex + promptID int +} + +func newMonitorWorkflow( + stream pb.MusicAgregatorService_MonitorAlbumStreamServer, + req *pb.StartMonitorRequest, + service *MusicAgregatorService, + cancel context.CancelFunc, +) *monitorWorkflow { + return &monitorWorkflow{ + stream: stream, + mode: req.Mode, + req: req, + service: service, + decisions: make(chan *pb.UserDecision, 1), + cancel: cancel, + } +} + +func (w *monitorWorkflow) sendStatus(step pb.MonitorStep, msg string) error { + select { + case <-w.stream.Context().Done(): + return w.stream.Context().Err() + default: + } + return w.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Status{ + Status: &pb.StatusUpdate{Step: step, Message: msg}, + }, + }) +} + +func (w *monitorWorkflow) sendStatusWithAlbumInfo(step pb.MonitorStep, msg string, info *pb.StreamAlbumInfo) error { + select { + case <-w.stream.Context().Done(): + return w.stream.Context().Err() + default: + } + return w.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Status{ + Status: &pb.StatusUpdate{ + Step: step, + Message: msg, + Data: &pb.StatusUpdate_AlbumInfo{AlbumInfo: info}, + }, + }, + }) +} + +func (w *monitorWorkflow) sendStatusWithTorrents(step pb.MonitorStep, msg string, torrents *pb.TorrentList) error { + select { + case <-w.stream.Context().Done(): + return w.stream.Context().Err() + default: + } + return w.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Status{ + Status: &pb.StatusUpdate{ + Step: step, + Message: msg, + Data: &pb.StatusUpdate_Torrents{Torrents: torrents}, + }, + }, + }) +} + +func (w *monitorWorkflow) sendStatusWithRelease(step pb.MonitorStep, msg string, release *pb.ReleaseInfo) error { + select { + case <-w.stream.Context().Done(): + return w.stream.Context().Err() + default: + } + return w.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Status{ + Status: &pb.StatusUpdate{ + Step: step, + Message: msg, + Data: &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: release}, + }, + }, + }) +} + +func (w *monitorWorkflow) sendError(step pb.MonitorStep, err error, recoverable bool) error { + return w.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Error{ + Error: &pb.ErrorUpdate{ + FailedStep: step, + Message: err.Error(), + Recoverable: recoverable, + }, + }, + }) +} + +func (w *monitorWorkflow) sendResult(result *pb.MonitorAlbumResponse) error { + return w.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Result{Result: result}, + }) +} + +func (w *monitorWorkflow) nextPromptID() string { + w.mu.Lock() + defer w.mu.Unlock() + w.promptID++ + return fmt.Sprintf("prompt-%d", w.promptID) +} + +func (w *monitorWorkflow) promptAndWait(ctx context.Context, prompt *pb.PromptForDecision) (*pb.UserDecision, error) { + if w.mode == pb.InteractionMode_INTERACTION_MODE_AUTOMATIC { + return w.defaultDecision(prompt), nil + } + + if err := w.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Prompt{Prompt: prompt}, + }); err != nil { + return nil, err + } + + timeout := time.Duration(prompt.TimeoutSeconds) * time.Second + if timeout == 0 || timeout > MaxPromptTimeout { + timeout = MaxPromptTimeout + } + + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + select { + case decision := <-w.decisions: + if decision.PromptId != prompt.PromptId { + return nil, status.Error(codes.InvalidArgument, "prompt_id mismatch") + } + return decision, nil + case <-timeoutCtx.Done(): + return w.defaultDecision(prompt), nil + } +} + +func (w *monitorWorkflow) defaultDecision(prompt *pb.PromptForDecision) *pb.UserDecision { + decision := &pb.UserDecision{PromptId: prompt.PromptId} + + switch prompt.Type { + case pb.PromptType_PROMPT_TYPE_CONFIRM: + decision.Decision = &pb.UserDecision_Confirm{ + Confirm: prompt.GetConfirm().GetDefaultValue(), + } + case pb.PromptType_PROMPT_TYPE_SELECT_ONE: + decision.Decision = &pb.UserDecision_SelectedId{ + SelectedId: prompt.GetSelectOne().GetDefaultId(), + } + case pb.PromptType_PROMPT_TYPE_SELECT_MANY: + decision.Decision = &pb.UserDecision_SelectedIds{ + SelectedIds: &pb.SelectedIds{Ids: prompt.GetSelectMany().GetDefaultIds()}, + } + } + + return decision +} + +func (w *monitorWorkflow) receiveDecisions(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + } + + msg, err := w.stream.Recv() + if err != nil { + return + } + + if msg.GetCancel() != nil { + w.cancel() + return + } + + if decision := msg.GetDecision(); decision != nil { + select { + case w.decisions <- decision: + default: + } + } + } +} + +func (w *monitorWorkflow) run(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + w.sendStatus(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, "Fetching album metadata...") + + album, err := w.service.metadata.GetAlbum(ctx, w.req.AlbumId) + if err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + log.Error().Err(err).Str("album_id", w.req.AlbumId).Msg("failed to get album") + w.sendError(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, err, false) + return err + } + + artistName := "" + if len(album.GetArtists()) > 0 { + artistName = album.GetArtists()[0].GetArtist().GetName() + } + + w.sendStatusWithAlbumInfo(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, + fmt.Sprintf("Got metadata: %s - %s", artistName, album.GetTitle()), + &pb.StreamAlbumInfo{ + Artist: artistName, + Title: album.GetTitle(), + ReleaseDate: album.GetReleaseDate(), + }) + + w.sendStatus(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, "Checking if already owned...") + + dbAlbum, _ := w.service.metadata.GetAlbumByExternalID(ctx, album.GetId()) + if dbAlbum != nil { + w.service.metadata.SetAlbumMonitorState(ctx, dbAlbum.ID, database.Monitored) + dbAlbum.MonitorState = database.Monitored + + qualityStr := normalizeQuality(w.req.Quality, 0, 0) + owned, err := w.service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, w.req.Quality.String(), qualityStr) + if err == nil && owned { + w.sendStatus(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, + fmt.Sprintf("Already owned in %s quality", qualityStr)) + + if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { + decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ + PromptId: w.nextPromptID(), + Type: pb.PromptType_PROMPT_TYPE_CONFIRM, + Message: "Album already owned. Download anyway?", + + Options: &pb.PromptForDecision_Confirm{ + Confirm: &pb.ConfirmPrompt{ + ConfirmLabel: "Download anyway", + CancelLabel: "Skip", + DefaultValue: false, + }, + }, + }) + if err != nil { + w.sendError(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, err, false) + return err + } + if !decision.GetConfirm() { + w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Skipped - already owned") + return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + } + } else { + w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Already owned") + return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + } + } + } + + w.sendStatus(pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, + fmt.Sprintf("Searching indexers for %s - %s...", artistName, album.GetTitle())) + + searchResult, err := w.service.searchIndexer(album, w.req.IndexerOptions.GetTracker()) + if err != nil { + w.sendError(pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, err, true) + return err + } + + parsed := w.service.parseSearchResults(searchResult, album) + + if len(parsed) > 0 { + summaries := make([]*pb.TorrentSummary, len(parsed)) + for i, p := range parsed { + summaries[i] = &pb.TorrentSummary{ + Id: fmt.Sprintf("torrent-%d", i), + Title: p.item.Title, + Tracker: p.item.Tracker, + Seeders: int32(p.item.Seeders), + Format: p.rel.Format.String(), + Lossless: p.rel.Format.IsLossless(), + } + } + w.sendStatusWithTorrents(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, + fmt.Sprintf("Parsed %d from %d torrents", len(parsed), len(searchResult.Items)), + &pb.TorrentList{Torrents: summaries}) + } else { + w.sendStatus(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, + fmt.Sprintf("Found %d torrents, none parseable", len(searchResult.Items))) + } + + if len(parsed) == 0 { + w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "No parseable results found") + return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + } + + if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL && len(parsed) > 1 { + options := make([]*pb.SelectOption, len(parsed)) + defaultIDs := make([]string, len(parsed)) + for i, p := range parsed { + id := fmt.Sprintf("torrent-%d", i) + options[i] = &pb.SelectOption{ + Id: id, + Label: p.item.Title, + Description: fmt.Sprintf("%s - %d seeders", p.item.Tracker, p.item.Seeders), + } + defaultIDs[i] = id + } + + decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ + PromptId: w.nextPromptID(), + Type: pb.PromptType_PROMPT_TYPE_SELECT_MANY, + Message: "Select torrents to consider", + + Options: &pb.PromptForDecision_SelectMany{ + SelectMany: &pb.SelectManyPrompt{ + Options: options, + DefaultIds: defaultIDs, + MinSelections: 1, + MaxSelections: int32(len(parsed)), + }, + }, + }) + if err != nil { + w.sendError(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, err, false) + return err + } + + selectedIDs := make(map[string]bool) + if ids := decision.GetSelectedIds(); ids != nil { + for _, id := range ids.GetIds() { + selectedIDs[id] = true + } + } + + var selected []parsedItem + for i, p := range parsed { + id := fmt.Sprintf("torrent-%d", i) + if selectedIDs[id] { + selected = append(selected, p) + } + } + if len(selected) > 0 { + parsed = selected + } + } + + w.sendStatus(pb.MonitorStep_MONITOR_STEP_FILTERING_QUALITY, + fmt.Sprintf("Filtering %d results by quality...", len(parsed))) + + filtered := filterByQuality(parsed, w.req.Quality) + if len(filtered) == 0 { + log.Warn().Str("album", album.GetTitle()).Str("quality", w.req.Quality.String()).Msg("no releases match quality filter") + w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "No releases match quality filter") + return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + } + + var best parsedItem + if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL && len(filtered) > 1 { + options := make([]*pb.SelectOption, len(filtered)) + for i, p := range filtered { + options[i] = &pb.SelectOption{ + Id: fmt.Sprintf("release-%d", i), + Label: p.item.Title, + Description: fmt.Sprintf("%s - %d seeders - %s", p.item.Tracker, p.item.Seeders, p.rel.Format.String()), + } + } + + bestIdx := 0 + for i, p := range filtered { + if p.item.Seeders > filtered[bestIdx].item.Seeders { + bestIdx = i + } + } + + decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ + PromptId: w.nextPromptID(), + Type: pb.PromptType_PROMPT_TYPE_SELECT_ONE, + Message: "Select release", + + Options: &pb.PromptForDecision_SelectOne{ + SelectOne: &pb.SelectOnePrompt{ + Options: options, + DefaultId: fmt.Sprintf("release-%d", bestIdx), + }, + }, + }) + if err != nil { + w.sendError(pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, err, false) + return err + } + + selectedIdx := 0 + if id := decision.GetSelectedId(); id != "" { + for i := range filtered { + if fmt.Sprintf("release-%d", i) == id { + selectedIdx = i + break + } + } + } + best = filtered[selectedIdx] + } else { + best = selectBestRelease(filtered) + } + + w.sendStatusWithRelease(pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, + fmt.Sprintf("Selected: %s (%d seeders)", best.item.Title, best.item.Seeders), + &pb.ReleaseInfo{ + InfoHash: best.rel.InfoHash, + Format: best.rel.Format.String(), + BitDepth: int32(best.rel.BitDepth), + SampleRate: int32(best.rel.SampleRate), + Seeders: int32(best.item.Seeders), + Tracker: best.item.Tracker, + }) + + w.sendStatus(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, + fmt.Sprintf("Adding torrent: %s...", best.item.Title)) + + if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { + decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ + PromptId: w.nextPromptID(), + Type: pb.PromptType_PROMPT_TYPE_CONFIRM, + Message: fmt.Sprintf("Add torrent '%s' to client?", best.item.Title), + + Options: &pb.PromptForDecision_Confirm{ + Confirm: &pb.ConfirmPrompt{ + ConfirmLabel: "Add", + CancelLabel: "Skip", + DefaultValue: true, + }, + }, + }) + if err != nil { + w.sendError(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, err, false) + return err + } + if !decision.GetConfirm() { + w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Skipped by user") + return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + } + } + + if err := w.service.addToTorrentClient(best); err != nil { + w.sendError(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, err, true) + return err + } + + w.sendStatus(pb.MonitorStep_MONITOR_STEP_SAVING, "Saving to database...") + + dbAlbum, _ = w.service.metadata.GetAlbumByExternalID(ctx, album.GetId()) + if dbAlbum != nil { + w.service.saveTorrentAndDownload(ctx, dbAlbum.ID, best) + } else { + log.Warn().Str("album_id", w.req.AlbumId).Msg("album not in DB, skipping torrent/download persistence") + } + + w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Done!") + + return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, &best)) +} diff --git a/internal/server.go b/internal/server.go index 5aed235..c9bddd3 100644 --- a/internal/server.go +++ b/internal/server.go @@ -7,6 +7,8 @@ import ( "github.com/riverqueue/river" "github.com/rs/zerolog/log" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/config" @@ -46,6 +48,29 @@ func (s *MusicAgregatorServer) MonitorAlbum(ctx context.Context, req *pb.Monitor return s.service.MonitorAlbum(ctx, req) } +func (s *MusicAgregatorServer) MonitorAlbumStream(stream pb.MusicAgregatorService_MonitorAlbumStreamServer) error { + msg, err := stream.Recv() + if err != nil { + return err + } + + startReq := msg.GetStart() + if startReq == nil { + return status.Error(codes.InvalidArgument, "first message must be StartMonitorRequest") + } + + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + + workflow := newMonitorWorkflow(stream, startReq, s.service, cancel) + + if startReq.Mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { + go workflow.receiveDecisions(ctx) + } + + return workflow.run(ctx) +} + func (s *MusicAgregatorServer) AnalyzeAlbumRelease(ctx context.Context, req *pb.AnalyzeAlbumReleaseRequest) (*pb.AnalyzeAlbumReleaseResponse, error) { return s.service.AnalyzeAlbumRelease(ctx, req) } diff --git a/proto/music_agregator/v1/music_agregator.proto b/proto/music_agregator/v1/music_agregator.proto index 5239dff..afaca6b 100644 --- a/proto/music_agregator/v1/music_agregator.proto +++ b/proto/music_agregator/v1/music_agregator.proto @@ -4,6 +4,7 @@ option go_package = "homelab.lan/music-agregator/gen/music_agregator/v1/"; service MusicAgregatorService { rpc MonitorAlbum(MonitorAlbumRequest) returns (MonitorAlbumResponse) {} + rpc MonitorAlbumStream(stream MonitorAlbumStreamRequest) returns (stream MonitorAlbumStreamResponse) {} rpc GetArtists(GetArtistsRequest) returns (GetArtistsResponse) {} rpc GetAlbum(GetAlbumRequest) returns (GetAlbumResponse) {} rpc AnalyzeAlbumRelease(AnalyzeAlbumReleaseRequest) returns (AnalyzeAlbumReleaseResponse) {} @@ -161,3 +162,149 @@ message MonitoredRelease { int32 seeders = 17; string tracker = 18; } + +enum InteractionMode { + INTERACTION_MODE_AUTOMATIC = 0; + INTERACTION_MODE_MANUAL = 1; +} + +enum MonitorStep { + MONITOR_STEP_UNSPECIFIED = 0; + MONITOR_STEP_FETCHING_METADATA = 1; + MONITOR_STEP_CHECKING_OWNED = 2; + MONITOR_STEP_SEARCHING_INDEXER = 3; + MONITOR_STEP_PARSING_RESULTS = 4; + MONITOR_STEP_FILTERING_QUALITY = 5; + MONITOR_STEP_SELECTING_RELEASE = 6; + MONITOR_STEP_ADDING_TORRENT = 7; + MONITOR_STEP_SAVING = 8; + MONITOR_STEP_COMPLETE = 9; +} + +enum PromptType { + PROMPT_TYPE_UNSPECIFIED = 0; + PROMPT_TYPE_CONFIRM = 1; + PROMPT_TYPE_SELECT_ONE = 2; + PROMPT_TYPE_SELECT_MANY = 3; +} + +message MonitorAlbumStreamRequest { + oneof message { + StartMonitorRequest start = 1; + UserDecision decision = 2; + CancelRequest cancel = 3; + } +} + +message StartMonitorRequest { + string album_id = 1; + IndexerOptions indexer_options = 2; + QualityType quality = 3; + InteractionMode mode = 4; +} + +message UserDecision { + string prompt_id = 1; + oneof decision { + bool confirm = 2; + string selected_id = 3; + SelectedIds selected_ids = 4; + } +} + +message SelectedIds { + repeated string ids = 1; +} + +message CancelRequest {} + +message MonitorAlbumStreamResponse { + oneof message { + StatusUpdate status = 1; + PromptForDecision prompt = 2; + MonitorAlbumResponse result = 3; + ErrorUpdate error = 4; + } +} + +message StatusUpdate { + MonitorStep step = 1; + string message = 2; + oneof data { + StreamAlbumInfo album_info = 10; + TorrentList torrents = 11; + ReleaseInfo release_info = 12; + } +} + +message PromptForDecision { + string prompt_id = 1; + PromptType type = 2; + string message = 3; + int32 timeout_seconds = 4; + oneof options { + ConfirmPrompt confirm = 10; + SelectOnePrompt select_one = 11; + SelectManyPrompt select_many = 12; + } +} + +message ErrorUpdate { + MonitorStep failed_step = 1; + string message = 2; + bool recoverable = 3; +} + +message StreamAlbumInfo { + string artist = 1; + string title = 2; + string release_date = 3; + bool already_owned = 4; + string owned_quality = 5; +} + +message TorrentList { + repeated TorrentSummary torrents = 1; +} + +message TorrentSummary { + string id = 1; + string title = 2; + string tracker = 3; + int32 seeders = 4; + string format = 5; + bool lossless = 6; +} + +message ReleaseInfo { + string info_hash = 1; + string format = 2; + int32 bit_depth = 3; + int32 sample_rate = 4; + int32 seeders = 5; + string tracker = 6; +} + +message ConfirmPrompt { + string confirm_label = 1; + string cancel_label = 2; + bool default_value = 3; +} + +message SelectOnePrompt { + repeated SelectOption options = 1; + string default_id = 2; +} + +message SelectManyPrompt { + repeated SelectOption options = 1; + repeated string default_ids = 2; + int32 min_selections = 3; + int32 max_selections = 4; +} + +message SelectOption { + string id = 1; + string label = 2; + string description = 3; +} diff --git a/test/component/monitor_album_stream_test.go b/test/component/monitor_album_stream_test.go new file mode 100644 index 0000000..f749cd4 --- /dev/null +++ b/test/component/monitor_album_stream_test.go @@ -0,0 +1,1527 @@ +package component + +import ( + "context" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + metadataPb "homelab.lan/music-agregator/gen/metadata/v1" + pb "homelab.lan/music-agregator/gen/music_agregator/v1" + "homelab.lan/music-agregator/internal" + "homelab.lan/music-agregator/internal/indexer" + "homelab.lan/music-agregator/internal/torrent" +) + +const defaultStreamTimeout = 5 * time.Second + +func startMonitorStream( + t *testing.T, + client pb.MusicAgregatorServiceClient, + req *pb.StartMonitorRequest, +) grpc.BidiStreamingClient[pb.MonitorAlbumStreamRequest, pb.MonitorAlbumStreamResponse] { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), defaultStreamTimeout) + t.Cleanup(cancel) + + stream, err := client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{Start: req}, + }) + require.NoError(t, err) + + return stream +} + +func collectAllMessages( + t *testing.T, + stream grpc.BidiStreamingClient[pb.MonitorAlbumStreamRequest, pb.MonitorAlbumStreamResponse], + timeout time.Duration, +) []*pb.MonitorAlbumStreamResponse { + t.Helper() + + if timeout == 0 { + timeout = defaultStreamTimeout + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var messages []*pb.MonitorAlbumStreamResponse + done := make(chan struct{}) + + go func() { + defer close(done) + for { + msg, err := stream.Recv() + if err == io.EOF { + return + } + if err != nil { + return + } + messages = append(messages, msg) + } + }() + + select { + case <-done: + case <-ctx.Done(): + t.Fatalf("collectAllMessages timed out after %v", timeout) + } + + return messages +} + +func collectUntilPrompt( + t *testing.T, + stream grpc.BidiStreamingClient[pb.MonitorAlbumStreamRequest, pb.MonitorAlbumStreamResponse], + timeout time.Duration, +) ([]*pb.MonitorAlbumStreamResponse, *pb.PromptForDecision, error) { + t.Helper() + + if timeout == 0 { + timeout = defaultStreamTimeout + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var messages []*pb.MonitorAlbumStreamResponse + resultChan := make(chan struct { + prompt *pb.PromptForDecision + err error + }, 1) + + go func() { + for { + msg, err := stream.Recv() + if err == io.EOF { + resultChan <- struct { + prompt *pb.PromptForDecision + err error + }{nil, io.EOF} + return + } + if err != nil { + resultChan <- struct { + prompt *pb.PromptForDecision + err error + }{nil, err} + return + } + messages = append(messages, msg) + if prompt := msg.GetPrompt(); prompt != nil { + resultChan <- struct { + prompt *pb.PromptForDecision + err error + }{prompt, nil} + return + } + if errUpdate := msg.GetError(); errUpdate != nil { + resultChan <- struct { + prompt *pb.PromptForDecision + err error + }{nil, status.Error(codes.Internal, errUpdate.Message)} + return + } + } + }() + + select { + case result := <-resultChan: + return messages, result.prompt, result.err + case <-ctx.Done(): + t.Fatalf("collectUntilPrompt timed out after %v", timeout) + return nil, nil, ctx.Err() + } +} + +func sendDecision( + t *testing.T, + stream grpc.BidiStreamingClient[pb.MonitorAlbumStreamRequest, pb.MonitorAlbumStreamResponse], + promptID string, + decision *pb.UserDecision, +) { + t.Helper() + decision.PromptId = promptID + err := stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Decision{Decision: decision}, + }) + require.NoError(t, err) +} + +func assertStatusSequence(t *testing.T, messages []*pb.MonitorAlbumStreamResponse, expectedSteps []pb.MonitorStep) { + t.Helper() + + var actualSteps []pb.MonitorStep + for _, msg := range messages { + if status := msg.GetStatus(); status != nil { + actualSteps = append(actualSteps, status.Step) + } + } + + assert.Equal(t, expectedSteps, actualSteps, "status step sequence mismatch") +} + +func assertContainsStep(t *testing.T, messages []*pb.MonitorAlbumStreamResponse, step pb.MonitorStep) { + t.Helper() + + for _, msg := range messages { + if status := msg.GetStatus(); status != nil && status.Step == step { + return + } + } + t.Errorf("expected messages to contain step %v", step) +} + +func TestMonitorAlbumStream_AutomaticHappyPath(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + newSearchItem("Test Artist - Test Album [MP3 320]", 10, "magnet:?xt=urn:btih:def456&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + assertStatusSequence(t, messages, []pb.MonitorStep{ + pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, + pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, + pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, + pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, + pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, + pb.MonitorStep_MONITOR_STEP_FILTERING_QUALITY, + pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, + pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, + pb.MonitorStep_MONITOR_STEP_SAVING, + pb.MonitorStep_MONITOR_STEP_COMPLETE, + }) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Album) + assert.Equal(t, "Test Album", result.Album.Title) + require.NotNil(t, result.Release) + + ctx := context.Background() + var torrentCount int + err := suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM torrents").Scan(&torrentCount) + require.NoError(t, err) + assert.Equal(t, 1, torrentCount) +} + +func TestMonitorAlbumStream_AutomaticMetadataUnavailable(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return nil, status.Error(codes.Unavailable, "connection refused") + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var hasError bool + for _, msg := range messages { + if errUpdate := msg.GetError(); errUpdate != nil { + hasError = true + assert.Equal(t, pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, errUpdate.FailedStep) + break + } + } + assert.True(t, hasError, "expected error message for metadata unavailable") + + ctx := context.Background() + var count int + err := suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM artists").Scan(&count) + require.NoError(t, err) + assert.Equal(t, 0, count) +} + +func TestMonitorAlbumStream_AutomaticMetadataNotFound(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return nil, status.Error(codes.NotFound, "album not found") + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "nonexistent-album", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var hasError bool + for _, msg := range messages { + if errUpdate := msg.GetError(); errUpdate != nil { + hasError = true + assert.Equal(t, pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, errUpdate.FailedStep) + break + } + } + assert.True(t, hasError, "expected error message for metadata not found") + + ctx := context.Background() + var count int + err := suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM albums").Scan(&count) + require.NoError(t, err) + assert.Equal(t, 0, count) +} + +func TestMonitorAlbumStream_AutomaticArtistPersistFails(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: &metadataPb.Album{ + Id: "orphan-album-ext-id", + Title: "Orphan Album", + AlbumType: "album", + ReleaseDate: "2024-01-15", + TotalTracks: 10, + Artists: []*metadataPb.ArtistCredit{}, + }, + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Orphan Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "orphan-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Release) + assert.Nil(t, result.Album) + + ctx := context.Background() + var count int + err := suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM downloads").Scan(&count) + require.NoError(t, err) + assert.Equal(t, 0, count) +} + +func TestMonitorAlbumStream_AutomaticAlreadyOwned(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + ctx := context.Background() + + var artistID string + err := suite.pool.QueryRow(ctx, ` + INSERT INTO artists (external_id, name, artist_type, country, genres, image_url, monitor_state) + VALUES ('artist-ext-id', 'Test Artist', 'person', 'US', ARRAY['Rock'], 'http://img.com', 'monitored') + RETURNING id + `).Scan(&artistID) + require.NoError(t, err) + + var albumID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO albums (external_id, artist_id, title, album_type, total_tracks, total_discs, label, genres, cover_url, monitor_state) + VALUES ('owned-album-ext-id', $1, 'Owned Album', 'album', 10, 1, 'Test Label', ARRAY['Rock'], 'http://cover.com', 'monitored') + RETURNING id + `, artistID).Scan(&albumID) + require.NoError(t, err) + + var torrentID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO torrents (album_id, info_hash, tracker, title, format, quality, source, seeders, peers, size) + VALUES ($1, 'existing-hash', 'test-tracker', 'Owned Album FLAC', 'FLAC', '16-44', 'CD', 100, 50, 500000000) + RETURNING id + `, albumID).Scan(&torrentID) + require.NoError(t, err) + + _, err = suite.pool.Exec(ctx, ` + INSERT INTO downloads (torrent_id, album_id, format, quality, state, qbit_hash, save_path) + VALUES ($1, $2, 'QUALITY_LOSSLESS', '16-44', 'completed', 'existing-hash', '/music/owned') + `, torrentID, albumID) + require.NoError(t, err) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("owned-album-ext-id", "Owned Album", "artist-ext-id", "Test Artist"), + }, nil + } + + var indexerCalled bool + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + indexerCalled = true + return newSearchResponse(), nil + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "owned-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + assertContainsStep(t, messages, pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Album) + assert.Equal(t, pb.MonitorState_MONITOR_STATE_MONITORED, result.Album.MonitorState) + assert.Nil(t, result.Release) + assert.False(t, indexerCalled) +} + +func TestMonitorAlbumStream_AutomaticIndexerDown(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return nil, assert.AnError + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var hasError bool + for _, msg := range messages { + if errUpdate := msg.GetError(); errUpdate != nil { + hasError = true + assert.Equal(t, pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, errUpdate.FailedStep) + break + } + } + assert.True(t, hasError, "expected error message for indexer down") +} + +func TestMonitorAlbumStream_AutomaticNoResults(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse(), nil + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + assertContainsStep(t, messages, pb.MonitorStep_MONITOR_STEP_COMPLETE) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Album) + assert.Nil(t, result.Release) + + ctx := context.Background() + var count int + err := suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM torrents").Scan(&count) + require.NoError(t, err) + assert.Equal(t, 0, count) +} + +func TestMonitorAlbumStream_AutomaticAllSeedersZero(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Album FLAC 1", 0, "magnet:?xt=urn:btih:abc1"), + newSearchItem("Test Album FLAC 2", 0, "magnet:?xt=urn:btih:abc2"), + newSearchItem("Test Album FLAC 3", 0, "magnet:?xt=urn:btih:abc3"), + ), nil + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Album) + assert.Nil(t, result.Release) +} + +func TestMonitorAlbumStream_AutomaticAllMagnetsFail(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Album 1", 50, "magnet:?xt=urn:btih:abc1"), + newSearchItem("Test Album 2", 30, "magnet:?xt=urn:btih:abc2"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return nil, assert.AnError + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Album) + assert.Nil(t, result.Release) +} + +func TestMonitorAlbumStream_AutomaticNoQualityMatch(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Album MP3 320", 50, "magnet:?xt=urn:btih:abc1"), + newSearchItem("Test Album MP3 V0", 30, "magnet:?xt=urn:btih:abc2"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentDataMP3(), nil + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + assertContainsStep(t, messages, pb.MonitorStep_MONITOR_STEP_FILTERING_QUALITY) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Album) + assert.Nil(t, result.Release) +} + +func TestMonitorAlbumStream_AutomaticQBitDown(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return nil, assert.AnError + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return assert.AnError + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var hasError bool + for _, msg := range messages { + if errUpdate := msg.GetError(); errUpdate != nil { + hasError = true + assert.Equal(t, pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, errUpdate.FailedStep) + break + } + } + assert.True(t, hasError, "expected error message for qbit down") + + ctx := context.Background() + var downloadCount int + err := suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM downloads").Scan(&downloadCount) + require.NoError(t, err) + assert.Equal(t, 0, downloadCount) +} + +func TestMonitorAlbumStream_AutomaticTorrentExists(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + ctx := context.Background() + + var artistID string + err := suite.pool.QueryRow(ctx, ` + INSERT INTO artists (external_id, name, artist_type, country, genres, image_url, monitor_state) + VALUES ('artist-ext-id', 'Test Artist', 'person', 'US', ARRAY['Rock'], 'http://img.com', 'monitored') + RETURNING id + `).Scan(&artistID) + require.NoError(t, err) + + var albumID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO albums (external_id, artist_id, title, album_type, total_tracks, total_discs, label, genres, cover_url, monitor_state) + VALUES ('test-album-ext-id', $1, 'Test Album', 'album', 10, 1, 'Test Label', ARRAY['Rock'], 'http://cover.com', 'unmonitored') + RETURNING id + `, artistID).Scan(&albumID) + require.NoError(t, err) + + torrentHash := "6ff7af15d0745a3e29d1b9620191cfe01ad3cc70" + + var torrentID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO torrents (album_id, info_hash, tracker, title, format, quality, source, seeders, peers, size) + VALUES ($1, $2, 'test-tracker', 'Test Album FLAC', 'FLAC', '16-44', 'CD', 100, 50, 500000000) + RETURNING id + `, albumID, torrentHash).Scan(&torrentID) + require.NoError(t, err) + + _, err = suite.pool.Exec(ctx, ` + INSERT INTO downloads (torrent_id, album_id, format, quality, state, qbit_hash) + VALUES ($1, $2, 'FLAC', '16-44', 'completed', $3) + `, torrentID, albumID, torrentHash) + require.NoError(t, err) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Album FLAC", 50, "magnet:?xt=urn:btih:"+torrentHash), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{{State: "stalledUP"}}, nil + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Release) + + var downloadCount int + err = suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM downloads").Scan(&downloadCount) + require.NoError(t, err) + assert.Equal(t, 1, downloadCount) +} + +func TestMonitorAlbumStream_AutomaticDuplicateSkipped(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + ctx := context.Background() + + var artistID string + err := suite.pool.QueryRow(ctx, ` + INSERT INTO artists (external_id, name, artist_type, country, genres, image_url, monitor_state) + VALUES ('artist-ext-id', 'Test Artist', 'person', 'US', ARRAY['Rock'], 'http://img.com', 'monitored') + RETURNING id + `).Scan(&artistID) + require.NoError(t, err) + + var albumID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO albums (external_id, artist_id, title, album_type, total_tracks, total_discs, label, genres, cover_url, monitor_state) + VALUES ('test-album-ext-id', $1, 'Test Album', 'album', 10, 1, 'Test Label', ARRAY['Rock'], 'http://cover.com', 'unmonitored') + RETURNING id + `, artistID).Scan(&albumID) + require.NoError(t, err) + + torrentHash := "6ff7af15d0745a3e29d1b9620191cfe01ad3cc70" + + var torrentID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO torrents (album_id, info_hash, tracker, title, format, quality, source, seeders, peers, size) + VALUES ($1, $2, 'test-tracker', 'Test Album FLAC', 'FLAC', '16-44', 'CD', 100, 50, 500000000) + RETURNING id + `, albumID, torrentHash).Scan(&torrentID) + require.NoError(t, err) + + _, err = suite.pool.Exec(ctx, ` + INSERT INTO downloads (torrent_id, album_id, format, quality, state, qbit_hash) + VALUES ($1, $2, 'FLAC', '16-44', 'downloading', $3) + `, torrentID, albumID, torrentHash) + require.NoError(t, err) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Album FLAC", 50, "magnet:?xt=urn:btih:"+torrentHash), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{{State: "downloading"}}, nil + } + + stream := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range messages { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Release) + + var downloadCount int + err = suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM downloads").Scan(&downloadCount) + require.NoError(t, err) + assert.Equal(t, 1, downloadCount) +} + +func TestMonitorAlbumStream_ManualSelectTorrents(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album FLAC 24/96", 50, "magnet:?xt=urn:btih:hash1"), + newSearchItem("Test Artist - Test Album FLAC 16/44", 100, "magnet:?xt=urn:btih:hash2"), + newSearchItem("Test Artist - Test Album MP3", 200, "magnet:?xt=urn:btih:hash3"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultStreamTimeout) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + messages, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + + assert.Equal(t, pb.PromptType_PROMPT_TYPE_SELECT_MANY, prompt.Type) + require.NotNil(t, prompt.GetSelectMany()) + assert.GreaterOrEqual(t, len(prompt.GetSelectMany().Options), 2) + + assertContainsStep(t, messages, pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS) + + sendDecision(t, stream, prompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_SelectedIds{ + SelectedIds: &pb.SelectedIds{ + Ids: []string{prompt.GetSelectMany().Options[0].Id, prompt.GetSelectMany().Options[1].Id}, + }, + }, + }) + + _, selectOnePrompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, selectOnePrompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_SELECT_ONE, selectOnePrompt.Type) + + sendDecision(t, stream, selectOnePrompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_SelectedId{SelectedId: selectOnePrompt.GetSelectOne().DefaultId}, + }) + + _, confirmPrompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, confirmPrompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_CONFIRM, confirmPrompt.Type) + + sendDecision(t, stream, confirmPrompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }) + + remaining := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range remaining { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) +} + +func TestMonitorAlbumStream_ManualSelectRelease(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album FLAC 24/96", 50, "magnet:?xt=urn:btih:hash1"), + newSearchItem("Test Artist - Test Album FLAC 16/44", 100, "magnet:?xt=urn:btih:hash2"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultStreamTimeout) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + messages, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_SELECT_MANY, prompt.Type) + require.NotNil(t, prompt.GetSelectMany()) + assert.Len(t, prompt.GetSelectMany().Options, 2) + + assertContainsStep(t, messages, pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS) + + sendDecision(t, stream, prompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_SelectedIds{SelectedIds: &pb.SelectedIds{Ids: prompt.GetSelectMany().DefaultIds}}, + }) + + _, selectOnePrompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, selectOnePrompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_SELECT_ONE, selectOnePrompt.Type) + require.NotNil(t, selectOnePrompt.GetSelectOne()) + assert.Len(t, selectOnePrompt.GetSelectOne().Options, 2) + + selectedOption := selectOnePrompt.GetSelectOne().Options[1] + sendDecision(t, stream, selectOnePrompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_SelectedId{SelectedId: selectedOption.Id}, + }) + + _, confirmPrompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, confirmPrompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_CONFIRM, confirmPrompt.Type) + + sendDecision(t, stream, confirmPrompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }) + + remaining := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range remaining { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Release) + assert.Equal(t, int32(100), result.Release.Seeders) +} + +func TestMonitorAlbumStream_ManualConfirmAdd(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + var addMagnetCalled bool + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + addMagnetCalled = true + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultStreamTimeout) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + _, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_CONFIRM, prompt.Type) + + sendDecision(t, stream, prompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }) + + remaining := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range remaining { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + require.NotNil(t, result.Release) + assert.True(t, addMagnetCalled) +} + +func TestMonitorAlbumStream_ManualRejectAdd(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + var addMagnetCalled bool + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + addMagnetCalled = true + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultStreamTimeout) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + _, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_CONFIRM, prompt.Type) + + sendDecision(t, stream, prompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_Confirm{Confirm: false}, + }) + + remaining := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range remaining { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) + assert.Nil(t, result.Release) + assert.False(t, addMagnetCalled) + + bgCtx := context.Background() + var downloadCount int + err = suite.pool.QueryRow(bgCtx, "SELECT COUNT(*) FROM downloads").Scan(&downloadCount) + require.NoError(t, err) + assert.Equal(t, 0, downloadCount) +} + +func TestMonitorAlbumStream_ManualAlreadyOwnedContinue(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + ctx := context.Background() + + var artistID string + err := suite.pool.QueryRow(ctx, ` + INSERT INTO artists (external_id, name, artist_type, country, genres, image_url, monitor_state) + VALUES ('artist-ext-id', 'Test Artist', 'person', 'US', ARRAY['Rock'], 'http://img.com', 'monitored') + RETURNING id + `).Scan(&artistID) + require.NoError(t, err) + + var albumID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO albums (external_id, artist_id, title, album_type, total_tracks, total_discs, label, genres, cover_url, monitor_state) + VALUES ('owned-album-ext-id', $1, 'Owned Album', 'album', 10, 1, 'Test Label', ARRAY['Rock'], 'http://cover.com', 'monitored') + RETURNING id + `, artistID).Scan(&albumID) + require.NoError(t, err) + + var torrentID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO torrents (album_id, info_hash, tracker, title, format, quality, source, seeders, peers, size) + VALUES ($1, 'existing-hash', 'test-tracker', 'Owned Album FLAC', 'FLAC', '16-44', 'CD', 100, 50, 500000000) + RETURNING id + `, albumID).Scan(&torrentID) + require.NoError(t, err) + + _, err = suite.pool.Exec(ctx, ` + INSERT INTO downloads (torrent_id, album_id, format, quality, state, qbit_hash, save_path) + VALUES ($1, $2, 'QUALITY_LOSSLESS', '16-44', 'completed', 'existing-hash', '/music/owned') + `, torrentID, albumID) + require.NoError(t, err) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("owned-album-ext-id", "Owned Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Owned Album [FLAC 24/96]", 50, "magnet:?xt=urn:btih:newhash"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + streamCtx, cancel := context.WithTimeout(context.Background(), defaultStreamTimeout) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(streamCtx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "owned-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + messages, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_CONFIRM, prompt.Type) + + assertContainsStep(t, messages, pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED) + + sendDecision(t, stream, prompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }) + + addMessages, addPrompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, addPrompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_CONFIRM, addPrompt.Type) + + assertContainsStep(t, addMessages, pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER) + assertContainsStep(t, addMessages, pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT) + + sendDecision(t, stream, addPrompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }) + + remaining := collectAllMessages(t, stream, 0) + + var result *pb.MonitorAlbumResponse + for _, msg := range remaining { + if r := msg.GetResult(); r != nil { + result = r + break + } + } + require.NotNil(t, result) +} + +func TestMonitorAlbumStream_FirstMessageMustBeStart(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + ctx, cancel := context.WithTimeout(context.Background(), defaultStreamTimeout) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Decision{ + Decision: &pb.UserDecision{ + PromptId: "some-prompt-id", + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }, + }, + }) + require.NoError(t, err) + + _, err = stream.Recv() + require.Error(t, err) + + st, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, codes.InvalidArgument, st.Code()) +} + +func TestMonitorAlbumStream_ContextCancellation(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(2 * time.Second): + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }, + }, + }) + require.NoError(t, err) + + var lastErr error + for { + _, err = stream.Recv() + if err != nil { + lastErr = err + break + } + } + require.Error(t, lastErr) + + st, ok := status.FromError(lastErr) + if ok { + assert.Contains(t, []codes.Code{codes.Canceled, codes.DeadlineExceeded}, st.Code()) + } +} + +func TestMonitorAlbumStream_PromptTimeout(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + origTimeout := internal.MaxPromptTimeout + internal.MaxPromptTimeout = 3 * time.Second + t.Cleanup(func() { internal.MaxPromptTimeout = origTimeout }) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + _, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + + remaining := collectAllMessages(t, stream, 15*time.Second) + + var hasResult bool + for _, msg := range remaining { + if r := msg.GetResult(); r != nil { + hasResult = true + break + } + } + assert.True(t, hasResult, "expected workflow to complete with default decision after timeout") +} + +func TestMonitorAlbumStream_InvalidPromptId(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultStreamTimeout) + defer cancel() + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + _, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Decision{ + Decision: &pb.UserDecision{ + PromptId: "wrong-prompt-id", + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }, + }, + }) + require.NoError(t, err) + + remaining := collectAllMessages(t, stream, 0) + + var hasError bool + for _, msg := range remaining { + if errUpdate := msg.GetError(); errUpdate != nil { + hasError = true + assert.Contains(t, errUpdate.Message, "prompt") + break + } + } + assert.True(t, hasError, "expected error for invalid prompt ID") +}