package database import ( "context" "fmt" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) var ErrWorkflowAlreadyRunning = fmt.Errorf("workflow already running for this album and quality") type WorkflowRun struct { ID string AlbumID string Quality string Status string ErrorMessage *string StartedAt time.Time CompletedAt *time.Time CreatedAt time.Time UpdatedAt time.Time } type WorkflowRunRepository struct { pool *pgxpool.Pool } func NewWorkflowRunRepository(pool *pgxpool.Pool) *WorkflowRunRepository { return &WorkflowRunRepository{pool: pool} } func (r *WorkflowRunRepository) Create(ctx context.Context, run *WorkflowRun) error { err := r.pool.QueryRow(ctx, `INSERT INTO workflow_runs (album_id, quality, status) VALUES ($1, $2, 'running') ON CONFLICT ON CONSTRAINT idx_workflow_runs_active DO NOTHING RETURNING id, started_at, created_at, updated_at`, run.AlbumID, run.Quality, ).Scan(&run.ID, &run.StartedAt, &run.CreatedAt, &run.UpdatedAt) if err != nil { if err == pgx.ErrNoRows { return ErrWorkflowAlreadyRunning } return fmt.Errorf("creating workflow run: %w", err) } return nil } func (r *WorkflowRunRepository) SetCompleted(ctx context.Context, id string) error { _, err := r.pool.Exec(ctx, `UPDATE workflow_runs SET status = 'completed', completed_at = NOW(), updated_at = NOW() WHERE id = $1`, id, ) if err != nil { return fmt.Errorf("setting workflow run completed: %w", err) } return nil } func (r *WorkflowRunRepository) SetFailed(ctx context.Context, id string, errorMsg string) error { _, err := r.pool.Exec(ctx, `UPDATE workflow_runs SET status = 'failed', error_message = $1, completed_at = NOW(), updated_at = NOW() WHERE id = $2`, errorMsg, id, ) if err != nil { return fmt.Errorf("setting workflow run failed: %w", err) } return nil } func (r *WorkflowRunRepository) SetCancelled(ctx context.Context, id string) error { _, err := r.pool.Exec(ctx, `UPDATE workflow_runs SET status = 'cancelled', completed_at = NOW(), updated_at = NOW() WHERE id = $1`, id, ) if err != nil { return fmt.Errorf("setting workflow run cancelled: %w", err) } return nil } func (r *WorkflowRunRepository) GetByAlbumAndQuality(ctx context.Context, albumID string, quality string) (*WorkflowRun, error) { run := &WorkflowRun{} err := r.pool.QueryRow(ctx, `SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at FROM workflow_runs WHERE album_id = $1 AND quality = $2 AND status = 'running' LIMIT 1`, albumID, quality, ).Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt) if err != nil { return nil, fmt.Errorf("getting workflow run by album and quality: %w", err) } return run, nil } func (r *WorkflowRunRepository) GetRunning(ctx context.Context) ([]*WorkflowRun, error) { rows, err := r.pool.Query(ctx, `SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at FROM workflow_runs WHERE status = 'running' ORDER BY started_at`, ) if err != nil { return nil, fmt.Errorf("listing running workflow runs: %w", err) } defer rows.Close() var runs []*WorkflowRun for rows.Next() { run := &WorkflowRun{} if err := rows.Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt); err != nil { return nil, fmt.Errorf("scanning workflow run: %w", err) } runs = append(runs, run) } return runs, nil } func (r *WorkflowRunRepository) GetByID(ctx context.Context, id string) (*WorkflowRun, error) { run := &WorkflowRun{} err := r.pool.QueryRow(ctx, `SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at FROM workflow_runs WHERE id = $1`, id, ).Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt) if err != nil { return nil, fmt.Errorf("getting workflow run by id: %w", err) } return run, nil }