0ff2a17ab7
Implements phase-c-hardening.md to fix 6 RED resilience tests: - D1/D2: Health check timeout (1.5s) + parallel execution via join_all - C6: Recursive CAS calculate_size() to scan shard subdirectories - C7: FUSE read timeout (30s) returns EIO instead of hanging - 6.4: Auto-re-fetch corrupt/missing chunks from origin - 6.6: Passthrough mode - continue even when CAS write fails - C9: PID file with flock prevents concurrent mounts - 5.3: fd exhaustion handling test All 27 resilience tests now pass. Full test suite green. Files changed: - musicfs-origins/src/health.rs: timeout + join_all - musicfs-origins/Cargo.toml: add futures dependency - musicfs-cas/src/store.rs: recursive calculate_size - musicfs-cas/src/reader.rs: auto-re-fetch on IntegrityError/NotFound - musicfs-cas/src/fetcher.rs: passthrough fallback - musicfs-fuse/src/filesystem.rs: 30s read timeout - musicfs-cli/src/main.rs: PID file with flock - musicfs-test-utils/tests/resilience.rs: updated tests
541 lines
16 KiB
Rust
541 lines
16 KiB
Rust
use anyhow::{Context, Result};
|
|
use clap::{Parser, Subcommand};
|
|
use musicfs_cache::TreeBuilder;
|
|
use musicfs_cas::{CasConfig, CasStore, ContentFetcher, FileReader};
|
|
use musicfs_core::{FileId, FileMeta, LoggingConfig, OriginId, RealPath, VirtualPath};
|
|
use musicfs_fuse::MusicFs;
|
|
use musicfs_metadata::MetadataParser;
|
|
use musicfs_origins::{LocalOrigin, Origin};
|
|
use parking_lot::RwLock;
|
|
use std::fs::File;
|
|
use std::io::Write;
|
|
use std::os::unix::io::AsRawFd;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::Arc;
|
|
use std::time::SystemTime;
|
|
use tracing::{debug, info, warn};
|
|
use tracing_appender::non_blocking::WorkerGuard;
|
|
use tracing_subscriber::{fmt, prelude::*, EnvFilter, Layer};
|
|
|
|
#[derive(Parser)]
|
|
#[command(name = "musicfs")]
|
|
#[command(about = "Virtual FUSE filesystem for music libraries")]
|
|
struct Cli {
|
|
#[arg(short, long, default_value = "info", help = "Log level")]
|
|
log_level: String,
|
|
|
|
#[command(subcommand)]
|
|
command: Commands,
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum Commands {
|
|
Mount {
|
|
#[arg(short, long, help = "Config file path")]
|
|
config: Option<PathBuf>,
|
|
#[arg(help = "Mount point")]
|
|
mountpoint: PathBuf,
|
|
#[arg(short, long, help = "Source music directory")]
|
|
origin: Option<PathBuf>,
|
|
#[arg(short = 'd', long, help = "Cache directory")]
|
|
cache_dir: Option<PathBuf>,
|
|
},
|
|
Status,
|
|
Cache {
|
|
#[command(subcommand)]
|
|
command: CacheCommands,
|
|
},
|
|
Search {
|
|
query: String,
|
|
#[arg(short, long, default_value = "100")]
|
|
limit: u32,
|
|
},
|
|
Origin {
|
|
#[command(subcommand)]
|
|
command: OriginCommands,
|
|
},
|
|
Events {
|
|
#[arg(short, long, help = "Filter by event type")]
|
|
r#type: Option<String>,
|
|
},
|
|
Shutdown {
|
|
#[arg(short, long, default_value = "true")]
|
|
graceful: bool,
|
|
#[arg(short, long, default_value = "30")]
|
|
timeout: u32,
|
|
},
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum CacheCommands {
|
|
Stats,
|
|
Clear {
|
|
#[arg(help = "Origin to clear cache for")]
|
|
origin: Option<String>,
|
|
},
|
|
Prefetch {
|
|
#[arg(help = "Paths to prefetch")]
|
|
paths: Vec<String>,
|
|
},
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum OriginCommands {
|
|
List,
|
|
Health {
|
|
origin_id: String,
|
|
},
|
|
Rescan {
|
|
origin_id: String,
|
|
},
|
|
}
|
|
|
|
struct LockFile {
|
|
_file: File,
|
|
}
|
|
|
|
fn try_acquire_lock(path: &Path) -> Result<LockFile> {
|
|
let file = File::create(path).context("Failed to create lock file")?;
|
|
let fd = file.as_raw_fd();
|
|
|
|
let ret = unsafe { libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) };
|
|
if ret != 0 {
|
|
let err = std::io::Error::last_os_error();
|
|
if err.kind() == std::io::ErrorKind::WouldBlock {
|
|
anyhow::bail!("MusicFS is already running (lock file: {:?})", path);
|
|
}
|
|
return Err(err).context("Failed to acquire lock");
|
|
}
|
|
|
|
let mut f = &file;
|
|
writeln!(f, "{}", std::process::id())?;
|
|
|
|
Ok(LockFile { _file: file })
|
|
}
|
|
|
|
fn main() -> Result<()> {
|
|
musicfs_core::install_panic_hook();
|
|
let cli = Cli::parse();
|
|
|
|
match cli.command {
|
|
Commands::Mount {
|
|
config: _,
|
|
mountpoint,
|
|
origin,
|
|
cache_dir,
|
|
} => {
|
|
let log_config = LoggingConfig {
|
|
level: cli.log_level,
|
|
..Default::default()
|
|
};
|
|
let _guard = init_logging(&log_config)?;
|
|
run_mount(mountpoint, origin, cache_dir)
|
|
}
|
|
Commands::Status => {
|
|
init_basic_logging(&cli.log_level);
|
|
run_status()
|
|
}
|
|
Commands::Cache { command } => {
|
|
init_basic_logging(&cli.log_level);
|
|
run_cache(command)
|
|
}
|
|
Commands::Search { query, limit } => {
|
|
init_basic_logging(&cli.log_level);
|
|
run_search(&query, limit)
|
|
}
|
|
Commands::Origin { command } => {
|
|
init_basic_logging(&cli.log_level);
|
|
run_origin(command)
|
|
}
|
|
Commands::Events { r#type } => {
|
|
init_basic_logging(&cli.log_level);
|
|
run_events(r#type)
|
|
}
|
|
Commands::Shutdown { graceful, timeout } => {
|
|
init_basic_logging(&cli.log_level);
|
|
run_shutdown(graceful, timeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn run_mount(
|
|
mountpoint: PathBuf,
|
|
origin_path: Option<PathBuf>,
|
|
cache_dir: Option<PathBuf>,
|
|
) -> Result<()> {
|
|
let origin_path = origin_path.context("--origin is required for mount")?;
|
|
|
|
let cache_dir = cache_dir.unwrap_or_else(|| {
|
|
dirs::cache_dir()
|
|
.unwrap_or_else(|| PathBuf::from("/tmp"))
|
|
.join("musicfs")
|
|
});
|
|
|
|
let runtime = tokio::runtime::Runtime::new().context("Failed to create Tokio runtime")?;
|
|
let handle = runtime.handle().clone();
|
|
|
|
let cache_dir_clone = cache_dir.clone();
|
|
let (tree, reader) = runtime.block_on(async {
|
|
info!(origin = ?origin_path, mountpoint = ?mountpoint, "Mount configuration");
|
|
info!("Cache directory: {:?}", cache_dir_clone);
|
|
|
|
std::fs::create_dir_all(&cache_dir_clone).context("Failed to create cache directory")?;
|
|
std::fs::create_dir_all(&mountpoint).context("Failed to create mountpoint")?;
|
|
|
|
let cas_config = CasConfig {
|
|
chunks_dir: cache_dir_clone.join("chunks"),
|
|
..Default::default()
|
|
};
|
|
let store = Arc::new(
|
|
CasStore::open(cas_config)
|
|
.await
|
|
.context("Failed to open CAS store")?,
|
|
);
|
|
info!("CAS store initialized");
|
|
|
|
let origin_id = OriginId::from("local");
|
|
let origin = Arc::new(LocalOrigin::new(origin_id.clone(), origin_path.clone()));
|
|
info!("Origin registered: {}", origin.display_name());
|
|
|
|
let fetcher = Arc::new(ContentFetcher::new(store.clone()));
|
|
fetcher.register_origin(origin);
|
|
|
|
info!("Scanning music files...");
|
|
let files = scan_music_files(&origin_path, &origin_id).await?;
|
|
info!("Found {} music files", files.len());
|
|
|
|
let mut builder = TreeBuilder::new();
|
|
for file in &files {
|
|
builder.add_file(file);
|
|
fetcher.register_file(file.clone());
|
|
}
|
|
let tree = Arc::new(RwLock::new(builder.build()));
|
|
info!("Virtual tree built");
|
|
|
|
let reader = Arc::new(FileReader::with_fetcher(store, fetcher));
|
|
|
|
Ok::<_, anyhow::Error>((tree, reader))
|
|
})?;
|
|
|
|
check_stale_mount(&mountpoint)?;
|
|
|
|
let lock_path = cache_dir.join("musicfs.lock");
|
|
let _lock = try_acquire_lock(&lock_path)
|
|
.context("Failed to acquire lock — is another instance running?")?;
|
|
info!(lock_path = ?lock_path, "Lock acquired");
|
|
|
|
let fs = MusicFs::with_reader(tree, reader, handle.clone());
|
|
|
|
info!("Mounting filesystem at {:?}", mountpoint);
|
|
|
|
let session = fs
|
|
.spawn_mount(&mountpoint)
|
|
.context("Failed to mount filesystem")?;
|
|
|
|
#[cfg(target_os = "linux")]
|
|
{
|
|
if let Err(e) = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]) {
|
|
debug!("sd_notify not available (not running under systemd): {}", e);
|
|
}
|
|
}
|
|
info!("MusicFS ready, PID {}", std::process::id());
|
|
|
|
let shutdown_token = tokio_util::sync::CancellationToken::new();
|
|
|
|
runtime.block_on(async {
|
|
let mut sigterm =
|
|
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
|
|
let mut sigint =
|
|
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
|
|
|
|
tokio::select! {
|
|
_ = sigterm.recv() => {
|
|
info!("Received SIGTERM, shutting down");
|
|
}
|
|
_ = sigint.recv() => {
|
|
info!("Received SIGINT, shutting down");
|
|
}
|
|
}
|
|
|
|
info!("Beginning ordered shutdown");
|
|
shutdown_token.cancel();
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
|
|
|
info!("Background tasks stopped");
|
|
|
|
Ok::<_, anyhow::Error>(())
|
|
})?;
|
|
|
|
#[cfg(target_os = "linux")]
|
|
{
|
|
let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Stopping]);
|
|
}
|
|
info!("Unmounting filesystem");
|
|
drop(session);
|
|
info!("Shutdown complete");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn run_status() -> Result<()> {
|
|
println!("Status: Not connected to daemon");
|
|
println!("Hint: gRPC client integration pending");
|
|
Ok(())
|
|
}
|
|
|
|
fn run_cache(command: CacheCommands) -> Result<()> {
|
|
match command {
|
|
CacheCommands::Stats => {
|
|
println!("Cache stats: gRPC client integration pending");
|
|
}
|
|
CacheCommands::Clear { origin } => {
|
|
println!(
|
|
"Clearing cache for: {}",
|
|
origin.as_deref().unwrap_or("all")
|
|
);
|
|
println!("gRPC client integration pending");
|
|
}
|
|
CacheCommands::Prefetch { paths } => {
|
|
println!("Prefetching {} paths", paths.len());
|
|
println!("gRPC client integration pending");
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn run_search(query: &str, limit: u32) -> Result<()> {
|
|
println!("Searching for: {} (limit: {})", query, limit);
|
|
println!("gRPC client integration pending");
|
|
Ok(())
|
|
}
|
|
|
|
fn run_origin(command: OriginCommands) -> Result<()> {
|
|
match command {
|
|
OriginCommands::List => {
|
|
println!("Origins: gRPC client integration pending");
|
|
}
|
|
OriginCommands::Health { origin_id } => {
|
|
println!("Health for {}: gRPC client integration pending", origin_id);
|
|
}
|
|
OriginCommands::Rescan { origin_id } => {
|
|
println!("Rescanning {}: gRPC client integration pending", origin_id);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn run_events(event_type: Option<String>) -> Result<()> {
|
|
println!(
|
|
"Subscribing to events: {}",
|
|
event_type.as_deref().unwrap_or("all")
|
|
);
|
|
println!("gRPC client integration pending");
|
|
Ok(())
|
|
}
|
|
|
|
fn run_shutdown(graceful: bool, timeout: u32) -> Result<()> {
|
|
println!(
|
|
"Shutdown requested (graceful: {}, timeout: {}s)",
|
|
graceful, timeout
|
|
);
|
|
println!("gRPC client integration pending");
|
|
Ok(())
|
|
}
|
|
|
|
fn init_logging(config: &LoggingConfig) -> Result<WorkerGuard> {
|
|
std::fs::create_dir_all(&config.log_dir)?;
|
|
|
|
let file_appender = tracing_appender::rolling::daily(&config.log_dir, "musicfs.log");
|
|
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
|
|
|
|
let file_layer = if config.json_output {
|
|
fmt::layer()
|
|
.json()
|
|
.with_writer(non_blocking)
|
|
.with_ansi(false)
|
|
.boxed()
|
|
} else {
|
|
fmt::layer()
|
|
.with_writer(non_blocking)
|
|
.with_ansi(false)
|
|
.boxed()
|
|
};
|
|
|
|
let stderr_layer = fmt::layer().with_writer(std::io::stderr).compact();
|
|
|
|
let filter = EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| EnvFilter::new(&config.level));
|
|
|
|
let subscriber = tracing_subscriber::registry()
|
|
.with(filter)
|
|
.with(file_layer)
|
|
.with(stderr_layer);
|
|
|
|
#[cfg(target_os = "linux")]
|
|
let subscriber = {
|
|
let journald_layer = if config.journald {
|
|
tracing_journald::layer()
|
|
.ok()
|
|
.map(|l| l.with_syslog_identifier("musicfs".to_string()))
|
|
} else {
|
|
None
|
|
};
|
|
subscriber.with(journald_layer)
|
|
};
|
|
|
|
subscriber.init();
|
|
|
|
info!(version = env!("CARGO_PKG_VERSION"), "MusicFS starting");
|
|
Ok(guard)
|
|
}
|
|
|
|
fn init_basic_logging(level: &str) {
|
|
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level));
|
|
|
|
tracing_subscriber::registry()
|
|
.with(fmt::layer().compact())
|
|
.with(filter)
|
|
.init();
|
|
}
|
|
|
|
async fn scan_music_files(dir: &Path, origin_id: &OriginId) -> Result<Vec<FileMeta>> {
|
|
let parser = MetadataParser::new();
|
|
let mut files = Vec::new();
|
|
let mut file_id_counter = 1i64;
|
|
|
|
scan_dir_recursive(
|
|
dir,
|
|
dir,
|
|
origin_id,
|
|
&parser,
|
|
&mut files,
|
|
&mut file_id_counter,
|
|
)
|
|
.await?;
|
|
|
|
Ok(files)
|
|
}
|
|
|
|
async fn scan_dir_recursive(
|
|
base: &Path,
|
|
dir: &Path,
|
|
origin_id: &OriginId,
|
|
parser: &MetadataParser,
|
|
files: &mut Vec<FileMeta>,
|
|
id_counter: &mut i64,
|
|
) -> Result<()> {
|
|
let mut entries = tokio::fs::read_dir(dir).await?;
|
|
|
|
while let Some(entry) = entries.next_entry().await? {
|
|
let path = entry.path();
|
|
let metadata = entry.metadata().await?;
|
|
|
|
if metadata.is_dir() {
|
|
Box::pin(scan_dir_recursive(
|
|
base, &path, origin_id, parser, files, id_counter,
|
|
))
|
|
.await?;
|
|
} else if is_audio_file(&path) {
|
|
let relative_path = path.strip_prefix(base).unwrap_or(&path);
|
|
|
|
let audio_meta = match parser.parse_file(&path) {
|
|
Ok(meta) => Some(meta),
|
|
Err(e) => {
|
|
debug!("Failed to parse metadata for {:?}: {}", path, e);
|
|
None
|
|
}
|
|
};
|
|
|
|
let virtual_path = build_virtual_path(&path, audio_meta.as_ref());
|
|
|
|
let file_meta = FileMeta {
|
|
id: FileId(*id_counter),
|
|
virtual_path,
|
|
real_path: RealPath {
|
|
origin_id: origin_id.clone(),
|
|
path: PathBuf::from("/").join(relative_path),
|
|
},
|
|
size: metadata.len(),
|
|
mtime: metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH),
|
|
content_hash: None,
|
|
audio: audio_meta,
|
|
};
|
|
|
|
debug!(
|
|
"Found: {:?} -> {:?}",
|
|
file_meta.real_path.path, file_meta.virtual_path
|
|
);
|
|
files.push(file_meta);
|
|
*id_counter += 1;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn is_audio_file(path: &Path) -> bool {
|
|
matches!(
|
|
path.extension()
|
|
.and_then(|e| e.to_str())
|
|
.map(|e| e.to_lowercase())
|
|
.as_deref(),
|
|
Some("flac" | "mp3" | "ogg" | "wav" | "m4a" | "aac" | "opus")
|
|
)
|
|
}
|
|
|
|
fn build_virtual_path(path: &Path, audio: Option<&musicfs_core::AudioMeta>) -> VirtualPath {
|
|
if let Some(meta) = audio {
|
|
let artist = meta.artist.as_deref().unwrap_or("Unknown Artist");
|
|
let album = meta.album.as_deref().unwrap_or("Unknown Album");
|
|
let filename = path
|
|
.file_name()
|
|
.and_then(|n| n.to_str())
|
|
.unwrap_or("track");
|
|
|
|
VirtualPath::new(&format!(
|
|
"/{}/{}/{}",
|
|
sanitize(artist),
|
|
sanitize(album),
|
|
filename
|
|
))
|
|
} else {
|
|
let filename = path
|
|
.file_name()
|
|
.and_then(|n| n.to_str())
|
|
.unwrap_or("unknown");
|
|
VirtualPath::new(&format!("/Unknown Artist/Unknown Album/{}", filename))
|
|
}
|
|
}
|
|
|
|
fn sanitize(s: &str) -> String {
|
|
s.chars()
|
|
.map(|c| match c {
|
|
'/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
|
|
_ => c,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
fn check_stale_mount(mountpoint: &Path) -> Result<()> {
|
|
if let Ok(mounts) = std::fs::read_to_string("/proc/mounts") {
|
|
for line in mounts.lines() {
|
|
if line.contains(mountpoint.to_string_lossy().as_ref()) && line.contains("fuse") {
|
|
warn!(
|
|
"Stale FUSE mount detected at {:?}, attempting cleanup",
|
|
mountpoint
|
|
);
|
|
let status = std::process::Command::new("fusermount")
|
|
.args(["-uz", &mountpoint.to_string_lossy()])
|
|
.status();
|
|
match status {
|
|
Ok(s) if s.success() => info!("Stale mount cleaned up"),
|
|
Ok(s) => warn!("fusermount exited with: {}", s),
|
|
Err(e) => warn!("Failed to run fusermount: {}", e),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|