Implement Week 5 CDC & Delta Detection with Oracle fixes

- Add CdcChunker using FastCDC v3 (16KB/64KB/256KB chunks)
- Add DeltaDetector with scan_origin() returning ScannedFile (no FileId assignment)
- Add OriginWatcher with inotify and 200ms debounce using tokio::spawn
- Fix LocalOrigin::read() to loop until all bytes read
- Add read_full() method to Origin trait
- Add mtime field to ChunkManifest
- Update ContentFetcher to use CDC chunking
- Update bandwidth reduction test to assert >90% (NFR-6.4)

Tests: 71 pass (+11 new)
This commit is contained in:
Alexander
2026-05-12 20:05:44 +02:00
parent 0e5a514015
commit 32c96701c8
12 changed files with 998 additions and 15 deletions
+149 -1
View File
@@ -203,6 +203,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
@@ -282,12 +291,28 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastcdc"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf51ceb43e96afbfe4dd5c6f6082af5dfd60e220820b8123792d61963f2ce6bc"
[[package]]
name = "fastrand"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
[[package]]
name = "filetime"
version = "0.2.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d5b2eef6fafbf69f877e55509ce5b11a760690ac9700a2921be067aa6afaef6"
dependencies = [
"cfg-if",
"libc",
]
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
@@ -310,6 +335,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "fuser"
version = "0.14.0"
@@ -421,6 +455,26 @@ dependencies = [
"serde_core",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags 1.3.2",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.13"
@@ -442,6 +496,26 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682"
[[package]]
name = "kqueue"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "285efcf12ef41bec907b3000d5ffaeb54191d4d9d83c0d6157e6cbc2db255e64"
dependencies = [
"bitflags 2.11.1",
"libc",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -516,6 +590,18 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
[[package]]
name = "mio"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.48.0",
]
[[package]]
name = "mio"
version = "1.2.0"
@@ -553,6 +639,7 @@ dependencies = [
"musicfs-cache",
"musicfs-core",
"musicfs-origins",
"musicfs-sync",
"rmp-serde",
"serde",
"sled",
@@ -641,6 +728,39 @@ version = "0.1.0"
[[package]]
name = "musicfs-sync"
version = "0.1.0"
dependencies = [
"async-trait",
"fastcdc",
"musicfs-core",
"musicfs-origins",
"notify",
"rmp-serde",
"serde",
"tempfile",
"thiserror",
"tokio",
"tracing",
"xxhash-rust",
]
[[package]]
name = "notify"
version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.11.1",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio 0.8.11",
"walkdir",
"windows-sys 0.48.0",
]
[[package]]
name = "nu-ansi-term"
@@ -874,6 +994,15 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -1186,7 +1315,7 @@ checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe"
dependencies = [
"bytes",
"libc",
"mio",
"mio 1.2.0",
"parking_lot 0.12.5",
"pin-project-lite",
"signal-hook-registry",
@@ -1303,6 +1432,16 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
@@ -1377,6 +1516,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
+1
View File
@@ -6,6 +6,7 @@ edition.workspace = true
[dependencies]
musicfs-core = { path = "../musicfs-core" }
musicfs-origins = { path = "../musicfs-origins" }
musicfs-sync = { path = "../musicfs-sync" }
tokio.workspace = true
tracing.workspace = true
serde.workspace = true
+32 -9
View File
@@ -1,6 +1,7 @@
use crate::{CasStore, ChunkManifest, ChunkRef};
use musicfs_core::{Event, EventBus, FileId, FileMeta, OriginId};
use musicfs_origins::Origin;
use musicfs_sync::CdcChunker;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tracing::{debug, info};
@@ -10,6 +11,7 @@ pub struct ContentFetcher {
origins: RwLock<HashMap<OriginId, Arc<dyn Origin>>>,
file_meta: RwLock<HashMap<FileId, FileMeta>>,
event_bus: Option<Arc<EventBus>>,
chunker: CdcChunker,
}
impl ContentFetcher {
@@ -19,6 +21,7 @@ impl ContentFetcher {
origins: RwLock::new(HashMap::new()),
file_meta: RwLock::new(HashMap::new()),
event_bus: None,
chunker: CdcChunker::default(),
}
}
@@ -28,6 +31,7 @@ impl ContentFetcher {
origins: RwLock::new(HashMap::new()),
file_meta: RwLock::new(HashMap::new()),
event_bus: Some(event_bus),
chunker: CdcChunker::default(),
}
}
@@ -71,25 +75,44 @@ impl ContentFetcher {
);
let data = origin
.read(&meta.real_path.path, 0, meta.size as u32)
.read_full(&meta.real_path.path)
.await
.map_err(|e| FetchError::OriginRead(e.to_string()))?;
let hash = self.store.put(&data).await.map_err(FetchError::Store)?;
let mtime = meta
.mtime
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let chunks = self.chunker.chunk_refs(&data);
info!("Chunked {:?} into {} chunks", file_id, chunks.len());
let mut chunk_refs = Vec::with_capacity(chunks.len());
for chunk in chunks {
if !self.store.exists(&chunk.hash) {
self.store.put(chunk.data).await.map_err(FetchError::Store)?;
}
chunk_refs.push(ChunkRef {
hash: chunk.hash,
offset: chunk.offset,
size: chunk.length,
});
}
let manifest = ChunkManifest {
file_id,
total_size: meta.size,
chunks: vec![ChunkRef {
hash,
offset: 0,
size: data.len() as u32,
}],
mtime,
chunks: chunk_refs,
};
debug!(
"Created manifest for {:?}: {} bytes, 1 chunk",
file_id, meta.size
"Created manifest for {:?}: {} bytes, {} chunks",
file_id,
meta.size,
manifest.chunks.len()
);
Ok(manifest)
+8 -1
View File
@@ -11,6 +11,7 @@ use std::sync::{Arc, RwLock};
pub struct ChunkManifest {
pub file_id: FileId,
pub total_size: u64,
pub mtime: i64,
pub chunks: Vec<ChunkRef>,
}
@@ -23,11 +24,12 @@ impl ChunkManifest {
rmp_serde::from_slice(data).ok()
}
pub fn from_db(file_id: FileId, total_size: u64, chunk_blob: &[u8]) -> Option<Self> {
pub fn from_db(file_id: FileId, total_size: u64, mtime: i64, chunk_blob: &[u8]) -> Option<Self> {
let chunks = Self::chunks_from_bytes(chunk_blob)?;
Some(Self {
file_id,
total_size,
mtime,
chunks,
})
}
@@ -166,6 +168,7 @@ mod tests {
reader.register_manifest(ChunkManifest {
file_id: FileId(1),
total_size: data.len() as u64,
mtime: 0,
chunks: vec![ChunkRef {
hash,
offset: 0,
@@ -193,6 +196,7 @@ mod tests {
reader.register_manifest(ChunkManifest {
file_id: FileId(1),
total_size: data.len() as u64,
mtime: 0,
chunks: vec![ChunkRef {
hash,
offset: 0,
@@ -222,6 +226,7 @@ mod tests {
reader.register_manifest(ChunkManifest {
file_id: FileId(1),
total_size: 8,
mtime: 0,
chunks: vec![
ChunkRef {
hash: hash1,
@@ -256,6 +261,7 @@ mod tests {
reader.register_manifest(ChunkManifest {
file_id: FileId(1),
total_size: data.len() as u64,
mtime: 0,
chunks: vec![ChunkRef {
hash,
offset: 0,
@@ -272,6 +278,7 @@ mod tests {
let manifest = ChunkManifest {
file_id: FileId(42),
total_size: 1024,
mtime: 0,
chunks: vec![ChunkRef {
hash: ChunkHash::from_bytes(b"test"),
offset: 0,
@@ -46,6 +46,7 @@ async fn test_cas_and_tree_integration() {
reader.register_manifest(ChunkManifest {
file_id: FileId(1),
total_size: file_data.len() as u64,
mtime: 0,
chunks: vec![ChunkRef {
hash: chunk_hash,
offset: 0,
+21 -3
View File
@@ -93,13 +93,31 @@ impl Origin for LocalOrigin {
let mut file = fs::File::open(&full_path).await?;
file.seek(std::io::SeekFrom::Start(offset)).await?;
let mut buffer = vec![0u8; size as usize];
let bytes_read = file.read(&mut buffer).await?;
buffer.truncate(bytes_read);
// FIX: Loop until all requested bytes are read or EOF
// Single read() only returns kernel buffer (~2MB), not full request
let mut buffer = Vec::with_capacity(size as usize);
let mut temp_buf = vec![0u8; 64 * 1024]; // 64KB chunks
let mut total_read = 0usize;
while total_read < size as usize {
let to_read = std::cmp::min(temp_buf.len(), size as usize - total_read);
let n = file.read(&mut temp_buf[..to_read]).await?;
if n == 0 {
break; // EOF
}
buffer.extend_from_slice(&temp_buf[..n]);
total_read += n;
}
Ok(buffer)
}
async fn read_full(&self, path: &Path) -> Result<Vec<u8>> {
let full_path = self.full_path(path);
debug!("LocalOrigin::read_full({:?})", full_path);
Ok(fs::read(&full_path).await?)
}
async fn exists(&self, path: &Path) -> Result<bool> {
let full_path = self.full_path(path);
Ok(fs::try_exists(&full_path).await?)
@@ -26,6 +26,9 @@ pub trait Origin: Send + Sync {
async fn read(&self, path: &Path, offset: u64, size: u32) -> Result<Vec<u8>>;
/// Read entire file content (for CDC chunking of files <4GB)
async fn read_full(&self, path: &Path) -> Result<Vec<u8>>;
async fn exists(&self, path: &Path) -> Result<bool>;
async fn health(&self) -> HealthStatus;
+16
View File
@@ -4,3 +4,19 @@ version.workspace = true
edition.workspace = true
[dependencies]
musicfs-core = { path = "../musicfs-core" }
musicfs-origins = { path = "../musicfs-origins" }
fastcdc = "3"
xxhash-rust = { version = "0.8", features = ["xxh64"] }
notify = "6"
rmp-serde = "1"
tokio = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
serde = { workspace = true }
async-trait = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }
+225
View File
@@ -0,0 +1,225 @@
use fastcdc::v2020::FastCDC;
use musicfs_core::ChunkHash;
pub struct CdcChunker {
min_size: u32,
avg_size: u32,
max_size: u32,
}
impl Default for CdcChunker {
fn default() -> Self {
Self {
min_size: 16 * 1024,
avg_size: 64 * 1024,
max_size: 256 * 1024,
}
}
}
#[derive(Debug, Clone)]
pub struct Chunk {
pub hash: ChunkHash,
pub offset: u64,
pub length: u32,
pub data: Vec<u8>,
}
#[derive(Debug)]
pub struct ChunkRef<'a> {
pub hash: ChunkHash,
pub offset: u64,
pub length: u32,
pub data: &'a [u8],
}
impl CdcChunker {
pub fn new(min_size: u32, avg_size: u32, max_size: u32) -> Self {
Self {
min_size,
avg_size,
max_size,
}
}
pub fn chunk(&self, data: &[u8]) -> Vec<Chunk> {
let chunker = FastCDC::new(data, self.min_size, self.avg_size, self.max_size);
chunker
.map(|c| {
let chunk_data = &data[c.offset..c.offset + c.length];
Chunk {
hash: ChunkHash::from_bytes(chunk_data),
offset: c.offset as u64,
length: c.length as u32,
data: chunk_data.to_vec(),
}
})
.collect()
}
pub fn chunk_refs<'a>(&self, data: &'a [u8]) -> Vec<ChunkRef<'a>> {
let chunker = FastCDC::new(data, self.min_size, self.avg_size, self.max_size);
chunker
.map(|c| {
let chunk_data = &data[c.offset..c.offset + c.length];
ChunkRef {
hash: ChunkHash::from_bytes(chunk_data),
offset: c.offset as u64,
length: c.length as u32,
data: chunk_data,
}
})
.collect()
}
pub fn chunk_streaming<F>(&self, data: &[u8], mut processor: F) -> usize
where
F: FnMut(ChunkRef<'_>),
{
let chunker = FastCDC::new(data, self.min_size, self.avg_size, self.max_size);
let mut count = 0;
for c in chunker {
let chunk_data = &data[c.offset..c.offset + c.length];
processor(ChunkRef {
hash: ChunkHash::from_bytes(chunk_data),
offset: c.offset as u64,
length: c.length as u32,
data: chunk_data,
});
count += 1;
}
count
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cdc_basic() {
let chunker = CdcChunker::default();
let data = vec![0u8; 256 * 1024];
let chunks = chunker.chunk(&data);
assert!(!chunks.is_empty());
let total: u64 = chunks.iter().map(|c| c.length as u64).sum();
assert_eq!(total, data.len() as u64);
let mut offset = 0u64;
for chunk in &chunks {
assert_eq!(chunk.offset, offset);
offset += chunk.length as u64;
}
}
#[test]
fn test_cdc_stable_boundaries() {
let chunker = CdcChunker::new(4 * 1024, 16 * 1024, 64 * 1024);
let mut data1 = vec![0u8; 512 * 1024];
for (i, b) in data1.iter_mut().enumerate() {
*b = ((i * 17 + 31) % 256) as u8;
}
let mut data2 = vec![0xFFu8; 1024];
data2.extend_from_slice(&data1);
let chunks1 = chunker.chunk(&data1);
let chunks2 = chunker.chunk(&data2);
let hashes1: std::collections::HashSet<_> = chunks1.iter().map(|c| c.hash).collect();
let hashes2: std::collections::HashSet<_> = chunks2.iter().map(|c| c.hash).collect();
let shared = hashes1.intersection(&hashes2).count();
assert!(shared > 0, "CDC should produce stable boundaries, got {} chunks in original, {} after prepend", chunks1.len(), chunks2.len());
}
#[test]
fn test_cdc_chunk_sizes() {
let chunker = CdcChunker::default();
let data: Vec<u8> = (0..1024 * 1024).map(|i| ((i * 17 + 31) % 256) as u8).collect();
let chunks = chunker.chunk(&data);
for chunk in &chunks {
if chunk.offset + chunk.length as u64 != data.len() as u64 {
assert!(
chunk.length >= chunker.min_size / 2,
"Chunk too small: {}",
chunk.length
);
assert!(
chunk.length <= chunker.max_size * 2,
"Chunk too large: {}",
chunk.length
);
}
}
}
#[test]
fn test_cdc_streaming() {
let chunker = CdcChunker::default();
let data = vec![0u8; 256 * 1024];
let mut streamed = Vec::new();
let count = chunker.chunk_streaming(&data, |chunk| {
streamed.push((chunk.hash, chunk.offset, chunk.length));
});
let batched = chunker.chunk(&data);
assert_eq!(count, batched.len());
for (i, chunk) in batched.iter().enumerate() {
assert_eq!(streamed[i].0, chunk.hash);
assert_eq!(streamed[i].1, chunk.offset);
assert_eq!(streamed[i].2, chunk.length);
}
}
#[test]
fn test_bandwidth_reduction_metadata_edit() {
let chunker = CdcChunker::new(4 * 1024, 16 * 1024, 64 * 1024);
let mut state = 12345u64;
let original: Vec<u8> = (0..2 * 1024 * 1024)
.map(|_| {
state = state.wrapping_mul(6364136223846793005).wrapping_add(1);
(state >> 56) as u8
})
.collect();
let chunks1 = chunker.chunk(&original);
let hashes1: std::collections::HashSet<_> = chunks1.iter().map(|c| c.hash).collect();
let mut modified = original.clone();
let mid = modified.len() / 2;
for i in mid..mid + 100 {
modified[i] = 0xFF;
}
let chunks2 = chunker.chunk(&modified);
let hashes2: std::collections::HashSet<_> = chunks2.iter().map(|c| c.hash).collect();
let reused = hashes1.intersection(&hashes2).count();
let reuse_ratio = reused as f64 / chunks2.len() as f64;
// NFR-6.4 requires >90% bandwidth reduction for typical edits
assert!(
reuse_ratio > 0.90,
"Expected >90% chunk reuse for mid-file edit (NFR-6.4). Reused {}/{} chunks ({:.1}%, total {} original)",
reused,
chunks2.len(),
reuse_ratio * 100.0,
chunks1.len()
);
}
}
+329
View File
@@ -0,0 +1,329 @@
use crate::cdc::CdcChunker;
use musicfs_core::{ChunkHash, FileId, FileMeta, OriginId, RealPath, VirtualPath};
use musicfs_origins::Origin;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::time::SystemTime;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct ScannedFile {
pub path: PathBuf,
pub origin_id: OriginId,
pub size: u64,
pub mtime: SystemTime,
}
#[derive(Debug, Default)]
pub struct ChangeSet {
pub added: Vec<ScannedFile>,
pub removed: Vec<FileId>,
pub modified: Vec<(FileId, ManifestDiff)>,
}
impl ChangeSet {
pub fn is_empty(&self) -> bool {
self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty()
}
pub fn total_changes(&self) -> usize {
self.added.len() + self.removed.len() + self.modified.len()
}
}
#[derive(Debug, Clone)]
pub struct ManifestChunk {
pub hash: ChunkHash,
pub offset: u64,
pub size: u32,
}
#[derive(Debug)]
pub struct ManifestDiff {
pub reuse: Vec<ManifestChunk>,
pub fetch: Vec<ManifestChunk>,
pub orphaned: Vec<ChunkHash>,
}
pub struct DeltaDetector {
chunker: CdcChunker,
}
impl DeltaDetector {
pub fn new() -> Self {
Self {
chunker: CdcChunker::default(),
}
}
pub fn with_chunker(chunker: CdcChunker) -> Self {
Self { chunker }
}
pub async fn detect_changes(
&self,
origin: &dyn Origin,
cached: &HashMap<FileId, FileMeta>,
manifests: &HashMap<FileId, Vec<ManifestChunk>>,
) -> Result<ChangeSet, DeltaError> {
let mut changes = ChangeSet::default();
let origin_files = self.scan_origin(origin).await?;
let cached_by_path: HashMap<_, _> = cached
.values()
.map(|m| (m.real_path.path.clone(), m))
.collect();
for scanned in &origin_files {
if let Some(cached_file) = cached_by_path.get(&scanned.path) {
if self.is_modified_scan(cached_file, scanned) {
debug!("File modified: {:?}", scanned.path);
if let Some(old_chunks) = manifests.get(&cached_file.id) {
let new_chunks = self.compute_chunks_for_scan(origin, scanned).await?;
let diff = self.compute_diff(old_chunks, &new_chunks);
changes.modified.push((cached_file.id, diff));
}
}
} else {
debug!("File added: {:?}", scanned.path);
changes.added.push(scanned.clone());
}
}
let origin_paths: HashSet<_> = origin_files.iter().map(|f| &f.path).collect();
for cached_file in cached.values() {
if !origin_paths.contains(&cached_file.real_path.path) {
debug!("File removed: {:?}", cached_file.real_path.path);
changes.removed.push(cached_file.id);
}
}
info!(
"Delta detection complete: {} added, {} removed, {} modified",
changes.added.len(),
changes.removed.len(),
changes.modified.len()
);
Ok(changes)
}
fn is_modified_scan(&self, cached: &FileMeta, scanned: &ScannedFile) -> bool {
cached.size != scanned.size || cached.mtime != scanned.mtime
}
async fn scan_origin(&self, origin: &dyn Origin) -> Result<Vec<ScannedFile>, DeltaError> {
let mut files = Vec::new();
let mut dirs_to_scan = vec![PathBuf::from("/")];
while let Some(dir) = dirs_to_scan.pop() {
let entries = origin
.readdir(&dir)
.await
.map_err(|e| DeltaError::OriginScan(e.to_string()))?;
for entry in entries {
let entry_path = dir.join(&entry.name);
if entry.is_dir {
dirs_to_scan.push(entry_path);
} else if Self::is_audio_file(&entry.name) {
let stat = origin
.stat(&entry_path)
.await
.map_err(|e| DeltaError::OriginScan(e.to_string()))?;
files.push(ScannedFile {
path: entry_path,
origin_id: origin.id().clone(),
size: stat.size,
mtime: stat.mtime,
});
}
}
}
Ok(files)
}
fn is_audio_file(name: &str) -> bool {
let lower = name.to_lowercase();
lower.ends_with(".flac")
|| lower.ends_with(".mp3")
|| lower.ends_with(".ogg")
|| lower.ends_with(".wav")
|| lower.ends_with(".m4a")
|| lower.ends_with(".aac")
|| lower.ends_with(".opus")
}
async fn compute_chunks_for_scan(
&self,
origin: &dyn Origin,
scanned: &ScannedFile,
) -> Result<Vec<ManifestChunk>, DeltaError> {
let data = origin
.read_full(&scanned.path)
.await
.map_err(|e| DeltaError::OriginRead(e.to_string()))?;
let chunks = self.chunker.chunk_refs(&data);
Ok(chunks
.into_iter()
.map(|c| ManifestChunk {
hash: c.hash,
offset: c.offset,
size: c.length,
})
.collect())
}
fn compute_diff(&self, old_chunks: &[ManifestChunk], new_chunks: &[ManifestChunk]) -> ManifestDiff {
let old_hashes: HashSet<_> = old_chunks.iter().map(|c| c.hash).collect();
let new_hashes: HashSet<_> = new_chunks.iter().map(|c| c.hash).collect();
ManifestDiff {
reuse: new_chunks
.iter()
.filter(|c| old_hashes.contains(&c.hash))
.cloned()
.collect(),
fetch: new_chunks
.iter()
.filter(|c| !old_hashes.contains(&c.hash))
.cloned()
.collect(),
orphaned: old_chunks
.iter()
.filter(|c| !new_hashes.contains(&c.hash))
.map(|c| c.hash)
.collect(),
}
}
}
impl Default for DeltaDetector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, thiserror::Error)]
pub enum DeltaError {
#[error("Origin read error: {0}")]
OriginRead(String),
#[error("Origin scan error: {0}")]
OriginScan(String),
}
#[cfg(test)]
mod tests {
use super::*;
use musicfs_core::OriginId;
use std::time::SystemTime;
fn make_file_meta(id: i64, path: &str, size: u64) -> FileMeta {
FileMeta {
id: FileId(id),
virtual_path: VirtualPath::new(format!("/test/{}", path)),
real_path: RealPath {
origin_id: OriginId::from("test"),
path: PathBuf::from(path),
},
size,
mtime: SystemTime::UNIX_EPOCH,
content_hash: None,
audio: None,
}
}
fn make_scanned_file(path: &str, size: u64) -> ScannedFile {
ScannedFile {
path: PathBuf::from(path),
origin_id: OriginId::from("test"),
size,
mtime: SystemTime::UNIX_EPOCH,
}
}
#[test]
fn test_is_modified_size_change() {
let detector = DeltaDetector::new();
let cached = make_file_meta(1, "test.flac", 1000);
let scanned = make_scanned_file("test.flac", 2000);
assert!(detector.is_modified_scan(&cached, &scanned));
}
#[test]
fn test_is_modified_same() {
let detector = DeltaDetector::new();
let cached = make_file_meta(1, "test.flac", 1000);
let scanned = make_scanned_file("test.flac", 1000);
assert!(!detector.is_modified_scan(&cached, &scanned));
}
#[test]
fn test_is_audio_file() {
assert!(DeltaDetector::is_audio_file("track.flac"));
assert!(DeltaDetector::is_audio_file("song.MP3"));
assert!(DeltaDetector::is_audio_file("audio.ogg"));
assert!(!DeltaDetector::is_audio_file("readme.txt"));
assert!(!DeltaDetector::is_audio_file("cover.jpg"));
}
#[test]
fn test_compute_diff() {
let detector = DeltaDetector::new();
let old_chunks = vec![
ManifestChunk {
hash: ChunkHash::from_bytes(b"A"),
offset: 0,
size: 256,
},
ManifestChunk {
hash: ChunkHash::from_bytes(b"B"),
offset: 256,
size: 256,
},
ManifestChunk {
hash: ChunkHash::from_bytes(b"C"),
offset: 512,
size: 256,
},
];
let new_chunks = vec![
ManifestChunk {
hash: ChunkHash::from_bytes(b"A"),
offset: 0,
size: 256,
},
ManifestChunk {
hash: ChunkHash::from_bytes(b"D"),
offset: 256,
size: 256,
},
ManifestChunk {
hash: ChunkHash::from_bytes(b"C"),
offset: 512,
size: 256,
},
];
let diff = detector.compute_diff(&old_chunks, &new_chunks);
assert_eq!(diff.reuse.len(), 2);
assert_eq!(diff.fetch.len(), 1);
assert_eq!(diff.orphaned.len(), 1);
}
}
+7 -1
View File
@@ -1 +1,7 @@
#![allow(dead_code)]
pub mod cdc;
pub mod delta;
pub mod watcher;
pub use cdc::{CdcChunker, Chunk, ChunkRef};
pub use delta::{ChangeSet, DeltaDetector, DeltaError, ManifestChunk, ManifestDiff};
pub use watcher::{OriginWatcher, WatchError, WatchHandle};
+206
View File
@@ -0,0 +1,206 @@
use musicfs_core::{Event, EventBus, OriginId, VirtualPath};
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{debug, error, info};
const DEBOUNCE_MS: u64 = 200;
pub struct OriginWatcher {
origin_id: OriginId,
root: PathBuf,
event_bus: Arc<EventBus>,
}
impl OriginWatcher {
pub fn new(origin_id: OriginId, root: PathBuf, event_bus: Arc<EventBus>) -> Self {
Self {
origin_id,
root,
event_bus,
}
}
pub fn start(self) -> WatchHandle {
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
let origin_id = self.origin_id.clone();
let root = self.root.clone();
let event_bus = self.event_bus.clone();
tokio::spawn(async move {
if let Err(e) = Self::watch_loop(&origin_id, &root, &event_bus, &mut stop_rx).await {
error!("Watcher error: {}", e);
}
});
WatchHandle { stop_tx }
}
async fn watch_loop(
origin_id: &OriginId,
root: &Path,
event_bus: &EventBus,
stop_rx: &mut mpsc::Receiver<()>,
) -> Result<(), WatchError> {
let (tx, mut rx) = mpsc::channel(100);
let mut watcher = RecommendedWatcher::new(
move |res: Result<notify::Event, notify::Error>| {
if let Ok(event) = res {
let _ = tx.blocking_send(event);
}
},
Config::default(),
)
.map_err(|e| WatchError::Init(e.to_string()))?;
watcher
.watch(root, RecursiveMode::Recursive)
.map_err(|e| WatchError::Watch(e.to_string()))?;
info!("Watching origin {} at {:?}", origin_id, root);
let mut debouncer: HashMap<PathBuf, Instant> = HashMap::new();
loop {
tokio::select! {
Some(event) = rx.recv() => {
Self::handle_notify_event(origin_id, root, event_bus, event, &mut debouncer);
}
_ = stop_rx.recv() => {
info!("Stopping watcher for {}", origin_id);
break;
}
}
}
Ok(())
}
fn handle_notify_event(
origin_id: &OriginId,
root: &Path,
event_bus: &EventBus,
event: notify::Event,
debouncer: &mut HashMap<PathBuf, Instant>,
) {
use notify::EventKind;
let now = Instant::now();
for path in event.paths {
let relative = match path.strip_prefix(root) {
Ok(p) => p.to_path_buf(),
Err(_) => continue,
};
if !Self::is_audio_file(&path) {
continue;
}
if let Some(last_seen) = debouncer.get(&relative) {
if now.duration_since(*last_seen).as_millis() < DEBOUNCE_MS as u128 {
debug!("Debouncing event for {:?}", relative);
continue;
}
}
debouncer.insert(relative.clone(), now);
let vpath = VirtualPath::new(format!("/{}", relative.display()));
match event.kind {
EventKind::Create(_) => {
debug!("File created: {:?}", relative);
event_bus.publish(Event::FileAdded {
path: vpath,
origin_id: origin_id.clone(),
});
}
EventKind::Remove(_) => {
debug!("File removed: {:?}", relative);
event_bus.publish(Event::FileRemoved { path: vpath });
}
EventKind::Modify(_) => {
debug!("File modified: {:?}", relative);
event_bus.publish(Event::FileModified { path: vpath });
}
_ => {}
}
}
}
fn is_audio_file(path: &Path) -> bool {
matches!(
path.extension()
.and_then(|e| e.to_str())
.map(|e| e.to_lowercase())
.as_deref(),
Some("flac" | "mp3" | "ogg" | "wav" | "m4a" | "aac" | "opus")
)
}
}
pub struct WatchHandle {
stop_tx: mpsc::Sender<()>,
}
impl WatchHandle {
pub async fn stop(self) {
let _ = self.stop_tx.send(()).await;
}
}
impl Drop for WatchHandle {
fn drop(&mut self) {
let _ = self.stop_tx.try_send(());
}
}
#[derive(Debug, thiserror::Error)]
pub enum WatchError {
#[error("Failed to initialize watcher: {0}")]
Init(String),
#[error("Failed to watch path: {0}")]
Watch(String),
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tempfile::TempDir;
#[tokio::test]
async fn test_watcher_detects_create() {
let dir = TempDir::new().unwrap();
let event_bus = Arc::new(EventBus::default());
let mut rx = event_bus.subscribe();
let watcher = OriginWatcher::new(OriginId::from("test"), dir.path().to_path_buf(), event_bus);
let handle = watcher.start();
tokio::time::sleep(Duration::from_millis(100)).await;
std::fs::write(dir.path().join("test.flac"), b"audio").unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
let event = rx.try_recv();
assert!(matches!(event, Ok(Event::FileAdded { .. })));
handle.stop().await;
}
#[test]
fn test_is_audio_file() {
assert!(OriginWatcher::is_audio_file(Path::new("/music/song.flac")));
assert!(OriginWatcher::is_audio_file(Path::new("/music/song.MP3")));
assert!(!OriginWatcher::is_audio_file(Path::new("/music/cover.jpg")));
assert!(!OriginWatcher::is_audio_file(Path::new("/music/readme.txt")));
}
}