Files
music-agregator/test/component/subscribe_events_test.go
2026-05-11 15:54:25 +02:00

363 lines
9.2 KiB
Go

package component
import (
"context"
"io"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
metadataPb "homelab.lan/music-agregator/gen/metadata/v1"
pb "homelab.lan/music-agregator/gen/music_agregator/v1"
"homelab.lan/music-agregator/internal/indexer"
"homelab.lan/music-agregator/internal/torrent"
)
func startAutomaticWorkflow(t *testing.T, client pb.MusicAgregatorServiceClient, albumID string) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.MonitorAlbumStream(ctx)
require.NoError(t, err)
err = stream.Send(&pb.MonitorAlbumStreamRequest{
Message: &pb.MonitorAlbumStreamRequest_Start{
Start: &pb.StartMonitorRequest{
AlbumId: albumID,
Quality: pb.QualityType_QUALITY_LOSSLESS,
Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC,
},
},
})
require.NoError(t, err)
for {
_, err := stream.Recv()
if err != nil {
break
}
}
}
func collectSubscribeEvents(
t *testing.T,
stream grpc.ServerStreamingClient[pb.AlbumEvent],
timeout time.Duration,
) []*pb.AlbumEvent {
t.Helper()
if timeout == 0 {
timeout = 5 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var events []*pb.AlbumEvent
done := make(chan struct{})
go func() {
defer close(done)
for {
event, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
return
}
events = append(events, event)
}
}()
select {
case <-done:
case <-ctx.Done():
}
return events
}
func TestSubscribeEvents_ReceivesWorkflowEvents(t *testing.T) {
suite := setupSuite(t)
cleanTables(t, suite.pool)
suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) {
return &metadataPb.GetAlbumResponse{
Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"),
}, nil
}
suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) {
return newSearchResponse(
newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"),
), nil
}
suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) {
return newTorrentData(), nil
}
suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) {
return []torrent.TorrentInfo{}, nil
}
suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
subStream, err := suite.client.SubscribeEvents(ctx, &pb.SubscribeEventsRequest{})
require.NoError(t, err)
var events []*pb.AlbumEvent
var mu sync.Mutex
done := make(chan struct{})
go func() {
defer close(done)
for {
event, err := subStream.Recv()
if err != nil {
return
}
mu.Lock()
events = append(events, event)
mu.Unlock()
}
}()
time.Sleep(100 * time.Millisecond)
startAutomaticWorkflow(t, suite.client, "test-album-ext-id")
time.Sleep(500 * time.Millisecond)
cancel()
<-done
mu.Lock()
defer mu.Unlock()
require.NotEmpty(t, events, "expected to receive events from workflow")
var hasStatusEvent bool
for _, e := range events {
if e.EventType == "status" {
hasStatusEvent = true
break
}
}
assert.True(t, hasStatusEvent, "expected at least one status event")
var prevSeq int64
for _, e := range events {
if e.Seq > 0 {
assert.Greater(t, e.Seq, prevSeq, "seq numbers should be monotonically increasing")
prevSeq = e.Seq
}
}
}
func TestSubscribeEvents_MultipleWorkflows(t *testing.T) {
suite := setupSuite(t)
cleanTables(t, suite.pool)
suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) {
albumID := in.GetId()
if albumID == "" {
albumID = "test-album"
}
return &metadataPb.GetAlbumResponse{
Album: newMetadataAlbum(albumID, "Test Album", "artist-ext-id", "Test Artist"),
}, nil
}
suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) {
return newSearchResponse(
newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"),
), nil
}
suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) {
return newTorrentData(), nil
}
suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) {
return []torrent.TorrentInfo{}, nil
}
suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
subStream, err := suite.client.SubscribeEvents(ctx, &pb.SubscribeEventsRequest{})
require.NoError(t, err)
var events []*pb.AlbumEvent
var mu sync.Mutex
done := make(chan struct{})
go func() {
defer close(done)
for {
event, err := subStream.Recv()
if err != nil {
return
}
mu.Lock()
events = append(events, event)
mu.Unlock()
}
}()
time.Sleep(100 * time.Millisecond)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
startAutomaticWorkflow(t, suite.client, "album-1")
}()
go func() {
defer wg.Done()
startAutomaticWorkflow(t, suite.client, "album-2")
}()
wg.Wait()
time.Sleep(500 * time.Millisecond)
cancel()
<-done
mu.Lock()
defer mu.Unlock()
require.NotEmpty(t, events, "expected to receive events from workflows")
albumIDs := make(map[string]bool)
for _, e := range events {
if e.AlbumId != "" {
albumIDs[e.AlbumId] = true
}
}
assert.True(t, len(albumIDs) >= 1, "expected events from at least one workflow")
}
func TestSubscribeEvents_ClientDisconnect(t *testing.T) {
suite := setupSuite(t)
cleanTables(t, suite.pool)
suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) {
time.Sleep(300 * time.Millisecond)
return &metadataPb.GetAlbumResponse{
Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"),
}, nil
}
suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) {
return newSearchResponse(
newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"),
), nil
}
suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) {
return newTorrentData(), nil
}
suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) {
return []torrent.TorrentInfo{}, nil
}
suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
subStream, err := suite.client.SubscribeEvents(ctx, &pb.SubscribeEventsRequest{})
require.NoError(t, err)
go startAutomaticWorkflow(t, suite.client, "test-album-ext-id")
for {
_, err := subStream.Recv()
if err != nil {
break
}
}
time.Sleep(2 * time.Second)
bgCtx := context.Background()
var workflowCount int
err = suite.pool.QueryRow(bgCtx, "SELECT COUNT(*) FROM workflow_runs").Scan(&workflowCount)
require.NoError(t, err)
assert.GreaterOrEqual(t, workflowCount, 1, "workflow should have been created")
}
func TestSubscribeEvents_ReplayFromSeq(t *testing.T) {
suite := setupSuite(t)
cleanTables(t, suite.pool)
suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) {
return &metadataPb.GetAlbumResponse{
Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"),
}, nil
}
suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) {
return newSearchResponse(
newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"),
), nil
}
suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) {
return newTorrentData(), nil
}
suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) {
return []torrent.TorrentInfo{}, nil
}
suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error {
return nil
}
startAutomaticWorkflow(t, suite.client, "test-album-ext-id")
bgCtx := context.Background()
var firstSeq int64
err := suite.pool.QueryRow(bgCtx, "SELECT MIN(seq) FROM album_events").Scan(&firstSeq)
require.NoError(t, err)
require.Greater(t, firstSeq, int64(0), "expected events to be persisted")
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
subStream, err := suite.client.SubscribeEvents(ctx, &pb.SubscribeEventsRequest{
SinceSeq: firstSeq,
})
require.NoError(t, err)
events := collectSubscribeEvents(t, subStream, 2*time.Second)
require.NotEmpty(t, events, "expected replayed events")
for _, e := range events {
assert.Greater(t, e.Seq, firstSeq, "replayed events should have seq > since_seq")
}
}