Add streaming flow diagrams, update existing flows with streaming references

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
Alexander
2026-05-11 10:26:45 +02:00
parent 24f355c5ae
commit ad03caa3f4
4 changed files with 467 additions and 0 deletions
@@ -0,0 +1,102 @@
@startuml MonitorAlbumStream - Already Owned Scenarios
skinparam sequenceMessageAlign center
skinparam responseMessageBelowArrow true
title MonitorAlbumStream: Already Owned Handling
actor Client
participant "monitorWorkflow" as Workflow
database "PostgreSQL" as DB
participant "IndexerService" as Indexer
== Scenario A: Automatic Mode - Early Return ==
Client -> Workflow: StartMonitorRequest(mode=AUTOMATIC)
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
note right #lightblue: "Fetching album metadata..."
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
note right #lightblue: "Got: Artist - Title"\nData: StreamAlbumInfo
Workflow ->> Client: StatusUpdate(CHECKING_OWNED)
Workflow -> DB: downloads.HasAlbumInQuality()
DB --> Workflow: true
Workflow ->> Client: StatusUpdate(COMPLETE)
note right #lightgreen: "Already owned"
Workflow ->> Client: MonitorAlbumResponse
note right: album: monitored\ndownload: existing info\nrelease: nil (no search)
note over Client, Workflow #lightblue
**Automatic Mode**: Skips search entirely.
Returns immediately with existing download info.
end note
== Scenario B: Manual Mode - User Confirms Continue ==
Client -> Workflow: StartMonitorRequest(mode=MANUAL)
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
Workflow ->> Client: StatusUpdate(CHECKING_OWNED)
Workflow -> DB: downloads.HasAlbumInQuality()
DB --> Workflow: true
Workflow ->> Client: PromptForDecision
note right #orange: type: CONFIRM\nmessage: "Album already owned. Download anyway?"\ndefault: false
Client -> Workflow: UserDecision(confirm=true)
note right #lightgreen: User chooses\nto continue
Workflow ->> Client: StatusUpdate(SEARCHING_INDEXER)
note right: Proceeds with normal flow...
Workflow -> Indexer: Search()
note right: ... continues to completion
== Scenario C: Manual Mode - User Skips ==
Client -> Workflow: StartMonitorRequest(mode=MANUAL)
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
Workflow ->> Client: StatusUpdate(CHECKING_OWNED)
Workflow -> DB: downloads.HasAlbumInQuality()
DB --> Workflow: true
Workflow ->> Client: PromptForDecision
note right #orange: type: CONFIRM\nmessage: "Album already owned. Download anyway?"
Client -> Workflow: UserDecision(confirm=false)
note right #lightyellow: User chooses\nto skip
Workflow ->> Client: StatusUpdate(COMPLETE)
note right: "Skipped - already owned"
Workflow ->> Client: MonitorAlbumResponse
note right: album: monitored\ndownload: existing info\nrelease: nil
== Scenario D: Manual Mode - Timeout ==
Client -> Workflow: StartMonitorRequest(mode=MANUAL)
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
Workflow ->> Client: StatusUpdate(CHECKING_OWNED)
Workflow ->> Client: PromptForDecision
note over Client, Workflow #lightyellow
Client does not respond within timeout (max: 300s)
end note
Workflow -> Workflow: Use default decision
note right: default: false\n(skip download)
Workflow ->> Client: StatusUpdate(COMPLETE)
note right: "Skipped - already owned"
Workflow ->> Client: MonitorAlbumResponse
@enduml
@@ -0,0 +1,113 @@
@startuml MonitorAlbumStream - Automatic Mode Happy Path
skinparam sequenceMessageAlign center
skinparam responseMessageBelowArrow true
title MonitorAlbumStream: Automatic Mode (Status Updates Only)
actor Client
participant "gRPC Server" as Server
participant "monitorWorkflow" as Workflow
participant "MusicAgregatorService" as Service
participant "MetadataService" as Metadata
database "metadata-agregator\n(gRPC)" as MetaGRPC
database "PostgreSQL" as DB
participant "IndexerService\n(Jackett)" as Indexer
participant "MagnetResolver" as Magnet
participant "TorrentClient\n(qBittorrent)" as QBit
participant "River Queue" as River
== 1. Initialize Bidirectional Stream ==
Client -> Server: MonitorAlbumStream()
note right: Opens bidirectional\ngRPC stream
Server -> Client: stream established
Client -> Server: StartMonitorRequest
note right: album_id, quality, tracker\nmode = AUTOMATIC
Server -> Workflow: newMonitorWorkflow(stream, req, service)
note right: Creates workflow with\ndecisions channel and\nreceiver goroutine
== 2. Fetch Album Metadata ==
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
note right #lightblue: "Fetching metadata..."
Workflow -> Service: getAlbumWithPersist(ctx, album_id)
Service -> Metadata: GetAlbum(album_id)
Metadata -> MetaGRPC: GetAlbum(id)
MetaGRPC --> Metadata: Album
Metadata -> DB: artists.Create / albums.Create
Metadata --> Service: Album
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
note right #lightblue: "Got: Artist - Title"\nData: StreamAlbumInfo
== 3. Check If Already Owned ==
Workflow ->> Client: StatusUpdate(CHECKING_OWNED)
note right #lightblue: "Checking ownership..."
Workflow -> DB: downloads.HasAlbumInQuality()
DB --> Workflow: false
== 4. Search Indexers ==
Workflow ->> Client: StatusUpdate(SEARCHING_INDEXER)
note right #lightblue: "Searching indexers..."
Workflow -> Indexer: Search(query, tracker)
Indexer --> Workflow: SearchResponse (N items)
== 5. Parse Releases ==
loop for each search result
Workflow -> Magnet: Resolve(magnet_uri)
Magnet --> Workflow: torrent metadata
Workflow -> Workflow: ParseTorrent()
end
Workflow ->> Client: StatusUpdate(PARSING_RESULTS)
note right #lightblue: "Parsed M from N torrents"\nData: TorrentList
== 6. Filter by Quality ==
Workflow ->> Client: StatusUpdate(FILTERING_QUALITY)
note right #lightblue: "Filtering by quality..."
Workflow -> Workflow: filterByQuality(parsed, quality)
== 7. Select Best Release ==
Workflow -> Workflow: selectBestRelease(filtered)
note right: Highest seeder count wins\n(automatic selection)
Workflow ->> Client: StatusUpdate(SELECTING_RELEASE)
note right #lightblue: "Selected: Title (N seeders)"\nData: ReleaseInfo
== 8. Add to Torrent Client ==
Workflow ->> Client: StatusUpdate(ADDING_TORRENT)
note right #lightblue: "Adding torrent..."
Workflow -> QBit: AddMagnet(magnet_uri)
QBit --> Workflow: OK
== 9. Persist & Schedule ==
Workflow ->> Client: StatusUpdate(SAVING)
note right #lightblue: "Saving to database..."
Workflow -> DB: torrents.Create / downloads.Create
Workflow -> River: Insert(PollDownloadArgs)
== 10. Return Result ==
Workflow ->> Client: StatusUpdate(COMPLETE)
note right #lightblue: "Download started"
Workflow ->> Client: MonitorAlbumResponse
note right #lightgreen: Final result with:\nalbum, artist, release, download
Server -> Server: Close stream
@enduml
@@ -0,0 +1,142 @@
@startuml MonitorAlbumStream - Manual Mode Happy Path
skinparam sequenceMessageAlign center
skinparam responseMessageBelowArrow true
title MonitorAlbumStream: Manual Mode (Interactive Prompts)
actor Client
participant "gRPC Server" as Server
participant "monitorWorkflow" as Workflow
participant "MusicAgregatorService" as Service
database "PostgreSQL" as DB
participant "IndexerService" as Indexer
participant "MagnetResolver" as Magnet
participant "TorrentClient\n(qBittorrent)" as QBit
== 1. Initialize Stream ==
Client -> Server: MonitorAlbumStream()
Server -> Client: stream established
Client -> Server: StartMonitorRequest
note right: album_id, quality\nmode = MANUAL
Server -> Workflow: newMonitorWorkflow()
== 2. Fetch Metadata ==
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
note right #lightblue: "Fetching album metadata..."
Workflow -> Service: getAlbumWithPersist()
Workflow ->> Client: StatusUpdate(FETCHING_METADATA)
note right #lightblue: Data: StreamAlbumInfo\n{artist, title, release_date,\nalready_owned, owned_quality}
== 3. Check Ownership (Interactive) ==
Workflow -> DB: downloads.HasAlbumInQuality()
DB --> Workflow: true (already owned!)
Workflow ->> Client: StatusUpdate(CHECKING_OWNED)
note right #lightyellow: "Already owned in FLAC quality"
Workflow ->> Client: PromptForDecision
note right #orange: type: CONFIRM\nmessage: "Album already owned. Download anyway?"\ndefault: false\ntimeout: max 300s
Client -> Server: UserDecision
note right: confirm: true\n(user chooses to continue)
Workflow -> Workflow: Continue with search
== 4. Search & Parse ==
Workflow ->> Client: StatusUpdate(SEARCHING_INDEXER)
Workflow -> Indexer: Search()
Indexer --> Workflow: 3 results
loop parse results
Workflow -> Magnet: Resolve()
end
Workflow ->> Client: StatusUpdate(PARSING_RESULTS)
note right #lightblue: Data: TorrentList\n[{id, title, seeders, format}, ...]
== 5. Select Torrents (Interactive) ==
Workflow ->> Client: PromptForDecision
note right #orange: type: SELECT_MANY\nmessage: "Select torrents to consider"\noptions: [{id, label, description}, ...]\ndefault: all selected\nmin: 1, max: N
Client -> Server: UserDecision
note right: selected_ids: ["torrent-0", "torrent-2"]\n(user deselects torrent-1)
Workflow -> Workflow: Filter to selected torrents
== 6. Filter by Quality ==
Workflow ->> Client: StatusUpdate(FILTERING_QUALITY)
Workflow -> Workflow: filterByQuality()
note right: 2 releases remain\nafter quality filter
== 7. Select Release (Interactive) ==
Workflow ->> Client: PromptForDecision
note right #orange: type: SELECT_ONE\nmessage: "Select release"\noptions: [{id, label, description}, ...]\ndefault: highest seeders
Client -> Server: UserDecision
note right: selected_id: "release-1"\n(user picks specific release)
Workflow ->> Client: StatusUpdate(SELECTING_RELEASE)
note right #lightblue: Data: ReleaseInfo\n{hash, format, seeders, tracker}
== 8. Confirm Add (Interactive) ==
Workflow ->> Client: StatusUpdate(ADDING_TORRENT)
note right #lightyellow: "Adding torrent: Title..."
Workflow ->> Client: PromptForDecision
note right #orange: type: CONFIRM\nmessage: "Add torrent 'Title' to client?"\nconfirm_label: "Add"\ncancel_label: "Skip"\ndefault: true
Client -> Server: UserDecision
note right: confirm: true
== 9. Add & Save ==
Workflow -> QBit: AddMagnet()
QBit --> Workflow: OK
Workflow ->> Client: StatusUpdate(SAVING)
Workflow -> DB: Create torrent & download
== 10. Complete ==
Workflow ->> Client: StatusUpdate(COMPLETE)
note right #lightblue: "Download started"
Workflow ->> Client: MonitorAlbumResponse
note right #lightgreen: Final result
== Decision Points Summary ==
note over Client, QBit #lightyellow
**Manual Mode Decision Points:**
1. **CHECKING_OWNED** (CONFIRM)
- Triggered when: Album already owned in requested quality
- Default: false (skip)
- Timeout action: Use default
2. **PARSING_RESULTS** (SELECT_MANY)
- Triggered when: Multiple torrents found (>1)
- Default: All selected
- Timeout action: Use defaults
3. **SELECTING_RELEASE** (SELECT_ONE)
- Triggered when: Multiple releases after quality filter (>1)
- Default: Highest seeders
- Timeout action: Use default
4. **ADDING_TORRENT** (CONFIRM)
- Triggered: Always in manual mode
- Default: true (add)
- Timeout action: Use default
end note
@enduml
@@ -0,0 +1,110 @@
@startuml MonitorAlbumStream Protocol
skinparam sequenceMessageAlign center
title MonitorAlbumStream: Message Protocol
participant "Client" as C
participant "Server" as S
== Stream Initialization ==
C -> S: gRPC MonitorAlbumStream()
note right: Opens bidirectional stream
== Request Messages (Client -> Server) ==
C -> S: MonitorAlbumStreamRequest
note right #lightblue
**oneof message:**
- **start**: StartMonitorRequest
- album_id (required)
- quality: LOSSLESS | LOSSY | UNSPECIFIED
- mode: AUTOMATIC (0) | MANUAL (1)
- indexer_options: {tracker}
- **decision**: UserDecision
- prompt_id (must match pending prompt)
- confirm | selected_id | selected_ids
- **cancel**: CancelRequest
- Gracefully terminates workflow
end note
== Response Messages (Server -> Client) ==
S -> C: MonitorAlbumStreamResponse
note left #lightgreen
**oneof message:**
- **status**: StatusUpdate
- step: MonitorStep enum
- message: human-readable text
- data: StreamAlbumInfo | TorrentList | ReleaseInfo
- **prompt**: PromptForDecision
- prompt_id: unique identifier
- type: CONFIRM | SELECT_ONE | SELECT_MANY
- message: prompt text
- timeout_seconds: response deadline
- options: confirm | select_one | select_many config
- **result**: MonitorAlbumResponse
- Final response (stream ends after this)
- **error**: ErrorUpdate
- failed_step: where error occurred
- message: error description
- recoverable: bool
end note
== Monitor Steps (Status Updates) ==
note over C, S #lightyellow
**MonitorStep Enum:**
1. FETCHING_METADATA - Getting album info from metadata service
2. CHECKING_OWNED - Checking if already downloaded
3. SEARCHING_INDEXER - Querying Jackett/indexers
4. PARSING_RESULTS - Resolving magnets, parsing torrents
5. FILTERING_QUALITY - Applying quality filter
6. SELECTING_RELEASE - Choosing best/user-selected release
7. ADDING_TORRENT - Adding to qBittorrent
8. SAVING - Persisting to database
9. COMPLETE - Workflow finished
end note
== Prompt Types (Manual Mode) ==
note over C, S #orange
**CONFIRM** (yes/no decision)
- confirm_label, cancel_label
- default_value: bool
**SELECT_ONE** (pick one option)
- options: [{id, label, description}]
- default_id: pre-selected option
**SELECT_MANY** (pick multiple options)
- options: [{id, label, description}]
- default_ids: pre-selected options
- min_selections, max_selections
end note
== Data Payloads ==
note over C, S #lightblue
**StreamAlbumInfo** (at FETCHING_METADATA)
- artist, title, release_date, already_owned, owned_quality
**TorrentList** (at PARSING_RESULTS)
- torrents: [{id, title, tracker, seeders, format, lossless}]
**ReleaseInfo** (at SELECTING_RELEASE)
- info_hash, format, bit_depth, sample_rate, seeders, tracker
end note
== Timeout Behavior ==
note over C, S #pink
When prompt times out (max: 300s):
- Server uses **default decision** value
- Workflow continues automatically
- No error is raised
Timeout is capped server-side at 300s.
If timeout_seconds on prompt is 0 or exceeds max, 300s is used.
end note
@enduml