81df4790bf
Tests use real FUSE operations against mounted beetfs filesystem: - test_smoke: mount/unmount lifecycle - test_nested_bug: detects critical indentation bug (13 failures) - test_readdir: directory listing - test_read: metadata overlay verification - test_stat: file/directory attributes - test_write: metadata modification - test_error_handling: ENOENT, EOPNOTSUPP Also includes: - conftest.py with BeetFSTestCase base class and synthetic FLAC generator - e2e-test-plan.md with Oracle-reviewed test strategy - flake.nix updated with ffmpeg/flac for test fixtures Run: cd tests && nix develop ../ --command python -m unittest discover
162 lines
4.6 KiB
Python
162 lines
4.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import os
|
|
import unittest
|
|
from io import BytesIO
|
|
|
|
from conftest import BeetFSTestCase, is_ffmpeg_available, is_flac_available
|
|
|
|
|
|
@unittest.skipUnless(is_ffmpeg_available() and is_flac_available(),
|
|
"ffmpeg and flac required for write tests")
|
|
class TestWriteMetadata(BeetFSTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestWriteMetadata, self).setUp()
|
|
try:
|
|
import mutagen.flac
|
|
self.mutagen = mutagen.flac
|
|
except ImportError:
|
|
self.mutagen = None
|
|
|
|
def test_write_opens_without_error(self):
|
|
self.add_test_track(filename='write_test.flac')
|
|
self.mount_beetfs()
|
|
|
|
tracks = self._find_any_track()
|
|
if not tracks:
|
|
self.skipTest("No tracks found")
|
|
|
|
try:
|
|
with open(tracks[0], 'r+b') as f:
|
|
pass
|
|
except IOError:
|
|
self.skipTest("Write not supported")
|
|
|
|
def test_write_to_header_updates_db(self):
|
|
if not self.mutagen:
|
|
self.skipTest("mutagen required")
|
|
|
|
file_path, item_id = self.add_test_track(
|
|
filename='db_write_test.flac',
|
|
artist='Original Artist',
|
|
title='Original Title'
|
|
)
|
|
|
|
self.mount_beetfs()
|
|
|
|
tracks = self._find_any_track()
|
|
if not tracks:
|
|
self.skipTest("No tracks found")
|
|
|
|
with open(tracks[0], 'rb') as f:
|
|
original_data = f.read()
|
|
|
|
flac = self.mutagen.FLAC(BytesIO(original_data))
|
|
flac['artist'] = ['Modified Artist']
|
|
|
|
output = BytesIO()
|
|
flac.save(output)
|
|
modified_data = output.getvalue()
|
|
|
|
try:
|
|
with open(tracks[0], 'r+b') as f:
|
|
f.write(modified_data[:1024])
|
|
except IOError:
|
|
self.skipTest("Write not supported")
|
|
|
|
item = self.library.get_item(item_id)
|
|
if item:
|
|
self.assertIn('Modified', str(item))
|
|
|
|
def test_original_file_unchanged_after_write(self):
|
|
if not self.mutagen:
|
|
self.skipTest("mutagen required")
|
|
|
|
file_path, item_id = self.add_test_track(
|
|
filename='unchanged_test.flac',
|
|
artist='Original Artist'
|
|
)
|
|
|
|
with open(file_path, 'rb') as f:
|
|
original_content = f.read()
|
|
|
|
self.mount_beetfs()
|
|
|
|
tracks = self._find_any_track()
|
|
if tracks:
|
|
try:
|
|
with open(tracks[0], 'r+b') as f:
|
|
f.write(b'\x00' * 100)
|
|
except IOError:
|
|
pass
|
|
|
|
with open(file_path, 'rb') as f:
|
|
after_content = f.read()
|
|
|
|
self.assertEqual(original_content, after_content)
|
|
|
|
def _find_any_track(self):
|
|
results = []
|
|
for root, dirs, files in os.walk(self.mount_dir):
|
|
for f in files:
|
|
if f.endswith('.flac'):
|
|
results.append(os.path.join(root, f))
|
|
return results
|
|
|
|
|
|
class TestWriteAudioDiscarded(BeetFSTestCase):
|
|
|
|
def test_write_to_audio_region_no_error(self):
|
|
file_path, item_id = self.add_test_track(filename='audio_write.flac')
|
|
|
|
self.mount_beetfs()
|
|
|
|
tracks = self._find_any_track()
|
|
if not tracks:
|
|
self.skipTest("No tracks found")
|
|
|
|
try:
|
|
with open(tracks[0], 'r+b') as f:
|
|
f.seek(10000)
|
|
f.write(b'\x00' * 100)
|
|
except IOError:
|
|
pass
|
|
|
|
def test_audio_unchanged_after_write_attempt(self):
|
|
file_path, item_id = self.add_test_track(filename='audio_unchanged.flac')
|
|
|
|
with open(file_path, 'rb') as f:
|
|
f.seek(10000)
|
|
original_audio = f.read(100)
|
|
|
|
self.mount_beetfs()
|
|
|
|
tracks = self._find_any_track()
|
|
if tracks:
|
|
try:
|
|
with open(tracks[0], 'r+b') as f:
|
|
f.seek(10000)
|
|
f.write(b'\xff' * 100)
|
|
except IOError:
|
|
pass
|
|
|
|
with open(file_path, 'rb') as f:
|
|
f.seek(10000)
|
|
after_audio = f.read(100)
|
|
|
|
self.assertEqual(original_audio, after_audio)
|
|
|
|
def _find_any_track(self):
|
|
results = []
|
|
for root, dirs, files in os.walk(self.mount_dir):
|
|
for f in files:
|
|
if f.endswith('.flac'):
|
|
results.append(os.path.join(root, f))
|
|
return results
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|