Files

327 lines
9.0 KiB
Go

package internal
import (
"context"
"encoding/json"
"time"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
"homelab.lan/music-agregator/internal/config"
"homelab.lan/music-agregator/internal/database"
"homelab.lan/music-agregator/internal/eventbus"
"homelab.lan/music-agregator/internal/torrent"
)
type MusicAgregatorServer struct {
service *MusicAgregatorService
bus *eventbus.EventBus
registry *WorkflowRegistry
pb.UnimplementedMusicAgregatorServiceServer
}
func NewMusicAgregatorServer(cfg config.Config, riverClient *river.Client[pgx.Tx], torrentClient torrent.TorrentClient, pathMapper *torrent.PathMapper, db *database.DB) (*MusicAgregatorServer, error) {
service, err := NewMusicAgregatorService(cfg, riverClient, torrentClient, pathMapper, db)
if err != nil {
log.Err(err).Msg("failed to create MusicAgregatorService")
return nil, err
}
bus := eventbus.New()
return &MusicAgregatorServer{
service: service,
bus: bus,
registry: NewWorkflowRegistry(bus),
}, nil
}
func NewMusicAgregatorServerWithService(service *MusicAgregatorService) *MusicAgregatorServer {
bus := eventbus.New()
return &MusicAgregatorServer{
service: service,
bus: bus,
registry: NewWorkflowRegistry(bus),
}
}
func NewMusicAgregatorServerWithDeps(service *MusicAgregatorService, bus *eventbus.EventBus, registry *WorkflowRegistry) *MusicAgregatorServer {
return &MusicAgregatorServer{
service: service,
bus: bus,
registry: registry,
}
}
func (s *MusicAgregatorServer) GetArtists(ctx context.Context, req *pb.GetArtistsRequest) (*pb.GetArtistsResponse, error) {
return s.service.GetArtists(ctx, req)
}
func (s *MusicAgregatorServer) GetAlbum(ctx context.Context, req *pb.GetAlbumRequest) (*pb.GetAlbumResponse, error) {
return s.service.GetAlbum(ctx, req)
}
func (s *MusicAgregatorServer) MonitorAlbum(ctx context.Context, req *pb.MonitorAlbumRequest) (*pb.MonitorAlbumResponse, error) {
return s.service.MonitorAlbum(ctx, req)
}
func (s *MusicAgregatorServer) MonitorAlbumStream(stream pb.MusicAgregatorService_MonitorAlbumStreamServer) error {
msg, err := stream.Recv()
if err != nil {
return err
}
startReq := msg.GetStart()
if startReq == nil {
return status.Error(codes.InvalidArgument, "first message must be StartMonitorRequest")
}
if startReq.Mode == pb.InteractionMode_INTERACTION_MODE_MANUAL {
return s.runManualWorkflow(stream, startReq)
}
return s.runAutomaticWorkflow(stream, startReq)
}
func (s *MusicAgregatorServer) runManualWorkflow(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, startReq *pb.StartMonitorRequest) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
albumKey := startReq.AlbumId
quality := startReq.Quality.String()
topic := albumKey + ":" + quality
dbPublisher := newDBEventPublisher("", quality, s.service.albumEvents, s.bus, topic)
publisher := newStreamEventPublisher(dbPublisher, stream)
workflow := &monitorWorkflow{
mode: startReq.Mode,
req: startReq,
service: s.service,
publisher: publisher,
stream: stream,
decisions: make(chan *pb.UserDecision, 1),
cancel: cancel,
}
go workflow.receiveDecisions(ctx)
err := workflow.run(ctx)
if ctx.Err() != nil {
cleanupCtx := context.Background()
workflow.cleanup(cleanupCtx)
if workflow.workflowRunID != "" {
s.service.workflowRuns.SetCancelled(cleanupCtx, workflow.workflowRunID)
}
} else if workflow.workflowRunID != "" {
if err != nil {
s.service.workflowRuns.SetFailed(context.Background(), workflow.workflowRunID, err.Error())
} else {
s.service.workflowRuns.SetCompleted(context.Background(), workflow.workflowRunID)
}
}
return err
}
func (s *MusicAgregatorServer) runAutomaticWorkflow(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, startReq *pb.StartMonitorRequest) error {
albumKey := startReq.AlbumId
quality := startReq.Quality.String()
entry, created := s.registry.GetOrCreate(context.Background(), albumKey, quality)
sub, cleanup := s.bus.Subscribe(entry.Topic)
defer cleanup()
if created {
s.registry.WaitGroup().Add(1)
go func() {
defer s.registry.WaitGroup().Done()
defer s.registry.Remove(albumKey, quality)
publisher := newDBEventPublisher("", quality, s.service.albumEvents, s.bus, entry.Topic)
workflow := &monitorWorkflow{
mode: startReq.Mode,
req: startReq,
service: s.service,
publisher: publisher,
}
err := workflow.run(entry.Ctx)
if workflow.workflowRunID != "" {
if err != nil {
if entry.Ctx.Err() == context.Canceled {
s.service.workflowRuns.SetCancelled(context.Background(), workflow.workflowRunID)
} else {
s.service.workflowRuns.SetFailed(context.Background(), workflow.workflowRunID, err.Error())
}
} else {
s.service.workflowRuns.SetCompleted(context.Background(), workflow.workflowRunID)
}
}
}()
}
for {
select {
case <-sub.C:
for {
event, ok := sub.Ring.Pop()
if !ok {
break
}
if err := s.sendEventToStream(stream, event); err != nil {
return nil
}
if event.EventType == "result" || event.EventType == "error" {
return nil
}
}
case <-stream.Context().Done():
return nil
}
}
}
func (s *MusicAgregatorServer) sendEventToStream(stream pb.MusicAgregatorService_MonitorAlbumStreamServer, event *eventbus.Event) error {
resp := &pb.MonitorAlbumStreamResponse{}
step := pb.MonitorStep(pb.MonitorStep_value[event.Step])
switch event.EventType {
case "status":
status := &pb.StatusUpdate{Step: step, Message: event.Message}
switch v := event.Data.(type) {
case *pb.StreamAlbumInfo:
status.Data = &pb.StatusUpdate_AlbumInfo{AlbumInfo: v}
case *pb.TorrentList:
status.Data = &pb.StatusUpdate_Torrents{Torrents: v}
case *pb.ReleaseInfo:
status.Data = &pb.StatusUpdate_ReleaseInfo{ReleaseInfo: v}
}
resp.Message = &pb.MonitorAlbumStreamResponse_Status{Status: status}
case "error":
recoverable := false
if data, ok := event.Data.(map[string]bool); ok {
recoverable = data["recoverable"]
}
resp.Message = &pb.MonitorAlbumStreamResponse_Error{
Error: &pb.ErrorUpdate{FailedStep: step, Message: event.Message, Recoverable: recoverable},
}
case "result":
if result, ok := event.Data.(*pb.MonitorAlbumResponse); ok {
resp.Message = &pb.MonitorAlbumStreamResponse_Result{Result: result}
} else if event.Data != nil {
if jsonBytes, ok := event.Data.(json.RawMessage); ok {
var result pb.MonitorAlbumResponse
if err := json.Unmarshal(jsonBytes, &result); err == nil {
resp.Message = &pb.MonitorAlbumStreamResponse_Result{Result: &result}
}
}
}
}
return stream.Send(resp)
}
func (s *MusicAgregatorServer) AnalyzeAlbumRelease(ctx context.Context, req *pb.AnalyzeAlbumReleaseRequest) (*pb.AnalyzeAlbumReleaseResponse, error) {
return s.service.AnalyzeAlbumRelease(ctx, req)
}
func (s *MusicAgregatorServer) SearchArtists(ctx context.Context, req *pb.SearchArtistsRequest) (*pb.SearchArtistsResponse, error) {
return s.service.SearchArtists(ctx, req)
}
func (s *MusicAgregatorServer) GetArtistAlbums(ctx context.Context, req *pb.GetArtistAlbumsRequest) (*pb.GetArtistAlbumsResponse, error) {
return s.service.GetArtistAlbums(ctx, req)
}
func (s *MusicAgregatorServer) Register(server *grpc.Server) {
pb.RegisterMusicAgregatorServiceServer(server, s)
}
func (s *MusicAgregatorServer) SubscribeEvents(req *pb.SubscribeEventsRequest, stream pb.MusicAgregatorService_SubscribeEventsServer) error {
ctx := stream.Context()
sub, cleanup := s.bus.SubscribeGlobal()
defer cleanup()
if req.SinceSeq > 0 {
events, err := s.service.albumEvents.GetAfterSeq(ctx, req.SinceSeq)
if err == nil {
for _, e := range events {
if err := stream.Send(albumEventToProto(e)); err != nil {
return err
}
}
}
}
var lastSentSeq int64
if req.SinceSeq > 0 {
lastSentSeq = req.SinceSeq
}
for {
select {
case <-sub.C:
for {
event, ok := sub.Ring.Pop()
if !ok {
break
}
if event.Seq > lastSentSeq {
pbEvent := busEventToAlbumEvent(event)
if err := stream.Send(pbEvent); err != nil {
return nil
}
lastSentSeq = event.Seq
}
}
case <-ctx.Done():
return nil
}
}
}
func albumEventToProto(e *database.AlbumEvent) *pb.AlbumEvent {
return &pb.AlbumEvent{
Seq: e.Seq,
WorkflowRunId: e.WorkflowRunID,
AlbumId: e.AlbumID,
EventType: e.EventType,
Step: e.Step,
Message: e.Message,
DataJson: e.DataJSON,
TimestampMs: e.CreatedAt.UnixMilli(),
}
}
func busEventToAlbumEvent(e *eventbus.Event) *pb.AlbumEvent {
var dataJSON []byte
if e.Data != nil {
dataJSON, _ = json.Marshal(e.Data)
}
return &pb.AlbumEvent{
Seq: e.Seq,
WorkflowRunId: e.WorkflowRunID,
AlbumId: e.AlbumID,
Quality: e.Quality,
EventType: e.EventType,
Step: e.Step,
Message: e.Message,
DataJson: dataJSON,
TimestampMs: time.Now().UnixMilli(),
}
}