Add WorkflowRun and AlbumEvent repositories with download cancel support
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -0,0 +1,116 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AlbumEvent struct {
|
||||||
|
ID string
|
||||||
|
Seq int64
|
||||||
|
WorkflowRunID string
|
||||||
|
AlbumID string
|
||||||
|
EventType string
|
||||||
|
Step string
|
||||||
|
Message string
|
||||||
|
DataJSON []byte
|
||||||
|
CreatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type AlbumEventRepository struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAlbumEventRepository(pool *pgxpool.Pool) *AlbumEventRepository {
|
||||||
|
return &AlbumEventRepository{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumEventRepository) Create(ctx context.Context, event *AlbumEvent) error {
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`INSERT INTO album_events (workflow_run_id, album_id, event_type, step, message, data_json)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
|
RETURNING id, seq, created_at`,
|
||||||
|
event.WorkflowRunID, event.AlbumID, event.EventType, event.Step, event.Message, event.DataJSON,
|
||||||
|
).Scan(&event.ID, &event.Seq, &event.CreatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating album event: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumEventRepository) GetByWorkflowRun(ctx context.Context, workflowRunID string) ([]*AlbumEvent, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, seq, workflow_run_id, album_id, event_type, step, message, data_json, created_at
|
||||||
|
FROM album_events WHERE workflow_run_id = $1 ORDER BY seq`, workflowRunID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing album events by workflow run: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var events []*AlbumEvent
|
||||||
|
for rows.Next() {
|
||||||
|
event := &AlbumEvent{}
|
||||||
|
if err := rows.Scan(&event.ID, &event.Seq, &event.WorkflowRunID, &event.AlbumID, &event.EventType, &event.Step, &event.Message, &event.DataJSON, &event.CreatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning album event: %w", err)
|
||||||
|
}
|
||||||
|
events = append(events, event)
|
||||||
|
}
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumEventRepository) GetByAlbum(ctx context.Context, albumID string, afterSeq int64, limit int) ([]*AlbumEvent, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, seq, workflow_run_id, album_id, event_type, step, message, data_json, created_at
|
||||||
|
FROM album_events WHERE album_id = $1 AND seq > $2 ORDER BY seq LIMIT $3`, albumID, afterSeq, limit,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing album events by album: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var events []*AlbumEvent
|
||||||
|
for rows.Next() {
|
||||||
|
event := &AlbumEvent{}
|
||||||
|
if err := rows.Scan(&event.ID, &event.Seq, &event.WorkflowRunID, &event.AlbumID, &event.EventType, &event.Step, &event.Message, &event.DataJSON, &event.CreatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning album event: %w", err)
|
||||||
|
}
|
||||||
|
events = append(events, event)
|
||||||
|
}
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumEventRepository) GetLatestSeq(ctx context.Context) (int64, error) {
|
||||||
|
var seq int64
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT COALESCE(MAX(seq), 0) FROM album_events`,
|
||||||
|
).Scan(&seq)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("getting latest album event seq: %w", err)
|
||||||
|
}
|
||||||
|
return seq, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AlbumEventRepository) GetAfterSeq(ctx context.Context, afterSeq int64) ([]*AlbumEvent, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, seq, workflow_run_id, album_id, event_type, step, message, data_json, created_at
|
||||||
|
FROM album_events WHERE seq > $1 ORDER BY seq LIMIT 1000`, afterSeq,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing album events after seq: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var events []*AlbumEvent
|
||||||
|
for rows.Next() {
|
||||||
|
event := &AlbumEvent{}
|
||||||
|
if err := rows.Scan(&event.ID, &event.Seq, &event.WorkflowRunID, &event.AlbumID, &event.EventType, &event.Step, &event.Message, &event.DataJSON, &event.CreatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning album event: %w", err)
|
||||||
|
}
|
||||||
|
events = append(events, event)
|
||||||
|
}
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
@@ -86,6 +86,26 @@ func (r *DownloadRepository) SetFailed(ctx context.Context, id string, errorMsg
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) SetCancelled(ctx context.Context, id string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE downloads SET state = 'cancelled', updated_at = NOW() WHERE id = $1`, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting download cancelled: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *DownloadRepository) SetCancelledByQbitHash(ctx context.Context, hash string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE downloads SET state = 'cancelled', updated_at = NOW() WHERE qbit_hash = $1 AND state NOT IN ('completed', 'failed', 'cancelled')`, hash,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting download cancelled by hash: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *DownloadRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Download, error) {
|
func (r *DownloadRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Download, error) {
|
||||||
rows, err := r.pool.Query(ctx,
|
rows, err := r.pool.Query(ctx,
|
||||||
`SELECT id, torrent_id, album_id, format, quality, state, qbit_hash, save_path, error_message, queued_at, started_at, completed_at, created_at, updated_at
|
`SELECT id, torrent_id, album_id, format, quality, state, qbit_hash, save_path, error_message, queued_at, started_at, completed_at, created_at, updated_at
|
||||||
|
|||||||
@@ -0,0 +1,123 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrWorkflowAlreadyRunning = fmt.Errorf("workflow already running for this album and quality")
|
||||||
|
|
||||||
|
type WorkflowRun struct {
|
||||||
|
ID string
|
||||||
|
AlbumID string
|
||||||
|
Quality string
|
||||||
|
Status string
|
||||||
|
ErrorMessage *string
|
||||||
|
StartedAt time.Time
|
||||||
|
CompletedAt *time.Time
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type WorkflowRunRepository struct {
|
||||||
|
pool *pgxpool.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWorkflowRunRepository(pool *pgxpool.Pool) *WorkflowRunRepository {
|
||||||
|
return &WorkflowRunRepository{pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowRunRepository) Create(ctx context.Context, run *WorkflowRun) error {
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`INSERT INTO workflow_runs (album_id, quality, status) VALUES ($1, $2, 'running')
|
||||||
|
ON CONFLICT ON CONSTRAINT idx_workflow_runs_active DO NOTHING
|
||||||
|
RETURNING id, started_at, created_at, updated_at`,
|
||||||
|
run.AlbumID, run.Quality,
|
||||||
|
).Scan(&run.ID, &run.StartedAt, &run.CreatedAt, &run.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
if err == pgx.ErrNoRows {
|
||||||
|
return ErrWorkflowAlreadyRunning
|
||||||
|
}
|
||||||
|
return fmt.Errorf("creating workflow run: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowRunRepository) SetCompleted(ctx context.Context, id string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE workflow_runs SET status = 'completed', completed_at = NOW(), updated_at = NOW() WHERE id = $1`, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting workflow run completed: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowRunRepository) SetFailed(ctx context.Context, id string, errorMsg string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE workflow_runs SET status = 'failed', error_message = $1, completed_at = NOW(), updated_at = NOW() WHERE id = $2`, errorMsg, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting workflow run failed: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowRunRepository) SetCancelled(ctx context.Context, id string) error {
|
||||||
|
_, err := r.pool.Exec(ctx,
|
||||||
|
`UPDATE workflow_runs SET status = 'cancelled', completed_at = NOW(), updated_at = NOW() WHERE id = $1`, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("setting workflow run cancelled: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowRunRepository) GetByAlbumAndQuality(ctx context.Context, albumID string, quality string) (*WorkflowRun, error) {
|
||||||
|
run := &WorkflowRun{}
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at
|
||||||
|
FROM workflow_runs WHERE album_id = $1 AND quality = $2 AND status = 'running' LIMIT 1`, albumID, quality,
|
||||||
|
).Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting workflow run by album and quality: %w", err)
|
||||||
|
}
|
||||||
|
return run, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowRunRepository) GetRunning(ctx context.Context) ([]*WorkflowRun, error) {
|
||||||
|
rows, err := r.pool.Query(ctx,
|
||||||
|
`SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at
|
||||||
|
FROM workflow_runs WHERE status = 'running' ORDER BY started_at`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("listing running workflow runs: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var runs []*WorkflowRun
|
||||||
|
for rows.Next() {
|
||||||
|
run := &WorkflowRun{}
|
||||||
|
if err := rows.Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scanning workflow run: %w", err)
|
||||||
|
}
|
||||||
|
runs = append(runs, run)
|
||||||
|
}
|
||||||
|
return runs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowRunRepository) GetByID(ctx context.Context, id string) (*WorkflowRun, error) {
|
||||||
|
run := &WorkflowRun{}
|
||||||
|
err := r.pool.QueryRow(ctx,
|
||||||
|
`SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at
|
||||||
|
FROM workflow_runs WHERE id = $1`, id,
|
||||||
|
).Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("getting workflow run by id: %w", err)
|
||||||
|
}
|
||||||
|
return run, nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user