diff --git a/internal/event_publisher.go b/internal/event_publisher.go new file mode 100644 index 0000000..1299f03 --- /dev/null +++ b/internal/event_publisher.go @@ -0,0 +1,260 @@ +package internal + +import ( + "context" + "encoding/json" + "sync" + + "github.com/rs/zerolog/log" + + pb "homelab.lan/music-agregator/gen/music_agregator/v1" + "homelab.lan/music-agregator/internal/database" + "homelab.lan/music-agregator/internal/eventbus" +) + +type EventPublisher interface { + PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error + PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error + PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error + SetAlbumID(albumID string) + SetWorkflowRunID(id string) +} + +type dbEventPublisher struct { + mu sync.Mutex + workflowRunID string + albumID string + quality string + events *database.AlbumEventRepository + bus *eventbus.EventBus + topic string +} + +func newDBEventPublisher(albumID, quality string, events *database.AlbumEventRepository, bus *eventbus.EventBus, topic string) *dbEventPublisher { + return &dbEventPublisher{ + albumID: albumID, + quality: quality, + events: events, + bus: bus, + topic: topic, + } +} + +func (p *dbEventPublisher) SetAlbumID(albumID string) { + p.mu.Lock() + defer p.mu.Unlock() + p.albumID = albumID +} + +func (p *dbEventPublisher) getAlbumID() string { + p.mu.Lock() + defer p.mu.Unlock() + return p.albumID +} + +func (p *dbEventPublisher) SetWorkflowRunID(id string) { + p.mu.Lock() + defer p.mu.Unlock() + p.workflowRunID = id +} + +func (p *dbEventPublisher) getWorkflowRunID() string { + p.mu.Lock() + defer p.mu.Unlock() + return p.workflowRunID +} + +func (p *dbEventPublisher) PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error { + var dataJSON []byte + if data != nil { + var err error + dataJSON, err = json.Marshal(data) + if err != nil { + log.Warn().Err(err).Msg("failed to marshal status data to JSON") + dataJSON = nil + } + } + + albumID := p.getAlbumID() + workflowRunID := p.getWorkflowRunID() + + var seq int64 + if albumID != "" { + event := &database.AlbumEvent{ + WorkflowRunID: workflowRunID, + AlbumID: albumID, + EventType: "status", + Step: step.String(), + Message: msg, + DataJSON: dataJSON, + } + + if err := p.events.Create(ctx, event); err != nil { + log.Error().Err(err).Msg("failed to persist status event") + } else { + seq = event.Seq + } + } + + p.bus.Publish(p.topic, &eventbus.Event{ + Seq: seq, + WorkflowRunID: workflowRunID, + AlbumID: albumID, + Quality: p.quality, + EventType: "status", + Step: step.String(), + Message: msg, + Data: data, + }) + + return nil +} + +func (p *dbEventPublisher) PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error { + albumID := p.getAlbumID() + workflowRunID := p.getWorkflowRunID() + + var seq int64 + if albumID != "" { + event := &database.AlbumEvent{ + WorkflowRunID: workflowRunID, + AlbumID: albumID, + EventType: "error", + Step: step.String(), + Message: err.Error(), + } + + if dbErr := p.events.Create(ctx, event); dbErr != nil { + log.Error().Err(dbErr).Msg("failed to persist error event") + } else { + seq = event.Seq + } + } + + p.bus.Publish(p.topic, &eventbus.Event{ + Seq: seq, + WorkflowRunID: workflowRunID, + AlbumID: albumID, + Quality: p.quality, + EventType: "error", + Step: step.String(), + Message: err.Error(), + Data: map[string]bool{"recoverable": recoverable}, + }) + + return nil +} + +func (p *dbEventPublisher) PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error { + var dataJSON []byte + if result != nil { + var err error + dataJSON, err = json.Marshal(result) + if err != nil { + log.Warn().Err(err).Msg("failed to marshal result to JSON") + dataJSON = nil + } + } + + albumID := p.getAlbumID() + workflowRunID := p.getWorkflowRunID() + + var seq int64 + if albumID != "" { + event := &database.AlbumEvent{ + WorkflowRunID: workflowRunID, + AlbumID: albumID, + EventType: "result", + Step: pb.MonitorStep_MONITOR_STEP_COMPLETE.String(), + Message: "workflow completed", + DataJSON: dataJSON, + } + + if err := p.events.Create(ctx, event); err != nil { + log.Error().Err(err).Msg("failed to persist result event") + } else { + seq = event.Seq + } + } + + p.bus.Publish(p.topic, &eventbus.Event{ + Seq: seq, + WorkflowRunID: workflowRunID, + AlbumID: albumID, + Quality: p.quality, + EventType: "result", + Step: pb.MonitorStep_MONITOR_STEP_COMPLETE.String(), + Message: "workflow completed", + Data: result, + }) + + return nil +} + +type streamEventPublisher struct { + *dbEventPublisher + stream pb.MusicAgregatorService_MonitorAlbumStreamServer +} + +func newStreamEventPublisher(db *dbEventPublisher, stream pb.MusicAgregatorService_MonitorAlbumStreamServer) *streamEventPublisher { + return &streamEventPublisher{ + dbEventPublisher: db, + stream: stream, + } +} + +func (p *streamEventPublisher) PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error { + if err := p.dbEventPublisher.PublishStatus(ctx, step, msg, data); err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + status := &pb.StatusUpdate{ + Step: step, + Message: msg, + } + + switch v := data.(type) { + case *pb.StreamAlbumInfo: + status.Data = &pb.StatusUpdate_AlbumInfo{AlbumInfo: v} + case *pb.TorrentList: + status.Data = &pb.StatusUpdate_Torrents{Torrents: v} + case *pb.ReleaseInfo: + status.Data = &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: v} + } + + return p.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Status{Status: status}, + }) +} + +func (p *streamEventPublisher) PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error { + if dbErr := p.dbEventPublisher.PublishError(ctx, step, err, recoverable); dbErr != nil { + return dbErr + } + + return p.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Error{ + Error: &pb.ErrorUpdate{ + FailedStep: step, + Message: err.Error(), + Recoverable: recoverable, + }, + }, + }) +} + +func (p *streamEventPublisher) PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error { + if err := p.dbEventPublisher.PublishResult(ctx, result); err != nil { + return err + } + + return p.stream.Send(&pb.MonitorAlbumStreamResponse{ + Message: &pb.MonitorAlbumStreamResponse_Result{Result: result}, + }) +} diff --git a/internal/monitor_workflow.go b/internal/monitor_workflow.go index 26fcf01..d7af730 100644 --- a/internal/monitor_workflow.go +++ b/internal/monitor_workflow.go @@ -17,112 +17,20 @@ import ( var MaxPromptTimeout = 300 * time.Second type monitorWorkflow struct { - stream pb.MusicAgregatorService_MonitorAlbumStreamServer mode pb.InteractionMode req *pb.StartMonitorRequest service *MusicAgregatorService + publisher EventPublisher + + stream pb.MusicAgregatorService_MonitorAlbumStreamServer decisions chan *pb.UserDecision cancel context.CancelFunc - mu sync.Mutex - promptID int -} -func newMonitorWorkflow( - stream pb.MusicAgregatorService_MonitorAlbumStreamServer, - req *pb.StartMonitorRequest, - service *MusicAgregatorService, - cancel context.CancelFunc, -) *monitorWorkflow { - return &monitorWorkflow{ - stream: stream, - mode: req.Mode, - req: req, - service: service, - decisions: make(chan *pb.UserDecision, 1), - cancel: cancel, - } -} + addedHash string + workflowRunID string -func (w *monitorWorkflow) sendStatus(step pb.MonitorStep, msg string) error { - select { - case <-w.stream.Context().Done(): - return w.stream.Context().Err() - default: - } - return w.stream.Send(&pb.MonitorAlbumStreamResponse{ - Message: &pb.MonitorAlbumStreamResponse_Status{ - Status: &pb.StatusUpdate{Step: step, Message: msg}, - }, - }) -} - -func (w *monitorWorkflow) sendStatusWithAlbumInfo(step pb.MonitorStep, msg string, info *pb.StreamAlbumInfo) error { - select { - case <-w.stream.Context().Done(): - return w.stream.Context().Err() - default: - } - return w.stream.Send(&pb.MonitorAlbumStreamResponse{ - Message: &pb.MonitorAlbumStreamResponse_Status{ - Status: &pb.StatusUpdate{ - Step: step, - Message: msg, - Data: &pb.StatusUpdate_AlbumInfo{AlbumInfo: info}, - }, - }, - }) -} - -func (w *monitorWorkflow) sendStatusWithTorrents(step pb.MonitorStep, msg string, torrents *pb.TorrentList) error { - select { - case <-w.stream.Context().Done(): - return w.stream.Context().Err() - default: - } - return w.stream.Send(&pb.MonitorAlbumStreamResponse{ - Message: &pb.MonitorAlbumStreamResponse_Status{ - Status: &pb.StatusUpdate{ - Step: step, - Message: msg, - Data: &pb.StatusUpdate_Torrents{Torrents: torrents}, - }, - }, - }) -} - -func (w *monitorWorkflow) sendStatusWithRelease(step pb.MonitorStep, msg string, release *pb.ReleaseInfo) error { - select { - case <-w.stream.Context().Done(): - return w.stream.Context().Err() - default: - } - return w.stream.Send(&pb.MonitorAlbumStreamResponse{ - Message: &pb.MonitorAlbumStreamResponse_Status{ - Status: &pb.StatusUpdate{ - Step: step, - Message: msg, - Data: &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: release}, - }, - }, - }) -} - -func (w *monitorWorkflow) sendError(step pb.MonitorStep, err error, recoverable bool) error { - return w.stream.Send(&pb.MonitorAlbumStreamResponse{ - Message: &pb.MonitorAlbumStreamResponse_Error{ - Error: &pb.ErrorUpdate{ - FailedStep: step, - Message: err.Error(), - Recoverable: recoverable, - }, - }, - }) -} - -func (w *monitorWorkflow) sendResult(result *pb.MonitorAlbumResponse) error { - return w.stream.Send(&pb.MonitorAlbumStreamResponse{ - Message: &pb.MonitorAlbumStreamResponse_Result{Result: result}, - }) + mu sync.Mutex + promptID int } func (w *monitorWorkflow) nextPromptID() string { @@ -137,6 +45,10 @@ func (w *monitorWorkflow) promptAndWait(ctx context.Context, prompt *pb.PromptFo return w.defaultDecision(prompt), nil } + if w.stream == nil { + return w.defaultDecision(prompt), nil + } + if err := w.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Prompt{Prompt: prompt}, }); err != nil { @@ -184,6 +96,10 @@ func (w *monitorWorkflow) defaultDecision(prompt *pb.PromptForDecision) *pb.User } func (w *monitorWorkflow) receiveDecisions(ctx context.Context) { + if w.stream == nil { + return + } + for { select { case <-ctx.Done(): @@ -197,7 +113,9 @@ func (w *monitorWorkflow) receiveDecisions(ctx context.Context) { } if msg.GetCancel() != nil { - w.cancel() + if w.cancel != nil { + w.cancel() + } return } @@ -217,7 +135,7 @@ func (w *monitorWorkflow) run(ctx context.Context) error { default: } - w.sendStatus(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, "Fetching album metadata...") + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, "Fetching album metadata...", nil) album, err := w.service.metadata.GetAlbum(ctx, w.req.AlbumId) if err != nil { @@ -225,7 +143,7 @@ func (w *monitorWorkflow) run(ctx context.Context) error { return ctx.Err() } log.Error().Err(err).Str("album_id", w.req.AlbumId).Msg("failed to get album") - w.sendError(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, err, false) + w.publisher.PublishError(ctx, pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, err, false) return err } @@ -234,7 +152,7 @@ func (w *monitorWorkflow) run(ctx context.Context) error { artistName = album.GetArtists()[0].GetArtist().GetName() } - w.sendStatusWithAlbumInfo(pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_FETCHING_METADATA, fmt.Sprintf("Got metadata: %s - %s", artistName, album.GetTitle()), &pb.StreamAlbumInfo{ Artist: artistName, @@ -242,18 +160,29 @@ func (w *monitorWorkflow) run(ctx context.Context) error { ReleaseDate: album.GetReleaseDate(), }) - w.sendStatus(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, "Checking if already owned...") + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, "Checking if already owned...", nil) dbAlbum, _ := w.service.metadata.GetAlbumByExternalID(ctx, album.GetId()) if dbAlbum != nil { + w.publisher.SetAlbumID(dbAlbum.ID) w.service.metadata.SetAlbumMonitorState(ctx, dbAlbum.ID, database.Monitored) dbAlbum.MonitorState = database.Monitored + if w.workflowRunID == "" { + run := &database.WorkflowRun{AlbumID: dbAlbum.ID, Quality: w.req.Quality.String()} + if err := w.service.workflowRuns.Create(ctx, run); err != nil && err != database.ErrWorkflowAlreadyRunning { + log.Warn().Err(err).Msg("failed to create workflow run") + } else if err == nil { + w.workflowRunID = run.ID + w.publisher.SetWorkflowRunID(run.ID) + } + } + qualityStr := normalizeQuality(w.req.Quality, 0, 0) owned, err := w.service.downloads.HasAlbumInQuality(ctx, dbAlbum.ID, w.req.Quality.String(), qualityStr) if err == nil && owned { - w.sendStatus(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, - fmt.Sprintf("Already owned in %s quality", qualityStr)) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, + fmt.Sprintf("Already owned in %s quality", qualityStr), nil) if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ @@ -270,26 +199,26 @@ func (w *monitorWorkflow) run(ctx context.Context) error { }, }) if err != nil { - w.sendError(pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, err, false) + w.publisher.PublishError(ctx, pb.MonitorStep_MONITOR_STEP_CHECKING_OWNED, err, false) return err } if !decision.GetConfirm() { - w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Skipped - already owned") - return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_COMPLETE, "Skipped - already owned", nil) + return w.publisher.PublishResult(ctx, w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } } else { - w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Already owned") - return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_COMPLETE, "Already owned", nil) + return w.publisher.PublishResult(ctx, w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } } } - w.sendStatus(pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, - fmt.Sprintf("Searching indexers for %s - %s...", artistName, album.GetTitle())) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, + fmt.Sprintf("Searching indexers for %s - %s...", artistName, album.GetTitle()), nil) searchResult, err := w.service.searchIndexer(album, w.req.IndexerOptions.GetTracker()) if err != nil { - w.sendError(pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, err, true) + w.publisher.PublishError(ctx, pb.MonitorStep_MONITOR_STEP_SEARCHING_INDEXER, err, true) return err } @@ -307,17 +236,17 @@ func (w *monitorWorkflow) run(ctx context.Context) error { Lossless: p.rel.Format.IsLossless(), } } - w.sendStatusWithTorrents(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, fmt.Sprintf("Parsed %d from %d torrents", len(parsed), len(searchResult.Items)), &pb.TorrentList{Torrents: summaries}) } else { - w.sendStatus(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, - fmt.Sprintf("Found %d torrents, none parseable", len(searchResult.Items))) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, + fmt.Sprintf("Found %d torrents, none parseable", len(searchResult.Items)), nil) } if len(parsed) == 0 { - w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "No parseable results found") - return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_COMPLETE, "No parseable results found", nil) + return w.publisher.PublishResult(ctx, w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL && len(parsed) > 1 { @@ -348,7 +277,7 @@ func (w *monitorWorkflow) run(ctx context.Context) error { }, }) if err != nil { - w.sendError(pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, err, false) + w.publisher.PublishError(ctx, pb.MonitorStep_MONITOR_STEP_PARSING_RESULTS, err, false) return err } @@ -371,14 +300,14 @@ func (w *monitorWorkflow) run(ctx context.Context) error { } } - w.sendStatus(pb.MonitorStep_MONITOR_STEP_FILTERING_QUALITY, - fmt.Sprintf("Filtering %d results by quality...", len(parsed))) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_FILTERING_QUALITY, + fmt.Sprintf("Filtering %d results by quality...", len(parsed)), nil) filtered := filterByQuality(parsed, w.req.Quality) if len(filtered) == 0 { log.Warn().Str("album", album.GetTitle()).Str("quality", w.req.Quality.String()).Msg("no releases match quality filter") - w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "No releases match quality filter") - return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_COMPLETE, "No releases match quality filter", nil) + return w.publisher.PublishResult(ctx, w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } var best parsedItem @@ -412,7 +341,7 @@ func (w *monitorWorkflow) run(ctx context.Context) error { }, }) if err != nil { - w.sendError(pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, err, false) + w.publisher.PublishError(ctx, pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, err, false) return err } @@ -430,7 +359,7 @@ func (w *monitorWorkflow) run(ctx context.Context) error { best = selectBestRelease(filtered) } - w.sendStatusWithRelease(pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_SELECTING_RELEASE, fmt.Sprintf("Selected: %s (%d seeders)", best.item.Title, best.item.Seeders), &pb.ReleaseInfo{ InfoHash: best.rel.InfoHash, @@ -441,8 +370,8 @@ func (w *monitorWorkflow) run(ctx context.Context) error { Tracker: best.item.Tracker, }) - w.sendStatus(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, - fmt.Sprintf("Adding torrent: %s...", best.item.Title)) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, + fmt.Sprintf("Adding torrent: %s...", best.item.Title), nil) if w.mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { decision, err := w.promptAndWait(ctx, &pb.PromptForDecision{ @@ -459,30 +388,44 @@ func (w *monitorWorkflow) run(ctx context.Context) error { }, }) if err != nil { - w.sendError(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, err, false) + w.publisher.PublishError(ctx, pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, err, false) return err } if !decision.GetConfirm() { - w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Skipped by user") - return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_COMPLETE, "Skipped by user", nil) + return w.publisher.PublishResult(ctx, w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, nil)) } } - if err := w.service.addToTorrentClient(best); err != nil { - w.sendError(pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, err, true) - return err - } - - w.sendStatus(pb.MonitorStep_MONITOR_STEP_SAVING, "Saving to database...") + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_SAVING, "Saving to database...", nil) dbAlbum, _ = w.service.metadata.GetAlbumByExternalID(ctx, album.GetId()) if dbAlbum != nil { + w.publisher.SetAlbumID(dbAlbum.ID) w.service.saveTorrentAndDownload(ctx, dbAlbum.ID, best) } else { log.Warn().Str("album_id", w.req.AlbumId).Msg("album not in DB, skipping torrent/download persistence") } - w.sendStatus(pb.MonitorStep_MONITOR_STEP_COMPLETE, "Done!") + w.addedHash = best.rel.InfoHash - return w.sendResult(w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, &best)) + if err := w.service.addToTorrentClient(best); err != nil { + w.publisher.PublishError(ctx, pb.MonitorStep_MONITOR_STEP_ADDING_TORRENT, err, true) + return err + } + + w.publisher.PublishStatus(ctx, pb.MonitorStep_MONITOR_STEP_COMPLETE, "Done!", nil) + + return w.publisher.PublishResult(ctx, w.service.buildMonitorAlbumResponse(ctx, album, dbAlbum, &best)) +} + +func (w *monitorWorkflow) cleanup(ctx context.Context) { + if w.addedHash != "" { + if err := w.service.torrentClient.DeleteTorrent(w.addedHash); err != nil { + log.Warn().Err(err).Str("hash", w.addedHash).Msg("failed to delete torrent during cancel cleanup") + } + if err := w.service.downloads.SetCancelledByQbitHash(ctx, w.addedHash); err != nil { + log.Warn().Err(err).Str("hash", w.addedHash).Msg("failed to cancel download during cleanup") + } + } } diff --git a/internal/server.go b/internal/server.go index aeea240..2686b8a 100644 --- a/internal/server.go +++ b/internal/server.go @@ -2,6 +2,8 @@ package internal import ( "context" + "encoding/json" + "time" "github.com/jackc/pgx/v5" "github.com/riverqueue/river" @@ -13,11 +15,14 @@ import ( pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/database" + "homelab.lan/music-agregator/internal/eventbus" "homelab.lan/music-agregator/internal/torrent" ) type MusicAgregatorServer struct { - service *MusicAgregatorService + service *MusicAgregatorService + bus *eventbus.EventBus + registry *WorkflowRegistry pb.UnimplementedMusicAgregatorServiceServer } @@ -27,13 +32,29 @@ func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx log.Err(err).Msg("failed to create MusicAgregatorService") return nil, err } + bus := eventbus.New() return &MusicAgregatorServer{ - service: service, + service: service, + bus: bus, + registry: NewWorkflowRegistry(bus), }, nil } func NewMusicAgregatorServerWithService(service *MusicAgregatorService) *MusicAgregatorServer { - return &MusicAgregatorServer{service: service} + bus := eventbus.New() + return &MusicAgregatorServer{ + service: service, + bus: bus, + registry: NewWorkflowRegistry(bus), + } +} + +func NewMusicAgregatorServerWithDeps(service *MusicAgregatorService, bus *eventbus.EventBus, registry *WorkflowRegistry) *MusicAgregatorServer { + return &MusicAgregatorServer{ + service: service, + bus: bus, + registry: registry, + } } func (s *MusicAgregatorServer) GetArtists(ctx context.Context, req *pb.GetArtistsRequest) (*pb.GetArtistsResponse, error) { @@ -59,16 +80,158 @@ func (s *MusicAgregatorServer) MonitorAlbumStream(stream pb.MusicAgregatorServic return status.Error(codes.InvalidArgument, "first message must be StartMonitorRequest") } + if startReq.Mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { + return s.runManualWorkflow(stream, startReq) + } + return s.runAutomaticWorkflow(stream, startReq) +} + +func (s *MusicAgregatorServer) runManualWorkflow(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, startReq *pb.StartMonitorRequest) error { ctx, cancel := context.WithCancel(stream.Context()) defer cancel() - workflow := newMonitorWorkflow(stream, startReq, s.service, cancel) + albumKey := startReq.AlbumId + quality := startReq.Quality.String() + topic := albumKey + ":" + quality - if startReq.Mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { - go workflow.receiveDecisions(ctx) + dbPublisher := newDBEventPublisher("", quality, s.service.albumEvents, s.bus, topic) + publisher := newStreamEventPublisher(dbPublisher, stream) + + workflow := &monitorWorkflow{ + mode: startReq.Mode, + req: startReq, + service: s.service, + publisher: publisher, + stream: stream, + decisions: make(chan *pb.UserDecision, 1), + cancel: cancel, } - return workflow.run(ctx) + go workflow.receiveDecisions(ctx) + + err := workflow.run(ctx) + + if ctx.Err() != nil { + cleanupCtx := context.Background() + workflow.cleanup(cleanupCtx) + if workflow.workflowRunID != "" { + s.service.workflowRuns.SetCancelled(cleanupCtx, workflow.workflowRunID) + } + } else if workflow.workflowRunID != "" { + if err != nil { + s.service.workflowRuns.SetFailed(context.Background(), workflow.workflowRunID, err.Error()) + } else { + s.service.workflowRuns.SetCompleted(context.Background(), workflow.workflowRunID) + } + } + + return err +} + +func (s *MusicAgregatorServer) runAutomaticWorkflow(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, startReq *pb.StartMonitorRequest) error { + albumKey := startReq.AlbumId + quality := startReq.Quality.String() + + entry, created := s.registry.GetOrCreate(context.Background(), albumKey, quality) + + sub, cleanup := s.bus.Subscribe(entry.Topic) + defer cleanup() + + if created { + s.registry.WaitGroup().Add(1) + go func() { + defer s.registry.WaitGroup().Done() + defer s.registry.Remove(albumKey, quality) + + publisher := newDBEventPublisher("", quality, s.service.albumEvents, s.bus, entry.Topic) + + workflow := &monitorWorkflow{ + mode: startReq.Mode, + req: startReq, + service: s.service, + publisher: publisher, + } + + err := workflow.run(entry.Ctx) + + if workflow.workflowRunID != "" { + if err != nil { + if entry.Ctx.Err() == context.Canceled { + s.service.workflowRuns.SetCancelled(context.Background(), workflow.workflowRunID) + } else { + s.service.workflowRuns.SetFailed(context.Background(), workflow.workflowRunID, err.Error()) + } + } else { + s.service.workflowRuns.SetCompleted(context.Background(), workflow.workflowRunID) + } + } + }() + } + + for { + select { + case <-sub.C: + for { + event, ok := sub.Ring.Pop() + if !ok { + break + } + if err := s.sendEventToStream(stream, event); err != nil { + return nil + } + if event.EventType == "result" || event.EventType == "error" { + return nil + } + } + case <-stream.Context().Done(): + return nil + } + } +} + +func (s *MusicAgregatorServer) sendEventToStream(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, event *eventbus.Event) error { + resp := &pb.MonitorAlbumStreamResponse{} + + step := pb.MonitorStep(pb.MonitorStep_value[event.Step]) + + switch event.EventType { + case "status": + status := &pb.StatusUpdate{Step: step, Message: event.Message} + + switch v := event.Data.(type) { + case *pb.StreamAlbumInfo: + status.Data = &pb.StatusUpdate_AlbumInfo{AlbumInfo: v} + case *pb.TorrentList: + status.Data = &pb.StatusUpdate_Torrents{Torrents: v} + case *pb.ReleaseInfo: + status.Data = &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: v} + } + + resp.Message = &pb.MonitorAlbumStreamResponse_Status{Status: status} + + case "error": + recoverable := false + if data, ok := event.Data.(map[string]bool); ok { + recoverable = data["recoverable"] + } + resp.Message = &pb.MonitorAlbumStreamResponse_Error{ + Error: &pb.ErrorUpdate{FailedStep: step, Message: event.Message, Recoverable: recoverable}, + } + + case "result": + if result, ok := event.Data.(*pb.MonitorAlbumResponse); ok { + resp.Message = &pb.MonitorAlbumStreamResponse_Result{Result: result} + } else if event.Data != nil { + if jsonBytes, ok := event.Data.(json.RawMessage); ok { + var result pb.MonitorAlbumResponse + if err := json.Unmarshal(jsonBytes, &result); err == nil { + resp.Message = &pb.MonitorAlbumStreamResponse_Result{Result: &result} + } + } + } + } + + return stream.Send(resp) } func (s *MusicAgregatorServer) AnalyzeAlbumRelease(ctx context.Context, req *pb.AnalyzeAlbumReleaseRequest) (*pb.AnalyzeAlbumReleaseResponse, error) { @@ -86,3 +249,78 @@ func (s *MusicAgregatorServer) GetArtistAlbums(ctx context.Context, req *pb.GetA func (s *MusicAgregatorServer) Register(server *grpc.Server) { pb.RegisterMusicAgregatorServiceServer(server, s) } + +func (s *MusicAgregatorServer) SubscribeEvents(req *pb.SubscribeEventsRequest, stream pb.MusicAgregatorService_SubscribeEventsServer) error { + ctx := stream.Context() + + sub, cleanup := s.bus.SubscribeGlobal() + defer cleanup() + + if req.SinceSeq > 0 { + events, err := s.service.albumEvents.GetAfterSeq(ctx, req.SinceSeq) + if err == nil { + for _, e := range events { + if err := stream.Send(albumEventToProto(e)); err != nil { + return err + } + } + } + } + + var lastSentSeq int64 + if req.SinceSeq > 0 { + lastSentSeq = req.SinceSeq + } + + for { + select { + case <-sub.C: + for { + event, ok := sub.Ring.Pop() + if !ok { + break + } + if event.Seq > lastSentSeq { + pbEvent := busEventToAlbumEvent(event) + if err := stream.Send(pbEvent); err != nil { + return nil + } + lastSentSeq = event.Seq + } + } + case <-ctx.Done(): + return nil + } + } +} + +func albumEventToProto(e *database.AlbumEvent) *pb.AlbumEvent { + return &pb.AlbumEvent{ + Seq: e.Seq, + WorkflowRunId: e.WorkflowRunID, + AlbumId: e.AlbumID, + EventType: e.EventType, + Step: e.Step, + Message: e.Message, + DataJson: e.DataJSON, + TimestampMs: e.CreatedAt.UnixMilli(), + } +} + +func busEventToAlbumEvent(e *eventbus.Event) *pb.AlbumEvent { + var dataJSON []byte + if e.Data != nil { + dataJSON, _ = json.Marshal(e.Data) + } + return &pb.AlbumEvent{ + Seq: e.Seq, + WorkflowRunId: e.WorkflowRunID, + AlbumId: e.AlbumID, + Quality: e.Quality, + EventType: e.EventType, + Step: e.Step, + Message: e.Message, + DataJson: dataJSON, + TimestampMs: time.Now().UnixMilli(), + } +} diff --git a/internal/service.go b/internal/service.go index 0f1ddb9..2d23d4f 100644 --- a/internal/service.go +++ b/internal/service.go @@ -49,6 +49,10 @@ type MusicAgregatorService struct { albumReleases *database.AlbumReleaseRepository trackReleases *database.TrackReleaseRepository analyzer *analysis.ReleaseAnalyzer + workflowRuns *database.WorkflowRunRepository + albumEvents *database.AlbumEventRepository + shutdownCtx context.Context + shutdownCancel context.CancelFunc } func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.Tx], torrentClient torrent.TorrentClient, pathMapper *torrent.PathMapper, db *database.DB) (*MusicAgregatorService, error) { @@ -70,6 +74,8 @@ func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.T return nil, err } + ctx, cancel := context.WithCancel(context.Background()) + return &MusicAgregatorService{ config: cfg, metadata: metadata.NewMetadataService(metadataClient, db), @@ -85,6 +91,10 @@ func NewMusicAgregatorService(cfg config.Config, riverClient *river.Client[pgx.T albumReleases: database.NewAlbumReleaseRepository(db.Pool), trackReleases: database.NewTrackReleaseRepository(db.Pool), analyzer: analysis.NewReleaseAnalyzer(db), + workflowRuns: database.NewWorkflowRunRepository(db.Pool), + albumEvents: database.NewAlbumEventRepository(db.Pool), + shutdownCtx: ctx, + shutdownCancel: cancel, }, nil } @@ -97,6 +107,8 @@ func NewMusicAgregatorServiceWithDeps( pathMapper *torrent.PathMapper, db *database.DB, ) *MusicAgregatorService { + ctx, cancel := context.WithCancel(context.Background()) + return &MusicAgregatorService{ metadata: metadata, indexer: searcher, @@ -111,15 +123,59 @@ func NewMusicAgregatorServiceWithDeps( albumReleases: database.NewAlbumReleaseRepository(db.Pool), trackReleases: database.NewTrackReleaseRepository(db.Pool), analyzer: analysis.NewReleaseAnalyzer(db), + workflowRuns: database.NewWorkflowRunRepository(db.Pool), + albumEvents: database.NewAlbumEventRepository(db.Pool), + shutdownCtx: ctx, + shutdownCancel: cancel, } } func (s *MusicAgregatorService) Close() { + if s.shutdownCancel != nil { + s.shutdownCancel() + } if closer, ok := s.magnetResolver.(interface{ Close() }); ok { closer.Close() } } +func (s *MusicAgregatorService) RecoverWorkflows(ctx context.Context) { + stale, err := s.workflowRuns.GetRunning(ctx) + if err != nil { + log.Error().Err(err).Msg("failed to query stale workflow runs for recovery") + return + } + + if len(stale) == 0 { + return + } + + for _, run := range stale { + downloads, err := s.downloads.GetByAlbumID(ctx, run.AlbumID) + if err != nil { + log.Error().Err(err).Str("workflow_run_id", run.ID).Msg("failed to query downloads for recovery") + s.workflowRuns.SetFailed(ctx, run.ID, "recovery: failed to query downloads") + continue + } + + hasActive := false + for _, d := range downloads { + if d.State == "downloading" || d.State == "completed" || d.State == "seeding" { + hasActive = true + break + } + } + + if hasActive { + s.workflowRuns.SetCompleted(ctx, run.ID) + log.Info().Str("workflow_run_id", run.ID).Str("album_id", run.AlbumID).Msg("recovered stale workflow as completed") + } else { + s.workflowRuns.SetFailed(ctx, run.ID, "server restarted during workflow") + log.Warn().Str("workflow_run_id", run.ID).Str("album_id", run.AlbumID).Msg("recovered stale workflow as failed") + } + } +} + func (service *MusicAgregatorService) GetArtists(ctx context.Context, _ *pb.GetArtistsRequest) (*pb.GetArtistsResponse, error) { dbArtists, err := service.artists.GetAll(ctx) if err != nil {