diff --git a/internal/database/album_event_repository.go b/internal/database/album_event_repository.go new file mode 100644 index 0000000..1322807 --- /dev/null +++ b/internal/database/album_event_repository.go @@ -0,0 +1,116 @@ +package database + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type AlbumEvent struct { + ID string + Seq int64 + WorkflowRunID string + AlbumID string + EventType string + Step string + Message string + DataJSON []byte + CreatedAt time.Time +} + +type AlbumEventRepository struct { + pool *pgxpool.Pool +} + +func NewAlbumEventRepository(pool *pgxpool.Pool) *AlbumEventRepository { + return &AlbumEventRepository{pool: pool} +} + +func (r *AlbumEventRepository) Create(ctx context.Context, event *AlbumEvent) error { + err := r.pool.QueryRow(ctx, + `INSERT INTO album_events (workflow_run_id, album_id, event_type, step, message, data_json) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id, seq, created_at`, + event.WorkflowRunID, event.AlbumID, event.EventType, event.Step, event.Message, event.DataJSON, + ).Scan(&event.ID, &event.Seq, &event.CreatedAt) + if err != nil { + return fmt.Errorf("creating album event: %w", err) + } + return nil +} + +func (r *AlbumEventRepository) GetByWorkflowRun(ctx context.Context, workflowRunID string) ([]*AlbumEvent, error) { + rows, err := r.pool.Query(ctx, + `SELECT id, seq, workflow_run_id, album_id, event_type, step, message, data_json, created_at + FROM album_events WHERE workflow_run_id = $1 ORDER BY seq`, workflowRunID, + ) + if err != nil { + return nil, fmt.Errorf("listing album events by workflow run: %w", err) + } + defer rows.Close() + + var events []*AlbumEvent + for rows.Next() { + event := &AlbumEvent{} + if err := rows.Scan(&event.ID, &event.Seq, &event.WorkflowRunID, &event.AlbumID, &event.EventType, &event.Step, &event.Message, &event.DataJSON, &event.CreatedAt); err != nil { + return nil, fmt.Errorf("scanning album event: %w", err) + } + events = append(events, event) + } + return events, nil +} + +func (r *AlbumEventRepository) GetByAlbum(ctx context.Context, albumID string, afterSeq int64, limit int) ([]*AlbumEvent, error) { + rows, err := r.pool.Query(ctx, + `SELECT id, seq, workflow_run_id, album_id, event_type, step, message, data_json, created_at + FROM album_events WHERE album_id = $1 AND seq > $2 ORDER BY seq LIMIT $3`, albumID, afterSeq, limit, + ) + if err != nil { + return nil, fmt.Errorf("listing album events by album: %w", err) + } + defer rows.Close() + + var events []*AlbumEvent + for rows.Next() { + event := &AlbumEvent{} + if err := rows.Scan(&event.ID, &event.Seq, &event.WorkflowRunID, &event.AlbumID, &event.EventType, &event.Step, &event.Message, &event.DataJSON, &event.CreatedAt); err != nil { + return nil, fmt.Errorf("scanning album event: %w", err) + } + events = append(events, event) + } + return events, nil +} + +func (r *AlbumEventRepository) GetLatestSeq(ctx context.Context) (int64, error) { + var seq int64 + err := r.pool.QueryRow(ctx, + `SELECT COALESCE(MAX(seq), 0) FROM album_events`, + ).Scan(&seq) + if err != nil { + return 0, fmt.Errorf("getting latest album event seq: %w", err) + } + return seq, nil +} + +func (r *AlbumEventRepository) GetAfterSeq(ctx context.Context, afterSeq int64) ([]*AlbumEvent, error) { + rows, err := r.pool.Query(ctx, + `SELECT id, seq, workflow_run_id, album_id, event_type, step, message, data_json, created_at + FROM album_events WHERE seq > $1 ORDER BY seq LIMIT 1000`, afterSeq, + ) + if err != nil { + return nil, fmt.Errorf("listing album events after seq: %w", err) + } + defer rows.Close() + + var events []*AlbumEvent + for rows.Next() { + event := &AlbumEvent{} + if err := rows.Scan(&event.ID, &event.Seq, &event.WorkflowRunID, &event.AlbumID, &event.EventType, &event.Step, &event.Message, &event.DataJSON, &event.CreatedAt); err != nil { + return nil, fmt.Errorf("scanning album event: %w", err) + } + events = append(events, event) + } + return events, nil +} diff --git a/internal/database/download_repository.go b/internal/database/download_repository.go index 528e5ed..cc2bdaa 100644 --- a/internal/database/download_repository.go +++ b/internal/database/download_repository.go @@ -86,6 +86,26 @@ func (r *DownloadRepository) SetFailed(ctx context.Context, id string, errorMsg return nil } +func (r *DownloadRepository) SetCancelled(ctx context.Context, id string) error { + _, err := r.pool.Exec(ctx, + `UPDATE downloads SET state = 'cancelled', updated_at = NOW() WHERE id = $1`, id, + ) + if err != nil { + return fmt.Errorf("setting download cancelled: %w", err) + } + return nil +} + +func (r *DownloadRepository) SetCancelledByQbitHash(ctx context.Context, hash string) error { + _, err := r.pool.Exec(ctx, + `UPDATE downloads SET state = 'cancelled', updated_at = NOW() WHERE qbit_hash = $1 AND state NOT IN ('completed', 'failed', 'cancelled')`, hash, + ) + if err != nil { + return fmt.Errorf("setting download cancelled by hash: %w", err) + } + return nil +} + func (r *DownloadRepository) GetByAlbumID(ctx context.Context, albumID string) ([]*Download, error) { rows, err := r.pool.Query(ctx, `SELECT id, torrent_id, album_id, format, quality, state, qbit_hash, save_path, error_message, queued_at, started_at, completed_at, created_at, updated_at diff --git a/internal/database/workflow_run_repository.go b/internal/database/workflow_run_repository.go new file mode 100644 index 0000000..286b619 --- /dev/null +++ b/internal/database/workflow_run_repository.go @@ -0,0 +1,123 @@ +package database + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +var ErrWorkflowAlreadyRunning = fmt.Errorf("workflow already running for this album and quality") + +type WorkflowRun struct { + ID string + AlbumID string + Quality string + Status string + ErrorMessage *string + StartedAt time.Time + CompletedAt *time.Time + CreatedAt time.Time + UpdatedAt time.Time +} + +type WorkflowRunRepository struct { + pool *pgxpool.Pool +} + +func NewWorkflowRunRepository(pool *pgxpool.Pool) *WorkflowRunRepository { + return &WorkflowRunRepository{pool: pool} +} + +func (r *WorkflowRunRepository) Create(ctx context.Context, run *WorkflowRun) error { + err := r.pool.QueryRow(ctx, + `INSERT INTO workflow_runs (album_id, quality, status) VALUES ($1, $2, 'running') + ON CONFLICT ON CONSTRAINT idx_workflow_runs_active DO NOTHING + RETURNING id, started_at, created_at, updated_at`, + run.AlbumID, run.Quality, + ).Scan(&run.ID, &run.StartedAt, &run.CreatedAt, &run.UpdatedAt) + if err != nil { + if err == pgx.ErrNoRows { + return ErrWorkflowAlreadyRunning + } + return fmt.Errorf("creating workflow run: %w", err) + } + return nil +} + +func (r *WorkflowRunRepository) SetCompleted(ctx context.Context, id string) error { + _, err := r.pool.Exec(ctx, + `UPDATE workflow_runs SET status = 'completed', completed_at = NOW(), updated_at = NOW() WHERE id = $1`, id, + ) + if err != nil { + return fmt.Errorf("setting workflow run completed: %w", err) + } + return nil +} + +func (r *WorkflowRunRepository) SetFailed(ctx context.Context, id string, errorMsg string) error { + _, err := r.pool.Exec(ctx, + `UPDATE workflow_runs SET status = 'failed', error_message = $1, completed_at = NOW(), updated_at = NOW() WHERE id = $2`, errorMsg, id, + ) + if err != nil { + return fmt.Errorf("setting workflow run failed: %w", err) + } + return nil +} + +func (r *WorkflowRunRepository) SetCancelled(ctx context.Context, id string) error { + _, err := r.pool.Exec(ctx, + `UPDATE workflow_runs SET status = 'cancelled', completed_at = NOW(), updated_at = NOW() WHERE id = $1`, id, + ) + if err != nil { + return fmt.Errorf("setting workflow run cancelled: %w", err) + } + return nil +} + +func (r *WorkflowRunRepository) GetByAlbumAndQuality(ctx context.Context, albumID string, quality string) (*WorkflowRun, error) { + run := &WorkflowRun{} + err := r.pool.QueryRow(ctx, + `SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at + FROM workflow_runs WHERE album_id = $1 AND quality = $2 AND status = 'running' LIMIT 1`, albumID, quality, + ).Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt) + if err != nil { + return nil, fmt.Errorf("getting workflow run by album and quality: %w", err) + } + return run, nil +} + +func (r *WorkflowRunRepository) GetRunning(ctx context.Context) ([]*WorkflowRun, error) { + rows, err := r.pool.Query(ctx, + `SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at + FROM workflow_runs WHERE status = 'running' ORDER BY started_at`, + ) + if err != nil { + return nil, fmt.Errorf("listing running workflow runs: %w", err) + } + defer rows.Close() + + var runs []*WorkflowRun + for rows.Next() { + run := &WorkflowRun{} + if err := rows.Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt); err != nil { + return nil, fmt.Errorf("scanning workflow run: %w", err) + } + runs = append(runs, run) + } + return runs, nil +} + +func (r *WorkflowRunRepository) GetByID(ctx context.Context, id string) (*WorkflowRun, error) { + run := &WorkflowRun{} + err := r.pool.QueryRow(ctx, + `SELECT id, album_id, quality, status, error_message, started_at, completed_at, created_at, updated_at + FROM workflow_runs WHERE id = $1`, id, + ).Scan(&run.ID, &run.AlbumID, &run.Quality, &run.Status, &run.ErrorMessage, &run.StartedAt, &run.CompletedAt, &run.CreatedAt, &run.UpdatedAt) + if err != nil { + return nil, fmt.Errorf("getting workflow run by id: %w", err) + } + return run, nil +}