Files

261 lines
6.4 KiB
Go

package internal
import (
"context"
"encoding/json"
"sync"
"github.com/rs/zerolog/log"
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/eventbus"
)
type EventPublisher interface {
PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error
PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error
PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error
SetAlbumID(albumID string)
SetWorkflowRunID(id string)
}
type dbEventPublisher struct {
mu sync.Mutex
workflowRunID string
albumID string
quality string
events *database.AlbumEventRepository
bus *eventbus.EventBus
topic string
}
func newDBEventPublisher(albumID, quality string, events *database.AlbumEventRepository, bus *eventbus.EventBus, topic string) *dbEventPublisher {
return &dbEventPublisher{
albumID: albumID,
quality: quality,
events: events,
bus: bus,
topic: topic,
}
}
func (p *dbEventPublisher) SetAlbumID(albumID string) {
p.mu.Lock()
defer p.mu.Unlock()
p.albumID = albumID
}
func (p *dbEventPublisher) getAlbumID() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.albumID
}
func (p *dbEventPublisher) SetWorkflowRunID(id string) {
p.mu.Lock()
defer p.mu.Unlock()
p.workflowRunID = id
}
func (p *dbEventPublisher) getWorkflowRunID() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.workflowRunID
}
func (p *dbEventPublisher) PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error {
var dataJSON []byte
if data != nil {
var err error
dataJSON, err = json.Marshal(data)
if err != nil {
log.Warn().Err(err).Msg("failed to marshal status data to JSON")
dataJSON = nil
}
}
albumID := p.getAlbumID()
workflowRunID := p.getWorkflowRunID()
var seq int64
if albumID != "" {
event := &database.AlbumEvent{
WorkflowRunID: workflowRunID,
AlbumID: albumID,
EventType: "status",
Step: step.String(),
Message: msg,
DataJSON: dataJSON,
}
if err := p.events.Create(ctx, event); err != nil {
log.Error().Err(err).Msg("failed to persist status event")
} else {
seq = event.Seq
}
}
p.bus.Publish(p.topic, &eventbus.Event{
Seq: seq,
WorkflowRunID: workflowRunID,
AlbumID: albumID,
Quality: p.quality,
EventType: "status",
Step: step.String(),
Message: msg,
Data: data,
})
return nil
}
func (p *dbEventPublisher) PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error {
albumID := p.getAlbumID()
workflowRunID := p.getWorkflowRunID()
var seq int64
if albumID != "" {
event := &database.AlbumEvent{
WorkflowRunID: workflowRunID,
AlbumID: albumID,
EventType: "error",
Step: step.String(),
Message: err.Error(),
}
if dbErr := p.events.Create(ctx, event); dbErr != nil {
log.Error().Err(dbErr).Msg("failed to persist error event")
} else {
seq = event.Seq
}
}
p.bus.Publish(p.topic, &eventbus.Event{
Seq: seq,
WorkflowRunID: workflowRunID,
AlbumID: albumID,
Quality: p.quality,
EventType: "error",
Step: step.String(),
Message: err.Error(),
Data: map[string]bool{"recoverable": recoverable},
})
return nil
}
func (p *dbEventPublisher) PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error {
var dataJSON []byte
if result != nil {
var err error
dataJSON, err = json.Marshal(result)
if err != nil {
log.Warn().Err(err).Msg("failed to marshal result to JSON")
dataJSON = nil
}
}
albumID := p.getAlbumID()
workflowRunID := p.getWorkflowRunID()
var seq int64
if albumID != "" {
event := &database.AlbumEvent{
WorkflowRunID: workflowRunID,
AlbumID: albumID,
EventType: "result",
Step: pb.MonitorStep_MONITOR_STEP_COMPLETE.String(),
Message: "workflow completed",
DataJSON: dataJSON,
}
if err := p.events.Create(ctx, event); err != nil {
log.Error().Err(err).Msg("failed to persist result event")
} else {
seq = event.Seq
}
}
p.bus.Publish(p.topic, &eventbus.Event{
Seq: seq,
WorkflowRunID: workflowRunID,
AlbumID: albumID,
Quality: p.quality,
EventType: "result",
Step: pb.MonitorStep_MONITOR_STEP_COMPLETE.String(),
Message: "workflow completed",
Data: result,
})
return nil
}
type streamEventPublisher struct {
*dbEventPublisher
stream pb.MusicAgregatorService_MonitorAlbumStreamServer
}
func newStreamEventPublisher(db *dbEventPublisher, stream pb.MusicAgregatorService_MonitorAlbumStreamServer) *streamEventPublisher {
return &streamEventPublisher{
dbEventPublisher: db,
stream: stream,
}
}
func (p *streamEventPublisher) PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error {
if err := p.dbEventPublisher.PublishStatus(ctx, step, msg, data); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
status := &pb.StatusUpdate{
Step: step,
Message: msg,
}
switch v := data.(type) {
case *pb.StreamAlbumInfo:
status.Data = &pb.StatusUpdate_AlbumInfo{AlbumInfo: v}
case *pb.TorrentList:
status.Data = &pb.StatusUpdate_Torrents{Torrents: v}
case *pb.ReleaseInfo:
status.Data = &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: v}
}
return p.stream.Send(&pb.MonitorAlbumStreamResponse{
Message: &pb.MonitorAlbumStreamResponse_Status{Status: status},
})
}
func (p *streamEventPublisher) PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error {
if dbErr := p.dbEventPublisher.PublishError(ctx, step, err, recoverable); dbErr != nil {
return dbErr
}
return p.stream.Send(&pb.MonitorAlbumStreamResponse{
Message: &pb.MonitorAlbumStreamResponse_Error{
Error: &pb.ErrorUpdate{
FailedStep: step,
Message: err.Error(),
Recoverable: recoverable,
},
},
})
}
func (p *streamEventPublisher) PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error {
if err := p.dbEventPublisher.PublishResult(ctx, result); err != nil {
return err
}
return p.stream.Send(&pb.MonitorAlbumStreamResponse{
Message: &pb.MonitorAlbumStreamResponse_Result{Result: result},
})
}