package internal import ( "context" "fmt" "sync" "time" "github.com/rs/zerolog/log" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/database" ) var MaxPromptTimeout = 300 * time.Second type monitorWorkflow struct { stream pb.MusicAgregatorService_MonitorAlbumStreamServer mode pb.InteractionMode req *pb.StartMonitorRequest service *MusicAgregatorService decisions chan *pb.UserDecision cancel context.CancelFunc mu sync.Mutex promptID int } func newMonitorWorkflow( stream pb.MusicAgregatorService_MonitorAlbumStreamServer, req *pb.StartMonitorRequest, service *MusicAgregatorService, cancel context.CancelFunc, ) *monitorWorkflow { return &monitorWorkflow{ stream: stream, mode: req.Mode, req: req, service: service, decisions: make(chan *pb.UserDecision, 1), cancel: cancel, } } func (w *monitorWorkflow) sendStatus(step pb.MonitorStep, msg string) error { select { case <-w.stream.Context().Done(): return w.stream.Context().Err() default: } return w.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Status{ Status: &pb.StatusUpdate{Step: step, Message: msg}, }, }) } func (w *monitorWorkflow) sendStatusWithAlbumInfo(step pb.MonitorStep, msg string, info *pb.StreamAlbumInfo) error { select { case <-w.stream.Context().Done(): return w.stream.Context().Err() default: } return w.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Status{ Status: &pb.StatusUpdate{ Step: step, Message: msg, Data: &pb.StatusUpdate_AlbumInfo{AlbumInfo: info}, }, }, }) } func (w *monitorWorkflow) sendStatusWithTorrents(step pb.MonitorStep, msg string, torrents *pb.TorrentList) error { select { case <-w.stream.Context().Done(): return w.stream.Context().Err() default: } return w.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Status{ Status: &pb.StatusUpdate{ Step: step, Message: msg, Data: &pb.StatusUpdate_Torrents{Torrents: torrents}, }, }, }) } func (w *monitorWorkflow) sendStatusWithRelease(step pb.MonitorStep, msg string, release *pb.ReleaseInfo) error { select { case <-w.stream.Context().Done(): return w.stream.Context().Err() default: } return w.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Status{ Status: &pb.StatusUpdate{ Step: step, Message: msg, Data: &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: release}, }, }, }) } func (w *monitorWorkflow) sendError(step pb.MonitorStep, err error, recoverable bool) error { return w.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Error{ Error: &pb.ErrorUpdate{ FailedStep: step, Message: err.Error(), Recoverable: recoverable, }, }, }) } func (w *monitorWorkflow) sendResult(result *pb.MonitorAlbumResponse) error { return w.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Result{Result: result}, }) } func (w *monitorWorkflow) nextPromptID() string { w.mu.Lock() defer w.mu.Unlock() w.promptID++ return fmt.Sprintf("prompt-%d", w.promptID) } func (w *monitorWorkflow) promptAndWait(ctx context.Context, prompt *pb.PromptForDecision) (*pb.UserDecision, error) { if w.mode == pb.InteractionMode_INTERACTION_MODE_AUTOMATIC { return w.defaultDecision(prompt), nil } if err := w.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Prompt{Prompt: prompt}, }); err != nil { return nil, err } timeout := time.Duration(prompt.TimeoutSeconds) * time.Second if timeout == 0 || timeout > MaxPromptTimeout { timeout = MaxPromptTimeout } timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() select { case decision := <-w.decisions: if decision.PromptId != prompt.PromptId { return nil, status.Error(codes.InvalidArgument, "prompt_id mismatch") } return decision, nil case <-timeoutCtx.Done(): return w.defaultDecision(prompt), nil } } func (w *monitorWorkflow) defaultDecision(prompt *pb.PromptForDecision) *pb.UserDecision { decision := &pb.UserDecision{PromptId: prompt.PromptId} switch prompt.Type { case pb.PromptType_PROMPT_TYPE_CONFIRM: decision.Decision = &pb.UserDecision_Confirm{ Confirm: prompt.GetConfirm().GetDefaultValue(), } case pb.PromptType_PROMPT_TYPE_SELECT_ONE: decision.Decision = &pb.UserDecision_SelectedId{ SelectedId: prompt.GetSelectOne().GetDefaultId(), } case pb.PromptType_PROMPT_TYPE_SELECT_MANY: decision.Decision = &pb.UserDecision_SelectedIds{ SelectedIds: &pb.SelectedIds{Ids: prompt.GetSelectMany().GetDefaultIds()}, } } return decision } func (w *monitorWorkflow) receiveDecisions(ctx context.Context) { for { select { case <-ctx.Done(): return default: } msg, err := w.stream.Recv() if err != nil { return } if msg.GetCancel() != nil { w.cancel() return } if decision := msg.GetDecision(); decision != nil { select { case w.decisions <- decision: default: } } } } func (w *monitorWorkflow) run(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } w.sendStatus(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, "Fetching album metadata...") album, err := w.service.metadata.GetAlbum(ctx, w.req.AlbumId) if err != nil { if ctx.Err() != nil { return ctx.Err() } log.Error().Err(err).Str("album_id", w.req.AlbumId).Msg("failed to get album") w.sendError(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, err, false) return err } artistName := "" if len(album.GetArtists()) > 0 { artistName = album.GetArtists()[0].GetArtist().GetName() } w.sendStatusWithAlbumInfo(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, fmt.Sprintf("Got metadata: %s - %s", artistName, album.GetTitle()), &pb.StreamAlbumInfo{ Artist: artistName, Title: album.GetTitle(), ReleaseDate: album.GetReleaseDate(), }) w.sendStatus(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, "Checking if already owned...") dbAlbum, _ := w.service.metadata.GetAlbumByExternalID(ctx, album.GetId()) if dbAlbum != nil { w.service.metadata.SetAlbumMonitorState(ctx, dbAlbum.ID, database.Monitored) dbAlbum.MonitorState = database.Monitored qualityStr := normalizeQuality(w.req.Quality, 0, 0) owned, err := w.service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, w.req.Quality.String(), qualityStr) if err == nil && owned { w.sendStatus(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, fmt.Sprintf("Already owned in %s quality", qualityStr)) if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ PromptId: w.nextPromptID(), Type: pb.PromptType_PROMPT_TYPE_CONFIRM, Message: "Album already owned. Download anyway?", Options: &pb.PromptForDecision_Confirm{ Confirm: &pb.ConfirmPrompt{ ConfirmLabel: "Download anyway", CancelLabel: "Skip", DefaultValue: false, }, }, }) if err != nil { w.sendError(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, err, false) return err } if !decision.GetConfirm() { w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Skipped - already owned") return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } } else { w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Already owned") return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } } } w.sendStatus(pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, fmt.Sprintf("Searching indexers for %s - %s...", artistName, album.GetTitle())) searchResult, err := w.service.searchIndexer(album, w.req.IndexerOptions.GetTracker()) if err != nil { w.sendError(pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, err, true) return err } parsed := w.service.parseSearchResults(searchResult, album) if len(parsed) > 0 { summaries := make([]*pb.TorrentSummary, len(parsed)) for i, p := range parsed { summaries[i] = &pb.TorrentSummary{ Id: fmt.Sprintf("torrent-%d", i), Title: p.item.Title, Tracker: p.item.Tracker, Seeders: int32(p.item.Seeders), Format: p.rel.Format.String(), Lossless: p.rel.Format.IsLossless(), } } w.sendStatusWithTorrents(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, fmt.Sprintf("Parsed %d from %d torrents", len(parsed), len(searchResult.Items)), &pb.TorrentList{Torrents: summaries}) } else { w.sendStatus(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, fmt.Sprintf("Found %d torrents, none parseable", len(searchResult.Items))) } if len(parsed) == 0 { w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "No parseable results found") return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL && len(parsed) > 1 { options := make([]*pb.SelectOption, len(parsed)) defaultIDs := make([]string, len(parsed)) for i, p := range parsed { id := fmt.Sprintf("torrent-%d", i) options[i] = &pb.SelectOption{ Id: id, Label: p.item.Title, Description: fmt.Sprintf("%s - %d seeders", p.item.Tracker, p.item.Seeders), } defaultIDs[i] = id } decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ PromptId: w.nextPromptID(), Type: pb.PromptType_PROMPT_TYPE_SELECT_MANY, Message: "Select torrents to consider", Options: &pb.PromptForDecision_SelectMany{ SelectMany: &pb.SelectManyPrompt{ Options: options, DefaultIds: defaultIDs, MinSelections: 1, MaxSelections: int32(len(parsed)), }, }, }) if err != nil { w.sendError(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, err, false) return err } selectedIDs := make(map[string]bool) if ids := decision.GetSelectedIds(); ids != nil { for _, id := range ids.GetIds() { selectedIDs[id] = true } } var selected []parsedItem for i, p := range parsed { id := fmt.Sprintf("torrent-%d", i) if selectedIDs[id] { selected = append(selected, p) } } if len(selected) > 0 { parsed = selected } } w.sendStatus(pb.MonitorStep_MONITOR_STEP_FILTERING_QUALITY, fmt.Sprintf("Filtering %d results by quality...", len(parsed))) filtered := filterByQuality(parsed, w.req.Quality) if len(filtered) == 0 { log.Warn().Str("album", album.GetTitle()).Str("quality", w.req.Quality.String()).Msg("no releases match quality filter") w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "No releases match quality filter") return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } var best parsedItem if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL && len(filtered) > 1 { options := make([]*pb.SelectOption, len(filtered)) for i, p := range filtered { options[i] = &pb.SelectOption{ Id: fmt.Sprintf("release-%d", i), Label: p.item.Title, Description: fmt.Sprintf("%s - %d seeders - %s", p.item.Tracker, p.item.Seeders, p.rel.Format.String()), } } bestIdx := 0 for i, p := range filtered { if p.item.Seeders > filtered[bestIdx].item.Seeders { bestIdx = i } } decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ PromptId: w.nextPromptID(), Type: pb.PromptType_PROMPT_TYPE_SELECT_ONE, Message: "Select release", Options: &pb.PromptForDecision_SelectOne{ SelectOne: &pb.SelectOnePrompt{ Options: options, DefaultId: fmt.Sprintf("release-%d", bestIdx), }, }, }) if err != nil { w.sendError(pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, err, false) return err } selectedIdx := 0 if id := decision.GetSelectedId(); id != "" { for i := range filtered { if fmt.Sprintf("release-%d", i) == id { selectedIdx = i break } } } best = filtered[selectedIdx] } else { best = selectBestRelease(filtered) } w.sendStatusWithRelease(pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, fmt.Sprintf("Selected: %s (%d seeders)", best.item.Title, best.item.Seeders), &pb.ReleaseInfo{ InfoHash: best.rel.InfoHash, Format: best.rel.Format.String(), BitDepth: int32(best.rel.BitDepth), SampleRate: int32(best.rel.SampleRate), Seeders: int32(best.item.Seeders), Tracker: best.item.Tracker, }) w.sendStatus(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, fmt.Sprintf("Adding torrent: %s...", best.item.Title)) if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ PromptId: w.nextPromptID(), Type: pb.PromptType_PROMPT_TYPE_CONFIRM, Message: fmt.Sprintf("Add torrent '%s' to client?", best.item.Title), Options: &pb.PromptForDecision_Confirm{ Confirm: &pb.ConfirmPrompt{ ConfirmLabel: "Add", CancelLabel: "Skip", DefaultValue: true, }, }, }) if err != nil { w.sendError(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, err, false) return err } if !decision.GetConfirm() { w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Skipped by user") return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } } if err := w.service.addToTorrentClient(best); err != nil { w.sendError(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, err, true) return err } w.sendStatus(pb.MonitorStep_MONITOR_STEP_SAVING, "Saving to database...") dbAlbum, _ = w.service.metadata.GetAlbumByExternalID(ctx, album.GetId()) if dbAlbum != nil { w.service.saveTorrentAndDownload(ctx, dbAlbum.ID, best) } else { log.Warn().Str("album_id", w.req.AlbumId).Msg("album not in DB, skipping torrent/download persistence") } w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Done!") return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, &best)) }