# AcoustID Codebase Analysis ## Configuration System ### Configuration File Format **File**: `acoustid.conf` (INI format) **Template**: `acoustid.conf.dist` **Structure**: ```ini [database] name = acoustid_app user = acoustid password_file = /run/secrets/db_password host = postgres port = 5432 pool_size = 20 pool_recycle = 3600 [database_fingerprint] name = acoustid_fingerprint user = acoustid password_file = /run/secrets/db_password host = postgres port = 5432 [database_ingest] name = acoustid_ingest user = acoustid password_file = /run/secrets/db_password host = postgres port = 5432 [database_musicbrainz] name = musicbrainz_db user = acoustid_readonly password_file = /run/secrets/mb_password host = musicbrainz-db port = 5432 [redis] host = redis port = 6379 db = 0 password_file = /run/secrets/redis_password [nats] servers = nats://nats:4222 stream = acoustid_submissions consumer = acoustid_worker [fingerprint_index] host = index port = 6081 protocol = http [cluster] role = master name = acoustid-prod [cluster.rate_limiter] global_limit = 3 ip_limit = 3 [sentry] dsn = https://...@sentry.io/... environment = production traces_sample_rate = 0.1 [logging] level = INFO ``` ### Environment Variable Overrides **Pattern**: `ACOUSTID_
_` **Examples**: ```bash ACOUSTID_DATABASE_NAME=acoustid_app ACOUSTID_DATABASE_PASSWORD=secret123 ACOUSTID_REDIS_HOST=redis.example.com ACOUSTID_FINGERPRINT_INDEX_HOST=index.example.com ``` **Secret Files** (suffix `_file`): ```bash ACOUSTID_DATABASE_PASSWORD_FILE=/run/secrets/db_password ACOUSTID_REDIS_PASSWORD_FILE=/run/secrets/redis_password ``` ### Configuration Loading **File**: `acoustid/config.py` ```python import os import configparser from typing import Any, Optional class Config: """Configuration manager with environment variable overrides.""" def __init__(self, config_file: Optional[str] = None): self.config = configparser.ConfigParser() # Load from file if config_file: self.config.read(config_file) # Apply environment variable overrides self._apply_env_overrides() def _apply_env_overrides(self): """Apply ACOUSTID_* environment variables.""" prefix = 'ACOUSTID_' for key, value in os.environ.items(): if not key.startswith(prefix): continue # Parse ACOUSTID_SECTION_KEY parts = key[len(prefix):].lower().split('_', 1) if len(parts) != 2: continue section, option = parts # Handle _file suffix (read from file) if option.endswith('_file'): option = option[:-5] with open(value) as f: value = f.read().strip() # Set config value if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, option, value) def get(self, section: str, key: str, default: Any = None) -> Any: """Get configuration value.""" try: return self.config.get(section, key) except (configparser.NoSectionError, configparser.NoOptionError): return default def getint(self, section: str, key: str, default: int = 0) -> int: """Get integer configuration value.""" try: return self.config.getint(section, key) except (configparser.NoSectionError, configparser.NoOptionError): return default def getboolean(self, section: str, key: str, default: bool = False) -> bool: """Get boolean configuration value.""" try: return self.config.getboolean(section, key) except (configparser.NoSectionError, configparser.NoOptionError): return default # Global config instance config = Config(os.environ.get('ACOUSTID_CONFIG', 'acoustid.conf')) ``` ### Configuration Access Patterns **Database Configuration**: ```python from acoustid.config import config db_config = { 'name': config.get('database', 'name'), 'user': config.get('database', 'user'), 'password': config.get('database', 'password'), 'host': config.get('database', 'host', 'localhost'), 'port': config.getint('database', 'port', 5432), 'pool_size': config.getint('database', 'pool_size', 20), 'pool_recycle': config.getint('database', 'pool_recycle', 3600) } ``` **Feature Flags**: ```python # Check if feature is enabled use_async_api = config.getboolean('features', 'async_api', False) use_fpstore = config.getboolean('fingerprint_store', 'enabled', False) ``` ## Logging System ### Logging Configuration **File**: `acoustid/logging.py` ```python import logging import sys from acoustid.config import config def setup_logging(): """Configure logging for the application.""" # Root logger level root_level = config.get('logging', 'level', 'INFO') logging.basicConfig( level=getattr(logging, root_level), format='%(asctime)s [%(process)d] [%(levelname)s] %(name)s: %(message)s', stream=sys.stdout ) # Per-module log levels for module in ['acoustid', 'sqlalchemy', 'werkzeug', 'uvicorn']: level_key = f'level_{module}' level = config.get('logging', level_key) if level: logging.getLogger(module).setLevel(getattr(logging, level)) ``` ### Environment Variable Log Levels **Pattern**: `ACOUSTID_LOGGING_LEVEL_` **Examples**: ```bash ACOUSTID_LOGGING_LEVEL=INFO ACOUSTID_LOGGING_LEVEL_ACOUSTID=DEBUG ACOUSTID_LOGGING_LEVEL_SQLALCHEMY=WARNING ACOUSTID_LOGGING_LEVEL_WERKZEUG=ERROR ``` ### Logger Usage **Module-Level Logger**: ```python import logging logger = logging.getLogger(__name__) def process_submission(submission_id): logger.info("Processing submission %d", submission_id) try: # ... processing logic ... logger.debug("Submission %d processed successfully", submission_id) except Exception as e: logger.error("Failed to process submission %d: %s", submission_id, e, exc_info=True) ``` **Structured Logging** (future): ```python import structlog logger = structlog.get_logger() logger.info("submission.processed", submission_id=submission_id, track_id=track_id, duration_ms=duration) ``` ## Metrics and Monitoring ### StatsD Metrics **File**: `acoustid/metrics.py` ```python import statsd from acoustid.config import config # Initialize StatsD client statsd_client = statsd.StatsClient( host=config.get('statsd', 'host', 'localhost'), port=config.getint('statsd', 'port', 8125), prefix='acoustid' ) def record_api_request(endpoint: str, method: str, status: int, duration: float): """Record API request metrics.""" # Counter: total requests statsd_client.incr(f'api.requests_total.{endpoint}.{method}.{status}') # Histogram: request duration statsd_client.timing(f'api.request_duration_seconds.{endpoint}.{method}', duration * 1000) # Convert to ms def record_lookup_search(hit: bool): """Record lookup search result.""" statsd_client.incr('api.lookup.searches.total') if hit: statsd_client.incr('api.lookup.matches.total') def record_submission(): """Record new submission.""" statsd_client.incr('new_submissions') def record_error(error_code: int, handled: bool = True): """Record error occurrence.""" if handled: statsd_client.incr(f'api.handled_errors_total.{error_code}') else: statsd_client.incr('api.unhandled_errors_total') ``` ### Metrics Collection Points **API Request Handler**: ```python from acoustid.metrics import record_api_request import time def handle_request(request): start_time = time.time() try: response = process_request(request) duration = time.time() - start_time record_api_request( endpoint=request.endpoint, method=request.method, status=response.status_code, duration=duration ) return response except Exception as e: duration = time.time() - start_time record_api_request( endpoint=request.endpoint, method=request.method, status=500, duration=duration ) raise ``` **Lookup Handler**: ```python from acoustid.metrics import record_lookup_search def lookup_fingerprint(fingerprint): results = search_index(fingerprint) record_lookup_search(hit=len(results) > 0) return results ``` ### Prometheus Metrics (Index) **File**: `src/metrics.zig` (index) ```zig const std = @import("std"); const prometheus = @import("metrics"); pub const Metrics = struct { search_duration: prometheus.Histogram, insert_duration: prometheus.Histogram, segment_count: prometheus.Gauge, memory_segment_size: prometheus.Gauge, file_segment_size: prometheus.Gauge, merge_duration: prometheus.Histogram, pub fn init(allocator: std.mem.Allocator) !Metrics { return Metrics{ .search_duration = try prometheus.Histogram.init( allocator, "fpindex_search_duration_seconds", "Search operation duration", &[_]f64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0} ), .insert_duration = try prometheus.Histogram.init( allocator, "fpindex_insert_duration_seconds", "Insert operation duration", &[_]f64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0} ), .segment_count = try prometheus.Gauge.init( allocator, "fpindex_segment_count", "Number of segments" ), .memory_segment_size = try prometheus.Gauge.init( allocator, "fpindex_memory_segment_size_bytes", "Memory segment size in bytes" ), .file_segment_size = try prometheus.Gauge.init( allocator, "fpindex_file_segment_size_bytes", "File segment size in bytes" ), .merge_duration = try prometheus.Histogram.init( allocator, "fpindex_merge_duration_seconds", "Segment merge duration", &[_]f64{0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0} ), }; } pub fn recordSearch(self: *Metrics, duration: f64) void { self.search_duration.observe(duration); } pub fn recordInsert(self: *Metrics, duration: f64) void { self.insert_duration.observe(duration); } pub fn updateSegmentCount(self: *Metrics, count: u64) void { self.segment_count.set(@intToFloat(f64, count)); } }; ``` ## Health Check System ### Health Check Endpoints **File**: `acoustid/api/health.py` ```python from flask import Blueprint, jsonify from acoustid.db import get_db_session from acoustid.redis import get_redis_client import logging logger = logging.getLogger(__name__) health_bp = Blueprint('health', __name__) @health_bp.route('/_health') def health_check(): """Full health check with database write test.""" try: # Test database write db = get_db_session() db.execute("SELECT 1") db.execute("CREATE TEMP TABLE health_check (id INT)") db.execute("INSERT INTO health_check VALUES (1)") db.execute("DROP TABLE health_check") db.commit() # Test Redis redis = get_redis_client() redis.ping() return jsonify({'status': 'ok'}), 200 except Exception as e: logger.error("Health check failed: %s", e, exc_info=True) return jsonify({'status': 'error', 'message': str(e)}), 503 @health_bp.route('/_health_ro') def health_check_readonly(): """Read-only health check (database read test only).""" try: # Test database read db = get_db_session() db.execute("SELECT 1") # Test Redis redis = get_redis_client() redis.ping() return jsonify({'status': 'ok'}), 200 except Exception as e: logger.error("Read-only health check failed: %s", e, exc_info=True) return jsonify({'status': 'error', 'message': str(e)}), 503 @health_bp.route('/_health_docker') def health_check_docker(): """Minimal health check for Docker (no external dependencies).""" return jsonify({'status': 'ok'}), 200 ``` ### Health Check Usage **Docker Compose**: ```yaml healthcheck: test: ["CMD", "wget", "-q", "-O-", "http://localhost:5000/_health"] interval: 30s timeout: 10s retries: 3 start_period: 40s ``` **Kubernetes**: ```yaml livenessProbe: httpGet: path: /_health_docker port: 5000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /_health_ro port: 5000 initialDelaySeconds: 10 periodSeconds: 5 ``` ## Error Tracking (Sentry) ### Sentry Integration **File**: `acoustid/sentry.py` ```python import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from sentry_sdk.integrations.redis import RedisIntegration from acoustid.config import config def init_sentry(): """Initialize Sentry error tracking.""" dsn = config.get('sentry', 'dsn') if not dsn: return sentry_sdk.init( dsn=dsn, environment=config.get('sentry', 'environment', 'production'), traces_sample_rate=config.getfloat('sentry', 'traces_sample_rate', 0.1), integrations=[ FlaskIntegration(), SqlalchemyIntegration(), RedisIntegration(), ], before_send=before_send_filter, ) def before_send_filter(event, hint): """Filter events before sending to Sentry.""" # Ignore certain exceptions if 'exc_info' in hint: exc_type, exc_value, tb = hint['exc_info'] if isinstance(exc_value, (KeyboardInterrupt, SystemExit)): return None # Add custom context event.setdefault('tags', {}) event['tags']['cluster'] = config.get('cluster', 'name', 'unknown') return event ``` ### Error Context **Adding Context**: ```python from sentry_sdk import set_context, set_tag, set_user def handle_submission(submission_id, user_id): # Set user context set_user({'id': user_id}) # Set custom tags set_tag('submission_id', submission_id) # Set custom context set_context('submission', { 'id': submission_id, 'user_id': user_id, 'timestamp': time.time() }) try: process_submission(submission_id) except Exception as e: # Exception automatically sent to Sentry with context raise ``` ## Authentication System ### API Key Types **File**: `acoustid/auth.py` ```python from acoustid.db import get_db_session from acoustid.tables import Application, Account from cachetools import TTLCache import logging logger = logging.getLogger(__name__) # API key cache (1000 keys, 60 second TTL) api_key_cache = TTLCache(maxsize=1000, ttl=60) class APIKeyType: APPLICATION = 'application' USER = 'user' DEMO = 'demo' DEMO_API_KEY = '8XaBELgH' def validate_application_key(api_key: str) -> Optional[Application]: """Validate application API key. Returns: Application object if valid, None otherwise """ # Check demo key if api_key == DEMO_API_KEY: return Application( id=0, name='Demo Application', apikey=DEMO_API_KEY, active=True, rate_limit=3 ) # Check cache if api_key in api_key_cache: return api_key_cache[api_key] # Query database db = get_db_session() app = db.query(Application).filter_by(apikey=api_key, active=True).first() if app: api_key_cache[api_key] = app return app def validate_user_key(api_key: str) -> Optional[Account]: """Validate user API key. Returns: Account object if valid, None otherwise """ # Check cache cache_key = f'user:{api_key}' if cache_key in api_key_cache: return api_key_cache[cache_key] # Query database db = get_db_session() account = db.query(Account).filter_by(apikey=api_key).first() if account: api_key_cache[cache_key] = account return account def require_api_key(key_type: str = APIKeyType.APPLICATION): """Decorator to require API key authentication. Args: key_type: Type of API key required (application or user) """ def decorator(func): def wrapper(*args, **kwargs): from flask import request, jsonify # Get API key from request api_key = request.values.get('client' if key_type == APIKeyType.APPLICATION else 'user') if not api_key: return jsonify({ 'status': 'error', 'error': { 'code': 1, 'message': f'Missing {key_type} API key' } }), 401 # Validate API key if key_type == APIKeyType.APPLICATION: entity = validate_application_key(api_key) else: entity = validate_user_key(api_key) if not entity: return jsonify({ 'status': 'error', 'error': { 'code': 1, 'message': f'Invalid {key_type} API key' } }), 401 # Store in request context request.api_application = entity if key_type == APIKeyType.APPLICATION else None request.api_account = entity if key_type == APIKeyType.USER else None return func(*args, **kwargs) wrapper.__name__ = func.__name__ return wrapper return decorator ``` ### Authentication Usage **Lookup Endpoint** (application key only): ```python from acoustid.auth import require_api_key, APIKeyType @app.route('/v2/lookup', methods=['GET', 'POST']) @require_api_key(APIKeyType.APPLICATION) def lookup(): # request.api_application is available application = request.api_application # ... lookup logic ... ``` **Submit Endpoint** (application + user key): ```python @app.route('/v2/submit', methods=['POST']) @require_api_key(APIKeyType.APPLICATION) def submit(): from flask import request # Validate user key user_key = request.values.get('user') if not user_key: return jsonify({'status': 'error', 'error': {'code': 1, 'message': 'Missing user API key'}}), 401 account = validate_user_key(user_key) if not account: return jsonify({'status': 'error', 'error': {'code': 1, 'message': 'Invalid user API key'}}), 401 # Both application and user are authenticated application = request.api_application # ... submit logic ... ``` ## Rate Limiting ### Rate Limiter Implementation **File**: `acoustid/api/ratelimit.py` ```python from acoustid.redis import get_redis_client from acoustid.config import config from flask import request import time import logging logger = logging.getLogger(__name__) class RateLimiter: """Redis-based sliding window rate limiter.""" def __init__(self): self.redis = get_redis_client() self.window_duration = 20 # seconds self.window_steps = 4 self.bucket_duration = self.window_duration // self.window_steps self.ttl = self.window_duration + 5 # cleanup buffer def check_limit(self, scope: str, identifier: str, limit: int) -> tuple[bool, dict]: """Check if request is within rate limit. Args: scope: Rate limit scope (global, app, ip) identifier: Unique identifier for scope limit: Maximum requests per window Returns: Tuple of (allowed, info_dict) """ current_time = int(time.time()) # Calculate window buckets buckets = [] for i in range(self.window_steps): bucket_time = current_time - (i * self.bucket_duration) bucket_time = (bucket_time // self.bucket_duration) * self.bucket_duration buckets.append(bucket_time) # Increment current bucket current_bucket_key = f"rl:bucket:{scope}:{identifier}:{buckets[0]}" count = self.redis.incr(current_bucket_key) self.redis.expire(current_bucket_key, self.ttl) # Sum all buckets in window total = 0 for bucket_time in buckets: bucket_key = f"rl:bucket:{scope}:{identifier}:{bucket_time}" bucket_count = self.redis.get(bucket_key) if bucket_count: total += int(bucket_count) # Check limit allowed = total <= limit info = { 'limit': limit, 'remaining': max(0, limit - total), 'reset': buckets[0] + self.window_duration } if not allowed: logger.warning("Rate limit exceeded: scope=%s, identifier=%s, total=%d, limit=%d", scope, identifier, total, limit) return allowed, info rate_limiter = RateLimiter() def check_rate_limit(application=None): """Check rate limits for current request. Checks three tiers: 1. Global limit (all requests) 2. Application limit (per API key) 3. IP limit (per client IP) Returns: Tuple of (allowed, info_dict) """ # Global limit global_limit = config.getint('cluster.rate_limiter', 'global_limit', 3) allowed, info = rate_limiter.check_limit('global', 'all', global_limit) if not allowed: return False, info # Application limit if application: app_limit = application.rate_limit or config.getint('cluster.rate_limiter', 'app_limit', 10) allowed, info = rate_limiter.check_limit('app', application.apikey, app_limit) if not allowed: return False, info # IP limit ip_limit = config.getint('cluster.rate_limiter', 'ip_limit', 3) client_ip = request.remote_addr allowed, info = rate_limiter.check_limit('ip', client_ip, ip_limit) return allowed, info ``` ### Rate Limit Middleware **File**: `acoustid/api/middleware.py` ```python from acoustid.api.ratelimit import check_rate_limit from flask import request, jsonify def rate_limit_middleware(): """Flask before_request handler for rate limiting.""" # Skip health checks if request.path.startswith('/_health'): return None # Check rate limits application = getattr(request, 'api_application', None) allowed, info = check_rate_limit(application) # Add rate limit headers response_headers = { 'X-RateLimit-Limit': str(info['limit']), 'X-RateLimit-Remaining': str(info['remaining']), 'X-RateLimit-Reset': str(info['reset']) } if not allowed: response = jsonify({ 'status': 'error', 'error': { 'code': 5, 'message': 'Rate limit exceeded' } }) response.status_code = 429 for key, value in response_headers.items(): response.headers[key] = value return response # Store headers for later request.rate_limit_headers = response_headers return None ``` ## Testing Framework ### Test Configuration **File**: `tests/conftest.py` ```python import pytest from acoustid.db import create_engine, create_session, Base from acoustid.config import Config import tempfile import os @pytest.fixture(scope='session') def test_config(): """Create test configuration.""" config = Config() config.config.add_section('database') config.config.set('database', 'name', 'acoustid_test') config.config.set('database', 'user', 'acoustid') config.config.set('database', 'password', 'acoustid') config.config.set('database', 'host', 'localhost') return config @pytest.fixture def with_database(test_config): """Provide test database session.""" engine = create_engine(test_config) # Create all tables Base.metadata.create_all(engine) # Create session session = create_session(engine) yield session # Rollback and cleanup session.rollback() session.close() Base.metadata.drop_all(engine) @pytest.fixture def with_script(test_config): """Provide script context with database.""" from acoustid.script import Script script = Script('test', config=test_config) script.setup() yield script script.teardown() @pytest.fixture def fingerprint_fixture(): """Predefined test fingerprint.""" return [ 123456789, 987654321, 456789123, 321987654, 789123456, 654321987, 147258369, 963852741 ] * 30 # ~240 hashes for 3-minute track ``` ### Test Decorators **File**: `tests/helpers.py` ```python import functools from tests.conftest import with_database, with_script def requires_database(func): """Decorator to inject database session.""" @functools.wraps(func) def wrapper(*args, **kwargs): # Use pytest fixture return func(*args, **kwargs) return pytest.mark.usefixtures('with_database')(wrapper) def requires_script(func): """Decorator to inject script context.""" @functools.wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return pytest.mark.usefixtures('with_script')(wrapper) ``` ### Example Tests **File**: `tests/test_api_lookup.py` ```python import pytest from acoustid.api.v2.lookup import LookupHandler from tests.conftest import with_database, fingerprint_fixture class TestLookupAPI: """Test lookup API endpoint.""" def test_lookup_with_fingerprint(self, with_database, fingerprint_fixture): """Test lookup with valid fingerprint.""" # Setup test data track = create_test_track(with_database) create_test_fingerprint(with_database, track.id, fingerprint_fixture) # Perform lookup handler = LookupHandler(with_database) results = handler.lookup( fingerprint=fingerprint_fixture, duration=240 ) # Verify results assert len(results) > 0 assert results[0]['id'] == str(track.gid) assert results[0]['score'] > 0.9 def test_lookup_with_track_id(self, with_database): """Test lookup with track ID.""" # Setup test data track = create_test_track(with_database) # Perform lookup handler = LookupHandler(with_database) results = handler.lookup(trackid=str(track.gid)) # Verify results assert len(results) == 1 assert results[0]['id'] == str(track.gid) def test_lookup_no_match(self, with_database, fingerprint_fixture): """Test lookup with no matching fingerprint.""" handler = LookupHandler(with_database) results = handler.lookup( fingerprint=fingerprint_fixture, duration=240 ) assert len(results) == 0 @pytest.mark.parametrize('duration', [0, -1, 10000]) def test_lookup_invalid_duration(self, with_database, fingerprint_fixture, duration): """Test lookup with invalid duration.""" handler = LookupHandler(with_database) with pytest.raises(ValueError): handler.lookup(fingerprint=fingerprint_fixture, duration=duration) ``` **File**: `tests/test_fingerprint.py` ```python import pytest from acoustid.fingerprint import ( decode_fingerprint, encode_fingerprint, extract_query, compare_fingerprints ) from tests.conftest import fingerprint_fixture class TestFingerprint: """Test fingerprint utilities.""" def test_encode_decode(self, fingerprint_fixture): """Test fingerprint encoding and decoding.""" encoded = encode_fingerprint(fingerprint_fixture) decoded = decode_fingerprint(encoded) assert decoded == fingerprint_fixture def test_extract_query(self, fingerprint_fixture): """Test query extraction.""" query = extract_query(fingerprint_fixture, max_terms=50) assert len(query) <= 50 assert all(term in fingerprint_fixture for term in query) def test_compare_identical(self, fingerprint_fixture): """Test comparison of identical fingerprints.""" score = compare_fingerprints(fingerprint_fixture, fingerprint_fixture) assert score == 1.0 def test_compare_different(self, fingerprint_fixture): """Test comparison of different fingerprints.""" other_fp = [x + 1000 for x in fingerprint_fixture] score = compare_fingerprints(fingerprint_fixture, other_fp) assert score < 0.1 ``` ## Code Organization Patterns ### Service Layer Pattern **File**: `acoustid/data/fingerprint.py` ```python from acoustid.db import get_db_session from acoustid.tables import Fingerprint, Track from acoustid.fpstore import FingerprintIndexClient import logging logger = logging.getLogger(__name__) class FingerprintService: """Service for fingerprint operations.""" def __init__(self, db_session=None, index_client=None): self.db = db_session or get_db_session() self.index = index_client or FingerprintIndexClient() def search(self, fingerprint, duration, limit=10): """Search for matching fingerprints.""" # Extract query terms query_terms = extract_query(fingerprint) # Search index candidates = self.index.search(query_terms, limit=limit * 2) # Fetch from database fp_ids = [c[0] for c in candidates] fingerprints = self.db.query(Fingerprint).filter( Fingerprint.id.in_(fp_ids), Fingerprint.length.between(duration - 5, duration + 5) ).all() # Score and sort results = [] for fp in fingerprints: score = compare_fingerprints(fingerprint, fp.fingerprint) results.append((fp, score)) results.sort(key=lambda x: x[1], reverse=True) return results[:limit] def insert(self, track_id, fingerprint, duration, **metadata): """Insert new fingerprint.""" # Create fingerprint record fp = Fingerprint( track_id=track_id, fingerprint=fingerprint, length=duration, **metadata ) self.db.add(fp) self.db.flush() # Update index query_terms = extract_query(fingerprint) self.index.insert(fp.id, query_terms) self.db.commit() logger.info("Inserted fingerprint %d for track %d", fp.id, track_id) return fp ``` ### Repository Pattern **File**: `acoustid/data/track.py` ```python from acoustid.tables import Track, TrackMBID from sqlalchemy.orm import joinedload class TrackRepository: """Repository for track data access.""" def __init__(self, db_session): self.db = db_session def get_by_id(self, track_id): """Get track by ID.""" return self.db.query(Track).filter_by(id=track_id).first() def get_by_gid(self, gid): """Get track by public GID.""" return self.db.query(Track).filter_by(gid=gid).first() def get_with_mbids(self, track_id): """Get track with all linked MBIDs.""" return self.db.query(Track).options( joinedload(Track.mbids) ).filter_by(id=track_id).first() def create(self): """Create new track.""" import uuid track = Track(gid=uuid.uuid4()) self.db.add(track) self.db.flush() return track def link_mbid(self, track_id, mbid): """Link track to MusicBrainz recording.""" link = TrackMBID(track_id=track_id, mbid=mbid) self.db.add(link) self.db.flush() return link ```