Update flow diagrams for event bus architecture, cancel cleanup, and SubscribeEvents

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
Alexander
2026-05-11 15:54:32 +02:00
parent 93821ab214
commit 69752bd6a2
5 changed files with 231 additions and 103 deletions
@@ -0,0 +1,86 @@
@startuml Event Bus Architecture
skinparam componentAlign center
title Event Bus: In-Process Pub/Sub Architecture
package "Publishers" {
[Workflow Goroutine 1\n(album A, LOSSLESS)] as WF1
[Workflow Goroutine 2\n(album B, LOSSY)] as WF2
}
database "PostgreSQL" as DB {
[workflow_runs] as WR
[album_events] as AE
}
package "Event Bus (in-memory)" {
[Topic: albumA:LOSSLESS] as T1
[Topic: albumB:LOSSY] as T2
[Global Subscribers] as GS
}
package "Subscribers" {
[MonitorAlbumStream\nClient A (album A)] as S1
[MonitorAlbumStream\nClient B (album A)] as S2
[SubscribeEvents\nClient C (global)] as S3
}
WF1 --> DB : 1. Write event\n(synchronous)
WF1 --> T1 : 2. Publish\n(async notification)
WF2 --> DB : 1. Write event
WF2 --> T2 : 2. Publish
T1 --> S1 : Ring buffer\n(per subscriber)
T1 --> S2 : Ring buffer
T1 --> GS
T2 --> GS
GS --> S3 : Ring buffer
note right of DB
**Source of truth.**
Events survive restarts.
Replay via seq numbers.
end note
note right of T1
**Ephemeral notification.**
Ring buffer per subscriber.
Slow subscribers: overwrite oldest.
No backpressure on publishers.
end note
note bottom of S1
Client disconnect removes
subscriber from topic.
Workflow continues.
end note
== Subscription Lifecycle ==
note as N1
**Subscribe flow:**
1. Client calls MonitorAlbumStream or SubscribeEvents
2. Server subscribes to EventBus (per-topic or global)
3. Server queries DB for historical events (replay)
4. Server bridges: EventBus → gRPC stream
5. On disconnect: cleanup func unsubscribes
**Topic cleanup:**
When last subscriber leaves AND workflow completed:
topic removed from EventBus map.
end note
== Recovery on Restart ==
note as N2
**Server restart recovery:**
1. Query workflow_runs WHERE status = 'running'
2. For each stale run:
- If active download exists → mark completed
- Otherwise → mark failed ("server restarted")
3. RecoverOrphanedDownloads reschedules poll jobs
4. New workflows start fresh (no goroutine resurrection)
end note
@enduml
@@ -5,9 +5,16 @@ title MonitorAlbumStream: Already Owned Handling
actor Client actor Client
participant "monitorWorkflow" as Workflow participant "monitorWorkflow" as Workflow
participant "EventBus" as Bus
database "PostgreSQL" as DB database "PostgreSQL" as DB
participant "IndexerService" as Indexer participant "IndexerService" as Indexer
note over Client, Indexer #lightblue
All events are persisted to album_events table (DB first)
then published to EventBus for live subscribers.
In automatic mode, workflow runs as background goroutine.
end note
== Scenario A: Automatic Mode - Early Return == == Scenario A: Automatic Mode - Early Return ==
Client -> Workflow: StartMonitorRequest(mode=AUTOMATIC) Client -> Workflow: StartMonitorRequest(mode=AUTOMATIC)
@@ -1,113 +1,99 @@
@startuml MonitorAlbumStream - Automatic Mode Happy Path @startuml MonitorAlbumStream - Automatic Mode Happy Path
skinparam sequenceMessageAlign center skinparam sequenceMessageAlign center
skinparam responseMessageBelowArrow true skinparam responseMessageBelowArrow true
title MonitorAlbumStream: Automatic Mode (Status Updates Only) title MonitorAlbumStream: Automatic Mode (Fire-and-Forget)
actor Client actor Client
participant "gRPC Server" as Server participant "gRPC Server" as Server
participant "monitorWorkflow" as Workflow participant "WorkflowRegistry" as Registry
participant "MusicAgregatorService" as Service participant "EventBus" as Bus
participant "MetadataService" as Metadata participant "monitorWorkflow\n(background goroutine)" as Workflow
database "metadata-agregator\n(gRPC)" as MetaGRPC
database "PostgreSQL" as DB database "PostgreSQL" as DB
participant "IndexerService\n(Jackett)" as Indexer participant "MusicAgregatorService" as Service
participant "IndexerService" as Indexer
participant "MagnetResolver" as Magnet participant "MagnetResolver" as Magnet
participant "TorrentClient\n(qBittorrent)" as QBit participant "TorrentClient\n(qBittorrent)" as QBit
participant "River Queue" as River
== 1. Initialize Bidirectional Stream == == 1. Initialize Stream ==
Client -> Server: MonitorAlbumStream() Client -> Server: MonitorAlbumStream()
note right: Opens bidirectional\ngRPC stream
Server -> Client: stream established Server -> Client: stream established
Client -> Server: StartMonitorRequest Client -> Server: StartMonitorRequest
note right: album_id, quality, tracker\nmode = AUTOMATIC note right: album_id, quality\nmode = AUTOMATIC
Server -> Workflow: newMonitorWorkflow(stream, req, service) == 2. Start or Subscribe to Workflow ==
note right: Creates workflow with\ndecisions channel and\nreceiver goroutine
== 2. Fetch Album Metadata == Server -> Registry: GetOrCreate(albumID, quality)
Workflow ->> Client: StatusUpdate(FETCHING_METADATA) alt New workflow
note right #lightblue: "Fetching metadata..." Registry --> Server: (entry, created=true)
Workflow -> Service: getAlbumWithPersist(ctx, album_id) Server -> Bus: Subscribe(topic)
Service -> Metadata: GetAlbum(album_id) note right #lightblue: Subscribe BEFORE\nstarting goroutine\n(no missed events)
Metadata -> MetaGRPC: GetAlbum(id)
MetaGRPC --> Metadata: Album
Metadata -> DB: artists.Create / albums.Create
Metadata --> Service: Album
Workflow ->> Client: StatusUpdate(FETCHING_METADATA) Server -> Workflow: go workflow.run(entry.Ctx)
note right #lightblue: "Got: Artist - Title"\nData: StreamAlbumInfo note right #lightyellow: Background goroutine.\nDecoupled from stream context.\nClient can disconnect freely.
else Existing workflow
Registry --> Server: (entry, created=false)
== 3. Check If Already Owned == Server -> Bus: Subscribe(topic)
Workflow ->> Client: StatusUpdate(CHECKING_OWNED) Server -> DB: Replay album_events\n(since last seq)
note right #lightblue: "Checking ownership..." DB --> Server: historical events
Server ->> Client: replayed events
Workflow -> DB: downloads.HasAlbumInQuality()
DB --> Workflow: false
== 4. Search Indexers ==
Workflow ->> Client: StatusUpdate(SEARCHING_INDEXER)
note right #lightblue: "Searching indexers..."
Workflow -> Indexer: Search(query, tracker)
Indexer --> Workflow: SearchResponse (N items)
== 5. Parse Releases ==
loop for each search result
Workflow -> Magnet: Resolve(magnet_uri)
Magnet --> Workflow: torrent metadata
Workflow -> Workflow: ParseTorrent()
end end
Workflow ->> Client: StatusUpdate(PARSING_RESULTS) == 3. Event Bridge (concurrent) ==
note right #lightblue: "Parsed M from N torrents"\nData: TorrentList
== 6. Filter by Quality == note over Client, Workflow #lightblue
**Left side**: Event bus → gRPC stream bridge
**Right side**: Workflow executing in background
Both run concurrently. Client disconnect only stops the bridge.
end note
Workflow ->> Client: StatusUpdate(FILTERING_QUALITY) |||
note right #lightblue: "Filtering by quality..."
Workflow -> Workflow: filterByQuality(parsed, quality) Workflow -> DB: album_events.Create(FETCHING_METADATA)
Workflow -> Bus: Publish(FETCHING_METADATA)
Bus ->> Server: event notification
Server ->> Client: StatusUpdate(FETCHING_METADATA)
== 7. Select Best Release == Workflow -> Service: getAlbumWithPersist()
Workflow -> DB: workflow_runs.Create(albumID, quality)
Workflow -> Workflow: selectBestRelease(filtered) Workflow -> DB: album_events.Create(CHECKING_OWNED)
note right: Highest seeder count wins\n(automatic selection) Workflow -> Bus: Publish(CHECKING_OWNED)
Bus ->> Server: event notification
Server ->> Client: StatusUpdate(CHECKING_OWNED)
Workflow ->> Client: StatusUpdate(SELECTING_RELEASE) Workflow -> Indexer: Search()
note right #lightblue: "Selected: Title (N seeders)"\nData: ReleaseInfo Workflow -> DB: album_events.Create(PARSING_RESULTS)
Workflow -> Bus: Publish(PARSING_RESULTS)
Bus ->> Server: event notification
Server ->> Client: StatusUpdate(PARSING_RESULTS)
== 8. Add to Torrent Client == Workflow -> Workflow: filterByQuality + selectBest
Workflow ->> Client: StatusUpdate(ADDING_TORRENT) Workflow -> DB: saveTorrentAndDownload()
note right #lightblue: "Adding torrent..." note right: DB save BEFORE qBit add\n(prevents orphan torrents)
Workflow -> QBit: AddMagnet()
Workflow -> QBit: AddMagnet(magnet_uri) Workflow -> DB: album_events.Create(COMPLETE)
QBit --> Workflow: OK Workflow -> Bus: Publish(COMPLETE)
Bus ->> Server: event notification
Server ->> Client: MonitorAlbumResponse
note right #lightgreen: Final result
== 9. Persist & Schedule == Workflow -> DB: workflow_runs.SetCompleted()
Workflow -> Registry: Remove(albumID, quality)
Workflow ->> Client: StatusUpdate(SAVING) == 4. Client Disconnect (Fire-and-Forget) ==
note right #lightblue: "Saving to database..."
Workflow -> DB: torrents.Create / downloads.Create note over Client, Workflow #lightyellow
Workflow -> River: Insert(PollDownloadArgs) Client can disconnect at ANY point during the workflow.
The workflow goroutine continues independently.
== 10. Return Result == Another client can subscribe to the same workflow later.
end note
Workflow ->> Client: StatusUpdate(COMPLETE)
note right #lightblue: "Download started"
Workflow ->> Client: MonitorAlbumResponse
note right #lightgreen: Final result with:\nalbum, artist, release, download
Server -> Server: Close stream
@enduml @enduml
@@ -113,6 +113,21 @@ note right #lightblue: "Download started"
Workflow ->> Client: MonitorAlbumResponse Workflow ->> Client: MonitorAlbumResponse
note right #lightgreen: Final result note right #lightgreen: Final result
== Cancel Cleanup (Disconnect or Cancel Message) ==
note over Client, QBit #salmon
**Manual Mode: Disconnect = Cancel**
When client disconnects or sends CancelRequest:
1. Workflow context is cancelled (stops further processing)
2. If torrent was added to qBit: **DeleteTorrent(hash)** removes it + files
3. If download record exists: marked as **cancelled** in DB
4. workflow_run marked as **cancelled** in DB
5. All events persisted to album_events for audit trail
Cleanup uses a fresh context (not the cancelled one).
end note
== Decision Points Summary == == Decision Points Summary ==
note over Client, QBit #lightyellow note over Client, QBit #lightyellow
@@ -1,11 +1,11 @@
@startuml MonitorAlbumStream Protocol @startuml MonitorAlbumStream Protocol
skinparam sequenceMessageAlign center skinparam sequenceMessageAlign center
title MonitorAlbumStream: Message Protocol title MonitorAlbumStream & SubscribeEvents: Message Protocol
participant "Client" as C participant "Client" as C
participant "Server" as S participant "Server" as S
== Stream Initialization == == MonitorAlbumStream (Bidirectional) ==
C -> S: gRPC MonitorAlbumStream() C -> S: gRPC MonitorAlbumStream()
note right: Opens bidirectional stream note right: Opens bidirectional stream
@@ -50,7 +50,44 @@ note left #lightgreen
- recoverable: bool - recoverable: bool
end note end note
== Monitor Steps (Status Updates) == == SubscribeEvents (Server-Side Stream) ==
C -> S: SubscribeEvents(SubscribeEventsRequest)
note right #lightyellow
**SubscribeEventsRequest:**
- since_seq: int64 (0 = live only, >0 = replay from seq)
**Response stream: AlbumEvent**
- seq: int64 (monotonic sequence number)
- workflow_run_id, album_id, quality
- event_type: status | error | result
- step: MonitorStep name
- message: human-readable text
- data_json: bytes (optional structured data)
- timestamp_ms: int64
**Global firehose**: receives events from ALL running workflows.
Client-side filtering by album_id if needed.
end note
== Event Flow Architecture ==
note over C, S #lightblue
**Event Path (DB first, bus second):**
1. Workflow step executes
2. Event written to **album_events** table (synchronous, durable)
3. Event published to **EventBus** (async, ephemeral notification)
4. Bus fans out to subscribers:
- MonitorAlbumStream clients (per-topic)
- SubscribeEvents clients (global)
5. Subscribers convert event to proto and stream.Send()
**DB is source of truth. Bus is notification layer.**
Events are never lost, even if no subscribers are connected.
end note
== Monitor Steps ==
note over C, S #lightyellow note over C, S #lightyellow
**MonitorStep Enum:** **MonitorStep Enum:**
@@ -65,34 +102,34 @@ note over C, S #lightyellow
9. COMPLETE - Workflow finished 9. COMPLETE - Workflow finished
end note end note
== Prompt Types (Manual Mode) == == Mode Comparison ==
note over C, S #orange note over C, S #lightyellow
**CONFIRM** (yes/no decision) **AUTOMATIC mode:**
- confirm_label, cancel_label - Workflow runs as background goroutine
- default_value: bool - Client can disconnect, workflow continues (fire-and-forget)
- Duplicate request for same album+quality subscribes to existing
- Events delivered via EventBus bridge
**SELECT_ONE** (pick one option) **MANUAL mode:**
- options: [{id, label, description}] - Workflow runs inline in stream handler
- default_id: pre-selected option - Interactive prompts at 4 decision points
- Disconnect = cancel = full cleanup (qBit delete + DB cancel)
**SELECT_MANY** (pick multiple options) - Events delivered directly via stream + persisted to DB
- options: [{id, label, description}]
- default_ids: pre-selected options
- min_selections, max_selections
end note end note
== Data Payloads == == Persistence ==
note over C, S #lightblue note over C, S #lightgreen
**StreamAlbumInfo** (at FETCHING_METADATA) **workflow_runs table:**
- artist, title, release_date, already_owned, owned_quality - Tracks workflow lifecycle: running → completed | failed | cancelled
- Unique constraint: one running workflow per album+quality
- Used for deduplication, recovery, and audit
**TorrentList** (at PARSING_RESULTS) **album_events table:**
- torrents: [{id, title, tracker, seeders, format, lossless}] - Audit log of all workflow events
- seq BIGSERIAL for ordering and replay
**ReleaseInfo** (at SELECTING_RELEASE) - Supports subscribe-before-query replay pattern
- info_hash, format, bit_depth, sample_rate, seeders, tracker
end note end note
== Timeout Behavior == == Timeout Behavior ==
@@ -102,9 +139,6 @@ When prompt times out (max: 300s):
- Server uses **default decision** value - Server uses **default decision** value
- Workflow continues automatically - Workflow continues automatically
- No error is raised - No error is raised
Timeout is capped server-side at 300s.
If timeout_seconds on prompt is 0 or exceeds max, 300s is used.
end note end note
@enduml @enduml