package internal import ( "context" "encoding/json" "sync" "github.com/rs/zerolog/log" pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/database" "homelab.lan/music-agregator/internal/eventbus" ) type EventPublisher interface { PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error SetAlbumID(albumID string) SetWorkflowRunID(id string) } type dbEventPublisher struct { mu sync.Mutex workflowRunID string albumID string quality string events *database.AlbumEventRepository bus *eventbus.EventBus topic string } func newDBEventPublisher(albumID, quality string, events *database.AlbumEventRepository, bus *eventbus.EventBus, topic string) *dbEventPublisher { return &dbEventPublisher{ albumID: albumID, quality: quality, events: events, bus: bus, topic: topic, } } func (p *dbEventPublisher) SetAlbumID(albumID string) { p.mu.Lock() defer p.mu.Unlock() p.albumID = albumID } func (p *dbEventPublisher) getAlbumID() string { p.mu.Lock() defer p.mu.Unlock() return p.albumID } func (p *dbEventPublisher) SetWorkflowRunID(id string) { p.mu.Lock() defer p.mu.Unlock() p.workflowRunID = id } func (p *dbEventPublisher) getWorkflowRunID() string { p.mu.Lock() defer p.mu.Unlock() return p.workflowRunID } func (p *dbEventPublisher) PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error { var dataJSON []byte if data != nil { var err error dataJSON, err = json.Marshal(data) if err != nil { log.Warn().Err(err).Msg("failed to marshal status data to JSON") dataJSON = nil } } albumID := p.getAlbumID() workflowRunID := p.getWorkflowRunID() var seq int64 if albumID != "" { event := &database.AlbumEvent{ WorkflowRunID: workflowRunID, AlbumID: albumID, EventType: "status", Step: step.String(), Message: msg, DataJSON: dataJSON, } if err := p.events.Create(ctx, event); err != nil { log.Error().Err(err).Msg("failed to persist status event") } else { seq = event.Seq } } p.bus.Publish(p.topic, &eventbus.Event{ Seq: seq, WorkflowRunID: workflowRunID, AlbumID: albumID, Quality: p.quality, EventType: "status", Step: step.String(), Message: msg, Data: data, }) return nil } func (p *dbEventPublisher) PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error { albumID := p.getAlbumID() workflowRunID := p.getWorkflowRunID() var seq int64 if albumID != "" { event := &database.AlbumEvent{ WorkflowRunID: workflowRunID, AlbumID: albumID, EventType: "error", Step: step.String(), Message: err.Error(), } if dbErr := p.events.Create(ctx, event); dbErr != nil { log.Error().Err(dbErr).Msg("failed to persist error event") } else { seq = event.Seq } } p.bus.Publish(p.topic, &eventbus.Event{ Seq: seq, WorkflowRunID: workflowRunID, AlbumID: albumID, Quality: p.quality, EventType: "error", Step: step.String(), Message: err.Error(), Data: map[string]bool{"recoverable": recoverable}, }) return nil } func (p *dbEventPublisher) PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error { var dataJSON []byte if result != nil { var err error dataJSON, err = json.Marshal(result) if err != nil { log.Warn().Err(err).Msg("failed to marshal result to JSON") dataJSON = nil } } albumID := p.getAlbumID() workflowRunID := p.getWorkflowRunID() var seq int64 if albumID != "" { event := &database.AlbumEvent{ WorkflowRunID: workflowRunID, AlbumID: albumID, EventType: "result", Step: pb.MonitorStep_MONITOR_STEP_COMPLETE.String(), Message: "workflow completed", DataJSON: dataJSON, } if err := p.events.Create(ctx, event); err != nil { log.Error().Err(err).Msg("failed to persist result event") } else { seq = event.Seq } } p.bus.Publish(p.topic, &eventbus.Event{ Seq: seq, WorkflowRunID: workflowRunID, AlbumID: albumID, Quality: p.quality, EventType: "result", Step: pb.MonitorStep_MONITOR_STEP_COMPLETE.String(), Message: "workflow completed", Data: result, }) return nil } type streamEventPublisher struct { *dbEventPublisher stream pb.MusicAgregatorService_MonitorAlbumStreamServer } func newStreamEventPublisher(db *dbEventPublisher, stream pb.MusicAgregatorService_MonitorAlbumStreamServer) *streamEventPublisher { return &streamEventPublisher{ dbEventPublisher: db, stream: stream, } } func (p *streamEventPublisher) PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error { if err := p.dbEventPublisher.PublishStatus(ctx, step, msg, data); err != nil { return err } select { case <-ctx.Done(): return ctx.Err() default: } status := &pb.StatusUpdate{ Step: step, Message: msg, } switch v := data.(type) { case *pb.StreamAlbumInfo: status.Data = &pb.StatusUpdate_AlbumInfo{AlbumInfo: v} case *pb.TorrentList: status.Data = &pb.StatusUpdate_Torrents{Torrents: v} case *pb.ReleaseInfo: status.Data = &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: v} } return p.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Status{Status: status}, }) } func (p *streamEventPublisher) PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error { if dbErr := p.dbEventPublisher.PublishError(ctx, step, err, recoverable); dbErr != nil { return dbErr } return p.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Error{ Error: &pb.ErrorUpdate{ FailedStep: step, Message: err.Error(), Recoverable: recoverable, }, }, }) } func (p *streamEventPublisher) PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error { if err := p.dbEventPublisher.PublishResult(ctx, result); err != nil { return err } return p.stream.Send(&pb.MonitorAlbumStreamResponse{ Message: &pb.MonitorAlbumStreamResponse_Result{Result: result}, }) }