Move the files around

This commit is contained in:
Alexander
2026-05-13 20:34:14 +02:00
parent 90e9683076
commit 305d027c8b
113 changed files with 650 additions and 3569 deletions
View File
-580
View File
@@ -1,580 +0,0 @@
# -*- coding: utf-8 -*-
"""
beetfs E2E Test Infrastructure
Base test class, fixtures, and utilities for testing the real beetfs FUSE filesystem.
Python 2.7 compatible - no mocks, real filesystem operations.
"""
from __future__ import print_function, unicode_literals
import unittest
import subprocess
import tempfile
import shutil
import os
import sys
import time
import threading
import sqlite3
# Add parent directory to path for imports
BEETFS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, BEETFS_ROOT)
sys.path.insert(0, os.path.join(BEETFS_ROOT, 'beetsplug'))
# Test constants
MOUNT_TIMEOUT = 30 # seconds
TEARDOWN_TIMEOUT = 5 # seconds
SYNTHETIC_FLAC_DURATION = 3 # seconds (smaller = faster tests)
# Real music files location (from qBittorrent container)
REAL_MUSIC_PATH = '/home/fujin/.local/share/docker/volumes/containers_downloads/_data/Metallica - 72 Seasons (2023) [FLAC] 88'
def is_fuse_available():
"""Check if FUSE is available on the system"""
try:
with open(os.devnull, 'w') as devnull:
subprocess.check_call(['which', 'fusermount'],
stdout=devnull, stderr=devnull)
return True
except (subprocess.CalledProcessError, OSError):
return False
def is_ffmpeg_available():
"""Check if ffmpeg is available for synthetic FLAC generation"""
try:
with open(os.devnull, 'w') as devnull:
subprocess.check_call(['which', 'ffmpeg'],
stdout=devnull, stderr=devnull)
return True
except (subprocess.CalledProcessError, OSError):
return False
def is_flac_available():
"""Check if flac encoder is available"""
try:
with open(os.devnull, 'w') as devnull:
subprocess.check_call(['which', 'flac'],
stdout=devnull, stderr=devnull)
return True
except (subprocess.CalledProcessError, OSError):
return False
def create_synthetic_flac(output_path, duration_sec=SYNTHETIC_FLAC_DURATION,
artist='Test Artist', title='Test Track',
album='Test Album', year='2024', track='01',
genre='Test Genre'):
"""
Create a minimal FLAC file with known metadata.
Uses ffmpeg to generate silence, then flac to encode with tags.
Result is ~100-500KB depending on duration.
Args:
output_path: Where to write the FLAC file
duration_sec: Duration in seconds (default 3)
artist, title, album, year, track, genre: Metadata tags
Returns:
Path to created FLAC file
Raises:
subprocess.CalledProcessError: If ffmpeg or flac fails
RuntimeError: If tools not available
"""
if not is_ffmpeg_available():
raise RuntimeError("ffmpeg not available - install it or use nix develop")
if not is_flac_available():
raise RuntimeError("flac not available - install it or use nix develop")
# Create temp WAV file
wav_fd, wav_path = tempfile.mkstemp(suffix='.wav')
os.close(wav_fd)
try:
# Generate silence WAV using ffmpeg
with open(os.devnull, 'w') as devnull:
subprocess.check_call([
'ffmpeg', '-f', 'lavfi', '-i',
'anullsrc=r=44100:cl=stereo',
'-t', str(duration_sec),
'-y', wav_path
], stdout=devnull, stderr=devnull)
# Convert to FLAC with metadata tags
with open(os.devnull, 'w') as devnull:
subprocess.check_call([
'flac', '--best', '--silent',
'-T', 'ARTIST={}'.format(artist),
'-T', 'TITLE={}'.format(title),
'-T', 'ALBUM={}'.format(album),
'-T', 'DATE={}'.format(year),
'-T', 'TRACKNUMBER={}'.format(track),
'-T', 'GENRE={}'.format(genre),
'-o', output_path,
wav_path
], stdout=devnull, stderr=devnull)
return output_path
finally:
# Cleanup temp WAV
if os.path.exists(wav_path):
os.unlink(wav_path)
class BeetsLibraryFixture(object):
"""
Creates an isolated beets library for testing.
Sets up:
- Temp directory for library files
- SQLite database with test items
- Config pointing to the test library
"""
def __init__(self, temp_dir=None):
self.temp_dir = temp_dir or tempfile.mkdtemp(prefix='beetfs_lib_')
self.db_path = os.path.join(self.temp_dir, 'library.db')
self.music_dir = os.path.join(self.temp_dir, 'music')
self.config_path = os.path.join(self.temp_dir, 'config.yaml')
os.makedirs(self.music_dir)
self._create_config()
self._create_database()
def _create_config(self):
"""Create minimal beets config"""
config_content = """
directory: {music_dir}
library: {db_path}
import:
copy: no
move: no
write: no
plugins: []
""".format(music_dir=self.music_dir, db_path=self.db_path)
with open(self.config_path, 'w') as f:
f.write(config_content)
def _create_database(self):
"""Create empty beets SQLite database with correct schema"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create items table (simplified schema matching beets)
cursor.execute('''
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY,
path BLOB NOT NULL,
album_id INTEGER,
title TEXT,
artist TEXT,
album TEXT,
genre TEXT,
year INTEGER,
track INTEGER,
disc INTEGER,
length REAL,
bitrate INTEGER,
format TEXT,
samplerate INTEGER,
bitdepth INTEGER,
channels INTEGER,
mtime REAL,
added REAL
)
''')
# Create albums table
cursor.execute('''
CREATE TABLE IF NOT EXISTS albums (
id INTEGER PRIMARY KEY,
artpath BLOB,
added REAL,
albumartist TEXT,
album TEXT,
genre TEXT,
year INTEGER
)
''')
conn.commit()
conn.close()
def add_item(self, path, title='Test Track', artist='Test Artist',
album='Test Album', genre='Test Genre', year=2024,
track=1, format='flac'):
"""
Add an item to the test library database.
Args:
path: Absolute path to the audio file
title, artist, album, genre, year, track: Metadata
format: Audio format (flac, mp3, etc.)
Returns:
Item ID
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO items (path, title, artist, album, genre, year, track, format, mtime, added)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (path, title, artist, album, genre, year, track, format,
time.time(), time.time()))
item_id = cursor.lastrowid
conn.commit()
conn.close()
return item_id
def get_item(self, item_id):
"""Get item by ID"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT * FROM items WHERE id = ?', (item_id,))
row = cursor.fetchone()
conn.close()
return row
def update_item(self, item_id, **kwargs):
"""Update item metadata"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
set_clause = ', '.join('{} = ?'.format(k) for k in kwargs.keys())
values = list(kwargs.values()) + [item_id]
cursor.execute(
'UPDATE items SET {} WHERE id = ?'.format(set_clause),
values
)
conn.commit()
conn.close()
def cleanup(self):
"""Remove all temp files"""
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir, ignore_errors=True)
class BeetFSTestCase(unittest.TestCase):
"""
Base test class for beetfs e2e tests.
Provides:
- FUSE availability checking
- Mount/unmount helpers with timeout
- Beets library fixture management
- Cleanup on test failure
Python 2.7 compatible (no subprocess.timeout, etc.)
"""
@classmethod
def setUpClass(cls):
"""Check FUSE availability once for all tests"""
if not is_fuse_available():
raise unittest.SkipTest("fusermount not available - FUSE required for e2e tests")
def setUp(self):
"""Create fresh temp directory and library for each test"""
self.temp_dir = tempfile.mkdtemp(prefix='beetfs_test_')
self.mount_dir = os.path.join(self.temp_dir, 'mount')
os.makedirs(self.mount_dir)
self.library = BeetsLibraryFixture(
temp_dir=os.path.join(self.temp_dir, 'library')
)
self.fs_process = None
self._timeout_timer = None
def tearDown(self):
"""Cleanup: unmount filesystem and remove temp files"""
# Cancel any pending timeout
if self._timeout_timer:
self._timeout_timer.cancel()
# Unmount if mounted
if self.fs_process and os.path.ismount(self.mount_dir):
self._unmount()
# Kill process if still running
if self.fs_process and self.fs_process.poll() is None:
self.fs_process.terminate()
self._wait_for_process_exit(TEARDOWN_TIMEOUT)
if self.fs_process.poll() is None:
self.fs_process.kill()
# Cleanup library
if hasattr(self, 'library'):
self.library.cleanup()
# Remove temp directory
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir, ignore_errors=True)
def mount_beetfs(self, foreground=True):
"""
Mount beetfs filesystem.
Args:
foreground: Run in foreground mode (required for testing)
Raises:
AssertionError: If mount fails or times out
"""
# Build command to run beetfs
# We need to invoke beets with our test config and library
env = os.environ.copy()
env['BEETSDIR'] = self.library.temp_dir
mount_script = '''
import sys
sys.path.insert(0, "{beetfs_root}")
sys.path.insert(0, "{beetsplug}")
import os
import re
os.environ["BEETSDIR"] = "{beetsdir}"
from beets import config
from beets.library import Library
config.read(user=False)
config["directory"] = "{music_dir}"
config["library"] = "{db_path}"
lib = Library("{db_path}")
import beetFs
import fuse
fuse.fuse_python_api = (0, 2)
beetFs.library = lib
beetFs.structure_depth = 4
beetFs.structure_split = [0, 1, 2, 3]
beetFs.directory_structure = beetFs.FSNode({{}}, {{}})
for item in lib.items():
mapping = beetFs.template_mapping(lib, item)
path_str = beetFs.PATH_FORMAT
for key, val in mapping.items():
if val is not None:
clean_val = re.sub(r"[\\\\/:]|^\\.", "_", unicode(val))
path_str = path_str.replace("$" + key, clean_val)
elements = path_str.split("/")
sub_elements = elements[0:beetFs.structure_depth-1]
for level in range(len(sub_elements)):
level_subbed = sub_elements[0:level+1]
beetFs.directory_structure.adddir(sub_elements, level_subbed[level])
beetFs.directory_structure.addfile(
sub_elements,
elements[beetFs.structure_depth-1],
item.id
)
fs = beetFs.beetFileSystem(
version="%prog " + fuse.__version__,
usage="beetfs test mount",
dash_s_do="setsingle"
)
fs.parser.add_option(mountopt="root", metavar="PATH", default="{music_dir}",
help="music library root path")
fs.parse(args=["{mount_dir}"], errex=1)
fs.flags = 0
fs.multithreaded = False
fs.fuse_args.setmod("foreground")
fs.fuse_args.add("fsname=beetfs")
fs.fuse_args.add("nonempty")
fs.lib = lib
fs.main()
'''.format(
beetfs_root=BEETFS_ROOT,
beetsplug=os.path.join(BEETFS_ROOT, 'beetsplug'),
beetsdir=self.library.temp_dir,
music_dir=self.library.music_dir,
db_path=self.library.db_path,
mount_dir=self.mount_dir
)
cmd = [sys.executable, '-c', mount_script]
# Start process
self.fs_process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Setup timeout using threading.Timer (Python 2.7 compatible)
self._timeout_timer = threading.Timer(
MOUNT_TIMEOUT,
self._mount_timeout_handler
)
self._timeout_timer.start()
# Wait for mount
try:
self._wait_for_mount()
finally:
self._timeout_timer.cancel()
def _mount_timeout_handler(self):
"""Called if mount times out"""
if self.fs_process and self.fs_process.poll() is None:
self.fs_process.kill()
def _wait_for_mount(self):
"""Poll until filesystem is mounted"""
start = time.time()
while time.time() - start < MOUNT_TIMEOUT:
if os.path.ismount(self.mount_dir):
return
# Check if process died
if self.fs_process.poll() is not None:
stdout, stderr = self.fs_process.communicate()
self.fail(
"beetfs process terminated prematurely with code {}.\n"
"stdout: {}\nstderr: {}".format(
self.fs_process.returncode,
stdout.decode('utf-8', errors='replace'),
stderr.decode('utf-8', errors='replace')
)
)
time.sleep(0.1)
self.fail("Mount timeout after {} seconds".format(MOUNT_TIMEOUT))
def _unmount(self):
"""Unmount the filesystem using fusermount"""
with open(os.devnull, 'w') as devnull:
# Use -z for lazy unmount (handles busy mounts)
subprocess.call(
['fusermount', '-z', '-u', self.mount_dir],
stdout=devnull,
stderr=devnull
)
def _wait_for_process_exit(self, timeout):
"""Wait for process to exit (Python 2.7 compatible)"""
start = time.time()
while time.time() - start < timeout:
if self.fs_process.poll() is not None:
return True
time.sleep(0.1)
return False
def create_test_flac(self, filename='test.flac', **metadata):
"""
Create a synthetic FLAC in the library music directory.
Args:
filename: Name for the file
**metadata: Passed to create_synthetic_flac()
Returns:
Absolute path to created file
"""
output_path = os.path.join(self.library.music_dir, filename)
create_synthetic_flac(output_path, **metadata)
return output_path
def add_test_track(self, filename='test.flac', **metadata):
"""
Create a synthetic FLAC and add it to the library.
Args:
filename: Name for the file
**metadata: Metadata for both file and database
Returns:
Tuple of (file_path, item_id)
"""
# Default metadata
defaults = {
'artist': 'Test Artist',
'title': 'Test Track',
'album': 'Test Album',
'genre': 'Test Genre',
'year': '2024',
'track': '01'
}
defaults.update(metadata)
# Create the file
file_path = self.create_test_flac(filename, **defaults)
# Add to database (convert year and track to int for DB)
item_id = self.library.add_item(
path=file_path,
title=defaults['title'],
artist=defaults['artist'],
album=defaults['album'],
genre=defaults['genre'],
year=int(defaults['year']),
track=int(defaults['track']),
format='flac'
)
return file_path, item_id
class RealMusicTestCase(BeetFSTestCase):
"""
Test case that uses real music files from qBittorrent.
Skipped if real music not available or E2E env var not set.
"""
@classmethod
def setUpClass(cls):
super(RealMusicTestCase, cls).setUpClass()
# Check if E2E tests are enabled
if not os.environ.get('E2E'):
raise unittest.SkipTest("E2E tests disabled - set E2E=1 to enable")
# Check if real music is available
if not os.path.isdir(REAL_MUSIC_PATH):
raise unittest.SkipTest(
"Real music not available at {}".format(REAL_MUSIC_PATH)
)
def get_real_flac_files(self):
"""Get list of real FLAC files"""
files = []
for f in os.listdir(REAL_MUSIC_PATH):
if f.endswith('.flac'):
files.append(os.path.join(REAL_MUSIC_PATH, f))
return sorted(files)
# Export test utilities
__all__ = [
'BeetFSTestCase',
'RealMusicTestCase',
'BeetsLibraryFixture',
'create_synthetic_flac',
'is_fuse_available',
'is_ffmpeg_available',
'is_flac_available',
'BEETFS_ROOT',
'REAL_MUSIC_PATH',
'MOUNT_TIMEOUT',
]
+90
View File
@@ -0,0 +1,90 @@
use std::process::Command;
#[test]
#[ignore]
fn test_mpv_playback() {
let mountpoint = setup_test_mount();
let output = Command::new("mpv")
.args([
"--no-video",
"--no-audio",
"--length=2",
"--msg-level=all=debug",
&format!("{}/Artist/Album/01 - Track.flac", mountpoint),
])
.output()
.expect("mpv must be installed");
assert!(
output.status.success(),
"mpv playback failed: {:?}",
output
);
}
#[test]
#[ignore]
fn test_vlc_playback() {
let mountpoint = setup_test_mount();
let output = Command::new("cvlc")
.args([
"--play-and-exit",
"--run-time=2",
&format!("{}/Artist/Album/", mountpoint),
])
.output()
.expect("vlc must be installed");
assert!(output.status.success(), "VLC playback failed");
}
#[test]
#[ignore]
fn test_file_manager_operations() {
let mountpoint = setup_test_mount();
let entries: Vec<_> = std::fs::read_dir(&mountpoint)
.expect("read_dir failed")
.collect();
assert!(!entries.is_empty(), "mountpoint should have entries");
for entry in entries {
let entry = entry.expect("entry should be valid");
let metadata = entry.metadata().expect("metadata should work");
assert!(metadata.is_dir() || metadata.is_file());
}
}
#[test]
#[ignore]
fn test_concurrent_player_access() {
let mountpoint = setup_test_mount();
let handles: Vec<_> = (0..3)
.map(|i| {
let mp = mountpoint.clone();
std::thread::spawn(move || {
Command::new("mpv")
.args([
"--no-video",
"--no-audio",
"--length=1",
&format!("{}/Artist/Album/0{} - Track.flac", mp, i + 1),
])
.output()
})
})
.collect();
for handle in handles {
let output = handle.join().unwrap().expect("mpv should run");
assert!(output.status.success());
}
}
fn setup_test_mount() -> String {
std::env::var("MUSICFS_TEST_MOUNT").unwrap_or_else(|_| "/tmp/musicfs-test".to_string())
}
+40
View File
@@ -0,0 +1,40 @@
services:
toxiproxy:
image: ghcr.io/shopify/toxiproxy:2.9.0
ports:
- "8474:8474"
- "20000-20010:20000-20010"
healthcheck:
test: ["CMD", "/toxiproxy-cli", "list"]
interval: 5s
timeout: 3s
retries: 3
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: test
MINIO_ROOT_PASSWORD: testtest123
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 5s
timeout: 3s
retries: 3
volumes:
- minio-data:/data
sftp:
image: atmoz/sftp:latest
ports:
- "2222:22"
command: test:test:::music
volumes:
- sftp-data:/home/test/music
volumes:
minio-data:
sftp-data:
-161
View File
@@ -1,161 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
import errno
import os
import unittest
from conftest import BeetFSTestCase
class TestENOENT(BeetFSTestCase):
def test_stat_nonexistent_file(self):
self.mount_beetfs()
nonexistent = os.path.join(self.mount_dir, 'does_not_exist.flac')
with self.assertRaises(OSError) as ctx:
os.stat(nonexistent)
self.assertEqual(ctx.exception.errno, errno.ENOENT)
def test_open_nonexistent_file(self):
self.mount_beetfs()
nonexistent = os.path.join(self.mount_dir, 'does_not_exist.flac')
with self.assertRaises(IOError) as ctx:
open(nonexistent, 'rb')
self.assertEqual(ctx.exception.errno, errno.ENOENT)
def test_listdir_nonexistent_directory(self):
self.mount_beetfs()
nonexistent = os.path.join(self.mount_dir, 'Nonexistent Artist')
with self.assertRaises(OSError) as ctx:
os.listdir(nonexistent)
self.assertEqual(ctx.exception.errno, errno.ENOENT)
def test_stat_deeply_nested_nonexistent(self):
self.mount_beetfs()
nonexistent = os.path.join(
self.mount_dir, 'Artist', 'Album', 'Track', 'Nested', 'Path'
)
with self.assertRaises(OSError) as ctx:
os.stat(nonexistent)
self.assertEqual(ctx.exception.errno, errno.ENOENT)
class TestEOPNOTSUPP(BeetFSTestCase):
def test_mkdir_not_supported(self):
self.mount_beetfs()
new_dir = os.path.join(self.mount_dir, 'new_directory')
with self.assertRaises(OSError) as ctx:
os.mkdir(new_dir)
self.assertIn(ctx.exception.errno, [errno.EOPNOTSUPP, errno.EACCES, errno.EROFS])
def test_unlink_not_supported(self):
self.add_test_track(filename='delete_test.flac')
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
with self.assertRaises(OSError) as ctx:
os.unlink(tracks[0])
self.assertIn(ctx.exception.errno, [errno.EOPNOTSUPP, errno.EACCES, errno.EROFS])
def test_rmdir_not_supported(self):
self.add_test_track(filename='rmdir_test.flac', artist='Rmdir Artist')
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Rmdir Artist')
if not os.path.isdir(artist_dir):
self.skipTest("Artist directory not found")
with self.assertRaises(OSError) as ctx:
os.rmdir(artist_dir)
self.assertIn(ctx.exception.errno, [errno.EOPNOTSUPP, errno.EACCES, errno.EROFS, errno.ENOTEMPTY])
def test_rename_not_supported(self):
self.add_test_track(filename='rename_test.flac')
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
new_path = tracks[0] + '.renamed'
with self.assertRaises(OSError) as ctx:
os.rename(tracks[0], new_path)
self.assertIn(ctx.exception.errno, [errno.EOPNOTSUPP, errno.EACCES, errno.EROFS])
def test_symlink_not_supported(self):
self.mount_beetfs()
link_path = os.path.join(self.mount_dir, 'test_link')
with self.assertRaises(OSError) as ctx:
os.symlink('/tmp', link_path)
self.assertIn(ctx.exception.errno, [errno.EOPNOTSUPP, errno.EACCES, errno.EROFS])
def test_chmod_not_supported(self):
self.add_test_track(filename='chmod_test.flac')
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
with self.assertRaises(OSError) as ctx:
os.chmod(tracks[0], 0o777)
self.assertIn(ctx.exception.errno, [errno.EOPNOTSUPP, errno.EACCES, errno.EROFS])
def _find_any_track(self):
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
class TestEdgeCaseErrors(BeetFSTestCase):
def test_access_empty_path(self):
self.mount_beetfs()
with self.assertRaises((OSError, IOError)):
os.stat(os.path.join(self.mount_dir, ''))
def test_open_directory_as_file(self):
self.add_test_track(filename='test.flac', artist='Dir Artist')
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Dir Artist')
if os.path.isdir(artist_dir):
with self.assertRaises((OSError, IOError)):
with open(artist_dir, 'rb') as f:
f.read()
if __name__ == '__main__':
unittest.main()
-141
View File
@@ -1,141 +0,0 @@
# -*- coding: utf-8 -*-
"""
Tests to verify the nested methods bug in beetFs.py.
Oracle discovered that lines 758-1144 are indented inside access() method,
making readdir, open, read, write, etc. unreachable as class methods.
These tests will fail until the bug is fixed.
"""
from __future__ import print_function, unicode_literals
import os
import sys
import unittest
from conftest import BEETFS_ROOT
sys.path.insert(0, os.path.join(BEETFS_ROOT, 'beetsplug'))
class TestNestedMethodsBug(unittest.TestCase):
"""Verify FUSE methods exist at class level (not nested inside access)."""
@classmethod
def setUpClass(cls):
try:
from beetFs import beetFileSystem
cls.fs_class = beetFileSystem
except ImportError as e:
raise unittest.SkipTest("Cannot import beetFs: {}".format(e))
def test_readdir_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'readdir'),
"readdir should be a class method, not nested inside access()"
)
def test_open_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'open'),
"open should be a class method, not nested inside access()"
)
def test_read_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'read'),
"read should be a class method, not nested inside access()"
)
def test_write_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'write'),
"write should be a class method, not nested inside access()"
)
def test_release_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'release'),
"release should be a class method, not nested inside access()"
)
def test_opendir_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'opendir'),
"opendir should be a class method, not nested inside access()"
)
def test_releasedir_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'releasedir'),
"releasedir should be a class method, not nested inside access()"
)
def test_mkdir_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'mkdir'),
"mkdir should be a class method, not nested inside access()"
)
def test_unlink_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'unlink'),
"unlink should be a class method, not nested inside access()"
)
def test_truncate_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'truncate'),
"truncate should be a class method, not nested inside access()"
)
def test_fgetattr_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'fgetattr'),
"fgetattr should be a class method, not nested inside access()"
)
def test_flush_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'flush'),
"flush should be a class method, not nested inside access()"
)
def test_fsync_is_class_method(self):
self.assertTrue(
hasattr(self.fs_class, 'fsync'),
"fsync should be a class method, not nested inside access()"
)
class TestMethodsCallable(unittest.TestCase):
"""Verify methods can be called (not just exist)."""
@classmethod
def setUpClass(cls):
try:
from beetFs import beetFileSystem
cls.fs_class = beetFileSystem
except ImportError as e:
raise unittest.SkipTest("Cannot import beetFs: {}".format(e))
def test_readdir_callable(self):
method = getattr(self.fs_class, 'readdir', None)
if method is None:
self.skipTest("readdir not found - nested bug not fixed yet")
self.assertTrue(callable(method))
def test_open_callable(self):
method = getattr(self.fs_class, 'open', None)
if method is None:
self.skipTest("open not found - nested bug not fixed yet")
self.assertTrue(callable(method))
def test_read_callable(self):
method = getattr(self.fs_class, 'read', None)
if method is None:
self.skipTest("read not found - nested bug not fixed yet")
self.assertTrue(callable(method))
if __name__ == '__main__':
unittest.main()
-296
View File
@@ -1,296 +0,0 @@
# -*- coding: utf-8 -*-
"""
Tests for file reading with metadata overlay.
This is the CORE FEATURE of beetfs:
- File headers contain metadata from beets database
- Audio data passes through unchanged from original file
"""
from __future__ import print_function, unicode_literals
import os
import unittest
from io import BytesIO
from conftest import BeetFSTestCase, is_ffmpeg_available, is_flac_available
class TestReadBasic(BeetFSTestCase):
def test_open_file(self):
file_path, item_id = self.add_test_track(
filename='test.flac',
artist='Test Artist',
title='Test Track'
)
self.mount_beetfs()
entries = self._find_track_path('Test Artist', 'Test Track')
if not entries:
self.skipTest("Could not find track in mounted filesystem")
track_path = entries[0]
with open(track_path, 'rb') as f:
data = f.read(1024)
self.assertIsInstance(data, bytes)
self.assertTrue(len(data) > 0)
def test_read_returns_bytes(self):
self.add_test_track(filename='test.flac')
self.mount_beetfs()
entries = self._find_any_track()
if not entries:
self.skipTest("No tracks found")
with open(entries[0], 'rb') as f:
data = f.read(100)
self.assertIsInstance(data, bytes)
def test_read_flac_magic_bytes(self):
self.add_test_track(filename='test.flac')
self.mount_beetfs()
entries = self._find_any_track()
if not entries:
self.skipTest("No tracks found")
with open(entries[0], 'rb') as f:
magic = f.read(4)
self.assertEqual(magic, b'fLaC', "FLAC files should start with 'fLaC' magic bytes")
def _find_track_path(self, artist, title):
"""Find track path in mounted filesystem."""
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if title.lower() in f.lower():
results.append(os.path.join(root, f))
return results
def _find_any_track(self):
"""Find any track in mounted filesystem."""
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
@unittest.skipUnless(is_ffmpeg_available() and is_flac_available(),
"ffmpeg and flac required for metadata overlay tests")
class TestMetadataOverlay(BeetFSTestCase):
"""
Tests that verify the core metadata overlay functionality.
The key insight: beetfs should return metadata from the DATABASE,
not from the original FILE. This allows presenting different metadata
than what's embedded in the file.
"""
def setUp(self):
super(TestMetadataOverlay, self).setUp()
try:
import mutagen.flac
self.mutagen_available = True
except ImportError:
self.mutagen_available = False
def test_overlay_artist_from_database(self):
if not self.mutagen_available:
self.skipTest("mutagen required for metadata verification")
import mutagen.flac
file_path, item_id = self.add_test_track(
filename='overlay_test.flac',
artist='File Artist',
title='Test Track',
album='Test Album'
)
self.library.update_item(item_id, artist='Database Artist')
self.mount_beetfs()
mounted_tracks = self._find_track_in_mount('Database Artist')
if not mounted_tracks:
self.skipTest("Track not found under 'Database Artist' - path uses DB metadata")
with open(mounted_tracks[0], 'rb') as f:
mounted_data = f.read()
mounted_flac = mutagen.flac.FLAC(BytesIO(mounted_data))
artist_tag = mounted_flac.get('artist', [None])[0]
self.assertEqual(artist_tag, 'Database Artist',
"Mounted file should have artist from database, not file")
def test_overlay_title_from_database(self):
if not self.mutagen_available:
self.skipTest("mutagen required for metadata verification")
import mutagen.flac
file_path, item_id = self.add_test_track(
filename='title_test.flac',
artist='Test Artist',
title='File Title'
)
self.library.update_item(item_id, title='Database Title')
self.mount_beetfs()
mounted_tracks = self._find_any_track()
if not mounted_tracks:
self.skipTest("No tracks found")
with open(mounted_tracks[0], 'rb') as f:
mounted_data = f.read()
mounted_flac = mutagen.flac.FLAC(BytesIO(mounted_data))
title_tag = mounted_flac.get('title', [None])[0]
self.assertEqual(title_tag, 'Database Title')
def test_original_file_unchanged(self):
if not self.mutagen_available:
self.skipTest("mutagen required for metadata verification")
import mutagen.flac
file_path, item_id = self.add_test_track(
filename='original_test.flac',
artist='Original Artist',
title='Original Title'
)
self.library.update_item(item_id, artist='Modified Artist')
self.mount_beetfs()
original_flac = mutagen.flac.FLAC(file_path)
original_artist = original_flac.get('artist', [None])[0]
self.assertEqual(original_artist, 'Original Artist',
"Original file should remain unchanged")
def test_audio_data_passthrough(self):
file_path, item_id = self.add_test_track(
filename='audio_test.flac',
artist='Test Artist'
)
with open(file_path, 'rb') as f:
original_data = f.read()
self.mount_beetfs()
mounted_tracks = self._find_any_track()
if not mounted_tracks:
self.skipTest("No tracks found")
with open(mounted_tracks[0], 'rb') as f:
mounted_data = f.read()
original_audio_start = original_data.find(b'\xff\xf8')
mounted_audio_start = mounted_data.find(b'\xff\xf8')
if original_audio_start > 0 and mounted_audio_start > 0:
original_audio = original_data[original_audio_start:original_audio_start + 1000]
mounted_audio = mounted_data[mounted_audio_start:mounted_audio_start + 1000]
self.assertEqual(original_audio, mounted_audio,
"Audio data should pass through unchanged")
def _find_track_in_mount(self, artist_name):
"""Find tracks under a specific artist directory."""
results = []
artist_dir = os.path.join(self.mount_dir, artist_name)
if not os.path.isdir(artist_dir):
return results
for root, dirs, files in os.walk(artist_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
def _find_any_track(self):
"""Find any track in mounted filesystem."""
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
class TestReadFullFile(BeetFSTestCase):
def test_read_entire_file(self):
file_path, item_id = self.add_test_track(filename='full_read.flac')
original_size = os.path.getsize(file_path)
self.mount_beetfs()
mounted_tracks = self._find_any_track()
if not mounted_tracks:
self.skipTest("No tracks found")
with open(mounted_tracks[0], 'rb') as f:
data = f.read()
self.assertTrue(len(data) > 0)
self.assertAlmostEqual(len(data), original_size, delta=original_size * 0.5)
def test_read_with_offset(self):
self.add_test_track(filename='offset_test.flac')
self.mount_beetfs()
mounted_tracks = self._find_any_track()
if not mounted_tracks:
self.skipTest("No tracks found")
with open(mounted_tracks[0], 'rb') as f:
f.seek(100)
data = f.read(100)
self.assertEqual(len(data), 100)
def test_multiple_reads(self):
self.add_test_track(filename='multi_read.flac')
self.mount_beetfs()
mounted_tracks = self._find_any_track()
if not mounted_tracks:
self.skipTest("No tracks found")
with open(mounted_tracks[0], 'rb') as f:
chunk1 = f.read(512)
chunk2 = f.read(512)
chunk3 = f.read(512)
self.assertEqual(len(chunk1), 512)
self.assertEqual(len(chunk2), 512)
def _find_any_track(self):
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
if __name__ == '__main__':
unittest.main()
-179
View File
@@ -1,179 +0,0 @@
# -*- coding: utf-8 -*-
"""Tests for directory listing operations."""
from __future__ import print_function, unicode_literals
import os
import unittest
from conftest import BeetFSTestCase
class TestReaddirEmpty(BeetFSTestCase):
def test_list_empty_root(self):
self.mount_beetfs()
entries = os.listdir(self.mount_dir)
self.assertEqual(entries, [])
def test_list_root_returns_list(self):
self.mount_beetfs()
entries = os.listdir(self.mount_dir)
self.assertIsInstance(entries, list)
class TestReaddirWithContent(BeetFSTestCase):
def setUp(self):
super(TestReaddirWithContent, self).setUp()
self.add_test_track(
filename='track01.flac',
artist='Pink Floyd',
title='Comfortably Numb',
album='The Wall',
year='1979',
track='01'
)
self.add_test_track(
filename='track02.flac',
artist='Pink Floyd',
title='Another Brick in the Wall',
album='The Wall',
year='1979',
track='02'
)
self.add_test_track(
filename='track03.flac',
artist='Led Zeppelin',
title='Stairway to Heaven',
album='Led Zeppelin IV',
year='1971',
track='01'
)
def test_list_root_shows_artists(self):
self.mount_beetfs()
entries = os.listdir(self.mount_dir)
self.assertIn('Pink Floyd', entries)
self.assertIn('Led Zeppelin', entries)
def test_list_artist_shows_albums(self):
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Pink Floyd')
entries = os.listdir(artist_dir)
self.assertTrue(any('The Wall' in e for e in entries))
def test_list_album_shows_tracks(self):
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Pink Floyd')
albums = os.listdir(artist_dir)
wall_album = [a for a in albums if 'The Wall' in a][0]
album_dir = os.path.join(artist_dir, wall_album)
tracks = os.listdir(album_dir)
self.assertEqual(len(tracks), 2)
def test_path_format_includes_year(self):
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Pink Floyd')
albums = os.listdir(artist_dir)
wall_album = [a for a in albums if 'The Wall' in a][0]
self.assertIn('1979', wall_album)
def test_path_format_includes_format(self):
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Pink Floyd')
albums = os.listdir(artist_dir)
wall_album = [a for a in albums if 'The Wall' in a][0]
self.assertIn('FLAC', wall_album.upper())
def test_track_filename_includes_number(self):
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Pink Floyd')
albums = os.listdir(artist_dir)
wall_album = [a for a in albums if 'The Wall' in a][0]
album_dir = os.path.join(artist_dir, wall_album)
tracks = os.listdir(album_dir)
self.assertTrue(any(t.startswith('01') or t.startswith('1') for t in tracks))
class TestReaddirUnicode(BeetFSTestCase):
def test_unicode_artist_name(self):
self.add_test_track(
filename='bjork.flac',
artist='Björk',
title='Hyperballad',
album='Post'
)
self.mount_beetfs()
entries = os.listdir(self.mount_dir)
self.assertTrue(
any('Bj' in e for e in entries),
"Unicode artist name should appear in listing"
)
def test_unicode_album_name(self):
self.add_test_track(
filename='sigur.flac',
artist='Sigur Ros',
title='Hoppipolla',
album='Takk...'
)
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Sigur Ros')
if os.path.isdir(artist_dir):
albums = os.listdir(artist_dir)
self.assertTrue(len(albums) > 0)
class TestReaddirSpecialChars(BeetFSTestCase):
def test_artist_with_ampersand(self):
self.add_test_track(
filename='guns.flac',
artist="Guns N' Roses",
title='Welcome to the Jungle',
album='Appetite for Destruction'
)
self.mount_beetfs()
entries = os.listdir(self.mount_dir)
self.assertTrue(len(entries) > 0)
def test_title_with_question_mark(self):
self.add_test_track(
filename='who.flac',
artist='The Who',
title='Who Are You?',
album='Who Are You'
)
self.mount_beetfs()
entries = os.listdir(self.mount_dir)
self.assertIn('The Who', entries)
if __name__ == '__main__':
unittest.main()
-83
View File
@@ -1,83 +0,0 @@
# -*- coding: utf-8 -*-
"""Smoke tests for beetfs mount/unmount lifecycle."""
from __future__ import print_function, unicode_literals
import os
import unittest
from conftest import BeetFSTestCase, is_fuse_available
class TestMountLifecycle(BeetFSTestCase):
def test_fuse_available(self):
self.assertTrue(is_fuse_available())
def test_mount_empty_library(self):
self.mount_beetfs()
self.assertTrue(os.path.ismount(self.mount_dir))
def test_unmount_clean(self):
self.mount_beetfs()
self.assertTrue(os.path.ismount(self.mount_dir))
self._unmount()
self._wait_for_process_exit(5)
self.assertFalse(os.path.ismount(self.mount_dir))
def test_mount_with_single_track(self):
self.add_test_track(
filename='track01.flac',
artist='Smoke Test Artist',
title='Smoke Test Track',
album='Smoke Test Album'
)
self.mount_beetfs()
self.assertTrue(os.path.ismount(self.mount_dir))
def test_mount_directory_accessible(self):
self.mount_beetfs()
entries = os.listdir(self.mount_dir)
self.assertIsInstance(entries, list)
def test_temp_directory_created(self):
self.assertTrue(os.path.isdir(self.temp_dir))
self.assertTrue(os.path.isdir(self.mount_dir))
def test_library_fixture_created(self):
self.assertTrue(os.path.isfile(self.library.db_path))
self.assertTrue(os.path.isdir(self.library.music_dir))
class TestMountWithContent(BeetFSTestCase):
def test_mount_with_multiple_tracks(self):
for i in range(3):
self.add_test_track(
filename='track{:02d}.flac'.format(i + 1),
artist='Test Artist',
title='Track {}'.format(i + 1),
album='Test Album',
track=str(i + 1)
)
self.mount_beetfs()
self.assertTrue(os.path.ismount(self.mount_dir))
def test_mount_with_unicode_metadata(self):
self.add_test_track(
filename='unicode_track.flac',
artist='Tëst Ärtîst',
title='Ünïcödé Träck',
album='Spëcîäl Chäräctërs'
)
self.mount_beetfs()
self.assertTrue(os.path.ismount(self.mount_dir))
if __name__ == '__main__':
unittest.main()
-138
View File
@@ -1,138 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
import os
import stat
import unittest
from conftest import BeetFSTestCase
class TestStatFile(BeetFSTestCase):
def setUp(self):
super(TestStatFile, self).setUp()
self.file_path, self.item_id = self.add_test_track(
filename='stat_test.flac',
artist='Test Artist',
title='Test Track',
album='Test Album'
)
def test_stat_returns_stat_result(self):
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
result = os.stat(tracks[0])
self.assertTrue(hasattr(result, 'st_mode'))
self.assertTrue(hasattr(result, 'st_size'))
def test_stat_file_mode(self):
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
result = os.stat(tracks[0])
self.assertTrue(stat.S_ISREG(result.st_mode))
def test_stat_file_size_positive(self):
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
result = os.stat(tracks[0])
self.assertGreater(result.st_size, 0)
def test_stat_file_size_matches_original(self):
original_size = os.path.getsize(self.file_path)
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
result = os.stat(tracks[0])
self.assertAlmostEqual(result.st_size, original_size, delta=original_size * 0.5)
def _find_any_track(self):
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
class TestStatDirectory(BeetFSTestCase):
def test_stat_root(self):
self.mount_beetfs()
result = os.stat(self.mount_dir)
self.assertTrue(stat.S_ISDIR(result.st_mode))
def test_stat_artist_directory(self):
self.add_test_track(
filename='test.flac',
artist='Stat Test Artist'
)
self.mount_beetfs()
artist_dir = os.path.join(self.mount_dir, 'Stat Test Artist')
if os.path.isdir(artist_dir):
result = os.stat(artist_dir)
self.assertTrue(stat.S_ISDIR(result.st_mode))
class TestStatfs(BeetFSTestCase):
def test_statvfs_returns_result(self):
self.mount_beetfs()
result = os.statvfs(self.mount_dir)
self.assertTrue(hasattr(result, 'f_bsize'))
self.assertTrue(hasattr(result, 'f_blocks'))
class TestAccess(BeetFSTestCase):
def test_access_root_readable(self):
self.mount_beetfs()
self.assertTrue(os.access(self.mount_dir, os.R_OK))
def test_access_root_executable(self):
self.mount_beetfs()
self.assertTrue(os.access(self.mount_dir, os.X_OK))
def test_access_file_readable(self):
self.add_test_track(filename='access_test.flac')
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
self.assertTrue(os.access(tracks[0], os.R_OK))
def _find_any_track(self):
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
if __name__ == '__main__':
unittest.main()
-161
View File
@@ -1,161 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
import os
import unittest
from io import BytesIO
from conftest import BeetFSTestCase, is_ffmpeg_available, is_flac_available
@unittest.skipUnless(is_ffmpeg_available() and is_flac_available(),
"ffmpeg and flac required for write tests")
class TestWriteMetadata(BeetFSTestCase):
def setUp(self):
super(TestWriteMetadata, self).setUp()
try:
import mutagen.flac
self.mutagen = mutagen.flac
except ImportError:
self.mutagen = None
def test_write_opens_without_error(self):
self.add_test_track(filename='write_test.flac')
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
try:
with open(tracks[0], 'r+b') as f:
pass
except IOError:
self.skipTest("Write not supported")
def test_write_to_header_updates_db(self):
if not self.mutagen:
self.skipTest("mutagen required")
file_path, item_id = self.add_test_track(
filename='db_write_test.flac',
artist='Original Artist',
title='Original Title'
)
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
with open(tracks[0], 'rb') as f:
original_data = f.read()
flac = self.mutagen.FLAC(BytesIO(original_data))
flac['artist'] = ['Modified Artist']
output = BytesIO()
flac.save(output)
modified_data = output.getvalue()
try:
with open(tracks[0], 'r+b') as f:
f.write(modified_data[:1024])
except IOError:
self.skipTest("Write not supported")
item = self.library.get_item(item_id)
if item:
self.assertIn('Modified', str(item))
def test_original_file_unchanged_after_write(self):
if not self.mutagen:
self.skipTest("mutagen required")
file_path, item_id = self.add_test_track(
filename='unchanged_test.flac',
artist='Original Artist'
)
with open(file_path, 'rb') as f:
original_content = f.read()
self.mount_beetfs()
tracks = self._find_any_track()
if tracks:
try:
with open(tracks[0], 'r+b') as f:
f.write(b'\x00' * 100)
except IOError:
pass
with open(file_path, 'rb') as f:
after_content = f.read()
self.assertEqual(original_content, after_content)
def _find_any_track(self):
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
class TestWriteAudioDiscarded(BeetFSTestCase):
def test_write_to_audio_region_no_error(self):
file_path, item_id = self.add_test_track(filename='audio_write.flac')
self.mount_beetfs()
tracks = self._find_any_track()
if not tracks:
self.skipTest("No tracks found")
try:
with open(tracks[0], 'r+b') as f:
f.seek(10000)
f.write(b'\x00' * 100)
except IOError:
pass
def test_audio_unchanged_after_write_attempt(self):
file_path, item_id = self.add_test_track(filename='audio_unchanged.flac')
with open(file_path, 'rb') as f:
f.seek(10000)
original_audio = f.read(100)
self.mount_beetfs()
tracks = self._find_any_track()
if tracks:
try:
with open(tracks[0], 'r+b') as f:
f.seek(10000)
f.write(b'\xff' * 100)
except IOError:
pass
with open(file_path, 'rb') as f:
f.seek(10000)
after_audio = f.read(100)
self.assertEqual(original_audio, after_audio)
def _find_any_track(self):
results = []
for root, dirs, files in os.walk(self.mount_dir):
for f in files:
if f.endswith('.flac'):
results.append(os.path.join(root, f))
return results
if __name__ == '__main__':
unittest.main()