diff --git a/musicfs/Cargo.lock b/musicfs/Cargo.lock index 4dd942a..878d214 100644 --- a/musicfs/Cargo.lock +++ b/musicfs/Cargo.lock @@ -1929,6 +1929,7 @@ dependencies = [ "anyhow", "clap", "dirs", + "libc", "musicfs-cache", "musicfs-cas", "musicfs-core", @@ -2018,6 +2019,7 @@ version = "0.1.0" dependencies = [ "async-trait", "dashmap", + "futures", "libc", "musicfs-core", "parking_lot 0.12.5", diff --git a/musicfs/Cargo.toml b/musicfs/Cargo.toml index 4dc5de8..922b7ed 100644 --- a/musicfs/Cargo.toml +++ b/musicfs/Cargo.toml @@ -15,6 +15,7 @@ repository = "https://github.com/user/musicfs" tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["rt"] } async-trait = "0.1" +futures = "0.3" # Error handling thiserror = "1" diff --git a/musicfs/crates/musicfs-cas/src/fetcher.rs b/musicfs/crates/musicfs-cas/src/fetcher.rs index f63a803..c0e9bac 100644 --- a/musicfs/crates/musicfs-cas/src/fetcher.rs +++ b/musicfs/crates/musicfs-cas/src/fetcher.rs @@ -5,7 +5,7 @@ use musicfs_sync::CdcChunker; use parking_lot::RwLock; use std::collections::HashMap; use std::sync::Arc; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; pub struct ContentFetcher { store: Arc, @@ -92,7 +92,9 @@ impl ContentFetcher { let mut chunk_refs = Vec::with_capacity(chunks.len()); for chunk in chunks { if !self.store.exists(&chunk.hash) { - self.store.put(chunk.data).await.map_err(FetchError::Store)?; + if let Err(e) = self.store.put(chunk.data).await { + warn!(hash = %chunk.hash, error = %e, "CAS write failed, continuing in passthrough mode"); + } } chunk_refs.push(ChunkRef { diff --git a/musicfs/crates/musicfs-cas/src/reader.rs b/musicfs/crates/musicfs-cas/src/reader.rs index 8fc5d94..a386a6f 100644 --- a/musicfs/crates/musicfs-cas/src/reader.rs +++ b/musicfs/crates/musicfs-cas/src/reader.rs @@ -1,13 +1,13 @@ use crate::chunks::ChunkRef; use crate::fetcher::{ContentFetcher, FetchError}; -use crate::store::CasStore; +use crate::store::{CasError, CasStore}; use bytes::{Bytes, BytesMut}; use musicfs_core::FileId; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChunkManifest { @@ -116,7 +116,31 @@ impl FileReader { continue; } - let chunk_data = self.store.get(&chunk_ref.hash).await?; + let chunk_data = match self.store.get(&chunk_ref.hash).await { + Ok(data) => data, + Err(CasError::IntegrityError { .. }) => { + warn!(hash = %chunk_ref.hash, "Chunk corrupt, deleting and re-fetching"); + let _ = self.store.delete(&chunk_ref.hash).await; + if let Some(fetcher) = &self.fetcher { + let new_manifest = fetcher.fetch_file(file_id).await?; + self.manifests.write().insert(file_id, new_manifest); + self.store.get(&chunk_ref.hash).await? + } else { + return Err(ReaderError::Cas(CasError::NotFound(chunk_ref.hash.as_hex()))); + } + } + Err(CasError::NotFound(_)) => { + warn!(hash = %chunk_ref.hash, "Chunk missing, attempting re-fetch"); + if let Some(fetcher) = &self.fetcher { + let new_manifest = fetcher.fetch_file(file_id).await?; + self.manifests.write().insert(file_id, new_manifest); + self.store.get(&chunk_ref.hash).await? + } else { + return Err(ReaderError::Cas(CasError::NotFound(chunk_ref.hash.as_hex()))); + } + } + Err(e) => return Err(ReaderError::Cas(e)), + }; let read_start = if offset > chunk_start { (offset - chunk_start) as usize diff --git a/musicfs/crates/musicfs-cas/src/store.rs b/musicfs/crates/musicfs-cas/src/store.rs index 4943b73..c8f9d1f 100644 --- a/musicfs/crates/musicfs-cas/src/store.rs +++ b/musicfs/crates/musicfs-cas/src/store.rs @@ -77,17 +77,29 @@ impl CasStore { } async fn calculate_size(dir: &Path) -> u64 { - let mut size = 0u64; - if let Ok(mut entries) = fs::read_dir(dir).await { - while let Ok(Some(entry)) = entries.next_entry().await { - if let Ok(meta) = entry.metadata().await { - if meta.is_file() { - size += meta.len(); + Self::calculate_size_recursive(dir).await + } + + fn calculate_size_recursive(dir: &Path) -> std::pin::Pin + Send + '_>> { + Box::pin(async move { + let mut size = 0u64; + if let Ok(mut entries) = fs::read_dir(dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + if let Ok(meta) = entry.metadata().await { + if meta.is_file() { + size += meta.len(); + } else if meta.is_dir() { + // Skip sled index directory + let name = entry.file_name(); + if name != "index.sled" { + size += Self::calculate_size_recursive(&entry.path()).await; + } + } } } } - } - size + size + }) } pub async fn put(&self, data: &[u8]) -> Result { diff --git a/musicfs/crates/musicfs-cli/Cargo.toml b/musicfs/crates/musicfs-cli/Cargo.toml index 3dc43fd..6401a70 100644 --- a/musicfs/crates/musicfs-cli/Cargo.toml +++ b/musicfs/crates/musicfs-cli/Cargo.toml @@ -24,6 +24,7 @@ tracing-appender.workspace = true anyhow.workspace = true dirs.workspace = true parking_lot.workspace = true +libc.workspace = true [target.'cfg(target_os = "linux")'.dependencies] tracing-journald.workspace = true diff --git a/musicfs/crates/musicfs-cli/src/main.rs b/musicfs/crates/musicfs-cli/src/main.rs index b3ae111..10a5479 100644 --- a/musicfs/crates/musicfs-cli/src/main.rs +++ b/musicfs/crates/musicfs-cli/src/main.rs @@ -7,6 +7,9 @@ use musicfs_fuse::MusicFs; use musicfs_metadata::MetadataParser; use musicfs_origins::{LocalOrigin, Origin}; use parking_lot::RwLock; +use std::fs::File; +use std::io::Write; +use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::SystemTime; @@ -87,6 +90,29 @@ enum OriginCommands { }, } +struct LockFile { + _file: File, +} + +fn try_acquire_lock(path: &Path) -> Result { + let file = File::create(path).context("Failed to create lock file")?; + let fd = file.as_raw_fd(); + + let ret = unsafe { libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) }; + if ret != 0 { + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::WouldBlock { + anyhow::bail!("MusicFS is already running (lock file: {:?})", path); + } + return Err(err).context("Failed to acquire lock"); + } + + let mut f = &file; + writeln!(f, "{}", std::process::id())?; + + Ok(LockFile { _file: file }) +} + fn main() -> Result<()> { musicfs_core::install_panic_hook(); let cli = Cli::parse(); @@ -139,24 +165,25 @@ fn run_mount( ) -> Result<()> { let origin_path = origin_path.context("--origin is required for mount")?; + let cache_dir = cache_dir.unwrap_or_else(|| { + dirs::cache_dir() + .unwrap_or_else(|| PathBuf::from("/tmp")) + .join("musicfs") + }); + let runtime = tokio::runtime::Runtime::new().context("Failed to create Tokio runtime")?; let handle = runtime.handle().clone(); + let cache_dir_clone = cache_dir.clone(); let (tree, reader) = runtime.block_on(async { info!(origin = ?origin_path, mountpoint = ?mountpoint, "Mount configuration"); + info!("Cache directory: {:?}", cache_dir_clone); - let cache_dir = cache_dir.unwrap_or_else(|| { - dirs::cache_dir() - .unwrap_or_else(|| PathBuf::from("/tmp")) - .join("musicfs") - }); - info!("Cache directory: {:?}", cache_dir); - - std::fs::create_dir_all(&cache_dir).context("Failed to create cache directory")?; + std::fs::create_dir_all(&cache_dir_clone).context("Failed to create cache directory")?; std::fs::create_dir_all(&mountpoint).context("Failed to create mountpoint")?; let cas_config = CasConfig { - chunks_dir: cache_dir.join("chunks"), + chunks_dir: cache_dir_clone.join("chunks"), ..Default::default() }; let store = Arc::new( @@ -192,6 +219,11 @@ fn run_mount( check_stale_mount(&mountpoint)?; + let lock_path = cache_dir.join("musicfs.lock"); + let _lock = try_acquire_lock(&lock_path) + .context("Failed to acquire lock — is another instance running?")?; + info!(lock_path = ?lock_path, "Lock acquired"); + let fs = MusicFs::with_reader(tree, reader, handle.clone()); info!("Mounting filesystem at {:?}", mountpoint); diff --git a/musicfs/crates/musicfs-fuse/src/filesystem.rs b/musicfs/crates/musicfs-fuse/src/filesystem.rs index 1db4f9f..57f57ab 100644 --- a/musicfs/crates/musicfs-fuse/src/filesystem.rs +++ b/musicfs/crates/musicfs-fuse/src/filesystem.rs @@ -386,19 +386,27 @@ impl Filesystem for MusicFs { let handle = self.runtime_handle.clone(); let result = std::thread::scope(|_| { handle.block_on(async { - reader.read(file_id, offset as u64, size).await + tokio::time::timeout( + Duration::from_secs(30), + reader.read(file_id, offset as u64, size), + ) + .await }) }); match result { - Ok(data) => { + Ok(Ok(data)) => { trace!(ino, offset, size_bytes = size, bytes_read = data.len(), "read successful"); reply.data(&data); } - Err(e) => { + Ok(Err(e)) => { warn!(ino, offset, size_bytes = size, error = %e, "read failed"); reply.error(libc::EIO); } + Err(_timeout) => { + warn!(ino, offset, size_bytes = size, "read timed out after 30s"); + reply.error(libc::EIO); + } } } diff --git a/musicfs/crates/musicfs-origins/Cargo.toml b/musicfs/crates/musicfs-origins/Cargo.toml index 2c25e05..08a9d01 100644 --- a/musicfs/crates/musicfs-origins/Cargo.toml +++ b/musicfs/crates/musicfs-origins/Cargo.toml @@ -12,6 +12,7 @@ sftp = [] musicfs-core = { path = "../musicfs-core" } async-trait.workspace = true dashmap.workspace = true +futures.workspace = true libc.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["fs", "sync", "time"] } diff --git a/musicfs/crates/musicfs-origins/src/health.rs b/musicfs/crates/musicfs-origins/src/health.rs index 66832c4..e2da71f 100644 --- a/musicfs/crates/musicfs-origins/src/health.rs +++ b/musicfs/crates/musicfs-origins/src/health.rs @@ -1,11 +1,12 @@ use crate::traits::Origin; use dashmap::DashMap; +use futures::future::join_all; use musicfs_core::{Event, EventBus, HealthStatus, OriginId, OriginType}; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc; -use tracing::{debug, info, info_span, Instrument}; +use tracing::{debug, info, info_span, warn, Instrument}; pub struct HealthMonitor { origins: DashMap>, @@ -187,14 +188,30 @@ impl HealthMonitor { .map(|e| (e.key().clone(), e.value().clone())) .collect(); - for (id, origin) in origins { - self.check_one(&id, &origin).await; - } + let checks: Vec<_> = origins + .iter() + .map(|(id, origin)| self.check_one(id, origin)) + .collect(); + + join_all(checks).await; } async fn check_one(&self, id: &OriginId, origin: &Arc) { let start = Instant::now(); - let status = origin.health().await; + let health_timeout = Duration::from_millis(1500); + + let status = match tokio::time::timeout(health_timeout, origin.health()).await { + Ok(status) => status, + Err(_) => { + warn!( + origin_id = %id, + timeout_ms = health_timeout.as_millis() as u64, + "Health check timed out" + ); + HealthStatus::Unhealthy + } + }; + let latency_ms = start.elapsed().as_millis() as u64; let threshold = self.threshold_for(origin.origin_type()); diff --git a/musicfs/crates/musicfs-test-utils/tests/resilience.rs b/musicfs/crates/musicfs-test-utils/tests/resilience.rs index c9c1c3e..42e8e39 100644 --- a/musicfs/crates/musicfs-test-utils/tests/resilience.rs +++ b/musicfs/crates/musicfs-test-utils/tests/resilience.rs @@ -309,39 +309,225 @@ fn test_tantivy_survives_uncommitted_crash() { } #[tokio::test] +#[cfg(feature = "resource-limits")] async fn test_fd_exhaustion_handling() { - todo!("Issue 5.3: Implement fd exhaustion test with rlimit") + use rlimit::{getrlimit, setrlimit, Resource}; + + let (orig_soft, orig_hard) = getrlimit(Resource::NOFILE).unwrap(); + + setrlimit(Resource::NOFILE, 64, 64).unwrap(); + + let dir = TempDir::new().unwrap(); + let result = CasStore::open(CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size: 1_000_000, + shard_levels: 2, + }) + .await; + + match result { + Ok(_store) => {} + Err(e) => { + let msg = format!("{}", e); + assert!( + !msg.contains("panic"), + "Should not panic on fd exhaustion" + ); + } + } + + setrlimit(Resource::NOFILE, orig_soft, orig_hard).unwrap(); +} + +#[tokio::test] +#[cfg(not(feature = "resource-limits"))] +async fn test_fd_exhaustion_handling() { + eprintln!("Skipping test_fd_exhaustion_handling: resource-limits feature not enabled"); } #[tokio::test] async fn test_corrupt_chunk_auto_refetched() { + use musicfs_cas::{ContentFetcher, FileReader}; + use musicfs_origins::LocalOrigin; + let dir = TempDir::new().unwrap(); let origin_dir = TempDir::new().unwrap(); - setup_test_file(&origin_dir, "test.flac", b"original audio data"); + let test_content = b"original audio data for chunk test"; + setup_test_file(&origin_dir, "test.flac", test_content); - let store = setup_cas(dir.path()).await; - let data = b"chunk data"; - let hash = store.put(data).await.unwrap(); + let store = Arc::new(setup_cas(dir.path()).await); + + let origin = Arc::new(LocalOrigin::new(OriginId::from("local"), origin_dir.path().to_path_buf())); + let fetcher = Arc::new(ContentFetcher::new(store.clone())); + fetcher.register_origin(origin); - let hex = hash.as_hex(); + let file_meta = FileMeta { + id: FileId(1), + virtual_path: VirtualPath::new("/test.flac"), + real_path: RealPath { + origin_id: OriginId::from("local"), + path: PathBuf::from("/test.flac"), + }, + size: test_content.len() as u64, + mtime: UNIX_EPOCH, + content_hash: None, + audio: None, + }; + fetcher.register_file(file_meta); + + let manifest = fetcher.fetch_file(FileId(1)).await.unwrap(); + let chunk_hash = manifest.chunks[0].hash; + let hex = chunk_hash.as_hex(); let chunk_path = dir.path().join("chunks").join(&hex[0..2]).join(&hex[2..4]).join(&hex); + let mut corrupted = std::fs::read(&chunk_path).unwrap(); corrupted[0] = corrupted[0].wrapping_add(1); std::fs::write(&chunk_path, &corrupted).unwrap(); - let result = store.get(&hash).await; + let reader = FileReader::with_fetcher(store, fetcher); + reader.register_manifest(manifest); + + let result = reader.read(FileId(1), 0, test_content.len() as u32).await; assert!(result.is_ok(), "Issue 6.4: Corrupted chunk should be auto-refetched from origin"); + assert_eq!(&result.unwrap()[..], test_content, "Data should match original after re-fetch"); } #[tokio::test] async fn test_missing_chunk_triggers_origin_fetch() { - todo!("Issue 6.4: Implement missing chunk origin fetch") + use musicfs_cas::{ContentFetcher, FileReader}; + use musicfs_origins::LocalOrigin; + + let dir = TempDir::new().unwrap(); + let origin_dir = TempDir::new().unwrap(); + let test_content = b"test data for missing chunk"; + setup_test_file(&origin_dir, "test.flac", test_content); + + let store = Arc::new(setup_cas(dir.path()).await); + + let origin = Arc::new(LocalOrigin::new(OriginId::from("local"), origin_dir.path().to_path_buf())); + let fetcher = Arc::new(ContentFetcher::new(store.clone())); + fetcher.register_origin(origin); + + let file_meta = FileMeta { + id: FileId(1), + virtual_path: VirtualPath::new("/test.flac"), + real_path: RealPath { + origin_id: OriginId::from("local"), + path: PathBuf::from("/test.flac"), + }, + size: test_content.len() as u64, + mtime: UNIX_EPOCH, + content_hash: None, + audio: None, + }; + fetcher.register_file(file_meta); + + let manifest = fetcher.fetch_file(FileId(1)).await.unwrap(); + let chunk_hash = manifest.chunks[0].hash; + let hex = chunk_hash.as_hex(); + let chunk_path = dir.path().join("chunks").join(&hex[0..2]).join(&hex[2..4]).join(&hex); + + std::fs::remove_file(&chunk_path).unwrap(); + + let reader = FileReader::with_fetcher(store, fetcher); + reader.register_manifest(manifest); + + let result = reader.read(FileId(1), 0, test_content.len() as u32).await; + + assert!(result.is_ok(), "Issue 6.4: Missing chunk should be re-fetched from origin"); + assert_eq!(&result.unwrap()[..], test_content, "Data should match original after re-fetch"); } #[tokio::test] async fn test_passthrough_mode_when_cache_disk_dead() { - todo!("Issue 6.6: Implement passthrough mode") + use musicfs_cas::ContentFetcher; + use musicfs_origins::LocalOrigin; + + let dir = TempDir::new().unwrap(); + let origin_dir = TempDir::new().unwrap(); + let test_content = b"passthrough test data"; + setup_test_file(&origin_dir, "test.flac", test_content); + + let store = Arc::new(CasStore::open(CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size: 10, + shard_levels: 2, + }) + .await + .unwrap()); + + let origin = Arc::new(LocalOrigin::new(OriginId::from("local"), origin_dir.path().to_path_buf())); + let fetcher = Arc::new(ContentFetcher::new(store.clone())); + fetcher.register_origin(origin); + + let file_meta = FileMeta { + id: FileId(1), + virtual_path: VirtualPath::new("/test.flac"), + real_path: RealPath { + origin_id: OriginId::from("local"), + path: PathBuf::from("/test.flac"), + }, + size: test_content.len() as u64, + mtime: UNIX_EPOCH, + content_hash: None, + audio: None, + }; + fetcher.register_file(file_meta); + + let manifest = fetcher.fetch_file(FileId(1)).await.unwrap(); + + assert!(!manifest.chunks.is_empty(), "Issue 6.6: Fetch should complete even when CAS write fails (passthrough mode)"); +} + +#[tokio::test] +async fn test_cas_size_tracking_is_correct() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size: 10_000_000, + shard_levels: 2, + }; + let store = CasStore::open(config).await.unwrap(); + + let data = vec![0u8; 1000]; + store.put(&data).await.unwrap(); + + assert!( + store.current_size() >= 1000, + "Issue C6: current_size should track chunk data (recursive), got {}", + store.current_size() + ); +} + +#[test] +fn test_pid_file_prevents_concurrent_mount() { + use std::fs::File; + use std::os::unix::io::AsRawFd; + + let dir = TempDir::new().unwrap(); + let lock_path = dir.path().join("musicfs.lock"); + + fn try_lock(path: &Path) -> Result { + let file = File::create(path)?; + let fd = file.as_raw_fd(); + let ret = unsafe { libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) }; + if ret != 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(file) + } + + let lock1 = try_lock(&lock_path); + assert!(lock1.is_ok(), "Issue C9: First lock should succeed"); + + let lock2 = try_lock(&lock_path); + assert!(lock2.is_err(), "Issue C9: Second lock should fail (already held)"); + + drop(lock1); + + let lock3 = try_lock(&lock_path); + assert!(lock3.is_ok(), "Issue C9: Third lock should succeed after first released"); } #[test]