From ffbb238633e30b0d3fdce23ca0e7aa20ec83f191 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 12 May 2026 18:43:39 +0200 Subject: [PATCH] Implement Week 4 CAS store with chunk deduplication and LRU eviction - Add musicfs-cas crate: CasStore, ChunkHash, FileReader, ChunkManifest - Add LruEviction policy to musicfs-cache for cache size management - Integrate FileReader into FUSE filesystem for actual file reads - Use xxHash64 for content hashing, sled for index, msgpack serialization - Default cache path: ~/.cache/musicfs/chunks/ with 256 subdirs sharding - 20 new tests (14 CAS unit + 3 integration + 3 eviction), 54 total --- docs/v2/plans/week-04-cas-caching.md | 1089 +++++++++++++++++ musicfs/Cargo.lock | 155 ++- musicfs/Cargo.toml | 6 + musicfs/crates/musicfs-cache/Cargo.toml | 1 + musicfs/crates/musicfs-cache/src/eviction.rs | 155 +++ musicfs/crates/musicfs-cache/src/lib.rs | 2 + musicfs/crates/musicfs-cas/Cargo.toml | 15 + musicfs/crates/musicfs-cas/src/chunks.rs | 45 + musicfs/crates/musicfs-cas/src/lib.rs | 8 +- musicfs/crates/musicfs-cas/src/reader.rs | 253 ++++ musicfs/crates/musicfs-cas/src/store.rs | 324 +++++ .../crates/musicfs-cas/tests/integration.rs | 100 ++ musicfs/crates/musicfs-core/src/types.rs | 22 +- musicfs/crates/musicfs-fuse/Cargo.toml | 1 + musicfs/crates/musicfs-fuse/src/filesystem.rs | 42 +- 15 files changed, 2204 insertions(+), 14 deletions(-) create mode 100644 docs/v2/plans/week-04-cas-caching.md create mode 100644 musicfs/crates/musicfs-cache/src/eviction.rs create mode 100644 musicfs/crates/musicfs-cas/src/chunks.rs create mode 100644 musicfs/crates/musicfs-cas/src/reader.rs create mode 100644 musicfs/crates/musicfs-cas/src/store.rs create mode 100644 musicfs/crates/musicfs-cas/tests/integration.rs diff --git a/docs/v2/plans/week-04-cas-caching.md b/docs/v2/plans/week-04-cas-caching.md new file mode 100644 index 0000000..47ff19b --- /dev/null +++ b/docs/v2/plans/week-04-cas-caching.md @@ -0,0 +1,1089 @@ +# Week 4: CAS & Chunk Caching + +**Phase**: 1 (MVP) +**Prerequisites**: Week 3 (Virtual Tree & Basic Ops) +**Estimated effort**: 5 days + +--- + +## Objective + +Implement Content-Addressable Storage (CAS) for chunk deduplication, cache eviction with LRU policy, and connect to FUSE read operations to enable actual file playback. + +**Note**: Week 4 treats whole files as single chunks for simplicity. Week 5 adds CDC (Content-Defined Chunking) via FastCDC for efficient delta sync (FR-8.2, FR-11.2). + +--- + +## Deliverables + +| Task | Crate | Files | Done | +|------|-------|-------|------| +| CAS store implementation | musicfs-cas | `lib.rs`, `store.rs` | [ ] | +| Chunk storage | musicfs-cas | `chunks.rs` | [ ] | +| Cache eviction (LRU) | musicfs-cache | `eviction.rs` | [ ] | +| FUSE read integration | musicfs-fuse | `filesystem.rs` | [ ] | +| Integration tests | tests/integration | `basic_mount.rs` | [ ] | + +--- + +## Task 1: CAS Store + +### 1.1 Update `musicfs-cas/Cargo.toml` + +```toml +[package] +name = "musicfs-cas" +version.workspace = true +edition.workspace = true + +[dependencies] +musicfs-core = { path = "../musicfs-core" } +tokio.workspace = true +tracing.workspace = true +serde.workspace = true +sled = "0.34" +xxhash-rust = { version = "0.8", features = ["xxh64"] } +bytes = "1" +rmp-serde = "1" # msgpack per architecture 4.3.6 +hex = "0.4" +dirs = "5" # For ~/.cache resolution +thiserror.workspace = true +``` + +### 1.2 Create `musicfs-cas/src/lib.rs` + +```rust +mod store; +mod chunks; + +pub use store::{CasStore, CasConfig, CasError, DedupStats}; +pub use chunks::{ChunkHash, ChunkLocation, ChunkRef}; +``` + +### 1.3 Create `musicfs-cas/src/chunks.rs` + +```rust +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +/// Chunk hash (xxHash64, 8 bytes) per architecture 8.3 +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ChunkHash(pub [u8; 8]); + +impl ChunkHash { + pub fn from_bytes(bytes: &[u8]) -> Self { + let hash = xxhash_rust::xxh64::xxh64(bytes, 0); + Self(hash.to_le_bytes()) + } + + pub fn as_hex(&self) -> String { + hex::encode(self.0) + } + + pub fn from_hex(s: &str) -> Option { + let bytes = hex::decode(s).ok()?; + if bytes.len() != 8 { + return None; + } + let mut arr = [0u8; 8]; + arr.copy_from_slice(&bytes); + Some(Self(arr)) + } +} + +impl std::fmt::Display for ChunkHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_hex()) + } +} + +/// Location of a chunk in storage +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkLocation { + pub path: PathBuf, + pub size: u32, +} + +/// Reference to a chunk within a file (per architecture 4.3.6 chunk_manifest format) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkRef { + pub hash: ChunkHash, + pub offset: u64, + pub size: u32, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_chunk_hash_from_bytes() { + let data = b"hello world"; + let hash = ChunkHash::from_bytes(data); + assert_eq!(hash.as_hex().len(), 16); + } + + #[test] + fn test_chunk_hash_deterministic() { + let data = b"test data"; + let hash1 = ChunkHash::from_bytes(data); + let hash2 = ChunkHash::from_bytes(data); + assert_eq!(hash1, hash2); + } + + #[test] + fn test_chunk_hash_hex_roundtrip() { + let data = b"roundtrip test"; + let hash = ChunkHash::from_bytes(data); + let hex = hash.as_hex(); + let restored = ChunkHash::from_hex(&hex).unwrap(); + assert_eq!(hash, restored); + } +} +``` + +### 1.4 Create `musicfs-cas/src/store.rs` + +```rust +use crate::chunks::{ChunkHash, ChunkLocation}; +use bytes::Bytes; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::fs; +use tracing::{debug, warn}; + +/// CAS configuration +#[derive(Debug, Clone)] +pub struct CasConfig { + /// Root directory for chunk storage + pub chunks_dir: PathBuf, + /// Maximum cache size in bytes (FR-8.2) + pub max_size: u64, + /// Number of subdirectory levels (for filesystem performance) + pub shard_levels: u8, +} + +impl Default for CasConfig { + fn default() -> Self { + // Per architecture 4.3.2: ~/.cache/musicfs/chunks/ + let cache_dir = dirs::cache_dir() + .unwrap_or_else(|| PathBuf::from(".cache")) + .join("musicfs") + .join("chunks"); + + Self { + chunks_dir: cache_dir, + max_size: 10 * 1024 * 1024 * 1024, // 10 GB per NFR-5.2 + shard_levels: 2, // 256 subdirs per architecture 4.3.2 + } + } +} + +/// Content-Addressable Storage (FR-20.1-20.4) +pub struct CasStore { + config: CasConfig, + index: sled::Db, + current_size: AtomicU64, +} + +impl CasStore { + pub async fn open(config: CasConfig) -> Result { + fs::create_dir_all(&config.chunks_dir).await?; + + let index_path = config.chunks_dir.join("index.sled"); + let index = sled::open(&index_path)?; + + let current_size = Self::calculate_size(&config.chunks_dir).await; + + Ok(Self { + config, + index, + current_size: AtomicU64::new(current_size), + }) + } + + async fn calculate_size(dir: &Path) -> u64 { + let mut size = 0u64; + if let Ok(mut entries) = fs::read_dir(dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + if let Ok(meta) = entry.metadata().await { + if meta.is_file() { + size += meta.len(); + } + } + } + } + size + } + + /// Store chunk, returns hash (FR-20.1) + /// Deduplicates automatically - same content = same hash (FR-20.2) + pub async fn put(&self, data: &[u8]) -> Result { + let hash = ChunkHash::from_bytes(data); + let path = self.chunk_path(&hash); + + if path.exists() { + debug!("Chunk {} already exists (dedup)", hash); + return Ok(hash); + } + + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + + fs::write(&path, data).await?; + + let location = ChunkLocation { + path: path.clone(), + size: data.len() as u32, + }; + // Use msgpack per architecture 4.3.6 + self.index.insert( + hash.0.as_slice(), + rmp_serde::to_vec(&location).unwrap(), + )?; + + self.current_size.fetch_add(data.len() as u64, Ordering::SeqCst); + + debug!("Stored chunk {} ({} bytes)", hash, data.len()); + Ok(hash) + } + + /// Retrieve chunk by hash (FR-20.1) + pub async fn get(&self, hash: &ChunkHash) -> Result { + let path = self.chunk_path(hash); + + if !path.exists() { + return Err(CasError::NotFound(hash.as_hex())); + } + + let data = fs::read(&path).await?; + + if self.config.max_size > 0 { + self.verify_integrity(hash, &data)?; + } + + Ok(Bytes::from(data)) + } + + /// Check if chunk exists (for dedup check) + pub fn exists(&self, hash: &ChunkHash) -> bool { + self.chunk_path(hash).exists() + } + + /// Verify chunk integrity (FR-20.4) + fn verify_integrity(&self, expected: &ChunkHash, data: &[u8]) -> Result<(), CasError> { + let actual = ChunkHash::from_bytes(data); + if actual != *expected { + warn!("Chunk integrity failure: expected {}, got {}", expected, actual); + return Err(CasError::IntegrityError { + expected: expected.as_hex(), + actual: actual.as_hex(), + }); + } + Ok(()) + } + + /// Get path for a chunk hash (sharded for filesystem performance) + fn chunk_path(&self, hash: &ChunkHash) -> PathBuf { + let hex = hash.as_hex(); + let mut path = self.config.chunks_dir.clone(); + + for i in 0..self.config.shard_levels as usize { + let start = i * 2; + let end = start + 2; + if end <= hex.len() { + path = path.join(&hex[start..end]); + } + } + + path.join(&hex) + } + + /// Delete a chunk + pub async fn delete(&self, hash: &ChunkHash) -> Result<(), CasError> { + let path = self.chunk_path(hash); + + if path.exists() { + let meta = fs::metadata(&path).await?; + fs::remove_file(&path).await?; + self.index.remove(hash.0.as_slice())?; + self.current_size.fetch_sub(meta.len(), Ordering::SeqCst); + debug!("Deleted chunk {}", hash); + } + + Ok(()) + } + + /// Get current cache size + pub fn current_size(&self) -> u64 { + self.current_size.load(Ordering::SeqCst) + } + + /// Get maximum cache size + pub fn max_size(&self) -> u64 { + self.config.max_size + } + + /// List all chunk hashes + pub fn list_chunks(&self) -> impl Iterator + '_ { + self.index.iter().filter_map(|r| { + r.ok().and_then(|(k, _)| { + if k.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(&k); + Some(ChunkHash(arr)) + } else { + None + } + }) + }) + } + + /// Get deduplication statistics (FR-20.3) + pub fn dedup_stats(&self) -> DedupStats { + let chunks_stored = self.index.len() as u64; + let size_bytes = self.current_size(); + + DedupStats { + chunks_stored, + chunks_unique: chunks_stored, // All stored chunks are unique by definition + size_bytes, + size_limit_bytes: self.config.max_size, + } + } +} + +/// Deduplication statistics (FR-20.3) +#[derive(Debug, Clone)] +pub struct DedupStats { + pub chunks_stored: u64, + pub chunks_unique: u64, + pub size_bytes: u64, + pub size_limit_bytes: u64, +} + +impl DedupStats { + /// Calculate dedup ratio (space saved) + pub fn dedup_ratio(&self) -> f64 { + if self.chunks_stored == 0 { + 0.0 + } else { + 1.0 - (self.chunks_unique as f64 / self.chunks_stored as f64) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CasError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Sled error: {0}")] + Sled(#[from] sled::Error), + + #[error("Chunk not found: {0}")] + NotFound(String), + + #[error("Integrity error: expected {expected}, got {actual}")] + IntegrityError { expected: String, actual: String }, +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + async fn test_store() -> (CasStore, TempDir) { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size: 1024 * 1024, + shard_levels: 2, + }; + let store = CasStore::open(config).await.unwrap(); + (store, dir) + } + + #[tokio::test] + async fn test_cas_put_get() { + let (store, _dir) = test_store().await; + + let data = b"test chunk data"; + let hash = store.put(data).await.unwrap(); + + let retrieved = store.get(&hash).await.unwrap(); + assert_eq!(&retrieved[..], data); + } + + #[tokio::test] + async fn test_cas_dedup() { + let (store, _dir) = test_store().await; + + let data = b"duplicate data"; + let hash1 = store.put(data).await.unwrap(); + let hash2 = store.put(data).await.unwrap(); + + assert_eq!(hash1, hash2); + } + + #[tokio::test] + async fn test_cas_exists() { + let (store, _dir) = test_store().await; + + let data = b"existence test"; + let hash = store.put(data).await.unwrap(); + + assert!(store.exists(&hash)); + + let fake_hash = ChunkHash::from_bytes(b"nonexistent"); + assert!(!store.exists(&fake_hash)); + } + + #[tokio::test] + async fn test_cas_delete() { + let (store, _dir) = test_store().await; + + let data = b"delete me"; + let hash = store.put(data).await.unwrap(); + + assert!(store.exists(&hash)); + + store.delete(&hash).await.unwrap(); + + assert!(!store.exists(&hash)); + } + + #[tokio::test] + async fn test_cas_integrity() { + let (store, _dir) = test_store().await; + + let data = b"integrity test"; + let hash = store.put(data).await.unwrap(); + + let retrieved = store.get(&hash).await.unwrap(); + assert_eq!(&retrieved[..], data); + } + + #[tokio::test] + async fn test_cas_dedup_stats() { + let (store, _dir) = test_store().await; + + store.put(b"chunk1").await.unwrap(); + store.put(b"chunk2").await.unwrap(); + store.put(b"chunk1").await.unwrap(); // Duplicate + + let stats = store.dedup_stats(); + assert_eq!(stats.chunks_stored, 2); // Only 2 unique + assert_eq!(stats.chunks_unique, 2); + } +} +``` + +--- + +## Task 2: Cache Eviction + +### 2.1 Add to `musicfs-cache/src/lib.rs` + +```rust +mod eviction; +pub use eviction::{LruEviction, EvictionPolicy}; +``` + +### 2.2 Create `musicfs-cache/src/eviction.rs` + +```rust +use musicfs_cas::{CasStore, ChunkHash}; +use std::collections::BTreeMap; +use std::sync::RwLock; +use std::time::Instant; +use tracing::{debug, info}; + +/// Eviction policy trait +pub trait EvictionPolicy: Send + Sync { + fn record_access(&self, hash: ChunkHash); + fn select_victims(&self, count: usize) -> Vec; + fn remove(&self, hash: &ChunkHash); +} + +/// LRU eviction policy (FR-8.2) +pub struct LruEviction { + access_times: RwLock>, + hash_to_time: RwLock>, +} + +impl LruEviction { + pub fn new() -> Self { + Self { + access_times: RwLock::new(BTreeMap::new()), + hash_to_time: RwLock::new(std::collections::HashMap::new()), + } + } + + /// Evict chunks until under target size + pub async fn evict_to_target( + &self, + store: &CasStore, + target_size: u64, + ) -> Result { + let mut bytes_freed = 0u64; + + while store.current_size() > target_size { + let victims = self.select_victims(10); + + if victims.is_empty() { + break; + } + + for hash in victims { + if let Ok(data) = store.get(&hash).await { + bytes_freed += data.len() as u64; + store.delete(&hash).await?; + self.remove(&hash); + } + } + } + + if bytes_freed > 0 { + info!("Evicted {} bytes from cache", bytes_freed); + } + + Ok(bytes_freed) + } +} + +impl Default for LruEviction { + fn default() -> Self { + Self::new() + } +} + +impl EvictionPolicy for LruEviction { + fn record_access(&self, hash: ChunkHash) { + let now = Instant::now(); + let mut times = self.access_times.write().unwrap(); + let mut h2t = self.hash_to_time.write().unwrap(); + + if let Some(old_time) = h2t.remove(&hash) { + times.remove(&old_time); + } + + times.insert(now, hash); + h2t.insert(hash, now); + } + + fn select_victims(&self, count: usize) -> Vec { + let times = self.access_times.read().unwrap(); + times.values().take(count).copied().collect() + } + + fn remove(&self, hash: &ChunkHash) { + let mut times = self.access_times.write().unwrap(); + let mut h2t = self.hash_to_time.write().unwrap(); + + if let Some(time) = h2t.remove(hash) { + times.remove(&time); + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum EvictionError { + #[error("CAS error: {0}")] + Cas(#[from] musicfs_cas::CasError), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lru_access_order() { + let lru = LruEviction::new(); + + let h1 = ChunkHash::from_bytes(b"chunk1"); + let h2 = ChunkHash::from_bytes(b"chunk2"); + let h3 = ChunkHash::from_bytes(b"chunk3"); + + lru.record_access(h1); + std::thread::sleep(std::time::Duration::from_millis(1)); + lru.record_access(h2); + std::thread::sleep(std::time::Duration::from_millis(1)); + lru.record_access(h3); + + let victims = lru.select_victims(2); + assert_eq!(victims.len(), 2); + assert_eq!(victims[0], h1); + assert_eq!(victims[1], h2); + } + + #[test] + fn test_lru_reaccess_updates_order() { + let lru = LruEviction::new(); + + let h1 = ChunkHash::from_bytes(b"chunk1"); + let h2 = ChunkHash::from_bytes(b"chunk2"); + + lru.record_access(h1); + std::thread::sleep(std::time::Duration::from_millis(1)); + lru.record_access(h2); + std::thread::sleep(std::time::Duration::from_millis(1)); + lru.record_access(h1); + + let victims = lru.select_victims(1); + assert_eq!(victims[0], h2); + } + + #[test] + fn test_lru_remove() { + let lru = LruEviction::new(); + + let h1 = ChunkHash::from_bytes(b"chunk1"); + let h2 = ChunkHash::from_bytes(b"chunk2"); + + lru.record_access(h1); + lru.record_access(h2); + lru.remove(&h1); + + let victims = lru.select_victims(10); + assert_eq!(victims.len(), 1); + assert_eq!(victims[0], h2); + } +} +``` + +--- + +## Task 3: File Reader Integration + +### 3.1 Create `musicfs-cas/src/reader.rs` + +```rust +use crate::{ChunkHash, ChunkRef, CasStore}; +use bytes::{Bytes, BytesMut}; +use musicfs_core::FileId; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::RwLock; + +/// Chunk manifest for a file (per architecture 4.3.6) +/// Stored as msgpack BLOB in SQLite files.chunk_manifest column +/// Format: [(chunk_hash, offset, size), ...] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkManifest { + pub file_id: FileId, + pub total_size: u64, + pub chunks: Vec, +} + +impl ChunkManifest { + /// Serialize chunks to msgpack for database storage (architecture 4.3.6) + pub fn chunks_to_bytes(&self) -> Vec { + rmp_serde::to_vec(&self.chunks).unwrap() + } + + /// Deserialize chunks from database BLOB + pub fn chunks_from_bytes(data: &[u8]) -> Option> { + rmp_serde::from_slice(data).ok() + } + + /// Create manifest from database fields + pub fn from_db(file_id: FileId, total_size: u64, chunk_blob: &[u8]) -> Option { + let chunks = Self::chunks_from_bytes(chunk_blob)?; + Some(Self { file_id, total_size, chunks }) + } +} + +/// File reader using CAS chunks +pub struct FileReader { + store: std::sync::Arc, + manifests: RwLock>, +} + +impl FileReader { + pub fn new(store: std::sync::Arc) -> Self { + Self { + store, + manifests: RwLock::new(HashMap::new()), + } + } + + /// Register a file's chunk manifest + pub fn register_manifest(&self, manifest: ChunkManifest) { + let mut manifests = self.manifests.write().unwrap(); + manifests.insert(manifest.file_id, manifest); + } + + /// Read bytes from a file at offset + pub async fn read( + &self, + file_id: FileId, + offset: u64, + size: u32, + ) -> Result { + let manifest = { + let manifests = self.manifests.read().unwrap(); + manifests.get(&file_id).cloned() + .ok_or(ReaderError::ManifestNotFound(file_id))? + }; + + if offset >= manifest.total_size { + return Ok(Bytes::new()); + } + + let end = std::cmp::min(offset + size as u64, manifest.total_size); + let mut result = BytesMut::with_capacity((end - offset) as usize); + + for chunk_ref in &manifest.chunks { + let chunk_start = chunk_ref.offset; + let chunk_end = chunk_ref.offset + chunk_ref.size as u64; + + if chunk_end <= offset || chunk_start >= end { + continue; + } + + let chunk_data = self.store.get(&chunk_ref.hash).await?; + + let read_start = if offset > chunk_start { + (offset - chunk_start) as usize + } else { + 0 + }; + + let read_end = if end < chunk_end { + (end - chunk_start) as usize + } else { + chunk_ref.size as usize + }; + + result.extend_from_slice(&chunk_data[read_start..read_end]); + } + + Ok(result.freeze()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ReaderError { + #[error("Manifest not found for file {0:?}")] + ManifestNotFound(FileId), + + #[error("CAS error: {0}")] + Cas(#[from] crate::CasError), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::CasConfig; + use tempfile::TempDir; + + #[tokio::test] + async fn test_file_reader_simple() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + + let data = b"Hello, World!"; + let hash = store.put(data).await.unwrap(); + + let reader = FileReader::new(store); + reader.register_manifest(ChunkManifest { + file_id: FileId(1), + total_size: data.len() as u64, + chunks: vec![ChunkRef { + hash, + offset: 0, + size: data.len() as u32, + }], + }); + + let result = reader.read(FileId(1), 0, data.len() as u32).await.unwrap(); + assert_eq!(&result[..], data); + } + + #[tokio::test] + async fn test_file_reader_partial() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + + let data = b"ABCDEFGHIJ"; + let hash = store.put(data).await.unwrap(); + + let reader = FileReader::new(store); + reader.register_manifest(ChunkManifest { + file_id: FileId(1), + total_size: data.len() as u64, + chunks: vec![ChunkRef { + hash, + offset: 0, + size: data.len() as u32, + }], + }); + + let result = reader.read(FileId(1), 3, 4).await.unwrap(); + assert_eq!(&result[..], b"DEFG"); + } +} +``` + +### 3.2 Update `musicfs-cas/src/lib.rs` + +```rust +mod store; +mod chunks; +mod reader; + +pub use store::{CasStore, CasConfig, CasError, DedupStats}; +pub use chunks::{ChunkHash, ChunkLocation, ChunkRef}; +pub use reader::{FileReader, ChunkManifest, ReaderError}; +``` + +--- + +## Task 4: FUSE Read Integration + +### 4.1 Update `musicfs-fuse/Cargo.toml` + +```toml +[dependencies] +musicfs-core = { path = "../musicfs-core" } +musicfs-cache = { path = "../musicfs-cache" } +musicfs-cas = { path = "../musicfs-cas" } +musicfs-origins = { path = "../musicfs-origins" } +# ... rest of dependencies +``` + +### 4.2 Update `musicfs-fuse/src/filesystem.rs` read method + +Replace the placeholder `read` implementation: + +```rust +use musicfs_cas::{FileReader, ChunkManifest}; + +pub struct MusicFs { + tree: Arc>, + reader: Arc, + uid: u32, + gid: u32, +} + +impl MusicFs { + pub fn new( + tree: Arc>, + reader: Arc, + ) -> Self { + Self { + tree, + reader, + uid: unsafe { libc::getuid() }, + gid: unsafe { libc::getgid() }, + } + } +} + +// In Filesystem impl: +fn read( + &mut self, + _req: &Request, + ino: u64, + _fh: u64, + offset: i64, + size: u32, + _flags: i32, + _lock_owner: Option, + reply: ReplyData, +) { + debug!("read(ino={}, offset={}, size={})", ino, offset, size); + + let file_id = { + let tree = self.tree.read().unwrap(); + if let Some(VirtualNode::File(file)) = tree.get(ino) { + file.file_id + } else { + reply.error(libc::ENOENT); + return; + } + }; + + // Use tokio runtime for async read + let reader = self.reader.clone(); + let result = tokio::runtime::Handle::current().block_on(async { + reader.read(file_id, offset as u64, size).await + }); + + match result { + Ok(data) => reply.data(&data), + Err(e) => { + warn!("Read error: {}", e); + reply.error(libc::EIO); + } + } +} +``` + +--- + +## Task 5: Integration Tests + +### 5.1 Create `tests/integration/basic_mount.rs` + +```rust +use musicfs_cache::{TreeBuilder, VirtualTree}; +use musicfs_cas::{CasStore, CasConfig, FileReader, ChunkManifest, ChunkRef}; +use musicfs_core::{FileId, FileMeta, OriginId, RealPath, VirtualPath}; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; +use std::time::SystemTime; +use tempfile::TempDir; + +fn make_file_meta(id: i64, vpath: &str, size: u64) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(vpath), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from("/test"), + }, + size, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + } +} + +#[tokio::test] +async fn test_cas_and_tree_integration() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + + let file_data = b"This is test audio file content for testing."; + let chunk_hash = store.put(file_data).await.unwrap(); + + let mut builder = TreeBuilder::new(); + builder.add_file(&make_file_meta(1, "/Artist/Album/Track.flac", file_data.len() as u64)); + let tree = Arc::new(RwLock::new(builder.build())); + + let reader = Arc::new(FileReader::new(store.clone())); + reader.register_manifest(ChunkManifest { + file_id: FileId(1), + total_size: file_data.len() as u64, + chunks: vec![ChunkRef { + hash: chunk_hash, + offset: 0, + size: file_data.len() as u32, + }], + }); + + let result = reader.read(FileId(1), 0, file_data.len() as u32).await.unwrap(); + assert_eq!(&result[..], file_data); +} + +#[tokio::test] +async fn test_cache_persistence() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + + let data = b"persistent data"; + let hash = { + let store = CasStore::open(config.clone()).await.unwrap(); + store.put(data).await.unwrap() + }; + + let store = CasStore::open(config).await.unwrap(); + let retrieved = store.get(&hash).await.unwrap(); + assert_eq!(&retrieved[..], data); +} + +#[tokio::test] +async fn test_deduplication() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = CasStore::open(config).await.unwrap(); + + let data = b"duplicate this content"; + + let hash1 = store.put(data).await.unwrap(); + let size_after_first = store.current_size(); + + let hash2 = store.put(data).await.unwrap(); + let size_after_second = store.current_size(); + + assert_eq!(hash1, hash2); + assert_eq!(size_after_first, size_after_second); +} +``` + +--- + +## Tests + +| Test | Type | Validates | +|------|------|-----------| +| `test_cas_put_get` | Unit | Basic store/retrieve (FR-20.1) | +| `test_cas_dedup` | Unit | Same content → same hash (FR-20.2) | +| `test_cas_dedup_stats` | Unit | Dedup statistics reported (FR-20.3) | +| `test_cas_integrity` | Unit | Verify chunk hash (FR-20.4) | +| `test_lru_access_order` | Unit | LRU ordering correct | +| `test_lru_reaccess_updates_order` | Unit | Re-access moves to end | +| `test_cache_eviction` | Unit | LRU eviction works (FR-8.4) | +| `test_cache_persistence` | Integration | Survives restart (FR-8.4) | +| `test_file_reader_simple` | Unit | Full file read | +| `test_file_reader_partial` | Unit | Offset/size read | +| `test_cas_and_tree_integration` | Integration | End-to-end read | +| `test_deduplication` | Integration | Dedup saves space | + +--- + +## Exit Criteria + +- [ ] Chunks stored in CAS with deduplication (FR-20.1, FR-20.2) +- [ ] Deduplication statistics reported via `dedup_stats()` (FR-20.3) +- [ ] Chunk integrity verified on read (FR-20.4) +- [ ] Cache size limit enforced via LRU eviction (FR-8.4) +- [ ] Cache persists across daemon restarts (FR-8.4) +- [ ] FUSE `read()` returns actual file content +- [ ] Audio playback works through mounted filesystem +- [ ] All Phase 1 requirements pass acceptance tests + +--- + +## Dependencies to Add + +### Workspace `Cargo.toml` + +```toml +[workspace.dependencies] +# ... existing ... +sled = "0.34" +xxhash-rust = { version = "0.8", features = ["xxh64"] } +bytes = "1" +rmp-serde = "1" # msgpack per architecture 4.3.6 +hex = "0.4" +dirs = "5" # For ~/.cache resolution +tempfile = "3" +``` + +--- + +## Next Week + +Week 5 will implement CDC chunking and delta detection for efficient synchronization. diff --git a/musicfs/Cargo.lock b/musicfs/Cargo.lock index b6a2bc5..3787df5 100644 --- a/musicfs/Cargo.lock +++ b/musicfs/Cargo.lock @@ -113,6 +113,27 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -135,7 +156,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -208,6 +229,17 @@ dependencies = [ "byteorder", ] +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "getrandom" version = "0.4.2" @@ -317,6 +349,15 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "libredox" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c" +dependencies = [ + "libc", +] + [[package]] name = "libsqlite3-sys" version = "0.28.0" @@ -363,13 +404,14 @@ checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] name = "musicfs-cache" version = "0.1.0" dependencies = [ + "musicfs-cas", "musicfs-core", "rmp-serde", "rusqlite", @@ -384,6 +426,21 @@ dependencies = [ [[package]] name = "musicfs-cas" version = "0.1.0" +dependencies = [ + "bytes", + "dirs", + "hex", + "musicfs-cache", + "musicfs-core", + "rmp-serde", + "serde", + "sled", + "tempfile", + "thiserror", + "tokio", + "tracing", + "xxhash-rust", +] [[package]] name = "musicfs-cli" @@ -407,6 +464,7 @@ dependencies = [ "fuser", "libc", "musicfs-cache", + "musicfs-cas", "musicfs-core", "tokio", "tracing", @@ -464,6 +522,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "page_size" version = "0.6.0" @@ -586,6 +650,17 @@ dependencies = [ "bitflags 2.11.1", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom 0.2.17", + "libredox", + "thiserror", +] + [[package]] name = "rmp" version = "0.8.15" @@ -629,7 +704,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -732,7 +807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -886,10 +961,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom", + "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -926,7 +1001,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1081,6 +1156,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -1090,6 +1174,63 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/musicfs/Cargo.toml b/musicfs/Cargo.toml index ac880dc..9fa740e 100644 --- a/musicfs/Cargo.toml +++ b/musicfs/Cargo.toml @@ -44,5 +44,11 @@ symphonia = { version = "0.5", default-features = false, features = [ "aac", "alac", "flac", "mp3", "ogg", "vorbis", "wav" ] } +# Bytes handling +bytes = "1" + +# Platform directories +dirs = "5" + # Testing tempfile = "3" diff --git a/musicfs/crates/musicfs-cache/Cargo.toml b/musicfs/crates/musicfs-cache/Cargo.toml index bf945bf..d4fe15c 100644 --- a/musicfs/crates/musicfs-cache/Cargo.toml +++ b/musicfs/crates/musicfs-cache/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] musicfs-core = { path = "../musicfs-core" } +musicfs-cas = { path = "../musicfs-cas" } rusqlite = { workspace = true, features = ["bundled"] } sled.workspace = true tokio.workspace = true diff --git a/musicfs/crates/musicfs-cache/src/eviction.rs b/musicfs/crates/musicfs-cache/src/eviction.rs new file mode 100644 index 0000000..6a2487d --- /dev/null +++ b/musicfs/crates/musicfs-cache/src/eviction.rs @@ -0,0 +1,155 @@ +use musicfs_cas::CasStore; +use musicfs_core::ChunkHash; +use std::collections::BTreeMap; +use std::sync::RwLock; +use std::time::Instant; +use tracing::info; + +pub trait EvictionPolicy: Send + Sync { + fn record_access(&self, hash: ChunkHash); + fn select_victims(&self, count: usize) -> Vec; + fn remove(&self, hash: &ChunkHash); +} + +pub struct LruEviction { + access_times: RwLock>, + hash_to_time: RwLock>, +} + +impl LruEviction { + pub fn new() -> Self { + Self { + access_times: RwLock::new(BTreeMap::new()), + hash_to_time: RwLock::new(std::collections::HashMap::new()), + } + } + + pub async fn evict_to_target( + &self, + store: &CasStore, + target_size: u64, + ) -> Result { + let mut bytes_freed = 0u64; + + while store.current_size() > target_size { + let victims = self.select_victims(10); + + if victims.is_empty() { + break; + } + + for hash in victims { + if let Ok(data) = store.get(&hash).await { + bytes_freed += data.len() as u64; + store.delete(&hash).await?; + self.remove(&hash); + } + } + } + + if bytes_freed > 0 { + info!("Evicted {} bytes from cache", bytes_freed); + } + + Ok(bytes_freed) + } +} + +impl Default for LruEviction { + fn default() -> Self { + Self::new() + } +} + +impl EvictionPolicy for LruEviction { + fn record_access(&self, hash: ChunkHash) { + let now = Instant::now(); + let mut times = self.access_times.write().unwrap(); + let mut h2t = self.hash_to_time.write().unwrap(); + + if let Some(old_time) = h2t.remove(&hash) { + times.remove(&old_time); + } + + times.insert(now, hash); + h2t.insert(hash, now); + } + + fn select_victims(&self, count: usize) -> Vec { + let times = self.access_times.read().unwrap(); + times.values().take(count).copied().collect() + } + + fn remove(&self, hash: &ChunkHash) { + let mut times = self.access_times.write().unwrap(); + let mut h2t = self.hash_to_time.write().unwrap(); + + if let Some(time) = h2t.remove(hash) { + times.remove(&time); + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum EvictionError { + #[error("CAS error: {0}")] + Cas(#[from] musicfs_cas::CasError), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lru_access_order() { + let lru = LruEviction::new(); + + let h1 = ChunkHash::from_bytes(b"chunk1"); + let h2 = ChunkHash::from_bytes(b"chunk2"); + let h3 = ChunkHash::from_bytes(b"chunk3"); + + lru.record_access(h1); + std::thread::sleep(std::time::Duration::from_millis(1)); + lru.record_access(h2); + std::thread::sleep(std::time::Duration::from_millis(1)); + lru.record_access(h3); + + let victims = lru.select_victims(2); + assert_eq!(victims.len(), 2); + assert_eq!(victims[0], h1); + assert_eq!(victims[1], h2); + } + + #[test] + fn test_lru_reaccess_updates_order() { + let lru = LruEviction::new(); + + let h1 = ChunkHash::from_bytes(b"chunk1"); + let h2 = ChunkHash::from_bytes(b"chunk2"); + + lru.record_access(h1); + std::thread::sleep(std::time::Duration::from_millis(1)); + lru.record_access(h2); + std::thread::sleep(std::time::Duration::from_millis(1)); + lru.record_access(h1); + + let victims = lru.select_victims(1); + assert_eq!(victims[0], h2); + } + + #[test] + fn test_lru_remove() { + let lru = LruEviction::new(); + + let h1 = ChunkHash::from_bytes(b"chunk1"); + let h2 = ChunkHash::from_bytes(b"chunk2"); + + lru.record_access(h1); + lru.record_access(h2); + lru.remove(&h1); + + let victims = lru.select_victims(10); + assert_eq!(victims.len(), 1); + assert_eq!(victims[0], h2); + } +} diff --git a/musicfs/crates/musicfs-cache/src/lib.rs b/musicfs/crates/musicfs-cache/src/lib.rs index 4456134..1a73920 100644 --- a/musicfs/crates/musicfs-cache/src/lib.rs +++ b/musicfs/crates/musicfs-cache/src/lib.rs @@ -1,8 +1,10 @@ mod db; +mod eviction; mod metadata; mod tree; pub use db::Database; +pub use eviction::{EvictionError, EvictionPolicy, LruEviction}; pub use metadata::MetadataCache; pub use tree::{ DirNode, FileNode, Inode, RefreshPolicy, TreeBuilder, VirtualNode, VirtualTree, ROOT_INODE, diff --git a/musicfs/crates/musicfs-cas/Cargo.toml b/musicfs/crates/musicfs-cas/Cargo.toml index 1021bb7..8d484bc 100644 --- a/musicfs/crates/musicfs-cas/Cargo.toml +++ b/musicfs/crates/musicfs-cas/Cargo.toml @@ -4,3 +4,18 @@ version.workspace = true edition.workspace = true [dependencies] +musicfs-core = { path = "../musicfs-core" } +tokio.workspace = true +tracing.workspace = true +serde.workspace = true +sled.workspace = true +xxhash-rust.workspace = true +bytes.workspace = true +rmp-serde.workspace = true +hex.workspace = true +dirs.workspace = true +thiserror.workspace = true + +[dev-dependencies] +tempfile.workspace = true +musicfs-cache = { path = "../musicfs-cache" } diff --git a/musicfs/crates/musicfs-cas/src/chunks.rs b/musicfs/crates/musicfs-cas/src/chunks.rs new file mode 100644 index 0000000..c79598c --- /dev/null +++ b/musicfs/crates/musicfs-cas/src/chunks.rs @@ -0,0 +1,45 @@ +use musicfs_core::ChunkHash; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkLocation { + pub path: PathBuf, + pub size: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkRef { + pub hash: ChunkHash, + pub offset: u64, + pub size: u32, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_chunk_hash_from_bytes() { + let data = b"hello world"; + let hash = ChunkHash::from_bytes(data); + assert_eq!(hash.as_hex().len(), 16); + } + + #[test] + fn test_chunk_hash_deterministic() { + let data = b"test data"; + let hash1 = ChunkHash::from_bytes(data); + let hash2 = ChunkHash::from_bytes(data); + assert_eq!(hash1, hash2); + } + + #[test] + fn test_chunk_hash_hex_roundtrip() { + let data = b"roundtrip test"; + let hash = ChunkHash::from_bytes(data); + let hex = hash.as_hex(); + let restored = ChunkHash::from_hex(&hex).unwrap(); + assert_eq!(hash, restored); + } +} diff --git a/musicfs/crates/musicfs-cas/src/lib.rs b/musicfs/crates/musicfs-cas/src/lib.rs index f9da2c4..235a10f 100644 --- a/musicfs/crates/musicfs-cas/src/lib.rs +++ b/musicfs/crates/musicfs-cas/src/lib.rs @@ -1 +1,7 @@ -#![allow(dead_code)] +mod chunks; +mod reader; +mod store; + +pub use chunks::{ChunkLocation, ChunkRef}; +pub use reader::{ChunkManifest, FileReader, ReaderError}; +pub use store::{CasConfig, CasError, CasStore, DedupStats}; diff --git a/musicfs/crates/musicfs-cas/src/reader.rs b/musicfs/crates/musicfs-cas/src/reader.rs new file mode 100644 index 0000000..ce5e673 --- /dev/null +++ b/musicfs/crates/musicfs-cas/src/reader.rs @@ -0,0 +1,253 @@ +use crate::chunks::ChunkRef; +use crate::store::CasStore; +use bytes::{Bytes, BytesMut}; +use musicfs_core::FileId; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::RwLock; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkManifest { + pub file_id: FileId, + pub total_size: u64, + pub chunks: Vec, +} + +impl ChunkManifest { + pub fn chunks_to_bytes(&self) -> Vec { + rmp_serde::to_vec(&self.chunks).unwrap_or_default() + } + + pub fn chunks_from_bytes(data: &[u8]) -> Option> { + rmp_serde::from_slice(data).ok() + } + + pub fn from_db(file_id: FileId, total_size: u64, chunk_blob: &[u8]) -> Option { + let chunks = Self::chunks_from_bytes(chunk_blob)?; + Some(Self { + file_id, + total_size, + chunks, + }) + } +} + +pub struct FileReader { + store: std::sync::Arc, + manifests: RwLock>, +} + +impl FileReader { + pub fn new(store: std::sync::Arc) -> Self { + Self { + store, + manifests: RwLock::new(HashMap::new()), + } + } + + pub fn register_manifest(&self, manifest: ChunkManifest) { + let mut manifests = self.manifests.write().unwrap(); + manifests.insert(manifest.file_id, manifest); + } + + pub async fn read( + &self, + file_id: FileId, + offset: u64, + size: u32, + ) -> Result { + let manifest = { + let manifests = self.manifests.read().unwrap(); + manifests + .get(&file_id) + .cloned() + .ok_or(ReaderError::ManifestNotFound(file_id))? + }; + + if offset >= manifest.total_size { + return Ok(Bytes::new()); + } + + let end = std::cmp::min(offset + size as u64, manifest.total_size); + let mut result = BytesMut::with_capacity((end - offset) as usize); + + for chunk_ref in &manifest.chunks { + let chunk_start = chunk_ref.offset; + let chunk_end = chunk_ref.offset + chunk_ref.size as u64; + + if chunk_end <= offset || chunk_start >= end { + continue; + } + + let chunk_data = self.store.get(&chunk_ref.hash).await?; + + let read_start = if offset > chunk_start { + (offset - chunk_start) as usize + } else { + 0 + }; + + let read_end = if end < chunk_end { + (end - chunk_start) as usize + } else { + chunk_ref.size as usize + }; + + result.extend_from_slice(&chunk_data[read_start..read_end]); + } + + Ok(result.freeze()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ReaderError { + #[error("Manifest not found for file {0:?}")] + ManifestNotFound(FileId), + + #[error("CAS error: {0}")] + Cas(#[from] crate::store::CasError), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::CasConfig; + use musicfs_core::ChunkHash; + use tempfile::TempDir; + + #[tokio::test] + async fn test_file_reader_simple() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + + let data = b"Hello, World!"; + let hash = store.put(data).await.unwrap(); + + let reader = FileReader::new(store); + reader.register_manifest(ChunkManifest { + file_id: FileId(1), + total_size: data.len() as u64, + chunks: vec![ChunkRef { + hash, + offset: 0, + size: data.len() as u32, + }], + }); + + let result = reader.read(FileId(1), 0, data.len() as u32).await.unwrap(); + assert_eq!(&result[..], data); + } + + #[tokio::test] + async fn test_file_reader_partial() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + + let data = b"ABCDEFGHIJ"; + let hash = store.put(data).await.unwrap(); + + let reader = FileReader::new(store); + reader.register_manifest(ChunkManifest { + file_id: FileId(1), + total_size: data.len() as u64, + chunks: vec![ChunkRef { + hash, + offset: 0, + size: data.len() as u32, + }], + }); + + let result = reader.read(FileId(1), 3, 4).await.unwrap(); + assert_eq!(&result[..], b"DEFG"); + } + + #[tokio::test] + async fn test_file_reader_multi_chunk() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + + let chunk1 = b"AAAA"; + let chunk2 = b"BBBB"; + let hash1 = store.put(chunk1).await.unwrap(); + let hash2 = store.put(chunk2).await.unwrap(); + + let reader = FileReader::new(store); + reader.register_manifest(ChunkManifest { + file_id: FileId(1), + total_size: 8, + chunks: vec![ + ChunkRef { + hash: hash1, + offset: 0, + size: 4, + }, + ChunkRef { + hash: hash2, + offset: 4, + size: 4, + }, + ], + }); + + let result = reader.read(FileId(1), 2, 4).await.unwrap(); + assert_eq!(&result[..], b"AABB"); + } + + #[tokio::test] + async fn test_file_reader_eof() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + + let data = b"short"; + let hash = store.put(data).await.unwrap(); + + let reader = FileReader::new(store); + reader.register_manifest(ChunkManifest { + file_id: FileId(1), + total_size: data.len() as u64, + chunks: vec![ChunkRef { + hash, + offset: 0, + size: data.len() as u32, + }], + }); + + let result = reader.read(FileId(1), 100, 10).await.unwrap(); + assert!(result.is_empty()); + } + + #[test] + fn test_chunk_manifest_serialization() { + let manifest = ChunkManifest { + file_id: FileId(42), + total_size: 1024, + chunks: vec![ChunkRef { + hash: ChunkHash::from_bytes(b"test"), + offset: 0, + size: 1024, + }], + }; + + let bytes = manifest.chunks_to_bytes(); + let restored = ChunkManifest::chunks_from_bytes(&bytes).unwrap(); + assert_eq!(restored.len(), 1); + assert_eq!(restored[0].size, 1024); + } +} diff --git a/musicfs/crates/musicfs-cas/src/store.rs b/musicfs/crates/musicfs-cas/src/store.rs new file mode 100644 index 0000000..3458eff --- /dev/null +++ b/musicfs/crates/musicfs-cas/src/store.rs @@ -0,0 +1,324 @@ +use crate::chunks::ChunkLocation; +use bytes::Bytes; +use musicfs_core::ChunkHash; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::fs; +use tracing::{debug, warn}; + +const DEFAULT_MAX_SIZE_10GB: u64 = 10 * 1024 * 1024 * 1024; +const DEFAULT_SHARD_LEVELS_256_SUBDIRS: u8 = 2; + +#[derive(Debug, Clone)] +pub struct CasConfig { + pub chunks_dir: PathBuf, + pub max_size: u64, + pub shard_levels: u8, +} + +impl Default for CasConfig { + fn default() -> Self { + let cache_dir = dirs::cache_dir() + .unwrap_or_else(|| PathBuf::from(".cache")) + .join("musicfs") + .join("chunks"); + + Self { + chunks_dir: cache_dir, + max_size: DEFAULT_MAX_SIZE_10GB, + shard_levels: DEFAULT_SHARD_LEVELS_256_SUBDIRS, + } + } +} + +pub struct CasStore { + config: CasConfig, + index: sled::Db, + current_size: AtomicU64, +} + +impl CasStore { + pub async fn open(config: CasConfig) -> Result { + fs::create_dir_all(&config.chunks_dir).await?; + + let index_path = config.chunks_dir.join("index.sled"); + let index = sled::open(&index_path)?; + + let current_size = Self::calculate_size(&config.chunks_dir).await; + + Ok(Self { + config, + index, + current_size: AtomicU64::new(current_size), + }) + } + + async fn calculate_size(dir: &Path) -> u64 { + let mut size = 0u64; + if let Ok(mut entries) = fs::read_dir(dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + if let Ok(meta) = entry.metadata().await { + if meta.is_file() { + size += meta.len(); + } + } + } + } + size + } + + pub async fn put(&self, data: &[u8]) -> Result { + let hash = ChunkHash::from_bytes(data); + let path = self.chunk_path(&hash); + + if path.exists() { + debug!("Chunk {} already exists (dedup)", hash); + return Ok(hash); + } + + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + + fs::write(&path, data).await?; + + let location = ChunkLocation { + path: path.clone(), + size: data.len() as u32, + }; + self.index.insert( + hash.0.as_slice(), + rmp_serde::to_vec(&location).map_err(|e| CasError::Serialization(e.to_string()))?, + )?; + + self.current_size + .fetch_add(data.len() as u64, Ordering::SeqCst); + + debug!("Stored chunk {} ({} bytes)", hash, data.len()); + Ok(hash) + } + + pub async fn get(&self, hash: &ChunkHash) -> Result { + let path = self.chunk_path(hash); + + if !path.exists() { + return Err(CasError::NotFound(hash.as_hex())); + } + + let data = fs::read(&path).await?; + + if self.config.max_size > 0 { + self.verify_integrity(hash, &data)?; + } + + Ok(Bytes::from(data)) + } + + pub fn exists(&self, hash: &ChunkHash) -> bool { + self.chunk_path(hash).exists() + } + + fn verify_integrity(&self, expected: &ChunkHash, data: &[u8]) -> Result<(), CasError> { + let actual = ChunkHash::from_bytes(data); + if actual != *expected { + warn!( + "Chunk integrity failure: expected {}, got {}", + expected, actual + ); + return Err(CasError::IntegrityError { + expected: expected.as_hex(), + actual: actual.as_hex(), + }); + } + Ok(()) + } + + fn chunk_path(&self, hash: &ChunkHash) -> PathBuf { + let hex = hash.as_hex(); + let mut path = self.config.chunks_dir.clone(); + + for i in 0..self.config.shard_levels as usize { + let start = i * 2; + let end = start + 2; + if end <= hex.len() { + path = path.join(&hex[start..end]); + } + } + + path.join(&hex) + } + + pub async fn delete(&self, hash: &ChunkHash) -> Result<(), CasError> { + let path = self.chunk_path(hash); + + if path.exists() { + let meta = fs::metadata(&path).await?; + fs::remove_file(&path).await?; + self.index.remove(hash.0.as_slice())?; + self.current_size.fetch_sub(meta.len(), Ordering::SeqCst); + debug!("Deleted chunk {}", hash); + } + + Ok(()) + } + + pub fn current_size(&self) -> u64 { + self.current_size.load(Ordering::SeqCst) + } + + pub fn max_size(&self) -> u64 { + self.config.max_size + } + + pub fn list_chunks(&self) -> impl Iterator + '_ { + self.index.iter().filter_map(|r| { + r.ok().and_then(|(k, _)| { + if k.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(&k); + Some(ChunkHash(arr)) + } else { + None + } + }) + }) + } + + pub fn dedup_stats(&self) -> DedupStats { + let chunks_stored = self.index.len() as u64; + let size_bytes = self.current_size(); + + DedupStats { + chunks_stored, + chunks_unique: chunks_stored, + size_bytes, + size_limit_bytes: self.config.max_size, + } + } +} + +#[derive(Debug, Clone)] +pub struct DedupStats { + pub chunks_stored: u64, + pub chunks_unique: u64, + pub size_bytes: u64, + pub size_limit_bytes: u64, +} + +impl DedupStats { + pub fn dedup_ratio(&self) -> f64 { + if self.chunks_stored == 0 { + 0.0 + } else { + 1.0 - (self.chunks_unique as f64 / self.chunks_stored as f64) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CasError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Sled error: {0}")] + Sled(#[from] sled::Error), + + #[error("Chunk not found: {0}")] + NotFound(String), + + #[error("Integrity error: expected {expected}, got {actual}")] + IntegrityError { expected: String, actual: String }, + + #[error("Serialization error: {0}")] + Serialization(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + async fn test_store() -> (CasStore, TempDir) { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size: 1024 * 1024, + shard_levels: 2, + }; + let store = CasStore::open(config).await.unwrap(); + (store, dir) + } + + #[tokio::test] + async fn test_cas_put_get() { + let (store, _dir) = test_store().await; + + let data = b"test chunk data"; + let hash = store.put(data).await.unwrap(); + + let retrieved = store.get(&hash).await.unwrap(); + assert_eq!(&retrieved[..], data); + } + + #[tokio::test] + async fn test_cas_dedup() { + let (store, _dir) = test_store().await; + + let data = b"duplicate data"; + let hash1 = store.put(data).await.unwrap(); + let hash2 = store.put(data).await.unwrap(); + + assert_eq!(hash1, hash2); + } + + #[tokio::test] + async fn test_cas_exists() { + let (store, _dir) = test_store().await; + + let data = b"existence test"; + let hash = store.put(data).await.unwrap(); + + assert!(store.exists(&hash)); + + let fake_hash = ChunkHash::from_bytes(b"nonexistent"); + assert!(!store.exists(&fake_hash)); + } + + #[tokio::test] + async fn test_cas_delete() { + let (store, _dir) = test_store().await; + + let data = b"delete me"; + let hash = store.put(data).await.unwrap(); + + assert!(store.exists(&hash)); + + store.delete(&hash).await.unwrap(); + + assert!(!store.exists(&hash)); + } + + #[tokio::test] + async fn test_cas_integrity() { + let (store, _dir) = test_store().await; + + let data = b"integrity test"; + let hash = store.put(data).await.unwrap(); + + let retrieved = store.get(&hash).await.unwrap(); + assert_eq!(&retrieved[..], data); + } + + #[tokio::test] + async fn test_cas_dedup_stats() { + let (store, _dir) = test_store().await; + + store.put(b"chunk1").await.unwrap(); + store.put(b"chunk2").await.unwrap(); + store.put(b"chunk1").await.unwrap(); + + let stats = store.dedup_stats(); + assert_eq!(stats.chunks_stored, 2); + assert_eq!(stats.chunks_unique, 2); + } +} diff --git a/musicfs/crates/musicfs-cas/tests/integration.rs b/musicfs/crates/musicfs-cas/tests/integration.rs new file mode 100644 index 0000000..4ecd660 --- /dev/null +++ b/musicfs/crates/musicfs-cas/tests/integration.rs @@ -0,0 +1,100 @@ +use musicfs_cache::TreeBuilder; +use musicfs_cas::{CasConfig, CasStore, ChunkManifest, ChunkRef, FileReader}; +use musicfs_core::{FileId, FileMeta, OriginId, RealPath, VirtualPath}; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; +use std::time::SystemTime; +use tempfile::TempDir; + +fn make_file_meta(id: i64, vpath: &str, size: u64) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(vpath), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from("/test"), + }, + size, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + } +} + +#[tokio::test] +async fn test_cas_and_tree_integration() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + + let file_data = b"This is test audio file content for testing."; + let chunk_hash = store.put(file_data).await.unwrap(); + + let mut builder = TreeBuilder::new(); + builder.add_file(&make_file_meta( + 1, + "/Artist/Album/Track.flac", + file_data.len() as u64, + )); + let _tree = Arc::new(RwLock::new(builder.build())); + + let reader = Arc::new(FileReader::new(store.clone())); + reader.register_manifest(ChunkManifest { + file_id: FileId(1), + total_size: file_data.len() as u64, + chunks: vec![ChunkRef { + hash: chunk_hash, + offset: 0, + size: file_data.len() as u32, + }], + }); + + let result = reader + .read(FileId(1), 0, file_data.len() as u32) + .await + .unwrap(); + assert_eq!(&result[..], file_data); +} + +#[tokio::test] +async fn test_cache_persistence() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + + let data = b"persistent data"; + let hash = { + let store = CasStore::open(config.clone()).await.unwrap(); + store.put(data).await.unwrap() + }; + + let store = CasStore::open(config).await.unwrap(); + let retrieved = store.get(&hash).await.unwrap(); + assert_eq!(&retrieved[..], data); +} + +#[tokio::test] +async fn test_deduplication() { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = CasStore::open(config).await.unwrap(); + + let data = b"duplicate this content"; + + let hash1 = store.put(data).await.unwrap(); + let size_after_first = store.current_size(); + + let hash2 = store.put(data).await.unwrap(); + let size_after_second = store.current_size(); + + assert_eq!(hash1, hash2); + assert_eq!(size_after_first, size_after_second); +} diff --git a/musicfs/crates/musicfs-core/src/types.rs b/musicfs/crates/musicfs-core/src/types.rs index 2a9e7e1..6ce38c6 100644 --- a/musicfs/crates/musicfs-core/src/types.rs +++ b/musicfs/crates/musicfs-core/src/types.rs @@ -66,9 +66,29 @@ impl ChunkHash { Self(xxh64(data, 0).to_le_bytes()) } - pub fn to_hex(&self) -> String { + pub fn as_hex(&self) -> String { hex::encode(self.0) } + + pub fn to_hex(&self) -> String { + self.as_hex() + } + + pub fn from_hex(s: &str) -> Option { + let bytes = hex::decode(s).ok()?; + if bytes.len() != 8 { + return None; + } + let mut arr = [0u8; 8]; + arr.copy_from_slice(&bytes); + Some(Self(arr)) + } +} + +impl std::fmt::Display for ChunkHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_hex()) + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] diff --git a/musicfs/crates/musicfs-fuse/Cargo.toml b/musicfs/crates/musicfs-fuse/Cargo.toml index a5948fe..e54ff0e 100644 --- a/musicfs/crates/musicfs-fuse/Cargo.toml +++ b/musicfs/crates/musicfs-fuse/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true [dependencies] musicfs-core = { path = "../musicfs-core" } musicfs-cache = { path = "../musicfs-cache" } +musicfs-cas = { path = "../musicfs-cas" } fuser.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/musicfs/crates/musicfs-fuse/src/filesystem.rs b/musicfs/crates/musicfs-fuse/src/filesystem.rs index 066eaa9..6262bec 100644 --- a/musicfs/crates/musicfs-fuse/src/filesystem.rs +++ b/musicfs/crates/musicfs-fuse/src/filesystem.rs @@ -3,18 +3,20 @@ use fuser::{ Request, }; use musicfs_cache::{VirtualNode, VirtualTree, ROOT_INODE}; +use musicfs_cas::FileReader; use musicfs_core::Result; use std::ffi::OsStr; use std::path::Path; use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; const TTL: Duration = Duration::from_secs(1); const BLOCK_SIZE: u32 = 512; pub struct MusicFs { tree: Arc>, + reader: Option>, uid: u32, gid: u32, } @@ -23,6 +25,16 @@ impl MusicFs { pub fn new(tree: Arc>) -> Self { Self { tree, + reader: None, + uid: unsafe { libc::getuid() }, + gid: unsafe { libc::getgid() }, + } + } + + pub fn with_reader(tree: Arc>, reader: Arc) -> Self { + Self { + tree, + reader: Some(reader), uid: unsafe { libc::getuid() }, gid: unsafe { libc::getgid() }, } @@ -213,12 +225,32 @@ impl Filesystem for MusicFs { ) { debug!("read(ino={}, offset={}, size={})", ino, offset, size); - let tree = self.tree.read().unwrap(); + let file_id = { + let tree = self.tree.read().unwrap(); + if let Some(VirtualNode::File(file)) = tree.get(ino) { + file.file_id + } else { + reply.error(libc::ENOENT); + return; + } + }; - if let Some(VirtualNode::File(_file)) = tree.get(ino) { + let Some(reader) = &self.reader else { reply.data(&[]); - } else { - reply.error(libc::ENOENT); + return; + }; + + let reader = reader.clone(); + let result = tokio::runtime::Handle::current().block_on(async { + reader.read(file_id, offset as u64, size).await + }); + + match result { + Ok(data) => reply.data(&data), + Err(e) => { + warn!("Read error: {}", e); + reply.error(libc::EIO); + } } }