Files
metadata-agregator/docs/research/acoustid/analysis/INTEGRATIONS.md
T
Alexander a1f6701bac feat: initial implementation of metadata aggregator
- gRPC service with MusicBrainz provider
- PostgreSQL schema with migrations
- Service layer with database-first caching
- Repository pattern for data access
- YAML configuration support
- Research documentation for 17 music metadata projects
2026-04-28 16:28:53 +02:00

769 lines
21 KiB
Markdown

# AcoustID Integrations
## Overview
AcoustID integrates with multiple external services and libraries to provide comprehensive audio fingerprinting and metadata enrichment. The system's architecture separates concerns between fingerprint generation (Chromaprint), fingerprint indexing (acoustid-index), metadata enrichment (MusicBrainz), and supporting infrastructure (Redis, NATS).
## MusicBrainz Integration
### Connection Method
**Type**: Direct PostgreSQL database connection (NOT REST API)
**Database**: `musicbrainz` (read-only replica)
**Access**: Separate database connection pool
**Configuration** (`acoustid.conf`):
```ini
[musicbrainz]
host = musicbrainz-db.example.com
port = 5432
name = musicbrainz_db
user = acoustid_readonly
password_file = /run/secrets/mb_password
```
**File**: `acoustid/data/musicbrainz.py`
### Queried Tables
The integration queries the following MusicBrainz tables directly:
| Table | Purpose | Columns Used |
|-------|---------|--------------|
| `artist_credit` | Artist information | `id`, `name`, `artist_count` |
| `artist_credit_name` | Artist credit details | `artist_credit`, `position`, `artist`, `name`, `join_phrase` |
| `artist` | Artist entities | `id`, `gid`, `name`, `sort_name` |
| `recording` | Recording metadata | `id`, `gid`, `name`, `length`, `artist_credit`, `comment` |
| `release` | Release information | `id`, `gid`, `name`, `artist_credit`, `release_group`, `status`, `packaging`, `barcode` |
| `release_group` | Release group data | `id`, `gid`, `name`, `artist_credit`, `type`, `comment` |
| `track` | Track listings | `id`, `gid`, `recording`, `position`, `number`, `name`, `length`, `artist_credit` |
| `medium` | Medium information | `id`, `release`, `position`, `format`, `track_count` |
| `release_country` | Release countries | `release`, `country`, `date_year`, `date_month`, `date_day` |
### Query Patterns
**Fetch Recording by MBID**:
```python
def get_recording_by_mbid(db, mbid):
"""Fetch recording with artist credits and releases."""
query = """
SELECT
r.gid AS recording_mbid,
r.name AS recording_title,
r.length AS duration,
ac.name AS artist_credit_name,
array_agg(DISTINCT rel.gid) AS release_mbids
FROM recording r
JOIN artist_credit ac ON r.artist_credit = ac.id
LEFT JOIN track t ON t.recording = r.id
LEFT JOIN medium m ON t.medium = m.id
LEFT JOIN release rel ON m.release = rel.id
WHERE r.gid = :mbid
GROUP BY r.gid, r.name, r.length, ac.name
"""
return db.execute(query, {'mbid': mbid}).fetchone()
```
**Fetch Release with Tracks**:
```python
def get_release_with_tracks(db, release_mbid):
"""Fetch complete release with all tracks."""
query = """
SELECT
rel.gid AS release_mbid,
rel.name AS release_title,
rel.barcode,
rc.country,
rc.date_year,
rc.date_month,
rc.date_day,
m.position AS medium_position,
m.format AS medium_format,
t.position AS track_position,
t.number AS track_number,
t.name AS track_title,
rec.gid AS recording_mbid,
ac.name AS artist_credit
FROM release rel
LEFT JOIN release_country rc ON rel.id = rc.release
LEFT JOIN medium m ON rel.id = m.release
LEFT JOIN track t ON m.id = t.medium
LEFT JOIN recording rec ON t.recording = rec.id
LEFT JOIN artist_credit ac ON rec.artist_credit = ac.id
WHERE rel.gid = :mbid
ORDER BY m.position, t.position
"""
return db.execute(query, {'mbid': release_mbid}).fetchall()
```
**Fetch Artist Credits**:
```python
def get_artist_credit(db, artist_credit_id):
"""Fetch artist credit with all artists."""
query = """
SELECT
acn.position,
a.gid AS artist_mbid,
a.name AS artist_name,
a.sort_name AS artist_sort_name,
acn.name AS credited_name,
acn.join_phrase
FROM artist_credit_name acn
JOIN artist a ON acn.artist = a.id
WHERE acn.artist_credit = :ac_id
ORDER BY acn.position
"""
return db.execute(query, {'ac_id': artist_credit_id}).fetchall()
```
### MBID Redirect Resolution
MusicBrainz uses MBID redirects when entities are merged. AcoustID resolves these automatically.
**File**: `acoustid/data/musicbrainz.py`
```python
def resolve_recording_mbid(db, mbid):
"""Resolve recording MBID redirects."""
query = """
SELECT new_id
FROM recording_gid_redirect
WHERE gid = :mbid
"""
result = db.execute(query, {'mbid': mbid}).fetchone()
if result:
# Recursively resolve redirects
return resolve_recording_mbid(db, result['new_id'])
return mbid
```
**Redirect Tables Used**:
- `recording_gid_redirect`
- `release_gid_redirect`
- `release_group_gid_redirect`
- `artist_gid_redirect`
### Metadata Enrichment
When a lookup request includes metadata flags, AcoustID fetches additional data from MusicBrainz:
**Metadata Levels**:
| Flag | Data Fetched | Query Complexity |
|------|--------------|------------------|
| `recordingids` | Recording MBIDs only | Low (join only) |
| `recordings` | Full recording metadata | Medium (artist credits) |
| `releaseids` | Release MBIDs only | Low (join only) |
| `releases` | Full release metadata | High (tracks, mediums, countries) |
| `releasegroupids` | Release group MBIDs only | Low (join only) |
| `releasegroups` | Full release group metadata | Medium (artist credits) |
**Example Enriched Response**:
```json
{
"recordings": [
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"title": "Example Song",
"duration": 240000,
"artists": [
{
"id": "12345678-90ab-cdef-1234-567890abcdef",
"name": "Example Artist",
"joinphrase": " & "
}
],
"releases": [
{
"id": "abcdef12-3456-7890-abcd-ef1234567890",
"title": "Example Album",
"country": "US",
"date": {
"year": 2020,
"month": 5,
"day": 15
},
"track_count": 12,
"medium_count": 1,
"releasegroup": {
"id": "fedcba98-7654-3210-fedc-ba9876543210",
"type": "Album"
}
}
]
}
]
}
```
### Performance Considerations
**Connection Pooling**:
- Separate pool for MusicBrainz database
- Pool size: 10 connections (configurable)
- Pool recycle: 3600 seconds
**Query Optimization**:
- Indexes on `gid` columns (MusicBrainz maintains these)
- Batch queries when possible
- Limit joins to requested metadata only
**Caching**:
- Unknown MBID cache (Redis, 1 hour TTL)
- Avoids repeated queries for non-existent MBIDs
**Fallback**:
- If MusicBrainz database unavailable, return AcoustID data only
- Graceful degradation (no metadata enrichment)
## Chromaprint Integration
### Library Information
**Name**: Chromaprint
**Version**: Built from source (commit `41a3e8fb`)
**License**: MIT
**Language**: C++
**Wrapper**: acoustid-ext (C extension for Python)
**Repository**: https://github.com/acoustid/chromaprint
### Build Process
**Dockerfile** (`docker/Dockerfile`):
```dockerfile
# Stage 1: Build Chromaprint
FROM ubuntu:24.04 AS chromaprint-build
RUN apt-get update && apt-get install -y \
git cmake build-essential libfftw3-dev
WORKDIR /build
RUN git clone https://github.com/acoustid/chromaprint.git && \
cd chromaprint && \
git checkout 41a3e8fb && \
cmake -DCMAKE_BUILD_TYPE=Release . && \
make && \
make install
# Stage 2: Build acoustid-ext
FROM ubuntu:24.04 AS builder
COPY --from=chromaprint-build /usr/local/lib/libchromaprint.so* /usr/local/lib/
COPY --from=chromaprint-build /usr/local/include/chromaprint.h /usr/local/include/
RUN pip install acoustid-ext
```
### Python Extension (acoustid-ext)
**Package**: `acoustid-ext`
**File**: `acoustid/fingerprint.py`
**Functions Exposed**:
```python
from acoustid_ext import (
decode_fingerprint,
encode_fingerprint,
compress_fingerprint,
decompress_fingerprint,
fingerprint_compare
)
```
**Function Signatures**:
| Function | Input | Output | Purpose |
|----------|-------|--------|---------|
| `decode_fingerprint(data)` | bytes/str | list[int] | Decode base64/compressed fingerprint |
| `encode_fingerprint(hashes)` | list[int] | str | Encode fingerprint to base64 |
| `compress_fingerprint(hashes)` | list[int] | bytes | Compress fingerprint (zstd) |
| `decompress_fingerprint(data)` | bytes | list[int] | Decompress fingerprint |
| `fingerprint_compare(fp1, fp2)` | list[int], list[int] | float | Compare similarity (0.0-1.0) |
### Fingerprint Format
**Raw Format** (Chromaprint output):
- Array of 32-bit unsigned integers
- Each integer represents a hash of audio features
- Typical length: 100-300 hashes (for 3-5 minute track)
**Compressed Format** (for transmission):
- Base64-encoded compressed data
- Compression: zstd or custom Chromaprint compression
- Typical size: 200-500 bytes
**Example**:
```python
# Raw fingerprint
fingerprint = [123456789, 987654321, 456789123, ...]
# Encoded (base64)
encoded = "AQADtNGiJEqUHUemR..."
# Compressed (bytes)
compressed = b'\x28\xb5\x2f\xfd...'
```
### Query Extraction
**File**: `acoustid/fingerprint.py`
```python
def extract_query(fingerprint, max_terms=100):
"""Extract query terms from fingerprint for index search.
Args:
fingerprint: List of 32-bit hash integers
max_terms: Maximum number of terms to extract
Returns:
List of term IDs (subset of fingerprint hashes)
"""
# Select most discriminative terms
# (implementation uses simhash or random sampling)
terms = select_discriminative_terms(fingerprint, max_terms)
return terms
```
**Query Strategy**:
- Extract subset of hashes (typically 50-100 terms)
- Prioritize discriminative hashes (high entropy)
- Balance between precision and recall
### Fingerprint Comparison
**PostgreSQL Function** (custom extension):
```sql
CREATE FUNCTION acoustid_compare(fp1 INTEGER[], fp2 INTEGER[])
RETURNS FLOAT AS $$
-- Calculate Jaccard similarity
SELECT COUNT(*)::FLOAT /
(array_length(fp1, 1) + array_length(fp2, 1) - COUNT(*))
FROM unnest(fp1) AS h1
JOIN unnest(fp2) AS h2 ON h1 = h2
$$ LANGUAGE SQL IMMUTABLE;
```
**Python Implementation**:
```python
def compare_fingerprints(fp1, fp2):
"""Calculate similarity between two fingerprints.
Returns:
Float between 0.0 (no match) and 1.0 (identical)
"""
set1 = set(fp1)
set2 = set(fp2)
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
```
## AcoustID Index Integration
### Client Implementations
AcoustID server has two index client implementations:
#### Legacy TCP Client (indexclient.py)
**Status**: Deprecated, being phased out
**Protocol**: Custom binary over TCP
**Port**: 6080 (default)
**File**: `acoustid/indexclient.py`
```python
class IndexClientPool:
"""Connection pool for legacy TCP index."""
def __init__(self, host, port, pool_size=10):
self.host = host
self.port = port
self.pool = Queue(maxsize=pool_size)
def search(self, fingerprint, limit=10):
"""Search index for similar fingerprints."""
client = self.pool.get()
try:
# Send search command
client.send_command(CMD_SEARCH, {
'fingerprint': fingerprint,
'limit': limit
})
# Receive results
results = client.receive_response()
return results
finally:
self.pool.put(client)
```
**Message Format**:
```
┌────────────┬─────────┬──────────────────┐
│ Length (4B)│ Cmd (1B)│ Payload (msgpack)│
└────────────┴─────────┴──────────────────┘
```
#### Modern HTTP Client (fpstore.py)
**Status**: Current, recommended
**Protocol**: HTTP/1.1 with MessagePack
**Port**: 6081 (default)
**File**: `acoustid/fpstore.py`
```python
class FingerprintIndexClient:
"""Async HTTP client for fingerprint index."""
def __init__(self, base_url, index_name='fingerprints'):
self.base_url = base_url
self.index_name = index_name
self.session = aiohttp.ClientSession()
async def search(self, query_terms, limit=10, min_score=0.5):
"""Search index for matching fingerprints.
Args:
query_terms: List of hash integers
limit: Maximum results to return
min_score: Minimum similarity score
Returns:
List of (fingerprint_id, score) tuples
"""
url = f"{self.base_url}/{self.index_name}/_search"
payload = msgspec.msgpack.encode({
'query': query_terms,
'limit': limit,
'min_score': min_score
})
async with self.session.post(url, data=payload) as resp:
data = await resp.read()
result = msgspec.msgpack.decode(data)
return [(r['id'], r['score']) for r in result['results']]
async def insert(self, fingerprint_id, terms):
"""Insert or update fingerprint in index."""
url = f"{self.base_url}/{self.index_name}/{fingerprint_id}"
payload = msgspec.msgpack.encode({'terms': terms})
async with self.session.put(url, data=payload) as resp:
return resp.status == 200
async def delete(self, fingerprint_id):
"""Delete fingerprint from index."""
url = f"{self.base_url}/{self.index_name}/{fingerprint_id}"
async with self.session.delete(url) as resp:
return resp.status == 200
```
### Index Operations
**Search Flow**:
1. Extract query terms from fingerprint (50-100 hashes)
2. Encode query as MessagePack
3. POST to `/:index/_search`
4. Decode MessagePack response
5. Return list of (fingerprint_id, score) tuples
**Insert Flow**:
1. Extract all terms from fingerprint
2. Encode as MessagePack
3. PUT to `/:index/:fingerprint_id`
4. Index adds to MemorySegment
5. Appends to Oplog for durability
**Batch Update Flow**:
1. Collect multiple fingerprint updates
2. Encode batch as MessagePack
3. POST to `/:index/_update`
4. Index processes all updates atomically
### Error Handling
**Retry Strategy**:
```python
async def search_with_retry(client, query, max_retries=3):
"""Search with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await client.search(query)
except aiohttp.ClientError as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
```
**Circuit Breaker**:
```python
class CircuitBreaker:
"""Prevent cascading failures to index."""
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
async def call(self, func, *args, **kwargs):
if self.state == 'open':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'half-open'
else:
raise CircuitBreakerOpen()
try:
result = await func(*args, **kwargs)
if self.state == 'half-open':
self.state = 'closed'
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
raise
```
## Fingerprint Store (fpstore)
### Optional Service
**Purpose**: Separate storage for raw fingerprint data
**Status**: Optional (can use PostgreSQL instead)
**Protocol**: HTTP with MessagePack
**Configuration**:
```ini
[fingerprint_store]
enabled = true
base_url = http://fpstore:8080
```
**Operations**:
```python
class FingerprintStore:
"""Client for fingerprint storage service."""
async def store(self, fingerprint_id, fingerprint_data):
"""Store raw fingerprint data."""
url = f"{self.base_url}/fingerprints/{fingerprint_id}"
payload = msgspec.msgpack.encode({
'data': fingerprint_data
})
async with self.session.put(url, data=payload) as resp:
return resp.status == 200
async def retrieve(self, fingerprint_id):
"""Retrieve raw fingerprint data."""
url = f"{self.base_url}/fingerprints/{fingerprint_id}"
async with self.session.get(url) as resp:
data = await resp.read()
result = msgspec.msgpack.decode(data)
return result['data']
```
## NATS Integration
### Message Queue
**Purpose**: Async submission processing
**Technology**: NATS with JetStream (persistent queue)
**Library**: `nats-py`
**Configuration**:
```ini
[nats]
servers = nats://nats:4222
stream = acoustid_submissions
consumer = acoustid_worker
```
**File**: `acoustid/worker.py`
### Publisher (API Server)
```python
import nats
from nats.js import JetStreamContext
async def publish_submission(submission_id):
"""Publish submission to NATS queue."""
nc = await nats.connect(servers=["nats://nats:4222"])
js: JetStreamContext = nc.jetstream()
# Ensure stream exists
await js.add_stream(
name="acoustid_submissions",
subjects=["submissions.*"],
retention="workqueue"
)
# Publish message
await js.publish(
subject="submissions.new",
payload=msgspec.json.encode({
'submission_id': submission_id,
'timestamp': time.time()
})
)
await nc.close()
```
### Consumer (Worker)
```python
async def consume_submissions():
"""Consume submissions from NATS queue."""
nc = await nats.connect(servers=["nats://nats:4222"])
js: JetStreamContext = nc.jetstream()
# Create consumer
consumer = await js.pull_subscribe(
subject="submissions.*",
durable="acoustid_worker",
config=nats.js.api.ConsumerConfig(
ack_policy="explicit",
max_deliver=3,
ack_wait=300 # 5 minutes
)
)
while True:
# Fetch batch of messages
messages = await consumer.fetch(batch=10, timeout=5)
for msg in messages:
try:
data = msgspec.json.decode(msg.data)
await process_submission(data['submission_id'])
await msg.ack()
except Exception as e:
logger.error(f"Failed to process submission: {e}")
await msg.nak(delay=60) # Retry after 1 minute
```
### JetStream Configuration
**Stream Settings**:
- Retention: WorkQueue (messages deleted after ack)
- Max age: 7 days (unprocessed messages)
- Max messages: 1,000,000
- Storage: File (persistent)
**Consumer Settings**:
- Ack policy: Explicit (manual acknowledgment)
- Max deliver: 3 (retry up to 3 times)
- Ack wait: 300 seconds (5 minutes timeout)
- Max ack pending: 100 (max unacked messages)
## Redis Integration
### Use Cases
1. **Rate Limiting**: Sliding window counters
2. **Task Queue** (legacy): RPUSH/LPOP queue
3. **Caching**: API key validation, MBID existence
4. **State Management**: Backfill progress, worker state
**Configuration**:
```ini
[redis]
host = redis
port = 6379
db = 0
password_file = /run/secrets/redis_password
```
**File**: `acoustid/redis.py`
### Connection Pool
```python
import redis
redis_pool = redis.ConnectionPool(
host='redis',
port=6379,
db=0,
max_connections=50,
socket_timeout=5,
socket_connect_timeout=5
)
redis_client = redis.Redis(connection_pool=redis_pool)
```
### Rate Limiting Implementation
See DATA.md for detailed rate limiting data structures.
### Caching Patterns
**API Key Cache**:
```python
from cachetools import TTLCache
api_key_cache = TTLCache(maxsize=1000, ttl=60)
def get_application_by_key(api_key):
if api_key in api_key_cache:
return api_key_cache[api_key]
app = db.query(Application).filter_by(apikey=api_key).first()
if app:
api_key_cache[api_key] = app
return app
```
**Unknown MBID Cache**:
```python
def is_mbid_known(mbid):
"""Check if MBID exists in MusicBrainz."""
cache_key = f"unknown_mbid:{mbid}"
# Check cache
if redis_client.exists(cache_key):
return False
# Query MusicBrainz
exists = mb_db.query(Recording).filter_by(gid=mbid).count() > 0
# Cache negative result
if not exists:
redis_client.setex(cache_key, 3600, '1')
return exists
```
## Integration Summary
| Service | Protocol | Purpose | Criticality |
|---------|----------|---------|-------------|
| MusicBrainz | PostgreSQL | Metadata enrichment | High |
| Chromaprint | C library | Fingerprint generation | Critical |
| Index (HTTP) | HTTP/MessagePack | Fingerprint search | Critical |
| Index (TCP) | TCP binary | Legacy fingerprint search | Low (deprecated) |
| Fingerprint Store | HTTP/MessagePack | Raw fingerprint storage | Low (optional) |
| NATS | NATS protocol | Async job queue | High |
| Redis | Redis protocol | Caching, rate limiting | High |