69752bd6a2
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
145 lines
4.2 KiB
Plaintext
145 lines
4.2 KiB
Plaintext
@startuml MonitorAlbumStream Protocol
|
|
skinparam sequenceMessageAlign center
|
|
title MonitorAlbumStream & SubscribeEvents: Message Protocol
|
|
|
|
participant "Client" as C
|
|
participant "Server" as S
|
|
|
|
== MonitorAlbumStream (Bidirectional) ==
|
|
|
|
C -> S: gRPC MonitorAlbumStream()
|
|
note right: Opens bidirectional stream
|
|
|
|
== Request Messages (Client -> Server) ==
|
|
|
|
C -> S: MonitorAlbumStreamRequest
|
|
note right #lightblue
|
|
**oneof message:**
|
|
- **start**: StartMonitorRequest
|
|
- album_id (required)
|
|
- quality: LOSSLESS | LOSSY | UNSPECIFIED
|
|
- mode: AUTOMATIC (0) | MANUAL (1)
|
|
- indexer_options: {tracker}
|
|
- **decision**: UserDecision
|
|
- prompt_id (must match pending prompt)
|
|
- confirm | selected_id | selected_ids
|
|
- **cancel**: CancelRequest
|
|
- Gracefully terminates workflow
|
|
end note
|
|
|
|
== Response Messages (Server -> Client) ==
|
|
|
|
S -> C: MonitorAlbumStreamResponse
|
|
note left #lightgreen
|
|
**oneof message:**
|
|
- **status**: StatusUpdate
|
|
- step: MonitorStep enum
|
|
- message: human-readable text
|
|
- data: StreamAlbumInfo | TorrentList | ReleaseInfo
|
|
- **prompt**: PromptForDecision
|
|
- prompt_id: unique identifier
|
|
- type: CONFIRM | SELECT_ONE | SELECT_MANY
|
|
- message: prompt text
|
|
- timeout_seconds: response deadline
|
|
- options: confirm | select_one | select_many config
|
|
- **result**: MonitorAlbumResponse
|
|
- Final response (stream ends after this)
|
|
- **error**: ErrorUpdate
|
|
- failed_step: where error occurred
|
|
- message: error description
|
|
- recoverable: bool
|
|
end note
|
|
|
|
== SubscribeEvents (Server-Side Stream) ==
|
|
|
|
C -> S: SubscribeEvents(SubscribeEventsRequest)
|
|
note right #lightyellow
|
|
**SubscribeEventsRequest:**
|
|
- since_seq: int64 (0 = live only, >0 = replay from seq)
|
|
|
|
**Response stream: AlbumEvent**
|
|
- seq: int64 (monotonic sequence number)
|
|
- workflow_run_id, album_id, quality
|
|
- event_type: status | error | result
|
|
- step: MonitorStep name
|
|
- message: human-readable text
|
|
- data_json: bytes (optional structured data)
|
|
- timestamp_ms: int64
|
|
|
|
**Global firehose**: receives events from ALL running workflows.
|
|
Client-side filtering by album_id if needed.
|
|
end note
|
|
|
|
== Event Flow Architecture ==
|
|
|
|
note over C, S #lightblue
|
|
**Event Path (DB first, bus second):**
|
|
|
|
1. Workflow step executes
|
|
2. Event written to **album_events** table (synchronous, durable)
|
|
3. Event published to **EventBus** (async, ephemeral notification)
|
|
4. Bus fans out to subscribers:
|
|
- MonitorAlbumStream clients (per-topic)
|
|
- SubscribeEvents clients (global)
|
|
5. Subscribers convert event to proto and stream.Send()
|
|
|
|
**DB is source of truth. Bus is notification layer.**
|
|
Events are never lost, even if no subscribers are connected.
|
|
end note
|
|
|
|
== Monitor Steps ==
|
|
|
|
note over C, S #lightyellow
|
|
**MonitorStep Enum:**
|
|
1. FETCHING_METADATA - Getting album info from metadata service
|
|
2. CHECKING_OWNED - Checking if already downloaded
|
|
3. SEARCHING_INDEXER - Querying Jackett/indexers
|
|
4. PARSING_RESULTS - Resolving magnets, parsing torrents
|
|
5. FILTERING_QUALITY - Applying quality filter
|
|
6. SELECTING_RELEASE - Choosing best/user-selected release
|
|
7. ADDING_TORRENT - Adding to qBittorrent
|
|
8. SAVING - Persisting to database
|
|
9. COMPLETE - Workflow finished
|
|
end note
|
|
|
|
== Mode Comparison ==
|
|
|
|
note over C, S #lightyellow
|
|
**AUTOMATIC mode:**
|
|
- Workflow runs as background goroutine
|
|
- Client can disconnect, workflow continues (fire-and-forget)
|
|
- Duplicate request for same album+quality subscribes to existing
|
|
- Events delivered via EventBus bridge
|
|
|
|
**MANUAL mode:**
|
|
- Workflow runs inline in stream handler
|
|
- Interactive prompts at 4 decision points
|
|
- Disconnect = cancel = full cleanup (qBit delete + DB cancel)
|
|
- Events delivered directly via stream + persisted to DB
|
|
end note
|
|
|
|
== Persistence ==
|
|
|
|
note over C, S #lightgreen
|
|
**workflow_runs table:**
|
|
- Tracks workflow lifecycle: running → completed | failed | cancelled
|
|
- Unique constraint: one running workflow per album+quality
|
|
- Used for deduplication, recovery, and audit
|
|
|
|
**album_events table:**
|
|
- Audit log of all workflow events
|
|
- seq BIGSERIAL for ordering and replay
|
|
- Supports subscribe-before-query replay pattern
|
|
end note
|
|
|
|
== Timeout Behavior ==
|
|
|
|
note over C, S #pink
|
|
When prompt times out (max: 300s):
|
|
- Server uses **default decision** value
|
|
- Workflow continues automatically
|
|
- No error is raised
|
|
end note
|
|
|
|
@enduml
|