a1f6701bac
- gRPC service with MusicBrainz provider - PostgreSQL schema with migrations - Service layer with database-first caching - Repository pattern for data access - YAML configuration support - Research documentation for 17 music metadata projects
227 lines
5.1 KiB
Go
227 lines
5.1 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/metadata-agregator/internal/domain"
|
|
"github.com/metadata-agregator/internal/repository"
|
|
)
|
|
|
|
type TrackRepository struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
func NewTrackRepository(pool *pgxpool.Pool) *TrackRepository {
|
|
return &TrackRepository{pool: pool}
|
|
}
|
|
|
|
func (r *TrackRepository) GetByID(ctx context.Context, id string) (*domain.Track, error) {
|
|
query := `
|
|
SELECT id, title, duration_ms, isrc, explicit, source, source_id
|
|
FROM tracks
|
|
WHERE id = $1`
|
|
|
|
track, err := r.scanTrack(ctx, query, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := r.loadExternalIDs(ctx, track); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return track, nil
|
|
}
|
|
|
|
func (r *TrackRepository) GetByExternalID(ctx context.Context, source, sourceID string) (*domain.Track, error) {
|
|
query := `
|
|
SELECT t.id, t.title, t.duration_ms, t.isrc, t.explicit, t.source, t.source_id
|
|
FROM tracks t
|
|
JOIN track_external_ids e ON t.id = e.track_id
|
|
WHERE e.source = $1 AND e.source_id = $2`
|
|
|
|
track, err := r.scanTrack(ctx, query, source, sourceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := r.loadExternalIDs(ctx, track); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return track, nil
|
|
}
|
|
|
|
func (r *TrackRepository) GetByISRC(ctx context.Context, isrc string) (*domain.Track, error) {
|
|
query := `
|
|
SELECT id, title, duration_ms, isrc, explicit, source, source_id
|
|
FROM tracks
|
|
WHERE isrc = $1`
|
|
|
|
track, err := r.scanTrack(ctx, query, isrc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := r.loadExternalIDs(ctx, track); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return track, nil
|
|
}
|
|
|
|
func (r *TrackRepository) GetByAlbumID(ctx context.Context, albumID string) ([]domain.Track, error) {
|
|
query := `
|
|
SELECT t.id, t.title, t.duration_ms, t.isrc, t.explicit, t.source, t.source_id,
|
|
at.disc_number, at.track_number
|
|
FROM tracks t
|
|
JOIN album_tracks at ON t.id = at.track_id
|
|
JOIN album_external_ids ae ON at.album_id = ae.album_id
|
|
WHERE ae.source_id = $1
|
|
ORDER BY at.disc_number, at.track_number`
|
|
|
|
rows, err := r.pool.Query(ctx, query, albumID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var tracks []domain.Track
|
|
for rows.Next() {
|
|
var (
|
|
track domain.Track
|
|
durationMs *int
|
|
isrc *string
|
|
explicit *bool
|
|
source string
|
|
sourceID *string
|
|
)
|
|
|
|
err := rows.Scan(
|
|
&track.ID, &track.Title, &durationMs, &isrc, &explicit,
|
|
&source, &sourceID, &track.DiscNumber, &track.TrackNumber,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if durationMs != nil {
|
|
track.DurationMs = *durationMs
|
|
}
|
|
track.ISRC = derefString(isrc)
|
|
if explicit != nil {
|
|
track.Explicit = *explicit
|
|
}
|
|
|
|
tracks = append(tracks, track)
|
|
}
|
|
|
|
return tracks, rows.Err()
|
|
}
|
|
|
|
func (r *TrackRepository) Save(ctx context.Context, track *domain.Track) error {
|
|
tx, err := r.pool.Begin(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
var source, sourceID string
|
|
if len(track.ExternalIDs) > 0 {
|
|
source = track.ExternalIDs[0].Source
|
|
sourceID = track.ExternalIDs[0].SourceID
|
|
}
|
|
|
|
query := `
|
|
INSERT INTO tracks (id, title, duration_ms, isrc, explicit, source, source_id)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
title = EXCLUDED.title,
|
|
duration_ms = EXCLUDED.duration_ms,
|
|
isrc = EXCLUDED.isrc,
|
|
explicit = EXCLUDED.explicit,
|
|
updated_at = now()`
|
|
|
|
_, err = tx.Exec(ctx, query,
|
|
track.ID, track.Title, track.DurationMs, nullString(track.ISRC),
|
|
track.Explicit, source, sourceID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, ext := range track.ExternalIDs {
|
|
extQuery := `
|
|
INSERT INTO track_external_ids (track_id, source, source_id, url)
|
|
VALUES ($1, $2, $3, $4)
|
|
ON CONFLICT (track_id, source, source_id) DO UPDATE SET
|
|
url = EXCLUDED.url,
|
|
fetched_at = now()`
|
|
|
|
_, err = tx.Exec(ctx, extQuery, track.ID, ext.Source, ext.SourceID, nullString(ext.URL))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
func (r *TrackRepository) scanTrack(ctx context.Context, query string, args ...any) (*domain.Track, error) {
|
|
row := r.pool.QueryRow(ctx, query, args...)
|
|
|
|
var (
|
|
track domain.Track
|
|
durationMs *int
|
|
isrc *string
|
|
explicit *bool
|
|
source string
|
|
sourceID *string
|
|
)
|
|
|
|
err := row.Scan(
|
|
&track.ID, &track.Title, &durationMs, &isrc, &explicit, &source, &sourceID,
|
|
)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, repository.ErrNotFound
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if durationMs != nil {
|
|
track.DurationMs = *durationMs
|
|
}
|
|
track.ISRC = derefString(isrc)
|
|
if explicit != nil {
|
|
track.Explicit = *explicit
|
|
}
|
|
|
|
return &track, nil
|
|
}
|
|
|
|
func (r *TrackRepository) loadExternalIDs(ctx context.Context, track *domain.Track) error {
|
|
query := `SELECT source, source_id, url FROM track_external_ids WHERE track_id = $1`
|
|
|
|
rows, err := r.pool.Query(ctx, query, track.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var ext domain.ExternalID
|
|
var url *string
|
|
if err := rows.Scan(&ext.Source, &ext.SourceID, &url); err != nil {
|
|
return err
|
|
}
|
|
ext.URL = derefString(url)
|
|
track.ExternalIDs = append(track.ExternalIDs, ext)
|
|
}
|
|
|
|
return rows.Err()
|
|
}
|