diff --git a/test/component/mocks_test.go b/test/component/mocks_test.go index faad61e..a0e6d8a 100644 --- a/test/component/mocks_test.go +++ b/test/component/mocks_test.go @@ -86,6 +86,7 @@ type mockTorrentClient struct { FindFunc func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) AddTorrentFunc func(file torrent.TorrentFile, savePath string) error AddMagnetFunc func(magnetURI string, savePath string) error + DeleteTorrentFunc func(hash string) error DefaultSavePathFunc func() (string, error) } @@ -124,6 +125,13 @@ func (m *mockTorrentClient) AddMagnet(magnetURI string, savePath string) error { return fmt.Errorf("not mocked") } +func (m *mockTorrentClient) DeleteTorrent(hash string) error { + if m.DeleteTorrentFunc != nil { + return m.DeleteTorrentFunc(hash) + } + return nil +} + func (m *mockTorrentClient) DefaultSavePath() (string, error) { if m.DefaultSavePathFunc != nil { return m.DefaultSavePathFunc() diff --git a/test/component/monitor_album_stream_test.go b/test/component/monitor_album_stream_test.go index f749cd4..2a26f00 100644 --- a/test/component/monitor_album_stream_test.go +++ b/test/component/monitor_album_stream_test.go @@ -699,7 +699,7 @@ func TestMonitorAlbumStream_AutomaticQBitDown(t *testing.T) { var downloadCount int err := suite.pool.QueryRow(ctx, "SELECT COUNT(*) FROM downloads").Scan(&downloadCount) require.NoError(t, err) - assert.Equal(t, 0, downloadCount) + assert.Equal(t, 1, downloadCount, "download record should exist even when qBit fails (DB save happens before qBit)") } func TestMonitorAlbumStream_AutomaticTorrentExists(t *testing.T) { @@ -1525,3 +1525,562 @@ func TestMonitorAlbumStream_InvalidPromptId(t *testing.T) { } assert.True(t, hasError, "expected error for invalid prompt ID") } + +func TestMonitorAlbumStream_ManualCancelBeforeQBit(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + indexerCalled := make(chan struct{}) + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + close(indexerCalled) + time.Sleep(2 * time.Second) + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + var addMagnetCalled bool + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + addMagnetCalled = true + return nil + } + + var deleteTorrentCalled bool + suite.mocks.torrent.DeleteTorrentFunc = func(hash string) error { + deleteTorrentCalled = true + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + <-indexerCalled + + cancel() + + for { + _, err = stream.Recv() + if err != nil { + break + } + } + + assert.False(t, addMagnetCalled, "AddMagnet should not be called when cancelled before qbit") + assert.False(t, deleteTorrentCalled, "DeleteTorrent should not be called when no torrent was added") + + bgCtx := context.Background() + var downloadCount int + err = suite.pool.QueryRow(bgCtx, "SELECT COUNT(*) FROM downloads").Scan(&downloadCount) + require.NoError(t, err) + assert.Equal(t, 0, downloadCount) +} + +func TestMonitorAlbumStream_ManualCancelAfterQBit(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + torrentAddedCh := make(chan struct{}) + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + close(torrentAddedCh) + return nil + } + + var deletedHash string + suite.mocks.torrent.DeleteTorrentFunc = func(hash string) error { + deletedHash = hash + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + _, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + assert.Equal(t, pb.PromptType_PROMPT_TYPE_CONFIRM, prompt.Type) + + sendDecision(t, stream, prompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }) + + select { + case <-torrentAddedCh: + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for torrent to be added") + } + + cancel() + + for { + _, err = stream.Recv() + if err != nil { + break + } + } + + time.Sleep(100 * time.Millisecond) + + assert.NotEmpty(t, deletedHash, "DeleteTorrent should be called with the torrent hash") + assert.Equal(t, "6ff7af15d0745a3e29d1b9620191cfe01ad3cc70", deletedHash) + + bgCtx := context.Background() + var downloadState string + err = suite.pool.QueryRow(bgCtx, "SELECT state FROM downloads WHERE qbit_hash = $1", deletedHash).Scan(&downloadState) + if err == nil { + assert.Equal(t, "cancelled", downloadState) + } +} + +func TestMonitorAlbumStream_AutomaticFireAndForget(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + time.Sleep(1 * time.Second) + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }, + }, + }) + require.NoError(t, err) + + var statusCount int + for i := 0; i < 3; i++ { + msg, err := stream.Recv() + if err != nil { + break + } + if msg.GetStatus() != nil { + statusCount++ + } + } + assert.GreaterOrEqual(t, statusCount, 1, "should receive at least one status before disconnect") + + cancel() + + time.Sleep(3 * time.Second) + + bgCtx := context.Background() + var downloadCount int + err = suite.pool.QueryRow(bgCtx, "SELECT COUNT(*) FROM downloads").Scan(&downloadCount) + require.NoError(t, err) + assert.Equal(t, 1, downloadCount, "workflow should complete and create download despite client disconnect") +} + +func TestMonitorAlbumStream_AutomaticDuplicateSubscribes(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + time.Sleep(500 * time.Millisecond) + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel1() + + stream1, err := suite.client.MonitorAlbumStream(ctx1) + require.NoError(t, err) + + err = stream1.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }, + }, + }) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel2() + + stream2, err := suite.client.MonitorAlbumStream(ctx2) + require.NoError(t, err) + + err = stream2.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }, + }, + }) + require.NoError(t, err) + + var stream1Events, stream2Events int + + done1 := make(chan struct{}) + go func() { + defer close(done1) + for { + msg, err := stream1.Recv() + if err != nil { + return + } + if msg.GetStatus() != nil || msg.GetResult() != nil || msg.GetError() != nil { + stream1Events++ + } + if msg.GetResult() != nil || msg.GetError() != nil { + return + } + } + }() + + done2 := make(chan struct{}) + go func() { + defer close(done2) + for { + msg, err := stream2.Recv() + if err != nil { + return + } + if msg.GetStatus() != nil || msg.GetResult() != nil || msg.GetError() != nil { + stream2Events++ + } + if msg.GetResult() != nil || msg.GetError() != nil { + return + } + } + }() + + select { + case <-done1: + case <-time.After(10 * time.Second): + t.Fatal("stream1 timed out") + } + + select { + case <-done2: + case <-time.After(10 * time.Second): + t.Fatal("stream2 timed out") + } + + assert.Greater(t, stream1Events, 0, "stream1 should receive events") + assert.Greater(t, stream2Events, 0, "stream2 should receive events") + + bgCtx := context.Background() + var downloadCount int + err = suite.pool.QueryRow(bgCtx, "SELECT COUNT(*) FROM downloads").Scan(&downloadCount) + require.NoError(t, err) + assert.Equal(t, 1, downloadCount, "only one download should be created despite two subscribers") +} + +func TestMonitorAlbumStream_AutomaticReplayOnReconnect(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + stream1 := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages1 := collectAllMessages(t, stream1, 0) + + var hasResult1 bool + for _, msg := range messages1 { + if msg.GetResult() != nil { + hasResult1 = true + break + } + } + require.True(t, hasResult1, "first workflow should complete") + + time.Sleep(200 * time.Millisecond) + + stream2 := startMonitorStream(t, suite.client, &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }) + + messages2 := collectAllMessages(t, stream2, 0) + + var hasResult2 bool + for _, msg := range messages2 { + if msg.GetResult() != nil { + hasResult2 = true + break + } + } + assert.True(t, hasResult2, "new workflow should start and complete after registry cleanup") + + bgCtx := context.Background() + var eventCount int + err := suite.pool.QueryRow(bgCtx, "SELECT COUNT(*) FROM album_events").Scan(&eventCount) + require.NoError(t, err) + assert.Greater(t, eventCount, len(messages1), "should have events from both runs") +} + +func TestMonitorAlbumStream_ManualCancelAfterDownloadSaved(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + savingCh := make(chan struct{}) + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + close(savingCh) + time.Sleep(2 * time.Second) + return nil + } + + var deleteTorrentCalled bool + suite.mocks.torrent.DeleteTorrentFunc = func(hash string) error { + deleteTorrentCalled = true + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + _, prompt, err := collectUntilPrompt(t, stream, 0) + require.NoError(t, err) + require.NotNil(t, prompt) + + sendDecision(t, stream, prompt.PromptId, &pb.UserDecision{ + Decision: &pb.UserDecision_Confirm{Confirm: true}, + }) + + select { + case <-savingCh: + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for saving to start") + } + + cancel() + + for { + _, err = stream.Recv() + if err != nil { + break + } + } + + time.Sleep(3 * time.Second) + + assert.True(t, deleteTorrentCalled, "DeleteTorrent should be called during cleanup") + + bgCtx := context.Background() + var downloadState string + err = suite.pool.QueryRow(bgCtx, "SELECT state FROM downloads WHERE qbit_hash = $1", "6ff7af15d0745a3e29d1b9620191cfe01ad3cc70").Scan(&downloadState) + if err == nil { + assert.Equal(t, "cancelled", downloadState) + } +} + +func TestMonitorAlbumStream_ManualDisconnectCancels(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + time.Sleep(2 * time.Second) + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123"), + ), nil + } + + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := suite.client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: "test-album-ext-id", + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_MANUAL, + }, + }, + }) + require.NoError(t, err) + + var gotStatus bool + for i := 0; i < 3; i++ { + msg, err := stream.Recv() + if err != nil { + break + } + if msg.GetStatus() != nil { + gotStatus = true + break + } + } + assert.True(t, gotStatus, "should receive at least one status before disconnect") + + cancel() + + _, err = stream.Recv() + assert.Error(t, err, "stream.Recv should return error after disconnect") +} diff --git a/test/component/setup_test.go b/test/component/setup_test.go index 6ae6590..7c63c13 100644 --- a/test/component/setup_test.go +++ b/test/component/setup_test.go @@ -48,6 +48,10 @@ func setupSuite(t *testing.T) *testSuite { schemaSQL, err := os.ReadFile(schemaPath) require.NoError(t, err, "failed to read schema file") + migrationPath := getMigrationPath(t, "003_event_bus.sql") + migrationSQL, err := os.ReadFile(migrationPath) + require.NoError(t, err, "failed to read migration file") + pgContainer, err := postgres.Run(ctx, "postgres:16-alpine", postgres.WithDatabase("music_agregator_test"), @@ -81,6 +85,9 @@ func setupSuite(t *testing.T) *testSuite { _, err = pool.Exec(ctx, string(schemaSQL)) require.NoError(t, err, "failed to apply schema") + _, err = pool.Exec(ctx, string(migrationSQL)) + require.NoError(t, err, "failed to apply migration") + db := &database.DB{Pool: pool} mocks := &testMocks{ @@ -156,10 +163,26 @@ func getSchemaPath(t *testing.T) string { return schemaPath } +func getMigrationPath(t *testing.T, filename string) string { + _, currentFile, _, ok := runtime.Caller(0) + require.True(t, ok, "failed to get current file path") + + testDir := filepath.Dir(currentFile) + migrationPath := filepath.Join(testDir, "..", "..", "..", "containers", "database", "music-agregator", filename) + + if _, err := os.Stat(migrationPath); os.IsNotExist(err) { + migrationPath = filepath.Join(testDir, "..", "..", "containers", "database", "music-agregator", filename) + } + + return migrationPath +} + func cleanTables(t *testing.T, pool *pgxpool.Pool) { ctx := context.Background() tables := []string{ + "album_events", + "workflow_runs", "download_files", "downloads", "torrents", diff --git a/test/component/subscribe_events_test.go b/test/component/subscribe_events_test.go new file mode 100644 index 0000000..8258917 --- /dev/null +++ b/test/component/subscribe_events_test.go @@ -0,0 +1,362 @@ +package component + +import ( + "context" + "io" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + metadataPb "homelab.lan/music-agregator/gen/metadata/v1" + pb "homelab.lan/music-agregator/gen/music_agregator/v1" + "homelab.lan/music-agregator/internal/indexer" + "homelab.lan/music-agregator/internal/torrent" +) + +func startAutomaticWorkflow(t *testing.T, client pb.MusicAgregatorServiceClient, albumID string) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + stream, err := client.MonitorAlbumStream(ctx) + require.NoError(t, err) + + err = stream.Send(&pb.MonitorAlbumStreamRequest{ + Message: &pb.MonitorAlbumStreamRequest_Start{ + Start: &pb.StartMonitorRequest{ + AlbumId: albumID, + Quality: pb.QualityType_QUALITY_LOSSLESS, + Mode: pb.InteractionMode_INTERACTION_MODE_AUTOMATIC, + }, + }, + }) + require.NoError(t, err) + + for { + _, err := stream.Recv() + if err != nil { + break + } + } +} + +func collectSubscribeEvents( + t *testing.T, + stream grpc.ServerStreamingClient[pb.AlbumEvent], + timeout time.Duration, +) []*pb.AlbumEvent { + t.Helper() + + if timeout == 0 { + timeout = 5 * time.Second + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + var events []*pb.AlbumEvent + done := make(chan struct{}) + + go func() { + defer close(done) + for { + event, err := stream.Recv() + if err == io.EOF { + return + } + if err != nil { + return + } + events = append(events, event) + } + }() + + select { + case <-done: + case <-ctx.Done(): + } + + return events +} + +func TestSubscribeEvents_ReceivesWorkflowEvents(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + subStream, err := suite.client.SubscribeEvents(ctx, &pb.SubscribeEventsRequest{}) + require.NoError(t, err) + + var events []*pb.AlbumEvent + var mu sync.Mutex + done := make(chan struct{}) + + go func() { + defer close(done) + for { + event, err := subStream.Recv() + if err != nil { + return + } + mu.Lock() + events = append(events, event) + mu.Unlock() + } + }() + + time.Sleep(100 * time.Millisecond) + + startAutomaticWorkflow(t, suite.client, "test-album-ext-id") + + time.Sleep(500 * time.Millisecond) + cancel() + <-done + + mu.Lock() + defer mu.Unlock() + + require.NotEmpty(t, events, "expected to receive events from workflow") + + var hasStatusEvent bool + for _, e := range events { + if e.EventType == "status" { + hasStatusEvent = true + break + } + } + assert.True(t, hasStatusEvent, "expected at least one status event") + + var prevSeq int64 + for _, e := range events { + if e.Seq > 0 { + assert.Greater(t, e.Seq, prevSeq, "seq numbers should be monotonically increasing") + prevSeq = e.Seq + } + } +} + +func TestSubscribeEvents_MultipleWorkflows(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + albumID := in.GetId() + if albumID == "" { + albumID = "test-album" + } + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum(albumID, "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + subStream, err := suite.client.SubscribeEvents(ctx, &pb.SubscribeEventsRequest{}) + require.NoError(t, err) + + var events []*pb.AlbumEvent + var mu sync.Mutex + done := make(chan struct{}) + + go func() { + defer close(done) + for { + event, err := subStream.Recv() + if err != nil { + return + } + mu.Lock() + events = append(events, event) + mu.Unlock() + } + }() + + time.Sleep(100 * time.Millisecond) + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + startAutomaticWorkflow(t, suite.client, "album-1") + }() + + go func() { + defer wg.Done() + startAutomaticWorkflow(t, suite.client, "album-2") + }() + + wg.Wait() + time.Sleep(500 * time.Millisecond) + cancel() + <-done + + mu.Lock() + defer mu.Unlock() + + require.NotEmpty(t, events, "expected to receive events from workflows") + + albumIDs := make(map[string]bool) + for _, e := range events { + if e.AlbumId != "" { + albumIDs[e.AlbumId] = true + } + } + + assert.True(t, len(albumIDs) >= 1, "expected events from at least one workflow") +} + +func TestSubscribeEvents_ClientDisconnect(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + time.Sleep(300 * time.Millisecond) + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + subStream, err := suite.client.SubscribeEvents(ctx, &pb.SubscribeEventsRequest{}) + require.NoError(t, err) + + go startAutomaticWorkflow(t, suite.client, "test-album-ext-id") + + for { + _, err := subStream.Recv() + if err != nil { + break + } + } + + time.Sleep(2 * time.Second) + + bgCtx := context.Background() + var workflowCount int + err = suite.pool.QueryRow(bgCtx, "SELECT COUNT(*) FROM workflow_runs").Scan(&workflowCount) + require.NoError(t, err) + assert.GreaterOrEqual(t, workflowCount, 1, "workflow should have been created") +} + +func TestSubscribeEvents_ReplayFromSeq(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + + suite.mocks.metadata.GetAlbumFunc = func(ctx context.Context, in *metadataPb.GetAlbumRequest, opts ...grpc.CallOption) (*metadataPb.GetAlbumResponse, error) { + return &metadataPb.GetAlbumResponse{ + Album: newMetadataAlbum("test-album-ext-id", "Test Album", "artist-ext-id", "Test Artist"), + }, nil + } + + suite.mocks.indexer.SearchFunc = func(query string, limit int32, idx string) (*indexer.SearchResponse, error) { + return newSearchResponse( + newSearchItem("Test Artist - Test Album [FLAC]", 50, "magnet:?xt=urn:btih:abc123&dn=test"), + ), nil + } + + suite.mocks.magnet.ResolveFunc = func(magnetURI string) ([]byte, error) { + return newTorrentData(), nil + } + + suite.mocks.torrent.FindFunc = func(opts torrent.FindOptions) ([]torrent.TorrentInfo, error) { + return []torrent.TorrentInfo{}, nil + } + + suite.mocks.torrent.AddMagnetFunc = func(magnetURI string, savePath string) error { + return nil + } + + startAutomaticWorkflow(t, suite.client, "test-album-ext-id") + + bgCtx := context.Background() + var firstSeq int64 + err := suite.pool.QueryRow(bgCtx, "SELECT MIN(seq) FROM album_events").Scan(&firstSeq) + require.NoError(t, err) + require.Greater(t, firstSeq, int64(0), "expected events to be persisted") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + subStream, err := suite.client.SubscribeEvents(ctx, &pb.SubscribeEventsRequest{ + SinceSeq: firstSeq, + }) + require.NoError(t, err) + + events := collectSubscribeEvents(t, subStream, 2*time.Second) + + require.NotEmpty(t, events, "expected replayed events") + + for _, e := range events { + assert.Greater(t, e.Seq, firstSeq, "replayed events should have seq > since_seq") + } +} diff --git a/test/component/workflow_event_test.go b/test/component/workflow_event_test.go new file mode 100644 index 0000000..1abe574 --- /dev/null +++ b/test/component/workflow_event_test.go @@ -0,0 +1,422 @@ +package component + +import ( + "context" + "errors" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "homelab.lan/music-agregator/internal" + "homelab.lan/music-agregator/internal/database" +) + +func insertTestArtistAndAlbum(t *testing.T, suite *testSuite) string { + ctx := context.Background() + + var artistID string + err := suite.pool.QueryRow(ctx, ` + INSERT INTO artists (external_id, name, artist_type, country, genres, image_url, monitor_state) + VALUES ('test-artist-ext', 'Test Artist', 'person', 'US', ARRAY['Rock'], 'http://img.com', 'monitored') + RETURNING id + `).Scan(&artistID) + require.NoError(t, err) + + var albumID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO albums (external_id, artist_id, title, album_type, total_tracks, total_discs, label, genres, cover_url, monitor_state) + VALUES ('test-album-ext', $1, 'Test Album', 'album', 10, 1, 'Test Label', ARRAY['Rock'], 'http://cover.com', 'monitored') + RETURNING id + `, artistID).Scan(&albumID) + require.NoError(t, err) + + return albumID +} + +func TestWorkflowRun_Create(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + repo := database.NewWorkflowRunRepository(suite.pool) + + run := &database.WorkflowRun{ + AlbumID: albumID, + Quality: "flac", + } + err := repo.Create(ctx, run) + require.NoError(t, err) + + assert.NotEmpty(t, run.ID) + assert.False(t, run.StartedAt.IsZero()) + + fetched, err := repo.GetByID(ctx, run.ID) + require.NoError(t, err) + assert.Equal(t, "running", fetched.Status) +} + +func TestWorkflowRun_DuplicateRunningRejected(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + repo := database.NewWorkflowRunRepository(suite.pool) + + run1 := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := repo.Create(ctx, run1) + require.NoError(t, err) + + run2 := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err = repo.Create(ctx, run2) + assert.ErrorIs(t, err, database.ErrWorkflowAlreadyRunning) +} + +func TestWorkflowRun_DuplicateAllowedAfterCompletion(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + repo := database.NewWorkflowRunRepository(suite.pool) + + run1 := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := repo.Create(ctx, run1) + require.NoError(t, err) + + err = repo.SetCompleted(ctx, run1.ID) + require.NoError(t, err) + + run2 := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err = repo.Create(ctx, run2) + require.NoError(t, err) + assert.NotEmpty(t, run2.ID) + assert.NotEqual(t, run1.ID, run2.ID) +} + +func TestWorkflowRun_SetCompleted(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + repo := database.NewWorkflowRunRepository(suite.pool) + + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := repo.Create(ctx, run) + require.NoError(t, err) + + err = repo.SetCompleted(ctx, run.ID) + require.NoError(t, err) + + updated, err := repo.GetByID(ctx, run.ID) + require.NoError(t, err) + assert.Equal(t, "completed", updated.Status) + assert.NotNil(t, updated.CompletedAt) +} + +func TestWorkflowRun_SetFailed(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + repo := database.NewWorkflowRunRepository(suite.pool) + + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := repo.Create(ctx, run) + require.NoError(t, err) + + err = repo.SetFailed(ctx, run.ID, "something went wrong") + require.NoError(t, err) + + updated, err := repo.GetByID(ctx, run.ID) + require.NoError(t, err) + assert.Equal(t, "failed", updated.Status) + require.NotNil(t, updated.ErrorMessage) + assert.Equal(t, "something went wrong", *updated.ErrorMessage) +} + +func TestWorkflowRun_SetCancelled(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + repo := database.NewWorkflowRunRepository(suite.pool) + + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := repo.Create(ctx, run) + require.NoError(t, err) + + err = repo.SetCancelled(ctx, run.ID) + require.NoError(t, err) + + updated, err := repo.GetByID(ctx, run.ID) + require.NoError(t, err) + assert.Equal(t, "cancelled", updated.Status) +} + +func TestWorkflowRun_GetByAlbumAndQuality(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + repo := database.NewWorkflowRunRepository(suite.pool) + + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := repo.Create(ctx, run) + require.NoError(t, err) + + found, err := repo.GetByAlbumAndQuality(ctx, albumID, "flac") + require.NoError(t, err) + assert.Equal(t, run.ID, found.ID) + + _, err = repo.GetByAlbumAndQuality(ctx, albumID, "mp3") + assert.True(t, errors.Is(err, pgx.ErrNoRows) || err != nil) +} + +func TestWorkflowRun_GetRunning(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + repo := database.NewWorkflowRunRepository(suite.pool) + + run1 := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := repo.Create(ctx, run1) + require.NoError(t, err) + + run2 := &database.WorkflowRun{AlbumID: albumID, Quality: "mp3"} + err = repo.Create(ctx, run2) + require.NoError(t, err) + + run3 := &database.WorkflowRun{AlbumID: albumID, Quality: "opus"} + err = repo.Create(ctx, run3) + require.NoError(t, err) + err = repo.SetCompleted(ctx, run3.ID) + require.NoError(t, err) + + running, err := repo.GetRunning(ctx) + require.NoError(t, err) + assert.Len(t, running, 2) + + ids := []string{running[0].ID, running[1].ID} + assert.Contains(t, ids, run1.ID) + assert.Contains(t, ids, run2.ID) +} + +func TestAlbumEvent_Create(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + workflowRepo := database.NewWorkflowRunRepository(suite.pool) + eventRepo := database.NewAlbumEventRepository(suite.pool) + + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := workflowRepo.Create(ctx, run) + require.NoError(t, err) + + event := &database.AlbumEvent{ + WorkflowRunID: run.ID, + AlbumID: albumID, + EventType: "info", + Step: "searching", + Message: "started search", + DataJSON: []byte(`{"query":"test"}`), + } + err = eventRepo.Create(ctx, event) + require.NoError(t, err) + + assert.NotEmpty(t, event.ID) + assert.Greater(t, event.Seq, int64(0)) +} + +func TestAlbumEvent_GetByWorkflowRun(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + workflowRepo := database.NewWorkflowRunRepository(suite.pool) + eventRepo := database.NewAlbumEventRepository(suite.pool) + + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := workflowRepo.Create(ctx, run) + require.NoError(t, err) + + for i := 0; i < 3; i++ { + event := &database.AlbumEvent{ + WorkflowRunID: run.ID, + AlbumID: albumID, + EventType: "info", + Step: "step", + Message: "msg", + } + err = eventRepo.Create(ctx, event) + require.NoError(t, err) + } + + events, err := eventRepo.GetByWorkflowRun(ctx, run.ID) + require.NoError(t, err) + assert.Len(t, events, 3) + assert.Less(t, events[0].Seq, events[1].Seq) + assert.Less(t, events[1].Seq, events[2].Seq) +} + +func TestAlbumEvent_GetByAlbum(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + workflowRepo := database.NewWorkflowRunRepository(suite.pool) + eventRepo := database.NewAlbumEventRepository(suite.pool) + + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := workflowRepo.Create(ctx, run) + require.NoError(t, err) + + var seqs []int64 + for i := 0; i < 5; i++ { + event := &database.AlbumEvent{ + WorkflowRunID: run.ID, + AlbumID: albumID, + EventType: "info", + Step: "step", + Message: "msg", + } + err = eventRepo.Create(ctx, event) + require.NoError(t, err) + seqs = append(seqs, event.Seq) + } + + events, err := eventRepo.GetByAlbum(ctx, albumID, seqs[1], 10) + require.NoError(t, err) + assert.Len(t, events, 3) + assert.Equal(t, seqs[2], events[0].Seq) + assert.Equal(t, seqs[3], events[1].Seq) + assert.Equal(t, seqs[4], events[2].Seq) +} + +func TestAlbumEvent_GetLatestSeq(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + eventRepo := database.NewAlbumEventRepository(suite.pool) + + seq, err := eventRepo.GetLatestSeq(ctx) + require.NoError(t, err) + assert.Equal(t, int64(0), seq) + + albumID := insertTestArtistAndAlbum(t, suite) + workflowRepo := database.NewWorkflowRunRepository(suite.pool) + + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err = workflowRepo.Create(ctx, run) + require.NoError(t, err) + + var lastSeq int64 + for i := 0; i < 3; i++ { + event := &database.AlbumEvent{ + WorkflowRunID: run.ID, + AlbumID: albumID, + EventType: "info", + Step: "step", + Message: "msg", + } + err = eventRepo.Create(ctx, event) + require.NoError(t, err) + lastSeq = event.Seq + } + + seq, err = eventRepo.GetLatestSeq(ctx) + require.NoError(t, err) + assert.Equal(t, lastSeq, seq) +} + +func TestRecovery_StaleWorkflowWithDownload(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + + workflowRepo := database.NewWorkflowRunRepository(suite.pool) + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := workflowRepo.Create(ctx, run) + require.NoError(t, err) + + _, err = suite.pool.Exec(ctx, ` + UPDATE workflow_runs SET started_at = NOW() - INTERVAL '10 minutes' WHERE id = $1 + `, run.ID) + require.NoError(t, err) + + var torrentID string + err = suite.pool.QueryRow(ctx, ` + INSERT INTO torrents (album_id, info_hash, tracker, title, format, quality, source, seeders, peers, size) + VALUES ($1, 'test-hash', 'test-tracker', 'Test Torrent', 'FLAC', '16-44', 'CD', 100, 50, 500000000) + RETURNING id + `, albumID).Scan(&torrentID) + require.NoError(t, err) + + _, err = suite.pool.Exec(ctx, ` + INSERT INTO downloads (torrent_id, album_id, format, quality, state, qbit_hash) + VALUES ($1, $2, 'FLAC', '16-44', 'downloading', 'test-hash') + `, torrentID, albumID) + require.NoError(t, err) + + service := createRecoveryTestService(suite) + service.RecoverWorkflows(ctx) + + updated, err := workflowRepo.GetByID(ctx, run.ID) + require.NoError(t, err) + assert.Equal(t, "completed", updated.Status) +} + +func TestRecovery_StaleWorkflowWithoutDownload(t *testing.T) { + suite := setupSuite(t) + cleanTables(t, suite.pool) + ctx := context.Background() + + albumID := insertTestArtistAndAlbum(t, suite) + + workflowRepo := database.NewWorkflowRunRepository(suite.pool) + run := &database.WorkflowRun{AlbumID: albumID, Quality: "flac"} + err := workflowRepo.Create(ctx, run) + require.NoError(t, err) + + _, err = suite.pool.Exec(ctx, ` + UPDATE workflow_runs SET started_at = NOW() - INTERVAL '10 minutes' WHERE id = $1 + `, run.ID) + require.NoError(t, err) + + service := createRecoveryTestService(suite) + service.RecoverWorkflows(ctx) + + updated, err := workflowRepo.GetByID(ctx, run.ID) + require.NoError(t, err) + assert.Equal(t, "failed", updated.Status) + require.NotNil(t, updated.ErrorMessage) + assert.Contains(t, *updated.ErrorMessage, "server restarted") +} + +func createRecoveryTestService(suite *testSuite) *internal.MusicAgregatorService { + return internal.NewMusicAgregatorServiceWithDeps( + nil, + nil, + nil, + nil, + nil, + nil, + suite.db, + ) +}