Add in-process event bus with ring buffer for workflow event broadcasting
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -0,0 +1,116 @@
|
|||||||
|
package eventbus
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type Event struct {
|
||||||
|
Seq int64
|
||||||
|
WorkflowRunID string
|
||||||
|
AlbumID string
|
||||||
|
Quality string
|
||||||
|
EventType string
|
||||||
|
Step string
|
||||||
|
Message string
|
||||||
|
Data interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Subscription struct {
|
||||||
|
Ring *RingBuffer[*Event]
|
||||||
|
C chan struct{}
|
||||||
|
done chan struct{}
|
||||||
|
once sync.Once
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventBus struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
topics map[string]map[*Subscription]struct{}
|
||||||
|
global map[*Subscription]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func New() *EventBus {
|
||||||
|
return &EventBus{
|
||||||
|
topics: make(map[string]map[*Subscription]struct{}),
|
||||||
|
global: make(map[*Subscription]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *EventBus) Publish(topic string, event *Event) {
|
||||||
|
b.mu.RLock()
|
||||||
|
defer b.mu.RUnlock()
|
||||||
|
|
||||||
|
if subs, ok := b.topics[topic]; ok {
|
||||||
|
for sub := range subs {
|
||||||
|
sub.Ring.Push(event)
|
||||||
|
select {
|
||||||
|
case sub.C <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for sub := range b.global {
|
||||||
|
sub.Ring.Push(event)
|
||||||
|
select {
|
||||||
|
case sub.C <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *EventBus) Subscribe(topic string) (*Subscription, func()) {
|
||||||
|
sub := &Subscription{
|
||||||
|
Ring: NewRingBuffer[*Event](256),
|
||||||
|
C: make(chan struct{}, 1),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
b.mu.Lock()
|
||||||
|
if b.topics[topic] == nil {
|
||||||
|
b.topics[topic] = make(map[*Subscription]struct{})
|
||||||
|
}
|
||||||
|
b.topics[topic][sub] = struct{}{}
|
||||||
|
b.mu.Unlock()
|
||||||
|
|
||||||
|
cleanup := func() {
|
||||||
|
sub.once.Do(func() {
|
||||||
|
b.mu.Lock()
|
||||||
|
delete(b.topics[topic], sub)
|
||||||
|
if len(b.topics[topic]) == 0 {
|
||||||
|
delete(b.topics, topic)
|
||||||
|
}
|
||||||
|
b.mu.Unlock()
|
||||||
|
close(sub.done)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return sub, cleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *EventBus) SubscribeGlobal() (*Subscription, func()) {
|
||||||
|
sub := &Subscription{
|
||||||
|
Ring: NewRingBuffer[*Event](256),
|
||||||
|
C: make(chan struct{}, 1),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
b.mu.Lock()
|
||||||
|
b.global[sub] = struct{}{}
|
||||||
|
b.mu.Unlock()
|
||||||
|
|
||||||
|
cleanup := func() {
|
||||||
|
sub.once.Do(func() {
|
||||||
|
b.mu.Lock()
|
||||||
|
delete(b.global, sub)
|
||||||
|
b.mu.Unlock()
|
||||||
|
close(sub.done)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return sub, cleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *EventBus) HasTopic(topic string) bool {
|
||||||
|
b.mu.RLock()
|
||||||
|
defer b.mu.RUnlock()
|
||||||
|
_, ok := b.topics[topic]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
@@ -0,0 +1,168 @@
|
|||||||
|
package eventbus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEventBus_PublishSubscribe(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
sub, cleanup := bus.Subscribe("test-topic")
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
event := &Event{Seq: 1, EventType: "status", Message: "hello"}
|
||||||
|
bus.Publish("test-topic", event)
|
||||||
|
|
||||||
|
got, ok := sub.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, event, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_MultipleSubscribers(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
sub1, cleanup1 := bus.Subscribe("topic")
|
||||||
|
defer cleanup1()
|
||||||
|
sub2, cleanup2 := bus.Subscribe("topic")
|
||||||
|
defer cleanup2()
|
||||||
|
sub3, cleanup3 := bus.Subscribe("topic")
|
||||||
|
defer cleanup3()
|
||||||
|
|
||||||
|
event := &Event{Seq: 1, EventType: "status"}
|
||||||
|
bus.Publish("topic", event)
|
||||||
|
|
||||||
|
got1, ok := sub1.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, event, got1)
|
||||||
|
|
||||||
|
got2, ok := sub2.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, event, got2)
|
||||||
|
|
||||||
|
got3, ok := sub3.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, event, got3)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_GlobalSubscriber(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
sub, cleanup := bus.SubscribeGlobal()
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
bus.Publish("topic-a", &Event{Seq: 1})
|
||||||
|
bus.Publish("topic-b", &Event{Seq: 2})
|
||||||
|
bus.Publish("topic-c", &Event{Seq: 3})
|
||||||
|
|
||||||
|
got, ok := sub.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, int64(1), got.Seq)
|
||||||
|
|
||||||
|
got, ok = sub.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, int64(2), got.Seq)
|
||||||
|
|
||||||
|
got, ok = sub.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, int64(3), got.Seq)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_TopicIsolation(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
subA, cleanupA := bus.Subscribe("topic-a")
|
||||||
|
defer cleanupA()
|
||||||
|
|
||||||
|
bus.Publish("topic-b", &Event{Seq: 1})
|
||||||
|
|
||||||
|
_, ok := subA.Ring.Pop()
|
||||||
|
assert.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_Notification(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
sub, cleanup := bus.Subscribe("topic")
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
bus.Publish("topic", &Event{Seq: 1})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-sub.C:
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("expected notification on channel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_Unsubscribe(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
sub, cleanup := bus.Subscribe("topic")
|
||||||
|
|
||||||
|
bus.Publish("topic", &Event{Seq: 1})
|
||||||
|
_, ok := sub.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
cleanup()
|
||||||
|
|
||||||
|
bus.Publish("topic", &Event{Seq: 2})
|
||||||
|
_, ok = sub.Ring.Pop()
|
||||||
|
assert.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_SlowSubscriber(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
sub, cleanup := bus.Subscribe("topic")
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
for i := 0; i < 500; i++ {
|
||||||
|
bus.Publish("topic", &Event{Seq: int64(i)})
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, 256, sub.Ring.Len())
|
||||||
|
|
||||||
|
first, ok := sub.Ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, int64(244), first.Seq)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_HasTopic(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
|
||||||
|
assert.False(t, bus.HasTopic("topic"))
|
||||||
|
|
||||||
|
sub, cleanup := bus.Subscribe("topic")
|
||||||
|
_ = sub
|
||||||
|
assert.True(t, bus.HasTopic("topic"))
|
||||||
|
|
||||||
|
cleanup()
|
||||||
|
assert.False(t, bus.HasTopic("topic"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_ConcurrentPublishSubscribe(t *testing.T) {
|
||||||
|
bus := New()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
sub, cleanup := bus.Subscribe("topic")
|
||||||
|
defer cleanup()
|
||||||
|
for j := 0; j < 100; j++ {
|
||||||
|
sub.Ring.Pop()
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < 100; j++ {
|
||||||
|
bus.Publish("topic", &Event{Seq: int64(id*100 + j)})
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package eventbus
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type RingBuffer[T any] struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
buf []T
|
||||||
|
head int
|
||||||
|
tail int
|
||||||
|
count int
|
||||||
|
cap int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRingBuffer[T any](capacity int) *RingBuffer[T] {
|
||||||
|
return &RingBuffer[T]{
|
||||||
|
buf: make([]T, capacity),
|
||||||
|
cap: capacity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RingBuffer[T]) Push(item T) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
r.buf[r.head] = item
|
||||||
|
r.head = (r.head + 1) % r.cap
|
||||||
|
|
||||||
|
if r.count == r.cap {
|
||||||
|
r.tail = (r.tail + 1) % r.cap
|
||||||
|
} else {
|
||||||
|
r.count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RingBuffer[T]) Pop() (T, bool) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
var zero T
|
||||||
|
if r.count == 0 {
|
||||||
|
return zero, false
|
||||||
|
}
|
||||||
|
|
||||||
|
item := r.buf[r.tail]
|
||||||
|
r.buf[r.tail] = zero
|
||||||
|
r.tail = (r.tail + 1) % r.cap
|
||||||
|
r.count--
|
||||||
|
|
||||||
|
return item, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RingBuffer[T]) Len() int {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
return r.count
|
||||||
|
}
|
||||||
@@ -0,0 +1,109 @@
|
|||||||
|
package eventbus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRingBuffer_PushPop(t *testing.T) {
|
||||||
|
ring := NewRingBuffer[int](5)
|
||||||
|
|
||||||
|
ring.Push(1)
|
||||||
|
ring.Push(2)
|
||||||
|
ring.Push(3)
|
||||||
|
|
||||||
|
v, ok := ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, 1, v)
|
||||||
|
|
||||||
|
v, ok = ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, 2, v)
|
||||||
|
|
||||||
|
v, ok = ring.Pop()
|
||||||
|
require.True(t, ok)
|
||||||
|
assert.Equal(t, 3, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRingBuffer_Empty(t *testing.T) {
|
||||||
|
ring := NewRingBuffer[int](5)
|
||||||
|
|
||||||
|
v, ok := ring.Pop()
|
||||||
|
assert.False(t, ok)
|
||||||
|
assert.Equal(t, 0, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRingBuffer_OverwriteOldest(t *testing.T) {
|
||||||
|
ring := NewRingBuffer[int](4)
|
||||||
|
|
||||||
|
ring.Push(1)
|
||||||
|
ring.Push(2)
|
||||||
|
ring.Push(3)
|
||||||
|
ring.Push(4)
|
||||||
|
ring.Push(5)
|
||||||
|
ring.Push(6)
|
||||||
|
|
||||||
|
var values []int
|
||||||
|
for {
|
||||||
|
v, ok := ring.Pop()
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
values = append(values, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, []int{3, 4, 5, 6}, values)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRingBuffer_Len(t *testing.T) {
|
||||||
|
ring := NewRingBuffer[int](5)
|
||||||
|
|
||||||
|
assert.Equal(t, 0, ring.Len())
|
||||||
|
|
||||||
|
ring.Push(1)
|
||||||
|
assert.Equal(t, 1, ring.Len())
|
||||||
|
|
||||||
|
ring.Push(2)
|
||||||
|
ring.Push(3)
|
||||||
|
assert.Equal(t, 3, ring.Len())
|
||||||
|
|
||||||
|
ring.Pop()
|
||||||
|
assert.Equal(t, 2, ring.Len())
|
||||||
|
|
||||||
|
ring.Push(4)
|
||||||
|
ring.Push(5)
|
||||||
|
ring.Push(6)
|
||||||
|
ring.Push(7)
|
||||||
|
assert.Equal(t, 5, ring.Len())
|
||||||
|
|
||||||
|
ring.Push(8)
|
||||||
|
assert.Equal(t, 5, ring.Len())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRingBuffer_Concurrent(t *testing.T) {
|
||||||
|
ring := NewRingBuffer[int](100)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < 100; j++ {
|
||||||
|
ring.Push(id*100 + j)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 0; i < 500; i++ {
|
||||||
|
ring.Pop()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user