5a5660bf21
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
261 lines
6.4 KiB
Go
261 lines
6.4 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sync"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
|
|
"homelab.lan/music-agregator/internal/database"
|
|
"homelab.lan/music-agregator/internal/eventbus"
|
|
)
|
|
|
|
type EventPublisher interface {
|
|
PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error
|
|
PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error
|
|
PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error
|
|
SetAlbumID(albumID string)
|
|
SetWorkflowRunID(id string)
|
|
}
|
|
|
|
type dbEventPublisher struct {
|
|
mu sync.Mutex
|
|
workflowRunID string
|
|
albumID string
|
|
quality string
|
|
events *database.AlbumEventRepository
|
|
bus *eventbus.EventBus
|
|
topic string
|
|
}
|
|
|
|
func newDBEventPublisher(albumID, quality string, events *database.AlbumEventRepository, bus *eventbus.EventBus, topic string) *dbEventPublisher {
|
|
return &dbEventPublisher{
|
|
albumID: albumID,
|
|
quality: quality,
|
|
events: events,
|
|
bus: bus,
|
|
topic: topic,
|
|
}
|
|
}
|
|
|
|
func (p *dbEventPublisher) SetAlbumID(albumID string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.albumID = albumID
|
|
}
|
|
|
|
func (p *dbEventPublisher) getAlbumID() string {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
return p.albumID
|
|
}
|
|
|
|
func (p *dbEventPublisher) SetWorkflowRunID(id string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.workflowRunID = id
|
|
}
|
|
|
|
func (p *dbEventPublisher) getWorkflowRunID() string {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
return p.workflowRunID
|
|
}
|
|
|
|
func (p *dbEventPublisher) PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error {
|
|
var dataJSON []byte
|
|
if data != nil {
|
|
var err error
|
|
dataJSON, err = json.Marshal(data)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("failed to marshal status data to JSON")
|
|
dataJSON = nil
|
|
}
|
|
}
|
|
|
|
albumID := p.getAlbumID()
|
|
workflowRunID := p.getWorkflowRunID()
|
|
|
|
var seq int64
|
|
if albumID != "" {
|
|
event := &database.AlbumEvent{
|
|
WorkflowRunID: workflowRunID,
|
|
AlbumID: albumID,
|
|
EventType: "status",
|
|
Step: step.String(),
|
|
Message: msg,
|
|
DataJSON: dataJSON,
|
|
}
|
|
|
|
if err := p.events.Create(ctx, event); err != nil {
|
|
log.Error().Err(err).Msg("failed to persist status event")
|
|
} else {
|
|
seq = event.Seq
|
|
}
|
|
}
|
|
|
|
p.bus.Publish(p.topic, &eventbus.Event{
|
|
Seq: seq,
|
|
WorkflowRunID: workflowRunID,
|
|
AlbumID: albumID,
|
|
Quality: p.quality,
|
|
EventType: "status",
|
|
Step: step.String(),
|
|
Message: msg,
|
|
Data: data,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *dbEventPublisher) PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error {
|
|
albumID := p.getAlbumID()
|
|
workflowRunID := p.getWorkflowRunID()
|
|
|
|
var seq int64
|
|
if albumID != "" {
|
|
event := &database.AlbumEvent{
|
|
WorkflowRunID: workflowRunID,
|
|
AlbumID: albumID,
|
|
EventType: "error",
|
|
Step: step.String(),
|
|
Message: err.Error(),
|
|
}
|
|
|
|
if dbErr := p.events.Create(ctx, event); dbErr != nil {
|
|
log.Error().Err(dbErr).Msg("failed to persist error event")
|
|
} else {
|
|
seq = event.Seq
|
|
}
|
|
}
|
|
|
|
p.bus.Publish(p.topic, &eventbus.Event{
|
|
Seq: seq,
|
|
WorkflowRunID: workflowRunID,
|
|
AlbumID: albumID,
|
|
Quality: p.quality,
|
|
EventType: "error",
|
|
Step: step.String(),
|
|
Message: err.Error(),
|
|
Data: map[string]bool{"recoverable": recoverable},
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *dbEventPublisher) PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error {
|
|
var dataJSON []byte
|
|
if result != nil {
|
|
var err error
|
|
dataJSON, err = json.Marshal(result)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msg("failed to marshal result to JSON")
|
|
dataJSON = nil
|
|
}
|
|
}
|
|
|
|
albumID := p.getAlbumID()
|
|
workflowRunID := p.getWorkflowRunID()
|
|
|
|
var seq int64
|
|
if albumID != "" {
|
|
event := &database.AlbumEvent{
|
|
WorkflowRunID: workflowRunID,
|
|
AlbumID: albumID,
|
|
EventType: "result",
|
|
Step: pb.MonitorStep_MONITOR_STEP_COMPLETE.String(),
|
|
Message: "workflow completed",
|
|
DataJSON: dataJSON,
|
|
}
|
|
|
|
if err := p.events.Create(ctx, event); err != nil {
|
|
log.Error().Err(err).Msg("failed to persist result event")
|
|
} else {
|
|
seq = event.Seq
|
|
}
|
|
}
|
|
|
|
p.bus.Publish(p.topic, &eventbus.Event{
|
|
Seq: seq,
|
|
WorkflowRunID: workflowRunID,
|
|
AlbumID: albumID,
|
|
Quality: p.quality,
|
|
EventType: "result",
|
|
Step: pb.MonitorStep_MONITOR_STEP_COMPLETE.String(),
|
|
Message: "workflow completed",
|
|
Data: result,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
type streamEventPublisher struct {
|
|
*dbEventPublisher
|
|
stream pb.MusicAgregatorService_MonitorAlbumStreamServer
|
|
}
|
|
|
|
func newStreamEventPublisher(db *dbEventPublisher, stream pb.MusicAgregatorService_MonitorAlbumStreamServer) *streamEventPublisher {
|
|
return &streamEventPublisher{
|
|
dbEventPublisher: db,
|
|
stream: stream,
|
|
}
|
|
}
|
|
|
|
func (p *streamEventPublisher) PublishStatus(ctx context.Context, step pb.MonitorStep, msg string, data interface{}) error {
|
|
if err := p.dbEventPublisher.PublishStatus(ctx, step, msg, data); err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
status := &pb.StatusUpdate{
|
|
Step: step,
|
|
Message: msg,
|
|
}
|
|
|
|
switch v := data.(type) {
|
|
case *pb.StreamAlbumInfo:
|
|
status.Data = &pb.StatusUpdate_AlbumInfo{AlbumInfo: v}
|
|
case *pb.TorrentList:
|
|
status.Data = &pb.StatusUpdate_Torrents{Torrents: v}
|
|
case *pb.ReleaseInfo:
|
|
status.Data = &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: v}
|
|
}
|
|
|
|
return p.stream.Send(&pb.MonitorAlbumStreamResponse{
|
|
Message: &pb.MonitorAlbumStreamResponse_Status{Status: status},
|
|
})
|
|
}
|
|
|
|
func (p *streamEventPublisher) PublishError(ctx context.Context, step pb.MonitorStep, err error, recoverable bool) error {
|
|
if dbErr := p.dbEventPublisher.PublishError(ctx, step, err, recoverable); dbErr != nil {
|
|
return dbErr
|
|
}
|
|
|
|
return p.stream.Send(&pb.MonitorAlbumStreamResponse{
|
|
Message: &pb.MonitorAlbumStreamResponse_Error{
|
|
Error: &pb.ErrorUpdate{
|
|
FailedStep: step,
|
|
Message: err.Error(),
|
|
Recoverable: recoverable,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func (p *streamEventPublisher) PublishResult(ctx context.Context, result *pb.MonitorAlbumResponse) error {
|
|
if err := p.dbEventPublisher.PublishResult(ctx, result); err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.stream.Send(&pb.MonitorAlbumStreamResponse{
|
|
Message: &pb.MonitorAlbumStreamResponse_Result{Result: result},
|
|
})
|
|
}
|