From 32c96701c825c217ee44c0c19f69f5520bc7a17a Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 12 May 2026 20:05:44 +0200 Subject: [PATCH] Implement Week 5 CDC & Delta Detection with Oracle fixes - Add CdcChunker using FastCDC v3 (16KB/64KB/256KB chunks) - Add DeltaDetector with scan_origin() returning ScannedFile (no FileId assignment) - Add OriginWatcher with inotify and 200ms debounce using tokio::spawn - Fix LocalOrigin::read() to loop until all bytes read - Add read_full() method to Origin trait - Add mtime field to ChunkManifest - Update ContentFetcher to use CDC chunking - Update bandwidth reduction test to assert >90% (NFR-6.4) Tests: 71 pass (+11 new) --- musicfs/Cargo.lock | 150 +++++++- musicfs/crates/musicfs-cas/Cargo.toml | 1 + musicfs/crates/musicfs-cas/src/fetcher.rs | 41 ++- musicfs/crates/musicfs-cas/src/reader.rs | 9 +- .../crates/musicfs-cas/tests/integration.rs | 1 + musicfs/crates/musicfs-origins/src/local.rs | 24 +- musicfs/crates/musicfs-origins/src/traits.rs | 3 + musicfs/crates/musicfs-sync/Cargo.toml | 16 + musicfs/crates/musicfs-sync/src/cdc.rs | 225 ++++++++++++ musicfs/crates/musicfs-sync/src/delta.rs | 329 ++++++++++++++++++ musicfs/crates/musicfs-sync/src/lib.rs | 8 +- musicfs/crates/musicfs-sync/src/watcher.rs | 206 +++++++++++ 12 files changed, 998 insertions(+), 15 deletions(-) create mode 100644 musicfs/crates/musicfs-sync/src/cdc.rs create mode 100644 musicfs/crates/musicfs-sync/src/delta.rs create mode 100644 musicfs/crates/musicfs-sync/src/watcher.rs diff --git a/musicfs/Cargo.lock b/musicfs/Cargo.lock index 90d634a..0760d3b 100644 --- a/musicfs/Cargo.lock +++ b/musicfs/Cargo.lock @@ -203,6 +203,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -282,12 +291,28 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastcdc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf51ceb43e96afbfe4dd5c6f6082af5dfd60e220820b8123792d61963f2ce6bc" + [[package]] name = "fastrand" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "filetime" +version = "0.2.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5b2eef6fafbf69f877e55509ce5b11a760690ac9700a2921be067aa6afaef6" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -310,6 +335,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "fuser" version = "0.14.0" @@ -421,6 +455,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.13" @@ -442,6 +496,26 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "285efcf12ef41bec907b3000d5ffaeb54191d4d9d83c0d6157e6cbc2db255e64" +dependencies = [ + "bitflags 2.11.1", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -516,6 +590,18 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.2.0" @@ -553,6 +639,7 @@ dependencies = [ "musicfs-cache", "musicfs-core", "musicfs-origins", + "musicfs-sync", "rmp-serde", "serde", "sled", @@ -641,6 +728,39 @@ version = "0.1.0" [[package]] name = "musicfs-sync" version = "0.1.0" +dependencies = [ + "async-trait", + "fastcdc", + "musicfs-core", + "musicfs-origins", + "notify", + "rmp-serde", + "serde", + "tempfile", + "thiserror", + "tokio", + "tracing", + "xxhash-rust", +] + +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.11.1", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] [[package]] name = "nu-ansi-term" @@ -874,6 +994,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1186,7 +1315,7 @@ checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ "bytes", "libc", - "mio", + "mio 1.2.0", "parking_lot 0.12.5", "pin-project-lite", "signal-hook-registry", @@ -1303,6 +1432,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1377,6 +1516,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/musicfs/crates/musicfs-cas/Cargo.toml b/musicfs/crates/musicfs-cas/Cargo.toml index eddbe9a..cdbff66 100644 --- a/musicfs/crates/musicfs-cas/Cargo.toml +++ b/musicfs/crates/musicfs-cas/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true [dependencies] musicfs-core = { path = "../musicfs-core" } musicfs-origins = { path = "../musicfs-origins" } +musicfs-sync = { path = "../musicfs-sync" } tokio.workspace = true tracing.workspace = true serde.workspace = true diff --git a/musicfs/crates/musicfs-cas/src/fetcher.rs b/musicfs/crates/musicfs-cas/src/fetcher.rs index 9af8361..5a25ac7 100644 --- a/musicfs/crates/musicfs-cas/src/fetcher.rs +++ b/musicfs/crates/musicfs-cas/src/fetcher.rs @@ -1,6 +1,7 @@ use crate::{CasStore, ChunkManifest, ChunkRef}; use musicfs_core::{Event, EventBus, FileId, FileMeta, OriginId}; use musicfs_origins::Origin; +use musicfs_sync::CdcChunker; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use tracing::{debug, info}; @@ -10,6 +11,7 @@ pub struct ContentFetcher { origins: RwLock>>, file_meta: RwLock>, event_bus: Option>, + chunker: CdcChunker, } impl ContentFetcher { @@ -19,6 +21,7 @@ impl ContentFetcher { origins: RwLock::new(HashMap::new()), file_meta: RwLock::new(HashMap::new()), event_bus: None, + chunker: CdcChunker::default(), } } @@ -28,6 +31,7 @@ impl ContentFetcher { origins: RwLock::new(HashMap::new()), file_meta: RwLock::new(HashMap::new()), event_bus: Some(event_bus), + chunker: CdcChunker::default(), } } @@ -71,25 +75,44 @@ impl ContentFetcher { ); let data = origin - .read(&meta.real_path.path, 0, meta.size as u32) + .read_full(&meta.real_path.path) .await .map_err(|e| FetchError::OriginRead(e.to_string()))?; - let hash = self.store.put(&data).await.map_err(FetchError::Store)?; + let mtime = meta + .mtime + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + + let chunks = self.chunker.chunk_refs(&data); + info!("Chunked {:?} into {} chunks", file_id, chunks.len()); + + let mut chunk_refs = Vec::with_capacity(chunks.len()); + for chunk in chunks { + if !self.store.exists(&chunk.hash) { + self.store.put(chunk.data).await.map_err(FetchError::Store)?; + } + + chunk_refs.push(ChunkRef { + hash: chunk.hash, + offset: chunk.offset, + size: chunk.length, + }); + } let manifest = ChunkManifest { file_id, total_size: meta.size, - chunks: vec![ChunkRef { - hash, - offset: 0, - size: data.len() as u32, - }], + mtime, + chunks: chunk_refs, }; debug!( - "Created manifest for {:?}: {} bytes, 1 chunk", - file_id, meta.size + "Created manifest for {:?}: {} bytes, {} chunks", + file_id, + meta.size, + manifest.chunks.len() ); Ok(manifest) diff --git a/musicfs/crates/musicfs-cas/src/reader.rs b/musicfs/crates/musicfs-cas/src/reader.rs index 9a9c3f4..aeb3b94 100644 --- a/musicfs/crates/musicfs-cas/src/reader.rs +++ b/musicfs/crates/musicfs-cas/src/reader.rs @@ -11,6 +11,7 @@ use std::sync::{Arc, RwLock}; pub struct ChunkManifest { pub file_id: FileId, pub total_size: u64, + pub mtime: i64, pub chunks: Vec, } @@ -23,11 +24,12 @@ impl ChunkManifest { rmp_serde::from_slice(data).ok() } - pub fn from_db(file_id: FileId, total_size: u64, chunk_blob: &[u8]) -> Option { + pub fn from_db(file_id: FileId, total_size: u64, mtime: i64, chunk_blob: &[u8]) -> Option { let chunks = Self::chunks_from_bytes(chunk_blob)?; Some(Self { file_id, total_size, + mtime, chunks, }) } @@ -166,6 +168,7 @@ mod tests { reader.register_manifest(ChunkManifest { file_id: FileId(1), total_size: data.len() as u64, + mtime: 0, chunks: vec![ChunkRef { hash, offset: 0, @@ -193,6 +196,7 @@ mod tests { reader.register_manifest(ChunkManifest { file_id: FileId(1), total_size: data.len() as u64, + mtime: 0, chunks: vec![ChunkRef { hash, offset: 0, @@ -222,6 +226,7 @@ mod tests { reader.register_manifest(ChunkManifest { file_id: FileId(1), total_size: 8, + mtime: 0, chunks: vec![ ChunkRef { hash: hash1, @@ -256,6 +261,7 @@ mod tests { reader.register_manifest(ChunkManifest { file_id: FileId(1), total_size: data.len() as u64, + mtime: 0, chunks: vec![ChunkRef { hash, offset: 0, @@ -272,6 +278,7 @@ mod tests { let manifest = ChunkManifest { file_id: FileId(42), total_size: 1024, + mtime: 0, chunks: vec![ChunkRef { hash: ChunkHash::from_bytes(b"test"), offset: 0, diff --git a/musicfs/crates/musicfs-cas/tests/integration.rs b/musicfs/crates/musicfs-cas/tests/integration.rs index 916a99d..0d7753a 100644 --- a/musicfs/crates/musicfs-cas/tests/integration.rs +++ b/musicfs/crates/musicfs-cas/tests/integration.rs @@ -46,6 +46,7 @@ async fn test_cas_and_tree_integration() { reader.register_manifest(ChunkManifest { file_id: FileId(1), total_size: file_data.len() as u64, + mtime: 0, chunks: vec![ChunkRef { hash: chunk_hash, offset: 0, diff --git a/musicfs/crates/musicfs-origins/src/local.rs b/musicfs/crates/musicfs-origins/src/local.rs index fe4f0b4..72a71ad 100644 --- a/musicfs/crates/musicfs-origins/src/local.rs +++ b/musicfs/crates/musicfs-origins/src/local.rs @@ -93,13 +93,31 @@ impl Origin for LocalOrigin { let mut file = fs::File::open(&full_path).await?; file.seek(std::io::SeekFrom::Start(offset)).await?; - let mut buffer = vec![0u8; size as usize]; - let bytes_read = file.read(&mut buffer).await?; - buffer.truncate(bytes_read); + // FIX: Loop until all requested bytes are read or EOF + // Single read() only returns kernel buffer (~2MB), not full request + let mut buffer = Vec::with_capacity(size as usize); + let mut temp_buf = vec![0u8; 64 * 1024]; // 64KB chunks + let mut total_read = 0usize; + + while total_read < size as usize { + let to_read = std::cmp::min(temp_buf.len(), size as usize - total_read); + let n = file.read(&mut temp_buf[..to_read]).await?; + if n == 0 { + break; // EOF + } + buffer.extend_from_slice(&temp_buf[..n]); + total_read += n; + } Ok(buffer) } + async fn read_full(&self, path: &Path) -> Result> { + let full_path = self.full_path(path); + debug!("LocalOrigin::read_full({:?})", full_path); + Ok(fs::read(&full_path).await?) + } + async fn exists(&self, path: &Path) -> Result { let full_path = self.full_path(path); Ok(fs::try_exists(&full_path).await?) diff --git a/musicfs/crates/musicfs-origins/src/traits.rs b/musicfs/crates/musicfs-origins/src/traits.rs index c367685..322dc68 100644 --- a/musicfs/crates/musicfs-origins/src/traits.rs +++ b/musicfs/crates/musicfs-origins/src/traits.rs @@ -26,6 +26,9 @@ pub trait Origin: Send + Sync { async fn read(&self, path: &Path, offset: u64, size: u32) -> Result>; + /// Read entire file content (for CDC chunking of files <4GB) + async fn read_full(&self, path: &Path) -> Result>; + async fn exists(&self, path: &Path) -> Result; async fn health(&self) -> HealthStatus; diff --git a/musicfs/crates/musicfs-sync/Cargo.toml b/musicfs/crates/musicfs-sync/Cargo.toml index 9e7c3cb..af2e6c3 100644 --- a/musicfs/crates/musicfs-sync/Cargo.toml +++ b/musicfs/crates/musicfs-sync/Cargo.toml @@ -4,3 +4,19 @@ version.workspace = true edition.workspace = true [dependencies] +musicfs-core = { path = "../musicfs-core" } +musicfs-origins = { path = "../musicfs-origins" } + +fastcdc = "3" +xxhash-rust = { version = "0.8", features = ["xxh64"] } +notify = "6" +rmp-serde = "1" + +tokio = { workspace = true } +tracing = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } +async-trait = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/musicfs/crates/musicfs-sync/src/cdc.rs b/musicfs/crates/musicfs-sync/src/cdc.rs new file mode 100644 index 0000000..9553d09 --- /dev/null +++ b/musicfs/crates/musicfs-sync/src/cdc.rs @@ -0,0 +1,225 @@ +use fastcdc::v2020::FastCDC; +use musicfs_core::ChunkHash; + +pub struct CdcChunker { + min_size: u32, + avg_size: u32, + max_size: u32, +} + +impl Default for CdcChunker { + fn default() -> Self { + Self { + min_size: 16 * 1024, + avg_size: 64 * 1024, + max_size: 256 * 1024, + } + } +} + +#[derive(Debug, Clone)] +pub struct Chunk { + pub hash: ChunkHash, + pub offset: u64, + pub length: u32, + pub data: Vec, +} + +#[derive(Debug)] +pub struct ChunkRef<'a> { + pub hash: ChunkHash, + pub offset: u64, + pub length: u32, + pub data: &'a [u8], +} + +impl CdcChunker { + pub fn new(min_size: u32, avg_size: u32, max_size: u32) -> Self { + Self { + min_size, + avg_size, + max_size, + } + } + + pub fn chunk(&self, data: &[u8]) -> Vec { + let chunker = FastCDC::new(data, self.min_size, self.avg_size, self.max_size); + + chunker + .map(|c| { + let chunk_data = &data[c.offset..c.offset + c.length]; + Chunk { + hash: ChunkHash::from_bytes(chunk_data), + offset: c.offset as u64, + length: c.length as u32, + data: chunk_data.to_vec(), + } + }) + .collect() + } + + pub fn chunk_refs<'a>(&self, data: &'a [u8]) -> Vec> { + let chunker = FastCDC::new(data, self.min_size, self.avg_size, self.max_size); + + chunker + .map(|c| { + let chunk_data = &data[c.offset..c.offset + c.length]; + ChunkRef { + hash: ChunkHash::from_bytes(chunk_data), + offset: c.offset as u64, + length: c.length as u32, + data: chunk_data, + } + }) + .collect() + } + + pub fn chunk_streaming(&self, data: &[u8], mut processor: F) -> usize + where + F: FnMut(ChunkRef<'_>), + { + let chunker = FastCDC::new(data, self.min_size, self.avg_size, self.max_size); + let mut count = 0; + + for c in chunker { + let chunk_data = &data[c.offset..c.offset + c.length]; + processor(ChunkRef { + hash: ChunkHash::from_bytes(chunk_data), + offset: c.offset as u64, + length: c.length as u32, + data: chunk_data, + }); + count += 1; + } + count + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cdc_basic() { + let chunker = CdcChunker::default(); + let data = vec![0u8; 256 * 1024]; + + let chunks = chunker.chunk(&data); + + assert!(!chunks.is_empty()); + + let total: u64 = chunks.iter().map(|c| c.length as u64).sum(); + assert_eq!(total, data.len() as u64); + + let mut offset = 0u64; + for chunk in &chunks { + assert_eq!(chunk.offset, offset); + offset += chunk.length as u64; + } + } + + #[test] + fn test_cdc_stable_boundaries() { + let chunker = CdcChunker::new(4 * 1024, 16 * 1024, 64 * 1024); + + let mut data1 = vec![0u8; 512 * 1024]; + for (i, b) in data1.iter_mut().enumerate() { + *b = ((i * 17 + 31) % 256) as u8; + } + + let mut data2 = vec![0xFFu8; 1024]; + data2.extend_from_slice(&data1); + + let chunks1 = chunker.chunk(&data1); + let chunks2 = chunker.chunk(&data2); + + let hashes1: std::collections::HashSet<_> = chunks1.iter().map(|c| c.hash).collect(); + let hashes2: std::collections::HashSet<_> = chunks2.iter().map(|c| c.hash).collect(); + + let shared = hashes1.intersection(&hashes2).count(); + + assert!(shared > 0, "CDC should produce stable boundaries, got {} chunks in original, {} after prepend", chunks1.len(), chunks2.len()); + } + + #[test] + fn test_cdc_chunk_sizes() { + let chunker = CdcChunker::default(); + + let data: Vec = (0..1024 * 1024).map(|i| ((i * 17 + 31) % 256) as u8).collect(); + + let chunks = chunker.chunk(&data); + + for chunk in &chunks { + if chunk.offset + chunk.length as u64 != data.len() as u64 { + assert!( + chunk.length >= chunker.min_size / 2, + "Chunk too small: {}", + chunk.length + ); + assert!( + chunk.length <= chunker.max_size * 2, + "Chunk too large: {}", + chunk.length + ); + } + } + } + + #[test] + fn test_cdc_streaming() { + let chunker = CdcChunker::default(); + let data = vec![0u8; 256 * 1024]; + + let mut streamed = Vec::new(); + let count = chunker.chunk_streaming(&data, |chunk| { + streamed.push((chunk.hash, chunk.offset, chunk.length)); + }); + + let batched = chunker.chunk(&data); + + assert_eq!(count, batched.len()); + for (i, chunk) in batched.iter().enumerate() { + assert_eq!(streamed[i].0, chunk.hash); + assert_eq!(streamed[i].1, chunk.offset); + assert_eq!(streamed[i].2, chunk.length); + } + } + + #[test] + fn test_bandwidth_reduction_metadata_edit() { + let chunker = CdcChunker::new(4 * 1024, 16 * 1024, 64 * 1024); + + let mut state = 12345u64; + let original: Vec = (0..2 * 1024 * 1024) + .map(|_| { + state = state.wrapping_mul(6364136223846793005).wrapping_add(1); + (state >> 56) as u8 + }) + .collect(); + + let chunks1 = chunker.chunk(&original); + let hashes1: std::collections::HashSet<_> = chunks1.iter().map(|c| c.hash).collect(); + + let mut modified = original.clone(); + let mid = modified.len() / 2; + for i in mid..mid + 100 { + modified[i] = 0xFF; + } + + let chunks2 = chunker.chunk(&modified); + let hashes2: std::collections::HashSet<_> = chunks2.iter().map(|c| c.hash).collect(); + + let reused = hashes1.intersection(&hashes2).count(); + let reuse_ratio = reused as f64 / chunks2.len() as f64; + + // NFR-6.4 requires >90% bandwidth reduction for typical edits + assert!( + reuse_ratio > 0.90, + "Expected >90% chunk reuse for mid-file edit (NFR-6.4). Reused {}/{} chunks ({:.1}%, total {} original)", + reused, + chunks2.len(), + reuse_ratio * 100.0, + chunks1.len() + ); + } +} diff --git a/musicfs/crates/musicfs-sync/src/delta.rs b/musicfs/crates/musicfs-sync/src/delta.rs new file mode 100644 index 0000000..4f582e4 --- /dev/null +++ b/musicfs/crates/musicfs-sync/src/delta.rs @@ -0,0 +1,329 @@ +use crate::cdc::CdcChunker; +use musicfs_core::{ChunkHash, FileId, FileMeta, OriginId, RealPath, VirtualPath}; +use musicfs_origins::Origin; +use std::collections::{HashMap, HashSet}; +use std::path::PathBuf; +use std::time::SystemTime; +use tracing::{debug, info}; + +#[derive(Debug, Clone)] +pub struct ScannedFile { + pub path: PathBuf, + pub origin_id: OriginId, + pub size: u64, + pub mtime: SystemTime, +} + +#[derive(Debug, Default)] +pub struct ChangeSet { + pub added: Vec, + pub removed: Vec, + pub modified: Vec<(FileId, ManifestDiff)>, +} + +impl ChangeSet { + pub fn is_empty(&self) -> bool { + self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty() + } + + pub fn total_changes(&self) -> usize { + self.added.len() + self.removed.len() + self.modified.len() + } +} + +#[derive(Debug, Clone)] +pub struct ManifestChunk { + pub hash: ChunkHash, + pub offset: u64, + pub size: u32, +} + +#[derive(Debug)] +pub struct ManifestDiff { + pub reuse: Vec, + pub fetch: Vec, + pub orphaned: Vec, +} + +pub struct DeltaDetector { + chunker: CdcChunker, +} + +impl DeltaDetector { + pub fn new() -> Self { + Self { + chunker: CdcChunker::default(), + } + } + + pub fn with_chunker(chunker: CdcChunker) -> Self { + Self { chunker } + } + + pub async fn detect_changes( + &self, + origin: &dyn Origin, + cached: &HashMap, + manifests: &HashMap>, + ) -> Result { + let mut changes = ChangeSet::default(); + + let origin_files = self.scan_origin(origin).await?; + + let cached_by_path: HashMap<_, _> = cached + .values() + .map(|m| (m.real_path.path.clone(), m)) + .collect(); + + for scanned in &origin_files { + if let Some(cached_file) = cached_by_path.get(&scanned.path) { + if self.is_modified_scan(cached_file, scanned) { + debug!("File modified: {:?}", scanned.path); + + if let Some(old_chunks) = manifests.get(&cached_file.id) { + let new_chunks = self.compute_chunks_for_scan(origin, scanned).await?; + let diff = self.compute_diff(old_chunks, &new_chunks); + changes.modified.push((cached_file.id, diff)); + } + } + } else { + debug!("File added: {:?}", scanned.path); + changes.added.push(scanned.clone()); + } + } + + let origin_paths: HashSet<_> = origin_files.iter().map(|f| &f.path).collect(); + + for cached_file in cached.values() { + if !origin_paths.contains(&cached_file.real_path.path) { + debug!("File removed: {:?}", cached_file.real_path.path); + changes.removed.push(cached_file.id); + } + } + + info!( + "Delta detection complete: {} added, {} removed, {} modified", + changes.added.len(), + changes.removed.len(), + changes.modified.len() + ); + + Ok(changes) + } + + fn is_modified_scan(&self, cached: &FileMeta, scanned: &ScannedFile) -> bool { + cached.size != scanned.size || cached.mtime != scanned.mtime + } + + async fn scan_origin(&self, origin: &dyn Origin) -> Result, DeltaError> { + let mut files = Vec::new(); + let mut dirs_to_scan = vec![PathBuf::from("/")]; + + while let Some(dir) = dirs_to_scan.pop() { + let entries = origin + .readdir(&dir) + .await + .map_err(|e| DeltaError::OriginScan(e.to_string()))?; + + for entry in entries { + let entry_path = dir.join(&entry.name); + + if entry.is_dir { + dirs_to_scan.push(entry_path); + } else if Self::is_audio_file(&entry.name) { + let stat = origin + .stat(&entry_path) + .await + .map_err(|e| DeltaError::OriginScan(e.to_string()))?; + + files.push(ScannedFile { + path: entry_path, + origin_id: origin.id().clone(), + size: stat.size, + mtime: stat.mtime, + }); + } + } + } + + Ok(files) + } + + fn is_audio_file(name: &str) -> bool { + let lower = name.to_lowercase(); + lower.ends_with(".flac") + || lower.ends_with(".mp3") + || lower.ends_with(".ogg") + || lower.ends_with(".wav") + || lower.ends_with(".m4a") + || lower.ends_with(".aac") + || lower.ends_with(".opus") + } + + async fn compute_chunks_for_scan( + &self, + origin: &dyn Origin, + scanned: &ScannedFile, + ) -> Result, DeltaError> { + let data = origin + .read_full(&scanned.path) + .await + .map_err(|e| DeltaError::OriginRead(e.to_string()))?; + + let chunks = self.chunker.chunk_refs(&data); + + Ok(chunks + .into_iter() + .map(|c| ManifestChunk { + hash: c.hash, + offset: c.offset, + size: c.length, + }) + .collect()) + } + + fn compute_diff(&self, old_chunks: &[ManifestChunk], new_chunks: &[ManifestChunk]) -> ManifestDiff { + let old_hashes: HashSet<_> = old_chunks.iter().map(|c| c.hash).collect(); + let new_hashes: HashSet<_> = new_chunks.iter().map(|c| c.hash).collect(); + + ManifestDiff { + reuse: new_chunks + .iter() + .filter(|c| old_hashes.contains(&c.hash)) + .cloned() + .collect(), + fetch: new_chunks + .iter() + .filter(|c| !old_hashes.contains(&c.hash)) + .cloned() + .collect(), + orphaned: old_chunks + .iter() + .filter(|c| !new_hashes.contains(&c.hash)) + .map(|c| c.hash) + .collect(), + } + } +} + +impl Default for DeltaDetector { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeltaError { + #[error("Origin read error: {0}")] + OriginRead(String), + + #[error("Origin scan error: {0}")] + OriginScan(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use musicfs_core::OriginId; + use std::time::SystemTime; + + fn make_file_meta(id: i64, path: &str, size: u64) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(format!("/test/{}", path)), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from(path), + }, + size, + mtime: SystemTime::UNIX_EPOCH, + content_hash: None, + audio: None, + } + } + + fn make_scanned_file(path: &str, size: u64) -> ScannedFile { + ScannedFile { + path: PathBuf::from(path), + origin_id: OriginId::from("test"), + size, + mtime: SystemTime::UNIX_EPOCH, + } + } + + #[test] + fn test_is_modified_size_change() { + let detector = DeltaDetector::new(); + + let cached = make_file_meta(1, "test.flac", 1000); + let scanned = make_scanned_file("test.flac", 2000); + + assert!(detector.is_modified_scan(&cached, &scanned)); + } + + #[test] + fn test_is_modified_same() { + let detector = DeltaDetector::new(); + + let cached = make_file_meta(1, "test.flac", 1000); + let scanned = make_scanned_file("test.flac", 1000); + + assert!(!detector.is_modified_scan(&cached, &scanned)); + } + + #[test] + fn test_is_audio_file() { + assert!(DeltaDetector::is_audio_file("track.flac")); + assert!(DeltaDetector::is_audio_file("song.MP3")); + assert!(DeltaDetector::is_audio_file("audio.ogg")); + assert!(!DeltaDetector::is_audio_file("readme.txt")); + assert!(!DeltaDetector::is_audio_file("cover.jpg")); + } + + #[test] + fn test_compute_diff() { + let detector = DeltaDetector::new(); + + let old_chunks = vec![ + ManifestChunk { + hash: ChunkHash::from_bytes(b"A"), + offset: 0, + size: 256, + }, + ManifestChunk { + hash: ChunkHash::from_bytes(b"B"), + offset: 256, + size: 256, + }, + ManifestChunk { + hash: ChunkHash::from_bytes(b"C"), + offset: 512, + size: 256, + }, + ]; + + let new_chunks = vec![ + ManifestChunk { + hash: ChunkHash::from_bytes(b"A"), + offset: 0, + size: 256, + }, + ManifestChunk { + hash: ChunkHash::from_bytes(b"D"), + offset: 256, + size: 256, + }, + ManifestChunk { + hash: ChunkHash::from_bytes(b"C"), + offset: 512, + size: 256, + }, + ]; + + let diff = detector.compute_diff(&old_chunks, &new_chunks); + + assert_eq!(diff.reuse.len(), 2); + assert_eq!(diff.fetch.len(), 1); + assert_eq!(diff.orphaned.len(), 1); + } +} diff --git a/musicfs/crates/musicfs-sync/src/lib.rs b/musicfs/crates/musicfs-sync/src/lib.rs index f9da2c4..0cf3d40 100644 --- a/musicfs/crates/musicfs-sync/src/lib.rs +++ b/musicfs/crates/musicfs-sync/src/lib.rs @@ -1 +1,7 @@ -#![allow(dead_code)] +pub mod cdc; +pub mod delta; +pub mod watcher; + +pub use cdc::{CdcChunker, Chunk, ChunkRef}; +pub use delta::{ChangeSet, DeltaDetector, DeltaError, ManifestChunk, ManifestDiff}; +pub use watcher::{OriginWatcher, WatchError, WatchHandle}; diff --git a/musicfs/crates/musicfs-sync/src/watcher.rs b/musicfs/crates/musicfs-sync/src/watcher.rs new file mode 100644 index 0000000..e940109 --- /dev/null +++ b/musicfs/crates/musicfs-sync/src/watcher.rs @@ -0,0 +1,206 @@ +use musicfs_core::{Event, EventBus, OriginId, VirtualPath}; +use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::mpsc; +use tracing::{debug, error, info}; + +const DEBOUNCE_MS: u64 = 200; + +pub struct OriginWatcher { + origin_id: OriginId, + root: PathBuf, + event_bus: Arc, +} + +impl OriginWatcher { + pub fn new(origin_id: OriginId, root: PathBuf, event_bus: Arc) -> Self { + Self { + origin_id, + root, + event_bus, + } + } + + pub fn start(self) -> WatchHandle { + let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); + + let origin_id = self.origin_id.clone(); + let root = self.root.clone(); + let event_bus = self.event_bus.clone(); + + tokio::spawn(async move { + if let Err(e) = Self::watch_loop(&origin_id, &root, &event_bus, &mut stop_rx).await { + error!("Watcher error: {}", e); + } + }); + + WatchHandle { stop_tx } + } + + async fn watch_loop( + origin_id: &OriginId, + root: &Path, + event_bus: &EventBus, + stop_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), WatchError> { + let (tx, mut rx) = mpsc::channel(100); + + let mut watcher = RecommendedWatcher::new( + move |res: Result| { + if let Ok(event) = res { + let _ = tx.blocking_send(event); + } + }, + Config::default(), + ) + .map_err(|e| WatchError::Init(e.to_string()))?; + + watcher + .watch(root, RecursiveMode::Recursive) + .map_err(|e| WatchError::Watch(e.to_string()))?; + + info!("Watching origin {} at {:?}", origin_id, root); + + let mut debouncer: HashMap = HashMap::new(); + + loop { + tokio::select! { + Some(event) = rx.recv() => { + Self::handle_notify_event(origin_id, root, event_bus, event, &mut debouncer); + } + _ = stop_rx.recv() => { + info!("Stopping watcher for {}", origin_id); + break; + } + } + } + + Ok(()) + } + + fn handle_notify_event( + origin_id: &OriginId, + root: &Path, + event_bus: &EventBus, + event: notify::Event, + debouncer: &mut HashMap, + ) { + use notify::EventKind; + + let now = Instant::now(); + + for path in event.paths { + let relative = match path.strip_prefix(root) { + Ok(p) => p.to_path_buf(), + Err(_) => continue, + }; + + if !Self::is_audio_file(&path) { + continue; + } + + if let Some(last_seen) = debouncer.get(&relative) { + if now.duration_since(*last_seen).as_millis() < DEBOUNCE_MS as u128 { + debug!("Debouncing event for {:?}", relative); + continue; + } + } + debouncer.insert(relative.clone(), now); + + let vpath = VirtualPath::new(format!("/{}", relative.display())); + + match event.kind { + EventKind::Create(_) => { + debug!("File created: {:?}", relative); + event_bus.publish(Event::FileAdded { + path: vpath, + origin_id: origin_id.clone(), + }); + } + EventKind::Remove(_) => { + debug!("File removed: {:?}", relative); + event_bus.publish(Event::FileRemoved { path: vpath }); + } + EventKind::Modify(_) => { + debug!("File modified: {:?}", relative); + event_bus.publish(Event::FileModified { path: vpath }); + } + _ => {} + } + } + } + + fn is_audio_file(path: &Path) -> bool { + matches!( + path.extension() + .and_then(|e| e.to_str()) + .map(|e| e.to_lowercase()) + .as_deref(), + Some("flac" | "mp3" | "ogg" | "wav" | "m4a" | "aac" | "opus") + ) + } +} + +pub struct WatchHandle { + stop_tx: mpsc::Sender<()>, +} + +impl WatchHandle { + pub async fn stop(self) { + let _ = self.stop_tx.send(()).await; + } +} + +impl Drop for WatchHandle { + fn drop(&mut self) { + let _ = self.stop_tx.try_send(()); + } +} + +#[derive(Debug, thiserror::Error)] +pub enum WatchError { + #[error("Failed to initialize watcher: {0}")] + Init(String), + + #[error("Failed to watch path: {0}")] + Watch(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use tempfile::TempDir; + + #[tokio::test] + async fn test_watcher_detects_create() { + let dir = TempDir::new().unwrap(); + let event_bus = Arc::new(EventBus::default()); + let mut rx = event_bus.subscribe(); + + let watcher = OriginWatcher::new(OriginId::from("test"), dir.path().to_path_buf(), event_bus); + let handle = watcher.start(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + std::fs::write(dir.path().join("test.flac"), b"audio").unwrap(); + + tokio::time::sleep(Duration::from_millis(300)).await; + + let event = rx.try_recv(); + assert!(matches!(event, Ok(Event::FileAdded { .. }))); + + handle.stop().await; + } + + #[test] + fn test_is_audio_file() { + assert!(OriginWatcher::is_audio_file(Path::new("/music/song.flac"))); + assert!(OriginWatcher::is_audio_file(Path::new("/music/song.MP3"))); + assert!(!OriginWatcher::is_audio_file(Path::new("/music/cover.jpg"))); + assert!(!OriginWatcher::is_audio_file(Path::new("/music/readme.txt"))); + } +}