package internal import ( "context" "fmt" "sync" "time" "homelab.lan/music-agregator/internal/eventbus" ) type WorkflowEntry struct { ID string AlbumID string Quality string Ctx context.Context Cancel context.CancelFunc Topic string Ready chan struct{} } type WorkflowRegistry struct { mu sync.Mutex workflows map[string]*WorkflowEntry bus *eventbus.EventBus wg sync.WaitGroup } func NewWorkflowRegistry(bus *eventbus.EventBus) *WorkflowRegistry { return &WorkflowRegistry{ workflows: make(map[string]*WorkflowEntry), bus: bus, } } func workflowKey(albumID, quality string) string { return fmt.Sprintf("%s:%s", albumID, quality) } func (r *WorkflowRegistry) GetOrCreate(ctx context.Context, albumID, quality string) (*WorkflowEntry, bool) { r.mu.Lock() defer r.mu.Unlock() key := workflowKey(albumID, quality) if entry, ok := r.workflows[key]; ok { return entry, false } wfCtx, cancel := context.WithCancel(ctx) entry := &WorkflowEntry{ AlbumID: albumID, Quality: quality, Ctx: wfCtx, Cancel: cancel, Topic: key, Ready: make(chan struct{}), } r.workflows[key] = entry return entry, true } func (r *WorkflowRegistry) Remove(albumID, quality string) { r.mu.Lock() defer r.mu.Unlock() key := workflowKey(albumID, quality) delete(r.workflows, key) } func (r *WorkflowRegistry) Get(albumID, quality string) (*WorkflowEntry, bool) { r.mu.Lock() defer r.mu.Unlock() key := workflowKey(albumID, quality) entry, ok := r.workflows[key] return entry, ok } func (r *WorkflowRegistry) WaitGroup() *sync.WaitGroup { return &r.wg } func (r *WorkflowRegistry) Shutdown(timeout time.Duration) { r.mu.Lock() for _, entry := range r.workflows { entry.Cancel() } r.mu.Unlock() done := make(chan struct{}) go func() { r.wg.Wait() close(done) }() select { case <-done: case <-time.After(timeout): } }