From 75822790776e2161feacc2dd40b4891ceb1e32b3 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 11 May 2026 15:53:43 +0200 Subject: [PATCH] Add in-process event bus with ring buffer for workflow event broadcasting Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent) Co-authored-by: Sisyphus --- internal/eventbus/bus.go | 116 +++++++++++++++++++++++ internal/eventbus/bus_test.go | 168 +++++++++++++++++++++++++++++++++ internal/eventbus/ring.go | 56 +++++++++++ internal/eventbus/ring_test.go | 109 +++++++++++++++++++++ 4 files changed, 449 insertions(+) create mode 100644 internal/eventbus/bus.go create mode 100644 internal/eventbus/bus_test.go create mode 100644 internal/eventbus/ring.go create mode 100644 internal/eventbus/ring_test.go diff --git a/internal/eventbus/bus.go b/internal/eventbus/bus.go new file mode 100644 index 0000000..7b3bfaf --- /dev/null +++ b/internal/eventbus/bus.go @@ -0,0 +1,116 @@ +package eventbus + +import "sync" + +type Event struct { + Seq int64 + WorkflowRunID string + AlbumID string + Quality string + EventType string + Step string + Message string + Data interface{} +} + +type Subscription struct { + Ring *RingBuffer[*Event] + C chan struct{} + done chan struct{} + once sync.Once +} + +type EventBus struct { + mu sync.RWMutex + topics map[string]map[*Subscription]struct{} + global map[*Subscription]struct{} +} + +func New() *EventBus { + return &EventBus{ + topics: make(map[string]map[*Subscription]struct{}), + global: make(map[*Subscription]struct{}), + } +} + +func (b *EventBus) Publish(topic string, event *Event) { + b.mu.RLock() + defer b.mu.RUnlock() + + if subs, ok := b.topics[topic]; ok { + for sub := range subs { + sub.Ring.Push(event) + select { + case sub.C <- struct{}{}: + default: + } + } + } + + for sub := range b.global { + sub.Ring.Push(event) + select { + case sub.C <- struct{}{}: + default: + } + } +} + +func (b *EventBus) Subscribe(topic string) (*Subscription, func()) { + sub := &Subscription{ + Ring: NewRingBuffer[*Event](256), + C: make(chan struct{}, 1), + done: make(chan struct{}), + } + + b.mu.Lock() + if b.topics[topic] == nil { + b.topics[topic] = make(map[*Subscription]struct{}) + } + b.topics[topic][sub] = struct{}{} + b.mu.Unlock() + + cleanup := func() { + sub.once.Do(func() { + b.mu.Lock() + delete(b.topics[topic], sub) + if len(b.topics[topic]) == 0 { + delete(b.topics, topic) + } + b.mu.Unlock() + close(sub.done) + }) + } + + return sub, cleanup +} + +func (b *EventBus) SubscribeGlobal() (*Subscription, func()) { + sub := &Subscription{ + Ring: NewRingBuffer[*Event](256), + C: make(chan struct{}, 1), + done: make(chan struct{}), + } + + b.mu.Lock() + b.global[sub] = struct{}{} + b.mu.Unlock() + + cleanup := func() { + sub.once.Do(func() { + b.mu.Lock() + delete(b.global, sub) + b.mu.Unlock() + close(sub.done) + }) + } + + return sub, cleanup +} + +func (b *EventBus) HasTopic(topic string) bool { + b.mu.RLock() + defer b.mu.RUnlock() + _, ok := b.topics[topic] + return ok +} diff --git a/internal/eventbus/bus_test.go b/internal/eventbus/bus_test.go new file mode 100644 index 0000000..fd71a67 --- /dev/null +++ b/internal/eventbus/bus_test.go @@ -0,0 +1,168 @@ +package eventbus + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventBus_PublishSubscribe(t *testing.T) { + bus := New() + sub, cleanup := bus.Subscribe("test-topic") + defer cleanup() + + event := &Event{Seq: 1, EventType: "status", Message: "hello"} + bus.Publish("test-topic", event) + + got, ok := sub.Ring.Pop() + require.True(t, ok) + assert.Equal(t, event, got) +} + +func TestEventBus_MultipleSubscribers(t *testing.T) { + bus := New() + sub1, cleanup1 := bus.Subscribe("topic") + defer cleanup1() + sub2, cleanup2 := bus.Subscribe("topic") + defer cleanup2() + sub3, cleanup3 := bus.Subscribe("topic") + defer cleanup3() + + event := &Event{Seq: 1, EventType: "status"} + bus.Publish("topic", event) + + got1, ok := sub1.Ring.Pop() + require.True(t, ok) + assert.Equal(t, event, got1) + + got2, ok := sub2.Ring.Pop() + require.True(t, ok) + assert.Equal(t, event, got2) + + got3, ok := sub3.Ring.Pop() + require.True(t, ok) + assert.Equal(t, event, got3) +} + +func TestEventBus_GlobalSubscriber(t *testing.T) { + bus := New() + sub, cleanup := bus.SubscribeGlobal() + defer cleanup() + + bus.Publish("topic-a", &Event{Seq: 1}) + bus.Publish("topic-b", &Event{Seq: 2}) + bus.Publish("topic-c", &Event{Seq: 3}) + + got, ok := sub.Ring.Pop() + require.True(t, ok) + assert.Equal(t, int64(1), got.Seq) + + got, ok = sub.Ring.Pop() + require.True(t, ok) + assert.Equal(t, int64(2), got.Seq) + + got, ok = sub.Ring.Pop() + require.True(t, ok) + assert.Equal(t, int64(3), got.Seq) +} + +func TestEventBus_TopicIsolation(t *testing.T) { + bus := New() + subA, cleanupA := bus.Subscribe("topic-a") + defer cleanupA() + + bus.Publish("topic-b", &Event{Seq: 1}) + + _, ok := subA.Ring.Pop() + assert.False(t, ok) +} + +func TestEventBus_Notification(t *testing.T) { + bus := New() + sub, cleanup := bus.Subscribe("topic") + defer cleanup() + + bus.Publish("topic", &Event{Seq: 1}) + + select { + case <-sub.C: + case <-time.After(100 * time.Millisecond): + t.Fatal("expected notification on channel") + } +} + +func TestEventBus_Unsubscribe(t *testing.T) { + bus := New() + sub, cleanup := bus.Subscribe("topic") + + bus.Publish("topic", &Event{Seq: 1}) + _, ok := sub.Ring.Pop() + require.True(t, ok) + + cleanup() + + bus.Publish("topic", &Event{Seq: 2}) + _, ok = sub.Ring.Pop() + assert.False(t, ok) +} + +func TestEventBus_SlowSubscriber(t *testing.T) { + bus := New() + sub, cleanup := bus.Subscribe("topic") + defer cleanup() + + for i := 0; i < 500; i++ { + bus.Publish("topic", &Event{Seq: int64(i)}) + } + + assert.Equal(t, 256, sub.Ring.Len()) + + first, ok := sub.Ring.Pop() + require.True(t, ok) + assert.Equal(t, int64(244), first.Seq) +} + +func TestEventBus_HasTopic(t *testing.T) { + bus := New() + + assert.False(t, bus.HasTopic("topic")) + + sub, cleanup := bus.Subscribe("topic") + _ = sub + assert.True(t, bus.HasTopic("topic")) + + cleanup() + assert.False(t, bus.HasTopic("topic")) +} + +func TestEventBus_ConcurrentPublishSubscribe(t *testing.T) { + bus := New() + var wg sync.WaitGroup + + for i := 0; i < 5; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + sub, cleanup := bus.Subscribe("topic") + defer cleanup() + for j := 0; j < 100; j++ { + sub.Ring.Pop() + } + }(i) + } + + for i := 0; i < 5; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := 0; j < 100; j++ { + bus.Publish("topic", &Event{Seq: int64(id*100 + j)}) + } + }(i) + } + + wg.Wait() +} diff --git a/internal/eventbus/ring.go b/internal/eventbus/ring.go new file mode 100644 index 0000000..688a125 --- /dev/null +++ b/internal/eventbus/ring.go @@ -0,0 +1,56 @@ +package eventbus + +import "sync" + +type RingBuffer[T any] struct { + mu sync.Mutex + buf []T + head int + tail int + count int + cap int +} + +func NewRingBuffer[T any](capacity int) *RingBuffer[T] { + return &RingBuffer[T]{ + buf: make([]T, capacity), + cap: capacity, + } +} + +func (r *RingBuffer[T]) Push(item T) { + r.mu.Lock() + defer r.mu.Unlock() + + r.buf[r.head] = item + r.head = (r.head + 1) % r.cap + + if r.count == r.cap { + r.tail = (r.tail + 1) % r.cap + } else { + r.count++ + } +} + +func (r *RingBuffer[T]) Pop() (T, bool) { + r.mu.Lock() + defer r.mu.Unlock() + + var zero T + if r.count == 0 { + return zero, false + } + + item := r.buf[r.tail] + r.buf[r.tail] = zero + r.tail = (r.tail + 1) % r.cap + r.count-- + + return item, true +} + +func (r *RingBuffer[T]) Len() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.count +} diff --git a/internal/eventbus/ring_test.go b/internal/eventbus/ring_test.go new file mode 100644 index 0000000..5801afb --- /dev/null +++ b/internal/eventbus/ring_test.go @@ -0,0 +1,109 @@ +package eventbus + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRingBuffer_PushPop(t *testing.T) { + ring := NewRingBuffer[int](5) + + ring.Push(1) + ring.Push(2) + ring.Push(3) + + v, ok := ring.Pop() + require.True(t, ok) + assert.Equal(t, 1, v) + + v, ok = ring.Pop() + require.True(t, ok) + assert.Equal(t, 2, v) + + v, ok = ring.Pop() + require.True(t, ok) + assert.Equal(t, 3, v) +} + +func TestRingBuffer_Empty(t *testing.T) { + ring := NewRingBuffer[int](5) + + v, ok := ring.Pop() + assert.False(t, ok) + assert.Equal(t, 0, v) +} + +func TestRingBuffer_OverwriteOldest(t *testing.T) { + ring := NewRingBuffer[int](4) + + ring.Push(1) + ring.Push(2) + ring.Push(3) + ring.Push(4) + ring.Push(5) + ring.Push(6) + + var values []int + for { + v, ok := ring.Pop() + if !ok { + break + } + values = append(values, v) + } + + assert.Equal(t, []int{3, 4, 5, 6}, values) +} + +func TestRingBuffer_Len(t *testing.T) { + ring := NewRingBuffer[int](5) + + assert.Equal(t, 0, ring.Len()) + + ring.Push(1) + assert.Equal(t, 1, ring.Len()) + + ring.Push(2) + ring.Push(3) + assert.Equal(t, 3, ring.Len()) + + ring.Pop() + assert.Equal(t, 2, ring.Len()) + + ring.Push(4) + ring.Push(5) + ring.Push(6) + ring.Push(7) + assert.Equal(t, 5, ring.Len()) + + ring.Push(8) + assert.Equal(t, 5, ring.Len()) +} + +func TestRingBuffer_Concurrent(t *testing.T) { + ring := NewRingBuffer[int](100) + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := 0; j < 100; j++ { + ring.Push(id*100 + j) + } + }(i) + } + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 500; i++ { + ring.Pop() + } + }() + + wg.Wait() +}