5a5660bf21
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
327 lines
9.0 KiB
Go
327 lines
9.0 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/riverqueue/river"
|
|
"github.com/rs/zerolog/log"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
|
|
"homelab.lan/music-agregator/internal/config"
|
|
"homelab.lan/music-agregator/internal/database"
|
|
"homelab.lan/music-agregator/internal/eventbus"
|
|
"homelab.lan/music-agregator/internal/torrent"
|
|
)
|
|
|
|
type MusicAgregatorServer struct {
|
|
service *MusicAgregatorService
|
|
bus *eventbus.EventBus
|
|
registry *WorkflowRegistry
|
|
pb.UnimplementedMusicAgregatorServiceServer
|
|
}
|
|
|
|
func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx], torrentClient torrent.TorrentClient, pathMapper *torrent.PathMapper, db *database.DB) (*MusicAgregatorServer, error) {
|
|
service, err := NewMusicAgregatorService(cfg, riverClient, torrentClient, pathMapper, db)
|
|
if err != nil {
|
|
log.Err(err).Msg("failed to create MusicAgregatorService")
|
|
return nil, err
|
|
}
|
|
bus := eventbus.New()
|
|
return &MusicAgregatorServer{
|
|
service: service,
|
|
bus: bus,
|
|
registry: NewWorkflowRegistry(bus),
|
|
}, nil
|
|
}
|
|
|
|
func NewMusicAgregatorServerWithService(service *MusicAgregatorService) *MusicAgregatorServer {
|
|
bus := eventbus.New()
|
|
return &MusicAgregatorServer{
|
|
service: service,
|
|
bus: bus,
|
|
registry: NewWorkflowRegistry(bus),
|
|
}
|
|
}
|
|
|
|
func NewMusicAgregatorServerWithDeps(service *MusicAgregatorService, bus *eventbus.EventBus, registry *WorkflowRegistry) *MusicAgregatorServer {
|
|
return &MusicAgregatorServer{
|
|
service: service,
|
|
bus: bus,
|
|
registry: registry,
|
|
}
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) GetArtists(ctx context.Context, req *pb.GetArtistsRequest) (*pb.GetArtistsResponse, error) {
|
|
return s.service.GetArtists(ctx, req)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) GetAlbum(ctx context.Context, req *pb.GetAlbumRequest) (*pb.GetAlbumResponse, error) {
|
|
return s.service.GetAlbum(ctx, req)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) {
|
|
return s.service.MonitorAlbum(ctx, req)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) MonitorAlbumStream(stream pb.MusicAgregatorService_MonitorAlbumStreamServer) error {
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
startReq := msg.GetStart()
|
|
if startReq == nil {
|
|
return status.Error(codes.InvalidArgument, "first message must be StartMonitorRequest")
|
|
}
|
|
|
|
if startReq.Mode == pb.InteractionMode_INTERACTION_MODE_MANUAL {
|
|
return s.runManualWorkflow(stream, startReq)
|
|
}
|
|
return s.runAutomaticWorkflow(stream, startReq)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) runManualWorkflow(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, startReq *pb.StartMonitorRequest) error {
|
|
ctx, cancel := context.WithCancel(stream.Context())
|
|
defer cancel()
|
|
|
|
albumKey := startReq.AlbumId
|
|
quality := startReq.Quality.String()
|
|
topic := albumKey + ":" + quality
|
|
|
|
dbPublisher := newDBEventPublisher("", quality, s.service.albumEvents, s.bus, topic)
|
|
publisher := newStreamEventPublisher(dbPublisher, stream)
|
|
|
|
workflow := &monitorWorkflow{
|
|
mode: startReq.Mode,
|
|
req: startReq,
|
|
service: s.service,
|
|
publisher: publisher,
|
|
stream: stream,
|
|
decisions: make(chan *pb.UserDecision, 1),
|
|
cancel: cancel,
|
|
}
|
|
|
|
go workflow.receiveDecisions(ctx)
|
|
|
|
err := workflow.run(ctx)
|
|
|
|
if ctx.Err() != nil {
|
|
cleanupCtx := context.Background()
|
|
workflow.cleanup(cleanupCtx)
|
|
if workflow.workflowRunID != "" {
|
|
s.service.workflowRuns.SetCancelled(cleanupCtx, workflow.workflowRunID)
|
|
}
|
|
} else if workflow.workflowRunID != "" {
|
|
if err != nil {
|
|
s.service.workflowRuns.SetFailed(context.Background(), workflow.workflowRunID, err.Error())
|
|
} else {
|
|
s.service.workflowRuns.SetCompleted(context.Background(), workflow.workflowRunID)
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) runAutomaticWorkflow(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, startReq *pb.StartMonitorRequest) error {
|
|
albumKey := startReq.AlbumId
|
|
quality := startReq.Quality.String()
|
|
|
|
entry, created := s.registry.GetOrCreate(context.Background(), albumKey, quality)
|
|
|
|
sub, cleanup := s.bus.Subscribe(entry.Topic)
|
|
defer cleanup()
|
|
|
|
if created {
|
|
s.registry.WaitGroup().Add(1)
|
|
go func() {
|
|
defer s.registry.WaitGroup().Done()
|
|
defer s.registry.Remove(albumKey, quality)
|
|
|
|
publisher := newDBEventPublisher("", quality, s.service.albumEvents, s.bus, entry.Topic)
|
|
|
|
workflow := &monitorWorkflow{
|
|
mode: startReq.Mode,
|
|
req: startReq,
|
|
service: s.service,
|
|
publisher: publisher,
|
|
}
|
|
|
|
err := workflow.run(entry.Ctx)
|
|
|
|
if workflow.workflowRunID != "" {
|
|
if err != nil {
|
|
if entry.Ctx.Err() == context.Canceled {
|
|
s.service.workflowRuns.SetCancelled(context.Background(), workflow.workflowRunID)
|
|
} else {
|
|
s.service.workflowRuns.SetFailed(context.Background(), workflow.workflowRunID, err.Error())
|
|
}
|
|
} else {
|
|
s.service.workflowRuns.SetCompleted(context.Background(), workflow.workflowRunID)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-sub.C:
|
|
for {
|
|
event, ok := sub.Ring.Pop()
|
|
if !ok {
|
|
break
|
|
}
|
|
if err := s.sendEventToStream(stream, event); err != nil {
|
|
return nil
|
|
}
|
|
if event.EventType == "result" || event.EventType == "error" {
|
|
return nil
|
|
}
|
|
}
|
|
case <-stream.Context().Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) sendEventToStream(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, event *eventbus.Event) error {
|
|
resp := &pb.MonitorAlbumStreamResponse{}
|
|
|
|
step := pb.MonitorStep(pb.MonitorStep_value[event.Step])
|
|
|
|
switch event.EventType {
|
|
case "status":
|
|
status := &pb.StatusUpdate{Step: step, Message: event.Message}
|
|
|
|
switch v := event.Data.(type) {
|
|
case *pb.StreamAlbumInfo:
|
|
status.Data = &pb.StatusUpdate_AlbumInfo{AlbumInfo: v}
|
|
case *pb.TorrentList:
|
|
status.Data = &pb.StatusUpdate_Torrents{Torrents: v}
|
|
case *pb.ReleaseInfo:
|
|
status.Data = &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: v}
|
|
}
|
|
|
|
resp.Message = &pb.MonitorAlbumStreamResponse_Status{Status: status}
|
|
|
|
case "error":
|
|
recoverable := false
|
|
if data, ok := event.Data.(map[string]bool); ok {
|
|
recoverable = data["recoverable"]
|
|
}
|
|
resp.Message = &pb.MonitorAlbumStreamResponse_Error{
|
|
Error: &pb.ErrorUpdate{FailedStep: step, Message: event.Message, Recoverable: recoverable},
|
|
}
|
|
|
|
case "result":
|
|
if result, ok := event.Data.(*pb.MonitorAlbumResponse); ok {
|
|
resp.Message = &pb.MonitorAlbumStreamResponse_Result{Result: result}
|
|
} else if event.Data != nil {
|
|
if jsonBytes, ok := event.Data.(json.RawMessage); ok {
|
|
var result pb.MonitorAlbumResponse
|
|
if err := json.Unmarshal(jsonBytes, &result); err == nil {
|
|
resp.Message = &pb.MonitorAlbumStreamResponse_Result{Result: &result}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return stream.Send(resp)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) AnalyzeAlbumRelease(ctx context.Context, req *pb.AnalyzeAlbumReleaseRequest) (*pb.AnalyzeAlbumReleaseResponse, error) {
|
|
return s.service.AnalyzeAlbumRelease(ctx, req)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) SearchArtists(ctx context.Context, req *pb.SearchArtistsRequest) (*pb.SearchArtistsResponse, error) {
|
|
return s.service.SearchArtists(ctx, req)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) GetArtistAlbums(ctx context.Context, req *pb.GetArtistAlbumsRequest) (*pb.GetArtistAlbumsResponse, error) {
|
|
return s.service.GetArtistAlbums(ctx, req)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) Register(server *grpc.Server) {
|
|
pb.RegisterMusicAgregatorServiceServer(server, s)
|
|
}
|
|
|
|
func (s *MusicAgregatorServer) SubscribeEvents(req *pb.SubscribeEventsRequest, stream pb.MusicAgregatorService_SubscribeEventsServer) error {
|
|
ctx := stream.Context()
|
|
|
|
sub, cleanup := s.bus.SubscribeGlobal()
|
|
defer cleanup()
|
|
|
|
if req.SinceSeq > 0 {
|
|
events, err := s.service.albumEvents.GetAfterSeq(ctx, req.SinceSeq)
|
|
if err == nil {
|
|
for _, e := range events {
|
|
if err := stream.Send(albumEventToProto(e)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var lastSentSeq int64
|
|
if req.SinceSeq > 0 {
|
|
lastSentSeq = req.SinceSeq
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-sub.C:
|
|
for {
|
|
event, ok := sub.Ring.Pop()
|
|
if !ok {
|
|
break
|
|
}
|
|
if event.Seq > lastSentSeq {
|
|
pbEvent := busEventToAlbumEvent(event)
|
|
if err := stream.Send(pbEvent); err != nil {
|
|
return nil
|
|
}
|
|
lastSentSeq = event.Seq
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func albumEventToProto(e *database.AlbumEvent) *pb.AlbumEvent {
|
|
return &pb.AlbumEvent{
|
|
Seq: e.Seq,
|
|
WorkflowRunId: e.WorkflowRunID,
|
|
AlbumId: e.AlbumID,
|
|
EventType: e.EventType,
|
|
Step: e.Step,
|
|
Message: e.Message,
|
|
DataJson: e.DataJSON,
|
|
TimestampMs: e.CreatedAt.UnixMilli(),
|
|
}
|
|
}
|
|
|
|
func busEventToAlbumEvent(e *eventbus.Event) *pb.AlbumEvent {
|
|
var dataJSON []byte
|
|
if e.Data != nil {
|
|
dataJSON, _ = json.Marshal(e.Data)
|
|
}
|
|
return &pb.AlbumEvent{
|
|
Seq: e.Seq,
|
|
WorkflowRunId: e.WorkflowRunID,
|
|
AlbumId: e.AlbumID,
|
|
Quality: e.Quality,
|
|
EventType: e.EventType,
|
|
Step: e.Step,
|
|
Message: e.Message,
|
|
DataJson: dataJSON,
|
|
TimestampMs: time.Now().UnixMilli(),
|
|
}
|
|
}
|