81df4790bf
Tests use real FUSE operations against mounted beetfs filesystem: - test_smoke: mount/unmount lifecycle - test_nested_bug: detects critical indentation bug (13 failures) - test_readdir: directory listing - test_read: metadata overlay verification - test_stat: file/directory attributes - test_write: metadata modification - test_error_handling: ENOENT, EOPNOTSUPP Also includes: - conftest.py with BeetFSTestCase base class and synthetic FLAC generator - e2e-test-plan.md with Oracle-reviewed test strategy - flake.nix updated with ffmpeg/flac for test fixtures Run: cd tests && nix develop ../ --command python -m unittest discover
297 lines
9.5 KiB
Python
297 lines
9.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Tests for file reading with metadata overlay.
|
|
|
|
This is the CORE FEATURE of beetfs:
|
|
- File headers contain metadata from beets database
|
|
- Audio data passes through unchanged from original file
|
|
"""
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import os
|
|
import unittest
|
|
from io import BytesIO
|
|
|
|
from conftest import BeetFSTestCase, is_ffmpeg_available, is_flac_available
|
|
|
|
|
|
class TestReadBasic(BeetFSTestCase):
|
|
|
|
def test_open_file(self):
|
|
file_path, item_id = self.add_test_track(
|
|
filename='test.flac',
|
|
artist='Test Artist',
|
|
title='Test Track'
|
|
)
|
|
|
|
self.mount_beetfs()
|
|
|
|
entries = self._find_track_path('Test Artist', 'Test Track')
|
|
if not entries:
|
|
self.skipTest("Could not find track in mounted filesystem")
|
|
|
|
track_path = entries[0]
|
|
|
|
with open(track_path, 'rb') as f:
|
|
data = f.read(1024)
|
|
|
|
self.assertIsInstance(data, bytes)
|
|
self.assertTrue(len(data) > 0)
|
|
|
|
def test_read_returns_bytes(self):
|
|
self.add_test_track(filename='test.flac')
|
|
self.mount_beetfs()
|
|
|
|
entries = self._find_any_track()
|
|
if not entries:
|
|
self.skipTest("No tracks found")
|
|
|
|
with open(entries[0], 'rb') as f:
|
|
data = f.read(100)
|
|
|
|
self.assertIsInstance(data, bytes)
|
|
|
|
def test_read_flac_magic_bytes(self):
|
|
self.add_test_track(filename='test.flac')
|
|
self.mount_beetfs()
|
|
|
|
entries = self._find_any_track()
|
|
if not entries:
|
|
self.skipTest("No tracks found")
|
|
|
|
with open(entries[0], 'rb') as f:
|
|
magic = f.read(4)
|
|
|
|
self.assertEqual(magic, b'fLaC', "FLAC files should start with 'fLaC' magic bytes")
|
|
|
|
def _find_track_path(self, artist, title):
|
|
"""Find track path in mounted filesystem."""
|
|
results = []
|
|
for root, dirs, files in os.walk(self.mount_dir):
|
|
for f in files:
|
|
if title.lower() in f.lower():
|
|
results.append(os.path.join(root, f))
|
|
return results
|
|
|
|
def _find_any_track(self):
|
|
"""Find any track in mounted filesystem."""
|
|
results = []
|
|
for root, dirs, files in os.walk(self.mount_dir):
|
|
for f in files:
|
|
if f.endswith('.flac'):
|
|
results.append(os.path.join(root, f))
|
|
return results
|
|
|
|
|
|
@unittest.skipUnless(is_ffmpeg_available() and is_flac_available(),
|
|
"ffmpeg and flac required for metadata overlay tests")
|
|
class TestMetadataOverlay(BeetFSTestCase):
|
|
"""
|
|
Tests that verify the core metadata overlay functionality.
|
|
|
|
The key insight: beetfs should return metadata from the DATABASE,
|
|
not from the original FILE. This allows presenting different metadata
|
|
than what's embedded in the file.
|
|
"""
|
|
|
|
def setUp(self):
|
|
super(TestMetadataOverlay, self).setUp()
|
|
|
|
try:
|
|
import mutagen.flac
|
|
self.mutagen_available = True
|
|
except ImportError:
|
|
self.mutagen_available = False
|
|
|
|
def test_overlay_artist_from_database(self):
|
|
if not self.mutagen_available:
|
|
self.skipTest("mutagen required for metadata verification")
|
|
|
|
import mutagen.flac
|
|
|
|
file_path, item_id = self.add_test_track(
|
|
filename='overlay_test.flac',
|
|
artist='File Artist',
|
|
title='Test Track',
|
|
album='Test Album'
|
|
)
|
|
|
|
self.library.update_item(item_id, artist='Database Artist')
|
|
|
|
self.mount_beetfs()
|
|
|
|
mounted_tracks = self._find_track_in_mount('Database Artist')
|
|
if not mounted_tracks:
|
|
self.skipTest("Track not found under 'Database Artist' - path uses DB metadata")
|
|
|
|
with open(mounted_tracks[0], 'rb') as f:
|
|
mounted_data = f.read()
|
|
|
|
mounted_flac = mutagen.flac.FLAC(BytesIO(mounted_data))
|
|
|
|
artist_tag = mounted_flac.get('artist', [None])[0]
|
|
self.assertEqual(artist_tag, 'Database Artist',
|
|
"Mounted file should have artist from database, not file")
|
|
|
|
def test_overlay_title_from_database(self):
|
|
if not self.mutagen_available:
|
|
self.skipTest("mutagen required for metadata verification")
|
|
|
|
import mutagen.flac
|
|
|
|
file_path, item_id = self.add_test_track(
|
|
filename='title_test.flac',
|
|
artist='Test Artist',
|
|
title='File Title'
|
|
)
|
|
|
|
self.library.update_item(item_id, title='Database Title')
|
|
|
|
self.mount_beetfs()
|
|
|
|
mounted_tracks = self._find_any_track()
|
|
if not mounted_tracks:
|
|
self.skipTest("No tracks found")
|
|
|
|
with open(mounted_tracks[0], 'rb') as f:
|
|
mounted_data = f.read()
|
|
|
|
mounted_flac = mutagen.flac.FLAC(BytesIO(mounted_data))
|
|
|
|
title_tag = mounted_flac.get('title', [None])[0]
|
|
self.assertEqual(title_tag, 'Database Title')
|
|
|
|
def test_original_file_unchanged(self):
|
|
if not self.mutagen_available:
|
|
self.skipTest("mutagen required for metadata verification")
|
|
|
|
import mutagen.flac
|
|
|
|
file_path, item_id = self.add_test_track(
|
|
filename='original_test.flac',
|
|
artist='Original Artist',
|
|
title='Original Title'
|
|
)
|
|
|
|
self.library.update_item(item_id, artist='Modified Artist')
|
|
|
|
self.mount_beetfs()
|
|
|
|
original_flac = mutagen.flac.FLAC(file_path)
|
|
original_artist = original_flac.get('artist', [None])[0]
|
|
|
|
self.assertEqual(original_artist, 'Original Artist',
|
|
"Original file should remain unchanged")
|
|
|
|
def test_audio_data_passthrough(self):
|
|
file_path, item_id = self.add_test_track(
|
|
filename='audio_test.flac',
|
|
artist='Test Artist'
|
|
)
|
|
|
|
with open(file_path, 'rb') as f:
|
|
original_data = f.read()
|
|
|
|
self.mount_beetfs()
|
|
|
|
mounted_tracks = self._find_any_track()
|
|
if not mounted_tracks:
|
|
self.skipTest("No tracks found")
|
|
|
|
with open(mounted_tracks[0], 'rb') as f:
|
|
mounted_data = f.read()
|
|
|
|
original_audio_start = original_data.find(b'\xff\xf8')
|
|
mounted_audio_start = mounted_data.find(b'\xff\xf8')
|
|
|
|
if original_audio_start > 0 and mounted_audio_start > 0:
|
|
original_audio = original_data[original_audio_start:original_audio_start + 1000]
|
|
mounted_audio = mounted_data[mounted_audio_start:mounted_audio_start + 1000]
|
|
|
|
self.assertEqual(original_audio, mounted_audio,
|
|
"Audio data should pass through unchanged")
|
|
|
|
def _find_track_in_mount(self, artist_name):
|
|
"""Find tracks under a specific artist directory."""
|
|
results = []
|
|
artist_dir = os.path.join(self.mount_dir, artist_name)
|
|
if not os.path.isdir(artist_dir):
|
|
return results
|
|
|
|
for root, dirs, files in os.walk(artist_dir):
|
|
for f in files:
|
|
if f.endswith('.flac'):
|
|
results.append(os.path.join(root, f))
|
|
return results
|
|
|
|
def _find_any_track(self):
|
|
"""Find any track in mounted filesystem."""
|
|
results = []
|
|
for root, dirs, files in os.walk(self.mount_dir):
|
|
for f in files:
|
|
if f.endswith('.flac'):
|
|
results.append(os.path.join(root, f))
|
|
return results
|
|
|
|
|
|
class TestReadFullFile(BeetFSTestCase):
|
|
|
|
def test_read_entire_file(self):
|
|
file_path, item_id = self.add_test_track(filename='full_read.flac')
|
|
|
|
original_size = os.path.getsize(file_path)
|
|
|
|
self.mount_beetfs()
|
|
|
|
mounted_tracks = self._find_any_track()
|
|
if not mounted_tracks:
|
|
self.skipTest("No tracks found")
|
|
|
|
with open(mounted_tracks[0], 'rb') as f:
|
|
data = f.read()
|
|
|
|
self.assertTrue(len(data) > 0)
|
|
self.assertAlmostEqual(len(data), original_size, delta=original_size * 0.5)
|
|
|
|
def test_read_with_offset(self):
|
|
self.add_test_track(filename='offset_test.flac')
|
|
self.mount_beetfs()
|
|
|
|
mounted_tracks = self._find_any_track()
|
|
if not mounted_tracks:
|
|
self.skipTest("No tracks found")
|
|
|
|
with open(mounted_tracks[0], 'rb') as f:
|
|
f.seek(100)
|
|
data = f.read(100)
|
|
|
|
self.assertEqual(len(data), 100)
|
|
|
|
def test_multiple_reads(self):
|
|
self.add_test_track(filename='multi_read.flac')
|
|
self.mount_beetfs()
|
|
|
|
mounted_tracks = self._find_any_track()
|
|
if not mounted_tracks:
|
|
self.skipTest("No tracks found")
|
|
|
|
with open(mounted_tracks[0], 'rb') as f:
|
|
chunk1 = f.read(512)
|
|
chunk2 = f.read(512)
|
|
chunk3 = f.read(512)
|
|
|
|
self.assertEqual(len(chunk1), 512)
|
|
self.assertEqual(len(chunk2), 512)
|
|
|
|
def _find_any_track(self):
|
|
results = []
|
|
for root, dirs, files in os.walk(self.mount_dir):
|
|
for f in files:
|
|
if f.endswith('.flac'):
|
|
results.append(os.path.join(root, f))
|
|
return results
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|