package internal import ( "context" "encoding/json" "time" "github.com/jackc/pgx/v5" "github.com/riverqueue/river" "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" pb "homelab.lan/music-agregator/gen/music_agregator/v1" "homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/database" "homelab.lan/music-agregator/internal/eventbus" "homelab.lan/music-agregator/internal/torrent" ) type MusicAgregatorServer struct { service *MusicAgregatorService bus *eventbus.EventBus registry *WorkflowRegistry pb.UnimplementedMusicAgregatorServiceServer } func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx], torrentClient torrent.TorrentClient, pathMapper *torrent.PathMapper, db *database.DB) (*MusicAgregatorServer, error) { service, err := NewMusicAgregatorService(cfg, riverClient, torrentClient, pathMapper, db) if err != nil { log.Err(err).Msg("failed to create MusicAgregatorService") return nil, err } bus := eventbus.New() return &MusicAgregatorServer{ service: service, bus: bus, registry: NewWorkflowRegistry(bus), }, nil } func NewMusicAgregatorServerWithService(service *MusicAgregatorService) *MusicAgregatorServer { bus := eventbus.New() return &MusicAgregatorServer{ service: service, bus: bus, registry: NewWorkflowRegistry(bus), } } func NewMusicAgregatorServerWithDeps(service *MusicAgregatorService, bus *eventbus.EventBus, registry *WorkflowRegistry) *MusicAgregatorServer { return &MusicAgregatorServer{ service: service, bus: bus, registry: registry, } } func (s *MusicAgregatorServer) GetArtists(ctx context.Context, req *pb.GetArtistsRequest) (*pb.GetArtistsResponse, error) { return s.service.GetArtists(ctx, req) } func (s *MusicAgregatorServer) GetAlbum(ctx context.Context, req *pb.GetAlbumRequest) (*pb.GetAlbumResponse, error) { return s.service.GetAlbum(ctx, req) } func (s *MusicAgregatorServer) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) { return s.service.MonitorAlbum(ctx, req) } func (s *MusicAgregatorServer) MonitorAlbumStream(stream pb.MusicAgregatorService_MonitorAlbumStreamServer) error { msg, err := stream.Recv() if err != nil { return err } startReq := msg.GetStart() if startReq == nil { return status.Error(codes.InvalidArgument, "first message must be StartMonitorRequest") } if startReq.Mode == pb.InteractionMode_INTERACTION_MODE_MANUAL { return s.runManualWorkflow(stream, startReq) } return s.runAutomaticWorkflow(stream, startReq) } func (s *MusicAgregatorServer) runManualWorkflow(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, startReq *pb.StartMonitorRequest) error { ctx, cancel := context.WithCancel(stream.Context()) defer cancel() albumKey := startReq.AlbumId quality := startReq.Quality.String() topic := albumKey + ":" + quality dbPublisher := newDBEventPublisher("", quality, s.service.albumEvents, s.bus, topic) publisher := newStreamEventPublisher(dbPublisher, stream) workflow := &monitorWorkflow{ mode: startReq.Mode, req: startReq, service: s.service, publisher: publisher, stream: stream, decisions: make(chan *pb.UserDecision, 1), cancel: cancel, } go workflow.receiveDecisions(ctx) err := workflow.run(ctx) if ctx.Err() != nil { cleanupCtx := context.Background() workflow.cleanup(cleanupCtx) if workflow.workflowRunID != "" { s.service.workflowRuns.SetCancelled(cleanupCtx, workflow.workflowRunID) } } else if workflow.workflowRunID != "" { if err != nil { s.service.workflowRuns.SetFailed(context.Background(), workflow.workflowRunID, err.Error()) } else { s.service.workflowRuns.SetCompleted(context.Background(), workflow.workflowRunID) } } return err } func (s *MusicAgregatorServer) runAutomaticWorkflow(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, startReq *pb.StartMonitorRequest) error { albumKey := startReq.AlbumId quality := startReq.Quality.String() entry, created := s.registry.GetOrCreate(context.Background(), albumKey, quality) sub, cleanup := s.bus.Subscribe(entry.Topic) defer cleanup() if created { s.registry.WaitGroup().Add(1) go func() { defer s.registry.WaitGroup().Done() defer s.registry.Remove(albumKey, quality) publisher := newDBEventPublisher("", quality, s.service.albumEvents, s.bus, entry.Topic) workflow := &monitorWorkflow{ mode: startReq.Mode, req: startReq, service: s.service, publisher: publisher, } err := workflow.run(entry.Ctx) if workflow.workflowRunID != "" { if err != nil { if entry.Ctx.Err() == context.Canceled { s.service.workflowRuns.SetCancelled(context.Background(), workflow.workflowRunID) } else { s.service.workflowRuns.SetFailed(context.Background(), workflow.workflowRunID, err.Error()) } } else { s.service.workflowRuns.SetCompleted(context.Background(), workflow.workflowRunID) } } }() } for { select { case <-sub.C: for { event, ok := sub.Ring.Pop() if !ok { break } if err := s.sendEventToStream(stream, event); err != nil { return nil } if event.EventType == "result" || event.EventType == "error" { return nil } } case <-stream.Context().Done(): return nil } } } func (s *MusicAgregatorServer) sendEventToStream(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, event *eventbus.Event) error { resp := &pb.MonitorAlbumStreamResponse{} step := pb.MonitorStep(pb.MonitorStep_value[event.Step]) switch event.EventType { case "status": status := &pb.StatusUpdate{Step: step, Message: event.Message} switch v := event.Data.(type) { case *pb.StreamAlbumInfo: status.Data = &pb.StatusUpdate_AlbumInfo{AlbumInfo: v} case *pb.TorrentList: status.Data = &pb.StatusUpdate_Torrents{Torrents: v} case *pb.ReleaseInfo: status.Data = &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: v} } resp.Message = &pb.MonitorAlbumStreamResponse_Status{Status: status} case "error": recoverable := false if data, ok := event.Data.(map[string]bool); ok { recoverable = data["recoverable"] } resp.Message = &pb.MonitorAlbumStreamResponse_Error{ Error: &pb.ErrorUpdate{FailedStep: step, Message: event.Message, Recoverable: recoverable}, } case "result": if result, ok := event.Data.(*pb.MonitorAlbumResponse); ok { resp.Message = &pb.MonitorAlbumStreamResponse_Result{Result: result} } else if event.Data != nil { if jsonBytes, ok := event.Data.(json.RawMessage); ok { var result pb.MonitorAlbumResponse if err := json.Unmarshal(jsonBytes, &result); err == nil { resp.Message = &pb.MonitorAlbumStreamResponse_Result{Result: &result} } } } } return stream.Send(resp) } func (s *MusicAgregatorServer) AnalyzeAlbumRelease(ctx context.Context, req *pb.AnalyzeAlbumReleaseRequest) (*pb.AnalyzeAlbumReleaseResponse, error) { return s.service.AnalyzeAlbumRelease(ctx, req) } func (s *MusicAgregatorServer) SearchArtists(ctx context.Context, req *pb.SearchArtistsRequest) (*pb.SearchArtistsResponse, error) { return s.service.SearchArtists(ctx, req) } func (s *MusicAgregatorServer) GetArtistAlbums(ctx context.Context, req *pb.GetArtistAlbumsRequest) (*pb.GetArtistAlbumsResponse, error) { return s.service.GetArtistAlbums(ctx, req) } func (s *MusicAgregatorServer) Register(server *grpc.Server) { pb.RegisterMusicAgregatorServiceServer(server, s) } func (s *MusicAgregatorServer) SubscribeEvents(req *pb.SubscribeEventsRequest, stream pb.MusicAgregatorService_SubscribeEventsServer) error { ctx := stream.Context() sub, cleanup := s.bus.SubscribeGlobal() defer cleanup() if req.SinceSeq > 0 { events, err := s.service.albumEvents.GetAfterSeq(ctx, req.SinceSeq) if err == nil { for _, e := range events { if err := stream.Send(albumEventToProto(e)); err != nil { return err } } } } var lastSentSeq int64 if req.SinceSeq > 0 { lastSentSeq = req.SinceSeq } for { select { case <-sub.C: for { event, ok := sub.Ring.Pop() if !ok { break } if event.Seq > lastSentSeq { pbEvent := busEventToAlbumEvent(event) if err := stream.Send(pbEvent); err != nil { return nil } lastSentSeq = event.Seq } } case <-ctx.Done(): return nil } } } func albumEventToProto(e *database.AlbumEvent) *pb.AlbumEvent { return &pb.AlbumEvent{ Seq: e.Seq, WorkflowRunId: e.WorkflowRunID, AlbumId: e.AlbumID, EventType: e.EventType, Step: e.Step, Message: e.Message, DataJson: e.DataJSON, TimestampMs: e.CreatedAt.UnixMilli(), } } func busEventToAlbumEvent(e *eventbus.Event) *pb.AlbumEvent { var dataJSON []byte if e.Data != nil { dataJSON, _ = json.Marshal(e.Data) } return &pb.AlbumEvent{ Seq: e.Seq, WorkflowRunId: e.WorkflowRunID, AlbumId: e.AlbumID, Quality: e.Quality, EventType: e.EventType, Step: e.Step, Message: e.Message, DataJson: dataJSON, TimestampMs: time.Now().UnixMilli(), } }