diff --git a/docs/architecture/flows/event-bus-architecture.puml b/docs/architecture/flows/event-bus-architecture.puml new file mode 100644 index 0000000..afaf1c3 --- /dev/null +++ b/docs/architecture/flows/event-bus-architecture.puml @@ -0,0 +1,86 @@ +@startuml Event Bus Architecture +skinparam componentAlign center +title Event Bus: In-Process Pub/Sub Architecture + +package "Publishers" { + [Workflow Goroutine 1\n(album A, LOSSLESS)] as WF1 + [Workflow Goroutine 2\n(album B, LOSSY)] as WF2 +} + +database "PostgreSQL" as DB { + [workflow_runs] as WR + [album_events] as AE +} + +package "Event Bus (in-memory)" { + [Topic: albumA:LOSSLESS] as T1 + [Topic: albumB:LOSSY] as T2 + [Global Subscribers] as GS +} + +package "Subscribers" { + [MonitorAlbumStream\nClient A (album A)] as S1 + [MonitorAlbumStream\nClient B (album A)] as S2 + [SubscribeEvents\nClient C (global)] as S3 +} + +WF1 --> DB : 1. Write event\n(synchronous) +WF1 --> T1 : 2. Publish\n(async notification) + +WF2 --> DB : 1. Write event +WF2 --> T2 : 2. Publish + +T1 --> S1 : Ring buffer\n(per subscriber) +T1 --> S2 : Ring buffer +T1 --> GS +T2 --> GS + +GS --> S3 : Ring buffer + +note right of DB + **Source of truth.** + Events survive restarts. + Replay via seq numbers. +end note + +note right of T1 + **Ephemeral notification.** + Ring buffer per subscriber. + Slow subscribers: overwrite oldest. + No backpressure on publishers. +end note + +note bottom of S1 + Client disconnect removes + subscriber from topic. + Workflow continues. +end note + +== Subscription Lifecycle == + +note as N1 +**Subscribe flow:** +1. Client calls MonitorAlbumStream or SubscribeEvents +2. Server subscribes to EventBus (per-topic or global) +3. Server queries DB for historical events (replay) +4. Server bridges: EventBus → gRPC stream +5. On disconnect: cleanup func unsubscribes + +**Topic cleanup:** +When last subscriber leaves AND workflow completed: +topic removed from EventBus map. +end note + +== Recovery on Restart == + +note as N2 +**Server restart recovery:** +1. Query workflow_runs WHERE status = 'running' +2. For each stale run: + - If active download exists → mark completed + - Otherwise → mark failed ("server restarted") +3. RecoverOrphanedDownloads reschedules poll jobs +4. New workflows start fresh (no goroutine resurrection) +end note + +@enduml diff --git a/docs/architecture/flows/monitor-album-stream-already-owned.puml b/docs/architecture/flows/monitor-album-stream-already-owned.puml index 34c3bbf..4d4400a 100644 --- a/docs/architecture/flows/monitor-album-stream-already-owned.puml +++ b/docs/architecture/flows/monitor-album-stream-already-owned.puml @@ -5,9 +5,16 @@ title MonitorAlbumStream: Already Owned Handling actor Client participant "monitorWorkflow" as Workflow +participant "EventBus" as Bus database "PostgreSQL" as DB participant "IndexerService" as Indexer +note over Client, Indexer #lightblue +All events are persisted to album_events table (DB first) +then published to EventBus for live subscribers. +In automatic mode, workflow runs as background goroutine. +end note + == Scenario A: Automatic Mode - Early Return == Client -> Workflow: StartMonitorRequest(mode=AUTOMATIC) diff --git a/docs/architecture/flows/monitor-album-stream-automatic.puml b/docs/architecture/flows/monitor-album-stream-automatic.puml index b029b28..f834957 100644 --- a/docs/architecture/flows/monitor-album-stream-automatic.puml +++ b/docs/architecture/flows/monitor-album-stream-automatic.puml @@ -1,113 +1,99 @@ @startuml MonitorAlbumStream - Automatic Mode Happy Path skinparam sequenceMessageAlign center skinparam responseMessageBelowArrow true -title MonitorAlbumStream: Automatic Mode (Status Updates Only) +title MonitorAlbumStream: Automatic Mode (Fire-and-Forget) actor Client participant "gRPC Server" as Server -participant "monitorWorkflow" as Workflow -participant "MusicAgregatorService" as Service -participant "MetadataService" as Metadata -database "metadata-agregator\n(gRPC)" as MetaGRPC +participant "WorkflowRegistry" as Registry +participant "EventBus" as Bus +participant "monitorWorkflow\n(background goroutine)" as Workflow database "PostgreSQL" as DB -participant "IndexerService\n(Jackett)" as Indexer +participant "MusicAgregatorService" as Service +participant "IndexerService" as Indexer participant "MagnetResolver" as Magnet participant "TorrentClient\n(qBittorrent)" as QBit -participant "River Queue" as River -== 1. Initialize Bidirectional Stream == +== 1. Initialize Stream == Client -> Server: MonitorAlbumStream() -note right: Opens bidirectional\ngRPC stream Server -> Client: stream established Client -> Server: StartMonitorRequest -note right: album_id, quality, tracker\nmode = AUTOMATIC +note right: album_id, quality\nmode = AUTOMATIC -Server -> Workflow: newMonitorWorkflow(stream, req, service) -note right: Creates workflow with\ndecisions channel and\nreceiver goroutine +== 2. Start or Subscribe to Workflow == -== 2. Fetch Album Metadata == +Server -> Registry: GetOrCreate(albumID, quality) -Workflow ->> Client: StatusUpdate(FETCHING_METADATA) -note right #lightblue: "Fetching metadata..." +alt New workflow + Registry --> Server: (entry, created=true) -Workflow -> Service: getAlbumWithPersist(ctx, album_id) -Service -> Metadata: GetAlbum(album_id) -Metadata -> MetaGRPC: GetAlbum(id) -MetaGRPC --> Metadata: Album -Metadata -> DB: artists.Create / albums.Create -Metadata --> Service: Album + Server -> Bus: Subscribe(topic) + note right #lightblue: Subscribe BEFORE\nstarting goroutine\n(no missed events) -Workflow ->> Client: StatusUpdate(FETCHING_METADATA) -note right #lightblue: "Got: Artist - Title"\nData: StreamAlbumInfo + Server -> Workflow: go workflow.run(entry.Ctx) + note right #lightyellow: Background goroutine.\nDecoupled from stream context.\nClient can disconnect freely. +else Existing workflow + Registry --> Server: (entry, created=false) -== 3. Check If Already Owned == + Server -> Bus: Subscribe(topic) -Workflow ->> Client: StatusUpdate(CHECKING_OWNED) -note right #lightblue: "Checking ownership..." - -Workflow -> DB: downloads.HasAlbumInQuality() -DB --> Workflow: false - -== 4. Search Indexers == - -Workflow ->> Client: StatusUpdate(SEARCHING_INDEXER) -note right #lightblue: "Searching indexers..." - -Workflow -> Indexer: Search(query, tracker) -Indexer --> Workflow: SearchResponse (N items) - -== 5. Parse Releases == - -loop for each search result - Workflow -> Magnet: Resolve(magnet_uri) - Magnet --> Workflow: torrent metadata - Workflow -> Workflow: ParseTorrent() + Server -> DB: Replay album_events\n(since last seq) + DB --> Server: historical events + Server ->> Client: replayed events end -Workflow ->> Client: StatusUpdate(PARSING_RESULTS) -note right #lightblue: "Parsed M from N torrents"\nData: TorrentList +== 3. Event Bridge (concurrent) == -== 6. Filter by Quality == +note over Client, Workflow #lightblue +**Left side**: Event bus → gRPC stream bridge +**Right side**: Workflow executing in background +Both run concurrently. Client disconnect only stops the bridge. +end note -Workflow ->> Client: StatusUpdate(FILTERING_QUALITY) -note right #lightblue: "Filtering by quality..." +||| -Workflow -> Workflow: filterByQuality(parsed, quality) +Workflow -> DB: album_events.Create(FETCHING_METADATA) +Workflow -> Bus: Publish(FETCHING_METADATA) +Bus ->> Server: event notification +Server ->> Client: StatusUpdate(FETCHING_METADATA) -== 7. Select Best Release == +Workflow -> Service: getAlbumWithPersist() +Workflow -> DB: workflow_runs.Create(albumID, quality) -Workflow -> Workflow: selectBestRelease(filtered) -note right: Highest seeder count wins\n(automatic selection) +Workflow -> DB: album_events.Create(CHECKING_OWNED) +Workflow -> Bus: Publish(CHECKING_OWNED) +Bus ->> Server: event notification +Server ->> Client: StatusUpdate(CHECKING_OWNED) -Workflow ->> Client: StatusUpdate(SELECTING_RELEASE) -note right #lightblue: "Selected: Title (N seeders)"\nData: ReleaseInfo +Workflow -> Indexer: Search() +Workflow -> DB: album_events.Create(PARSING_RESULTS) +Workflow -> Bus: Publish(PARSING_RESULTS) +Bus ->> Server: event notification +Server ->> Client: StatusUpdate(PARSING_RESULTS) -== 8. Add to Torrent Client == +Workflow -> Workflow: filterByQuality + selectBest -Workflow ->> Client: StatusUpdate(ADDING_TORRENT) -note right #lightblue: "Adding torrent..." +Workflow -> DB: saveTorrentAndDownload() +note right: DB save BEFORE qBit add\n(prevents orphan torrents) +Workflow -> QBit: AddMagnet() -Workflow -> QBit: AddMagnet(magnet_uri) -QBit --> Workflow: OK +Workflow -> DB: album_events.Create(COMPLETE) +Workflow -> Bus: Publish(COMPLETE) +Bus ->> Server: event notification +Server ->> Client: MonitorAlbumResponse +note right #lightgreen: Final result -== 9. Persist & Schedule == +Workflow -> DB: workflow_runs.SetCompleted() +Workflow -> Registry: Remove(albumID, quality) -Workflow ->> Client: StatusUpdate(SAVING) -note right #lightblue: "Saving to database..." +== 4. Client Disconnect (Fire-and-Forget) == -Workflow -> DB: torrents.Create / downloads.Create -Workflow -> River: Insert(PollDownloadArgs) - -== 10. Return Result == - -Workflow ->> Client: StatusUpdate(COMPLETE) -note right #lightblue: "Download started" - -Workflow ->> Client: MonitorAlbumResponse -note right #lightgreen: Final result with:\nalbum, artist, release, download - -Server -> Server: Close stream +note over Client, Workflow #lightyellow +Client can disconnect at ANY point during the workflow. +The workflow goroutine continues independently. +Another client can subscribe to the same workflow later. +end note @enduml diff --git a/docs/architecture/flows/monitor-album-stream-manual.puml b/docs/architecture/flows/monitor-album-stream-manual.puml index 7480595..4fcf944 100644 --- a/docs/architecture/flows/monitor-album-stream-manual.puml +++ b/docs/architecture/flows/monitor-album-stream-manual.puml @@ -113,6 +113,21 @@ note right #lightblue: "Download started" Workflow ->> Client: MonitorAlbumResponse note right #lightgreen: Final result +== Cancel Cleanup (Disconnect or Cancel Message) == + +note over Client, QBit #salmon +**Manual Mode: Disconnect = Cancel** + +When client disconnects or sends CancelRequest: +1. Workflow context is cancelled (stops further processing) +2. If torrent was added to qBit: **DeleteTorrent(hash)** removes it + files +3. If download record exists: marked as **cancelled** in DB +4. workflow_run marked as **cancelled** in DB +5. All events persisted to album_events for audit trail + +Cleanup uses a fresh context (not the cancelled one). +end note + == Decision Points Summary == note over Client, QBit #lightyellow diff --git a/docs/architecture/flows/monitor-album-stream-protocol.puml b/docs/architecture/flows/monitor-album-stream-protocol.puml index 3079ab2..93bb3b9 100644 --- a/docs/architecture/flows/monitor-album-stream-protocol.puml +++ b/docs/architecture/flows/monitor-album-stream-protocol.puml @@ -1,11 +1,11 @@ @startuml MonitorAlbumStream Protocol skinparam sequenceMessageAlign center -title MonitorAlbumStream: Message Protocol +title MonitorAlbumStream & SubscribeEvents: Message Protocol participant "Client" as C participant "Server" as S -== Stream Initialization == +== MonitorAlbumStream (Bidirectional) == C -> S: gRPC MonitorAlbumStream() note right: Opens bidirectional stream @@ -50,7 +50,44 @@ note left #lightgreen - recoverable: bool end note -== Monitor Steps (Status Updates) == +== SubscribeEvents (Server-Side Stream) == + +C -> S: SubscribeEvents(SubscribeEventsRequest) +note right #lightyellow +**SubscribeEventsRequest:** +- since_seq: int64 (0 = live only, >0 = replay from seq) + +**Response stream: AlbumEvent** +- seq: int64 (monotonic sequence number) +- workflow_run_id, album_id, quality +- event_type: status | error | result +- step: MonitorStep name +- message: human-readable text +- data_json: bytes (optional structured data) +- timestamp_ms: int64 + +**Global firehose**: receives events from ALL running workflows. +Client-side filtering by album_id if needed. +end note + +== Event Flow Architecture == + +note over C, S #lightblue +**Event Path (DB first, bus second):** + +1. Workflow step executes +2. Event written to **album_events** table (synchronous, durable) +3. Event published to **EventBus** (async, ephemeral notification) +4. Bus fans out to subscribers: + - MonitorAlbumStream clients (per-topic) + - SubscribeEvents clients (global) +5. Subscribers convert event to proto and stream.Send() + +**DB is source of truth. Bus is notification layer.** +Events are never lost, even if no subscribers are connected. +end note + +== Monitor Steps == note over C, S #lightyellow **MonitorStep Enum:** @@ -65,34 +102,34 @@ note over C, S #lightyellow 9. COMPLETE - Workflow finished end note -== Prompt Types (Manual Mode) == +== Mode Comparison == -note over C, S #orange -**CONFIRM** (yes/no decision) -- confirm_label, cancel_label -- default_value: bool +note over C, S #lightyellow +**AUTOMATIC mode:** +- Workflow runs as background goroutine +- Client can disconnect, workflow continues (fire-and-forget) +- Duplicate request for same album+quality subscribes to existing +- Events delivered via EventBus bridge -**SELECT_ONE** (pick one option) -- options: [{id, label, description}] -- default_id: pre-selected option - -**SELECT_MANY** (pick multiple options) -- options: [{id, label, description}] -- default_ids: pre-selected options -- min_selections, max_selections +**MANUAL mode:** +- Workflow runs inline in stream handler +- Interactive prompts at 4 decision points +- Disconnect = cancel = full cleanup (qBit delete + DB cancel) +- Events delivered directly via stream + persisted to DB end note -== Data Payloads == +== Persistence == -note over C, S #lightblue -**StreamAlbumInfo** (at FETCHING_METADATA) -- artist, title, release_date, already_owned, owned_quality +note over C, S #lightgreen +**workflow_runs table:** +- Tracks workflow lifecycle: running → completed | failed | cancelled +- Unique constraint: one running workflow per album+quality +- Used for deduplication, recovery, and audit -**TorrentList** (at PARSING_RESULTS) -- torrents: [{id, title, tracker, seeders, format, lossless}] - -**ReleaseInfo** (at SELECTING_RELEASE) -- info_hash, format, bit_depth, sample_rate, seeders, tracker +**album_events table:** +- Audit log of all workflow events +- seq BIGSERIAL for ordering and replay +- Supports subscribe-before-query replay pattern end note == Timeout Behavior == @@ -102,9 +139,6 @@ When prompt times out (max: 300s): - Server uses **default decision** value - Workflow continues automatically - No error is raised - -Timeout is capped server-side at 300s. -If timeout_seconds on prompt is 0 or exceeds max, 300s is used. end note @enduml