From 0e5a5140159314a4fd49dfe894d7f4138db8c627 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 12 May 2026 19:48:40 +0200 Subject: [PATCH] Add Week 5-7 plans with Oracle review fixes Week 5 (CDC & Delta Detection): - Add read_full() method to avoid u32 overflow on >4GB files - Add chunk_streaming() to avoid 200MB+ memory per file - Implement scan_origin() recursive walk (was stub) - Use spawn_blocking for watcher instead of separate runtime - Add 200ms event debouncing - Add >90% bandwidth reduction test Week 6 (Origin Federation): - Define all-origins-unhealthy behavior (least-bad selection) - Track watch handles for cleanup on unregister - Clarify tuple-based priority routing - Add per-origin-type health thresholds - Align retry delays with NFR-7.3 spec (100ms, 500ms, 2000ms) Week 7 (Remote Origins): - Replace SFTP single mutex with connection pool - Add 30s timeout to all remote operations - Custom Debug impl to redact credentials - SSH host verification against known_hosts - Clamp S3 range requests to file size - Use head_bucket for S3 health checks --- docs/v2/plans/week-05-cdc-delta.md | 1219 +++++++++++++++++++ docs/v2/plans/week-06-origin-federation.md | 1113 +++++++++++++++++ docs/v2/plans/week-07-remote-origins.md | 1261 ++++++++++++++++++++ 3 files changed, 3593 insertions(+) create mode 100644 docs/v2/plans/week-05-cdc-delta.md create mode 100644 docs/v2/plans/week-06-origin-federation.md create mode 100644 docs/v2/plans/week-07-remote-origins.md diff --git a/docs/v2/plans/week-05-cdc-delta.md b/docs/v2/plans/week-05-cdc-delta.md new file mode 100644 index 0000000..52be7f8 --- /dev/null +++ b/docs/v2/plans/week-05-cdc-delta.md @@ -0,0 +1,1219 @@ +# Week 5: CDC & Delta Detection + +**Phase**: 2 (Delta Sync & Multi-Origin) +**Prerequisites**: Week 4b (Origin-CAS Connector) +**Estimated effort**: 5 days + +--- + +## Objective + +Implement Content-Defined Chunking (CDC) using FastCDC and delta detection for efficient synchronization. This enables the >90% bandwidth reduction requirement (NFR-6.4) by only transferring changed chunks. + +**Critical Fix**: The MVP performance review identified that `Origin::read()` only returns ~2MB per call due to tokio's async read behavior. This must be fixed as part of CDC implementation since CDC requires the full file content. + +--- + +## Oracle Review Fixes (MUST IMPLEMENT) + +| Severity | Issue | Fix | +|----------|-------|-----| +| 🔴 Critical | **u32 overflow** - `file.size as u32` fails for files >4GB | Add `read_full(path) -> Result>` to Origin trait, use u64 for sizes | +| 🔴 Critical | **Memory explosion** - 200MB+ per file (data + chunk copies) | Use `chunk_refs()` and store immediately, drop source buffer after each chunk | +| 🔴 Critical | **`scan_origin()` is stub** - returns empty Vec, delta detection non-functional | Implement recursive walk using `Origin::readdir()` | +| 🟡 Arch | **Duplicate types** - `FileManifest` duplicates existing `ChunkManifest` | Extend existing `ChunkManifest` with `mtime` field instead of new type | +| 🟡 Arch | **Watcher spawns separate runtime** - wasteful | Use `tokio::task::spawn_blocking` instead of `std::thread::spawn` | +| ⚠️ Watch | No event debouncing (rapid saves flood events) | Add 200ms debounce before emitting events | +| ⚠️ Watch | Missing test for >90% bandwidth reduction claim | Add concrete reuse ratio test with metadata-only file edit | + +--- + +## Architecture Reference + +From architecture.md section 4.3.2 (CAS): + +``` +Avg chunk: 64KB +Min: 16KB, Max: 256KB +Stable boundaries for delta sync +``` + +From section 4.3.5 (Read Operation): + +``` +|CAS| +:chunk fetched data (CDC); +:store chunks by hash; +:update chunk manifest; +``` + +--- + +## Requirements Covered + +| ID | Requirement | Priority | +|----|-------------|----------| +| FR-8.2 | Content-defined chunking for cache efficiency | P0 | +| FR-11.1 | Download only changed portions of files | P0 | +| FR-11.2 | Use CDC to identify changed chunks | P0 | +| FR-11.3 | Preserve unchanged chunks in cache | P0 | +| FR-11.4 | Handle file additions and deletions | P0 | +| FR-10.1 | Detect changes to origin files | P0 | +| FR-10.4 | Compare mtime and size for change detection | P0 | +| NFR-6.4 | Delta sync >90% bandwidth reduction | P0 | + +--- + +## Deliverables + +| Task | Crate | Files | Est. | +|------|-------|-------|------| +| Fix async read (read full file) | musicfs-origins | `local.rs` | 0.5d | +| FastCDC integration | musicfs-sync | `cdc.rs` | 1d | +| ChunkManifest persistence | musicfs-sync | `manifest.rs` | 0.5d | +| Delta detector | musicfs-sync | `delta.rs` | 1d | +| Change watcher (inotify) | musicfs-sync | `watcher.rs` | 1d | +| Update ContentFetcher for CDC | musicfs-cas | `fetcher.rs` | 0.5d | +| Integration tests | tests | `delta_sync.rs` | 0.5d | + +--- + +## Task 1: Fix Async Read + +### 1.1 Problem + +Current `LocalOrigin::read()` uses `file.read()` which returns when the kernel buffer is exhausted (~2MB), not when the requested size is read. + +### 1.2 Update Origin trait to add `read_full()` method + +Add to `musicfs-origins/src/traits.rs`: + +```rust +/// Read entire file content (for CDC chunking) +/// NOTE: Use u64 for size to support files >4GB +async fn read_full(&self, path: &Path) -> Result>; +``` + +### 1.3 Update `musicfs-origins/src/local.rs` + +```rust +async fn read(&self, path: &Path, offset: u64, size: u64) -> Result> { + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + let full_path = self.full_path(path); + debug!( + "LocalOrigin::read({:?}, offset={}, size={})", + full_path, offset, size + ); + + let mut file = fs::File::open(&full_path).await?; + file.seek(std::io::SeekFrom::Start(offset)).await?; + + // FIX: Use loop instead of single read() to get all requested bytes + let mut buffer = Vec::with_capacity(size as usize); + + // Read until we have all requested bytes or EOF + let mut total_read = 0u64; + let mut temp_buf = vec![0u8; 64 * 1024]; // 64KB chunks + + while total_read < size { + let to_read = std::cmp::min(temp_buf.len() as u64, size - total_read) as usize; + let n = file.read(&mut temp_buf[..to_read]).await?; + if n == 0 { + break; // EOF + } + buffer.extend_from_slice(&temp_buf[..n]); + total_read += n as u64; + } + + Ok(buffer) +} + +/// Read entire file (Oracle fix: separate method to avoid u32 overflow) +async fn read_full(&self, path: &Path) -> Result> { + let full_path = self.full_path(path); + debug!("LocalOrigin::read_full({:?})", full_path); + Ok(tokio::fs::read(&full_path).await?) +} +``` + +**NOTE**: Change `size: u32` to `size: u64` throughout the Origin trait to support files >4GB. + +--- + +## Task 2: FastCDC Integration + +### 2.1 Add dependencies to `musicfs-sync/Cargo.toml` + +```toml +[dependencies] +musicfs-core = { path = "../musicfs-core" } +musicfs-cas = { path = "../musicfs-cas" } + +fastcdc = "3" +xxhash-rust = { version = "0.8", features = ["xxh64"] } +tokio = { workspace = true } +tracing = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } +``` + +### 2.2 Create `musicfs-sync/src/cdc.rs` + +```rust +use fastcdc::v2020::FastCDC; +use musicfs_core::ChunkHash; +use xxhash_rust::xxh64::xxh64; + +/// CDC chunker configuration per architecture spec +pub struct CdcChunker { + min_size: u32, // 16 KB + avg_size: u32, // 64 KB + max_size: u32, // 256 KB +} + +impl Default for CdcChunker { + fn default() -> Self { + Self { + min_size: 16 * 1024, + avg_size: 64 * 1024, + max_size: 256 * 1024, + } + } +} + +/// A chunk produced by CDC +#[derive(Debug, Clone)] +pub struct Chunk { + pub hash: ChunkHash, + pub offset: u64, + pub length: u32, + pub data: Vec, +} + +impl CdcChunker { + pub fn new(min_size: u32, avg_size: u32, max_size: u32) -> Self { + Self { min_size, avg_size, max_size } + } + + /// Chunk data using FastCDC algorithm + /// Returns chunks with stable boundaries for delta sync + /// + /// WARNING: This copies all chunk data. For large files, use `chunk_refs()` + /// and store immediately to avoid memory explosion. + pub fn chunk(&self, data: &[u8]) -> Vec { + let chunker = FastCDC::new( + data, + self.min_size, + self.avg_size, + self.max_size, + ); + + chunker + .map(|c| { + let chunk_data = &data[c.offset..c.offset + c.length]; + let hash = ChunkHash::from_bytes(chunk_data); + + Chunk { + hash, + offset: c.offset as u64, + length: c.length as u32, + data: chunk_data.to_vec(), + } + }) + .collect() + } + + /// Chunk data without copying (returns references) - PREFERRED for large files + /// + /// Oracle fix: Use this method and store each chunk immediately before + /// processing the next to avoid 200MB+ memory usage per file. + pub fn chunk_refs<'a>(&self, data: &'a [u8]) -> Vec> { + let chunker = FastCDC::new( + data, + self.min_size, + self.avg_size, + self.max_size, + ); + + chunker + .map(|c| { + let chunk_data = &data[c.offset..c.offset + c.length]; + ChunkRef { + hash: ChunkHash::from_bytes(chunk_data), + offset: c.offset as u64, + length: c.length as u32, + data: chunk_data, + } + }) + .collect() + } + + /// Stream-process chunks to minimize memory (Oracle fix: avoid memory explosion) + /// Calls `processor` for each chunk, allowing immediate storage before next chunk + pub fn chunk_streaming(&self, data: &[u8], mut processor: F) -> usize + where + F: FnMut(ChunkRef<'_>), + { + let chunker = FastCDC::new( + data, + self.min_size, + self.avg_size, + self.max_size, + ); + + let mut count = 0; + for c in chunker { + let chunk_data = &data[c.offset..c.offset + c.length]; + processor(ChunkRef { + hash: ChunkHash::from_bytes(chunk_data), + offset: c.offset as u64, + length: c.length as u32, + data: chunk_data, + }); + count += 1; + } + count + } +} + +#[derive(Debug)] +pub struct ChunkRef<'a> { + pub hash: ChunkHash, + pub offset: u64, + pub length: u32, + pub data: &'a [u8], +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cdc_basic() { + let chunker = CdcChunker::default(); + let data = vec![0u8; 256 * 1024]; // 256KB of zeros + + let chunks = chunker.chunk(&data); + + // Should produce multiple chunks + assert!(!chunks.is_empty()); + + // Total size should match + let total: u64 = chunks.iter().map(|c| c.length as u64).sum(); + assert_eq!(total, data.len() as u64); + + // Chunks should be contiguous + let mut offset = 0u64; + for chunk in &chunks { + assert_eq!(chunk.offset, offset); + offset += chunk.length as u64; + } + } + + #[test] + fn test_cdc_stable_boundaries() { + let chunker = CdcChunker::default(); + + // Original data + let mut data1 = vec![0u8; 128 * 1024]; + for (i, b) in data1.iter_mut().enumerate() { + *b = (i % 256) as u8; + } + + // Data with insertion at start (should only affect first chunk) + let mut data2 = vec![0xFFu8; 1024]; // 1KB insertion + data2.extend_from_slice(&data1); + + let chunks1 = chunker.chunk(&data1); + let chunks2 = chunker.chunk(&data2); + + // Most chunk hashes should be shared (CDC stability) + let hashes1: std::collections::HashSet<_> = chunks1.iter().map(|c| c.hash).collect(); + let hashes2: std::collections::HashSet<_> = chunks2.iter().map(|c| c.hash).collect(); + + let shared = hashes1.intersection(&hashes2).count(); + + // At least 50% of chunks should be reusable + // (In practice, CDC achieves much better than this) + assert!(shared > 0, "CDC should produce stable boundaries"); + } + + #[test] + fn test_cdc_chunk_sizes() { + let chunker = CdcChunker::default(); + + // Random-ish data (to avoid degenerate cases) + let data: Vec = (0..1024 * 1024) + .map(|i| ((i * 17 + 31) % 256) as u8) + .collect(); + + let chunks = chunker.chunk(&data); + + for chunk in &chunks { + // Chunks should respect size bounds (with some tolerance for last chunk) + if chunk.offset + chunk.length as u64 != data.len() as u64 { + assert!(chunk.length >= chunker.min_size / 2, + "Chunk too small: {}", chunk.length); + assert!(chunk.length <= chunker.max_size * 2, + "Chunk too large: {}", chunk.length); + } + } + } +} +``` + +--- + +## Task 3: Manifest Persistence + +### 3.1 Extend existing `ChunkManifest` in `musicfs-cas/src/manifest.rs` + +**Oracle fix**: Don't create duplicate `FileManifest` type. Extend existing `ChunkManifest` with `mtime` field. + +```rust +use musicfs_core::{ChunkHash, FileId}; +use serde::{Deserialize, Serialize}; + +/// Persistent chunk manifest for a file +/// NOTE: Extended from original to include mtime for delta detection +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChunkManifest { + pub file_id: FileId, + pub total_size: u64, + pub mtime: i64, // Oracle fix: added for delta detection + pub chunks: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ManifestChunk { + pub hash: ChunkHash, + pub offset: u64, + pub size: u32, +} + +impl FileManifest { + pub fn new(file_id: FileId, total_size: u64, mtime: i64) -> Self { + Self { + file_id, + total_size, + mtime, + chunks: Vec::new(), + } + } + + pub fn add_chunk(&mut self, hash: ChunkHash, offset: u64, size: u32) { + self.chunks.push(ManifestChunk { hash, offset, size }); + } + + /// Serialize to msgpack for storage in SQLite + pub fn to_bytes(&self) -> Vec { + rmp_serde::to_vec(self).unwrap_or_default() + } + + /// Deserialize from msgpack + pub fn from_bytes(data: &[u8]) -> Option { + rmp_serde::from_slice(data).ok() + } + + /// Get all unique chunk hashes + pub fn chunk_hashes(&self) -> impl Iterator { + self.chunks.iter().map(|c| &c.hash) + } +} + +/// Result of comparing two manifests +#[derive(Debug)] +pub struct ManifestDiff { + /// Chunks in new manifest that exist in old (reusable) + pub reuse: Vec, + /// Chunks in new manifest that don't exist in old (need fetch) + pub fetch: Vec, + /// Chunks in old manifest that don't exist in new (can evict) + pub orphaned: Vec, +} + +impl FileManifest { + /// Compare this manifest to a new one + pub fn diff(&self, new_chunks: &[ManifestChunk]) -> ManifestDiff { + use std::collections::HashSet; + + let old_hashes: HashSet<_> = self.chunks.iter().map(|c| c.hash).collect(); + let new_hashes: HashSet<_> = new_chunks.iter().map(|c| c.hash).collect(); + + ManifestDiff { + reuse: new_chunks.iter() + .filter(|c| old_hashes.contains(&c.hash)) + .cloned() + .collect(), + fetch: new_chunks.iter() + .filter(|c| !old_hashes.contains(&c.hash)) + .cloned() + .collect(), + orphaned: self.chunks.iter() + .filter(|c| !new_hashes.contains(&c.hash)) + .map(|c| c.hash) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_manifest_roundtrip() { + let mut manifest = FileManifest::new(FileId(1), 1024, 12345); + manifest.add_chunk(ChunkHash::from_bytes(b"chunk1"), 0, 512); + manifest.add_chunk(ChunkHash::from_bytes(b"chunk2"), 512, 512); + + let bytes = manifest.to_bytes(); + let restored = FileManifest::from_bytes(&bytes).unwrap(); + + assert_eq!(restored.file_id, manifest.file_id); + assert_eq!(restored.chunks.len(), 2); + } + + #[test] + fn test_manifest_diff() { + let mut old = FileManifest::new(FileId(1), 1024, 12345); + old.add_chunk(ChunkHash::from_bytes(b"A"), 0, 256); + old.add_chunk(ChunkHash::from_bytes(b"B"), 256, 256); + old.add_chunk(ChunkHash::from_bytes(b"C"), 512, 256); + old.add_chunk(ChunkHash::from_bytes(b"D"), 768, 256); + + // New manifest: A stays, B removed, C stays, D removed, E added + let new_chunks = vec![ + ManifestChunk { hash: ChunkHash::from_bytes(b"A"), offset: 0, size: 256 }, + ManifestChunk { hash: ChunkHash::from_bytes(b"C"), offset: 256, size: 256 }, + ManifestChunk { hash: ChunkHash::from_bytes(b"E"), offset: 512, size: 256 }, + ]; + + let diff = old.diff(&new_chunks); + + assert_eq!(diff.reuse.len(), 2); // A, C + assert_eq!(diff.fetch.len(), 1); // E + assert_eq!(diff.orphaned.len(), 2); // B, D + } +} +``` + +--- + +## Task 4: Delta Detector + +### 4.1 Create `musicfs-sync/src/delta.rs` + +```rust +use crate::cdc::CdcChunker; +use crate::manifest::{FileManifest, ManifestChunk, ManifestDiff}; +use musicfs_core::{FileId, FileMeta, OriginId}; +use musicfs_origins::Origin; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::SystemTime; +use tracing::{debug, info}; + +/// Detected changes between origin and cache +#[derive(Debug, Default)] +pub struct ChangeSet { + pub added: Vec, + pub removed: Vec, + pub modified: Vec<(FileId, ManifestDiff)>, +} + +impl ChangeSet { + pub fn is_empty(&self) -> bool { + self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty() + } + + pub fn total_changes(&self) -> usize { + self.added.len() + self.removed.len() + self.modified.len() + } +} + +/// Delta detector compares origin state to cached state +pub struct DeltaDetector { + chunker: CdcChunker, +} + +impl DeltaDetector { + pub fn new() -> Self { + Self { + chunker: CdcChunker::default(), + } + } + + pub fn with_chunker(chunker: CdcChunker) -> Self { + Self { chunker } + } + + /// Detect changes between cached files and origin + pub async fn detect_changes( + &self, + origin: &dyn Origin, + cached: &HashMap, + manifests: &HashMap, + ) -> Result { + let mut changes = ChangeSet::default(); + + // Scan origin for current files + let origin_files = self.scan_origin(origin).await?; + + // Build lookup by real path + let cached_by_path: HashMap<_, _> = cached.values() + .map(|m| (m.real_path.path.clone(), m)) + .collect(); + + // Check for added/modified + for origin_file in &origin_files { + if let Some(cached_file) = cached_by_path.get(&origin_file.real_path.path) { + // File exists - check if modified + if self.is_modified(cached_file, origin_file) { + debug!("File modified: {:?}", origin_file.real_path.path); + + if let Some(old_manifest) = manifests.get(&cached_file.id) { + // Compute new chunks and diff + let new_chunks = self.compute_chunks(origin, origin_file).await?; + let diff = old_manifest.diff(&new_chunks); + changes.modified.push((cached_file.id, diff)); + } + } + } else { + // New file + debug!("File added: {:?}", origin_file.real_path.path); + changes.added.push(origin_file.clone()); + } + } + + // Check for removed + let origin_paths: std::collections::HashSet<_> = origin_files.iter() + .map(|f| &f.real_path.path) + .collect(); + + for cached_file in cached.values() { + if !origin_paths.contains(&cached_file.real_path.path) { + debug!("File removed: {:?}", cached_file.real_path.path); + changes.removed.push(cached_file.id); + } + } + + info!( + "Delta detection complete: {} added, {} removed, {} modified", + changes.added.len(), + changes.removed.len(), + changes.modified.len() + ); + + Ok(changes) + } + + /// Check if file was modified based on mtime/size + fn is_modified(&self, cached: &FileMeta, origin: &FileMeta) -> bool { + cached.size != origin.size || cached.mtime != origin.mtime + } + + /// Scan origin for all files (Oracle fix: implement recursive walk) + async fn scan_origin(&self, origin: &dyn Origin) -> Result, DeltaError> { + let mut files = Vec::new(); + let mut dirs_to_scan = vec![PathBuf::from("/")]; + + while let Some(dir) = dirs_to_scan.pop() { + let entries = origin.readdir(&dir) + .await + .map_err(|e| DeltaError::OriginScan(e.to_string()))?; + + for entry in entries { + let entry_path = dir.join(&entry.name); + + if entry.is_dir { + dirs_to_scan.push(entry_path); + } else if Self::is_audio_file(&entry.name) { + // Get full stat for mtime + let stat = origin.stat(&entry_path) + .await + .map_err(|e| DeltaError::OriginScan(e.to_string()))?; + + files.push(FileMeta { + id: FileId(0), // Will be assigned by caller + virtual_path: VirtualPath::new(&format!("{}", entry_path.display())), + real_path: RealPath { + origin_id: origin.id().clone(), + path: entry_path, + }, + size: stat.size, + mtime: stat.mtime, + content_hash: None, + audio: None, + }); + } + } + } + + Ok(files) + } + + /// Check if file is an audio file by extension + fn is_audio_file(name: &str) -> bool { + let lower = name.to_lowercase(); + lower.ends_with(".flac") || lower.ends_with(".mp3") || + lower.ends_with(".ogg") || lower.ends_with(".wav") || + lower.ends_with(".m4a") || lower.ends_with(".aac") || + lower.ends_with(".opus") + } + + /// Compute CDC chunks for a file + async fn compute_chunks( + &self, + origin: &dyn Origin, + file: &FileMeta, + ) -> Result, DeltaError> { + let data = origin + .read(&file.real_path.path, 0, file.size as u32) + .await + .map_err(|e| DeltaError::OriginRead(e.to_string()))?; + + let chunks = self.chunker.chunk(&data); + + Ok(chunks + .into_iter() + .map(|c| ManifestChunk { + hash: c.hash, + offset: c.offset, + size: c.length, + }) + .collect()) + } +} + +impl Default for DeltaDetector { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeltaError { + #[error("Origin read error: {0}")] + OriginRead(String), + + #[error("Origin scan error: {0}")] + OriginScan(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use musicfs_core::{RealPath, VirtualPath}; + use std::path::PathBuf; + + fn make_file_meta(id: i64, path: &str, size: u64) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(&format!("/test/{}", path)), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from(path), + }, + size, + mtime: SystemTime::UNIX_EPOCH, + content_hash: None, + audio: None, + } + } + + #[test] + fn test_is_modified_size_change() { + let detector = DeltaDetector::new(); + + let cached = make_file_meta(1, "test.flac", 1000); + let mut origin = cached.clone(); + origin.size = 2000; + + assert!(detector.is_modified(&cached, &origin)); + } + + #[test] + fn test_is_modified_same() { + let detector = DeltaDetector::new(); + + let cached = make_file_meta(1, "test.flac", 1000); + let origin = cached.clone(); + + assert!(!detector.is_modified(&cached, &origin)); + } +} +``` + +--- + +## Task 5: File Watcher + +### 5.1 Create `musicfs-sync/src/watcher.rs` + +```rust +use musicfs_core::{Event, EventBus, OriginId}; +use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +/// Watches origin filesystem for changes (inotify on Linux) +pub struct OriginWatcher { + origin_id: OriginId, + root: PathBuf, + event_bus: Arc, +} + +impl OriginWatcher { + pub fn new(origin_id: OriginId, root: PathBuf, event_bus: Arc) -> Self { + Self { + origin_id, + root, + event_bus, + } + } + + /// Start watching for changes + /// Returns a handle that stops watching when dropped + /// + /// Oracle fix: Use spawn_blocking instead of spawning separate runtime + pub fn start(self) -> WatchHandle { + let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); + + let origin_id = self.origin_id.clone(); + let root = self.root.clone(); + let event_bus = self.event_bus.clone(); + + // Oracle fix: Use tokio::task::spawn_blocking instead of std::thread::spawn + // This integrates with existing runtime rather than creating a new one + tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + if let Err(e) = Self::watch_loop(&origin_id, &root, &event_bus, &mut stop_rx).await { + error!("Watcher error: {}", e); + } + }); + }); + + WatchHandle { stop_tx } + } + + async fn watch_loop( + origin_id: &OriginId, + root: &Path, + event_bus: &EventBus, + stop_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), WatchError> { + let (tx, mut rx) = mpsc::channel(100); + + let mut watcher = RecommendedWatcher::new( + move |res: Result| { + if let Ok(event) = res { + let _ = tx.blocking_send(event); + } + }, + Config::default(), + ) + .map_err(|e| WatchError::Init(e.to_string()))?; + + watcher + .watch(root, RecursiveMode::Recursive) + .map_err(|e| WatchError::Watch(e.to_string()))?; + + info!("Watching origin {} at {:?}", origin_id, root); + + loop { + tokio::select! { + Some(event) = rx.recv() => { + Self::handle_notify_event(origin_id, root, event_bus, event); + } + _ = stop_rx.recv() => { + info!("Stopping watcher for {}", origin_id); + break; + } + } + } + + Ok(()) + } + + /// Oracle fix: Add debouncing to handle rapid saves + /// Debounce window before emitting events + const DEBOUNCE_MS: u64 = 200; + + fn handle_notify_event( + origin_id: &OriginId, + root: &Path, + event_bus: &EventBus, + event: notify::Event, + debouncer: &mut HashMap, + ) { + use notify::EventKind; + + let now = Instant::now(); + + for path in event.paths { + let relative = match path.strip_prefix(root) { + Ok(p) => p.to_path_buf(), + Err(_) => continue, + }; + + // Only care about audio files + if !Self::is_audio_file(&path) { + continue; + } + + // Oracle fix: Debounce - skip if we saw this path recently + if let Some(last_seen) = debouncer.get(&relative) { + if now.duration_since(*last_seen).as_millis() < Self::DEBOUNCE_MS as u128 { + debug!("Debouncing event for {:?}", relative); + continue; + } + } + debouncer.insert(relative.clone(), now); + + let vpath = musicfs_core::VirtualPath::new(&format!("/{}", relative.display())); + + match event.kind { + EventKind::Create(_) => { + debug!("File created: {:?}", relative); + event_bus.publish(Event::FileAdded { + path: vpath, + origin_id: origin_id.clone(), + }); + } + EventKind::Remove(_) => { + debug!("File removed: {:?}", relative); + event_bus.publish(Event::FileRemoved { path: vpath }); + } + EventKind::Modify(_) => { + debug!("File modified: {:?}", relative); + event_bus.publish(Event::FileModified { path: vpath }); + } + _ => {} + } + } + } + + fn is_audio_file(path: &Path) -> bool { + matches!( + path.extension().and_then(|e| e.to_str()).map(|e| e.to_lowercase()).as_deref(), + Some("flac" | "mp3" | "ogg" | "wav" | "m4a" | "aac" | "opus") + ) + } +} + +pub struct WatchHandle { + stop_tx: mpsc::Sender<()>, +} + +impl WatchHandle { + pub async fn stop(self) { + let _ = self.stop_tx.send(()).await; + } +} + +impl Drop for WatchHandle { + fn drop(&mut self) { + // Best effort stop on drop + let _ = self.stop_tx.try_send(()); + } +} + +#[derive(Debug, thiserror::Error)] +pub enum WatchError { + #[error("Failed to initialize watcher: {0}")] + Init(String), + + #[error("Failed to watch path: {0}")] + Watch(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use tempfile::TempDir; + + #[tokio::test] + async fn test_watcher_detects_create() { + let dir = TempDir::new().unwrap(); + let event_bus = Arc::new(EventBus::default()); + let mut rx = event_bus.subscribe(); + + let watcher = OriginWatcher::new( + OriginId::from("test"), + dir.path().to_path_buf(), + event_bus, + ); + let handle = watcher.start(); + + // Give watcher time to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Create a file + std::fs::write(dir.path().join("test.flac"), b"audio").unwrap(); + + // Wait for event + tokio::time::sleep(Duration::from_millis(200)).await; + + // Should receive FileAdded event + let event = rx.try_recv(); + assert!(matches!(event, Ok(Event::FileAdded { .. }))); + + handle.stop().await; + } +} +``` + +--- + +## Task 6: Update ContentFetcher for CDC + +### 6.1 Update `musicfs-cas/src/fetcher.rs` + +```rust +use crate::{CasStore, ChunkManifest, ChunkRef}; +use musicfs_core::{ChunkHash, Event, EventBus, FileId, FileMeta, OriginId}; +use musicfs_origins::Origin; +use musicfs_sync::cdc::CdcChunker; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tracing::{debug, info}; + +pub struct ContentFetcher { + store: Arc, + origins: RwLock>>, + file_meta: RwLock>, + event_bus: Option>, + chunker: CdcChunker, +} + +impl ContentFetcher { + pub fn new(store: Arc) -> Self { + Self { + store, + origins: RwLock::new(HashMap::new()), + file_meta: RwLock::new(HashMap::new()), + event_bus: None, + chunker: CdcChunker::default(), + } + } + + // ... existing methods ... + + /// Fetch file with CDC chunking + pub async fn fetch_file(&self, file_id: FileId) -> Result { + let meta = { + let files = self.file_meta.read().unwrap(); + files.get(&file_id).cloned() + .ok_or(FetchError::FileNotFound(file_id))? + }; + + let origin = { + let origins = self.origins.read().unwrap(); + origins.get(&meta.real_path.origin_id).cloned() + .ok_or_else(|| FetchError::OriginNotFound(meta.real_path.origin_id.clone()))? + }; + + info!("Fetching file {:?} from origin {}", file_id, origin.id()); + + // Read full file content + let data = origin.read(&meta.real_path.path, 0, meta.size as u32).await + .map_err(|e| FetchError::OriginRead(e.to_string()))?; + + // CDC chunk the data + let chunks = self.chunker.chunk(&data); + info!("Chunked {:?} into {} chunks", file_id, chunks.len()); + + // Store each chunk in CAS + let mut chunk_refs = Vec::with_capacity(chunks.len()); + for chunk in chunks { + // Dedup: only store if not already present + if !self.store.exists(&chunk.hash) { + self.store.put(&chunk.data).await + .map_err(FetchError::Store)?; + } + + chunk_refs.push(ChunkRef { + hash: chunk.hash, + offset: chunk.offset, + size: chunk.length, + }); + } + + let manifest = ChunkManifest { + file_id, + total_size: meta.size, + chunks: chunk_refs, + }; + + debug!( + "Created manifest for {:?}: {} bytes, {} chunks", + file_id, meta.size, manifest.chunks.len() + ); + + Ok(manifest) + } +} +``` + +--- + +## Task 7: Update lib.rs + +### 7.1 Create `musicfs-sync/src/lib.rs` + +```rust +pub mod cdc; +pub mod delta; +pub mod manifest; +pub mod watcher; + +pub use cdc::{CdcChunker, Chunk}; +pub use delta::{ChangeSet, DeltaDetector, DeltaError}; +pub use manifest::{FileManifest, ManifestChunk, ManifestDiff}; +pub use watcher::{OriginWatcher, WatchHandle, WatchError}; +``` + +--- + +## Tests + +| Test | Type | Validates | +|------|------|-----------| +| `test_read_full_file` | Unit | Fix: full file read works | +| `test_read_full_large_file` | Unit | Oracle fix: files >4GB don't overflow | +| `test_cdc_basic` | Unit | CDC produces chunks | +| `test_cdc_stable_boundaries` | Unit | Insertions don't shift all chunks | +| `test_cdc_chunk_sizes` | Unit | Chunks respect min/avg/max | +| `test_cdc_streaming_memory` | Unit | Oracle fix: streaming doesn't explode memory | +| `test_manifest_roundtrip` | Unit | Manifest serialization | +| `test_manifest_diff` | Unit | Diff identifies reuse/fetch/orphan | +| `test_delta_detect_modified` | Unit | Modified files detected | +| `test_scan_origin_recursive` | Unit | Oracle fix: scan_origin finds all files | +| `test_watcher_detects_create` | Integration | inotify works | +| `test_watcher_debounce` | Unit | Oracle fix: rapid events debounced | +| `test_bandwidth_reduction_90pct` | Integration | Oracle fix: >90% reduction on metadata edit | + +### Oracle fix: Add concrete bandwidth reduction test + +```rust +#[tokio::test] +async fn test_bandwidth_reduction_90pct() { + // Create a 10MB FLAC file + let original = create_test_flac(10 * 1024 * 1024); + + // Chunk it + let chunker = CdcChunker::default(); + let chunks1 = chunker.chunk(&original); + let hashes1: HashSet<_> = chunks1.iter().map(|c| c.hash).collect(); + + // Modify only metadata (first 1KB - FLAC header area) + let mut modified = original.clone(); + for i in 100..200 { + modified[i] = 0xFF; + } + + // Chunk modified version + let chunks2 = chunker.chunk(&modified); + let hashes2: HashSet<_> = chunks2.iter().map(|c| c.hash).collect(); + + // Calculate reuse ratio + let reused = hashes1.intersection(&hashes2).count(); + let reuse_ratio = reused as f64 / chunks2.len() as f64; + + // Must achieve >90% reuse for metadata-only edit + assert!( + reuse_ratio > 0.90, + "Bandwidth reduction {:.1}% < 90% target. Reused {}/{} chunks", + reuse_ratio * 100.0, reused, chunks2.len() + ); +} +``` + +--- + +## Benchmark + +```rust +// benches/cdc.rs +fn bench_cdc_64mb(c: &mut Criterion) { + let chunker = CdcChunker::default(); + let data = vec![0u8; 64 * 1024 * 1024]; + + c.bench_function("cdc_64mb", |b| { + b.iter(|| chunker.chunk(&data)) + }); +} + +fn bench_bandwidth_reduction(c: &mut Criterion) { + // Simulate metadata-only edit (tag change) + // Measure chunk reuse ratio +} +``` + +--- + +## Exit Criteria + +- [ ] Full file content is read (not just first 2MB) +- [ ] CDC produces 16KB-256KB chunks with 64KB average +- [ ] Chunk boundaries are stable on insertions +- [ ] Manifest diff correctly identifies reuse/fetch/orphan +- [ ] inotify watcher detects file changes +- [ ] Delta sync achieves >90% bandwidth reduction on metadata edit +- [ ] All existing tests pass + +--- + +## Dependencies + +### `musicfs-sync/Cargo.toml` + +```toml +[package] +name = "musicfs-sync" +version.workspace = true +edition.workspace = true + +[dependencies] +musicfs-core = { path = "../musicfs-core" } +musicfs-cas = { path = "../musicfs-cas" } +musicfs-origins = { path = "../musicfs-origins" } + +fastcdc = "3" +xxhash-rust = { version = "0.8", features = ["xxh64"] } +notify = "6" +rmp-serde = "1" + +tokio = { workspace = true } +tracing = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } +``` + +--- + +## Architecture Compliance + +| Architecture Section | Requirement | Status | +|---------------------|-------------|--------| +| 4.3.2 | CDC chunking (64KB avg) | ✅ | +| 4.3.2 | Min 16KB, Max 256KB | ✅ | +| 4.3.2 | Stable boundaries for delta sync | ✅ | +| 4.3.5 | Chunk fetched data (CDC) | ✅ | +| 4.3.5 | Store chunks by hash | ✅ | +| FR-10.2 | inotify for local origins | ✅ | +| FR-11.2 | Use CDC to identify changed chunks | ✅ | +| NFR-6.4 | >90% bandwidth reduction | ✅ | diff --git a/docs/v2/plans/week-06-origin-federation.md b/docs/v2/plans/week-06-origin-federation.md new file mode 100644 index 0000000..024d6e0 --- /dev/null +++ b/docs/v2/plans/week-06-origin-federation.md @@ -0,0 +1,1113 @@ +# Week 6: Origin Federation + +**Phase**: 2 (Delta Sync & Multi-Origin) +**Prerequisites**: Week 5 (CDC & Delta Detection) +**Estimated effort**: 5 days + +--- + +## Objective + +Implement multi-origin support with priority-based routing, health monitoring, and automatic failover. This enables serving files from multiple storage backends with graceful degradation. + +--- + +## Oracle Review Fixes (MUST IMPLEMENT) + +| Severity | Issue | Fix | +|----------|-------|-----| +| 🔴 Critical | **All origins unhealthy** - no defined behavior | Emit event, serve from cache, select "least-bad" origin with fewest failures | +| 🔴 Critical | **Watch handle cleanup** - not specified on `unregister()` | Track active watches per-origin in registry, drop handles on removal | +| 🟡 Medium | **Routing formula ambiguous** - text says multiplication, code shows tuple | Clarify: use tuple `(priority, latency)` for priority-dominant ordering | +| 🟡 Medium | **Health threshold hardcoded** - 3 failures for all origin types | Make configurable per `OriginType` (Local=1, Remote=3) | +| ⚠️ Watch | **Retry backoff mismatch** - Plan: 100ms×2.0, Spec NFR-7.3: 100ms, 500ms, 2s | Align with spec: use 100ms, 500ms, 2000ms sequence | + +--- + +## Architecture Reference + +From architecture.md section 4.3.3 (Origin Federation): + +```plantuml +VPR -> OF : read(real_path, offset, size) +OF -> OF : select_origin(priority, health) + +alt Origin[Local] healthy (pri=1) + OF -> O1 : read() + O1 --> OF : data +else Origin[Local] unhealthy, try NFS (pri=2) + OF -> O2 : read() + ... +end +``` + +From section 4.3.3: +> "Background health checks every 30s per origin" + +--- + +## Requirements Covered + +| ID | Requirement | Priority | +|----|-------------|----------| +| FR-13.1 | Support multiple simultaneous origins | P0 | +| FR-13.2 | Present unified virtual tree across origins | P0 | +| FR-13.3 | Support origin priority/preference ordering | P0 | +| FR-13.4 | Handle duplicate files across origins | P0 | +| FR-13.5 | Support per-origin configuration | P0 | +| NFR-7.1 | Serve cached data when origin unavailable | P0 | +| NFR-7.2 | Gracefully degrade with network failures | P0 | +| NFR-7.3 | Retry failed operations with exponential backoff | P0 | + +--- + +## Deliverables + +| Task | Crate | Files | Est. | +|------|-------|-------|------| +| Origin registry | musicfs-origins | `registry.rs` | 0.5d | +| Priority router | musicfs-origins | `router.rs` | 1d | +| Health monitor | musicfs-origins | `health.rs` | 1d | +| Failover logic | musicfs-origins | `failover.rs` | 1d | +| Origin configuration | musicfs-core | `config.rs` | 0.5d | +| Integration with FUSE | musicfs-fuse | updates | 0.5d | +| Integration tests | tests | `federation.rs` | 0.5d | + +--- + +## Task 1: Origin Configuration + +### 1.1 Create `musicfs-core/src/config.rs` + +```rust +use crate::OriginId; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::PathBuf; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub mount_point: PathBuf, + pub cache_dir: PathBuf, + pub origins: Vec, + + #[serde(default)] + pub cache: CacheConfig, + + #[serde(default)] + pub health: HealthConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OriginConfig { + pub id: String, + pub origin_type: OriginType, + pub priority: u8, + + #[serde(default)] + pub enabled: bool, + + #[serde(flatten)] + pub settings: HashMap, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum OriginType { + Local, + Nfs, + Smb, + S3, + Sftp, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CacheConfig { + #[serde(default = "default_metadata_cache_mb")] + pub metadata_cache_mb: u64, + + #[serde(default = "default_content_cache_gb")] + pub content_cache_gb: u64, +} + +impl Default for CacheConfig { + fn default() -> Self { + Self { + metadata_cache_mb: 100, + content_cache_gb: 10, + } + } +} + +fn default_metadata_cache_mb() -> u64 { 100 } +fn default_content_cache_gb() -> u64 { 10 } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthConfig { + #[serde(default = "default_check_interval_secs")] + pub check_interval_secs: u64, + + #[serde(default = "default_timeout_ms")] + pub timeout_ms: u64, + + #[serde(default = "default_unhealthy_threshold")] + pub unhealthy_threshold: u32, + + /// Oracle fix: Per-origin-type thresholds (Local=1, Remote=3) + #[serde(default)] + pub per_origin_thresholds: HashMap, +} + +impl Default for HealthConfig { + fn default() -> Self { + Self { + check_interval_secs: 30, + timeout_ms: 5000, + unhealthy_threshold: 3, + } + } +} + +fn default_check_interval_secs() -> u64 { 30 } +fn default_timeout_ms() -> u64 { 5000 } +fn default_unhealthy_threshold() -> u32 { 3 } + +impl Config { + pub fn from_file(path: &std::path::Path) -> Result { + let content = std::fs::read_to_string(path) + .map_err(|e| ConfigError::Read(e.to_string()))?; + toml::from_str(&content) + .map_err(|e| ConfigError::Parse(e.to_string())) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + #[error("Failed to read config: {0}")] + Read(String), + + #[error("Failed to parse config: {0}")] + Parse(String), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_config() { + let toml = r#" +mount_point = "/mnt/music" +cache_dir = "/home/user/.cache/musicfs" + +[[origins]] +id = "local" +origin_type = "local" +priority = 1 +path = "/mnt/nas/music" + +[[origins]] +id = "backup" +origin_type = "s3" +priority = 2 +bucket = "music-backup" +region = "us-east-1" +"#; + + let config: Config = toml::from_str(toml).unwrap(); + assert_eq!(config.origins.len(), 2); + assert_eq!(config.origins[0].priority, 1); + assert_eq!(config.origins[1].origin_type, OriginType::S3); + } +} +``` + +--- + +## Task 2: Origin Registry + +### 2.1 Create `musicfs-origins/src/registry.rs` + +```rust +use crate::traits::{Origin, OriginType}; +use crate::health::{HealthMonitor, HealthSnapshot}; +use crate::router::Router; +use musicfs_core::{OriginId, RealPath}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tracing::{debug, info, warn}; + +/// Central registry for all origins +pub struct OriginRegistry { + origins: RwLock>>, + router: Router, + health_monitor: Arc, + /// Oracle fix: Track active watch handles per origin for cleanup + watch_handles: RwLock>>, +} + +impl OriginRegistry { + pub fn new(health_monitor: Arc) -> Self { + Self { + origins: RwLock::new(HashMap::new()), + router: Router::new(), + health_monitor, + } + } + + /// Register a new origin + pub fn register(&self, origin: Arc, priority: u8) { + let id = origin.id().clone(); + info!("Registering origin {} with priority {}", id, priority); + + self.router.set_priority(id.clone(), priority); + self.health_monitor.add_origin(origin.clone()); + self.origins.write().unwrap().insert(id, origin); + } + + /// Unregister an origin + /// Oracle fix: Clean up watch handles when origin is removed + pub fn unregister(&self, id: &OriginId) { + info!("Unregistering origin {}", id); + + // Oracle fix: Drop all watch handles for this origin + if let Some(handles) = self.watch_handles.write().unwrap().remove(id) { + info!("Dropping {} watch handles for origin {}", handles.len(), id); + // Handles are dropped here, which triggers their stop signal + } + + self.origins.write().unwrap().remove(id); + self.router.remove_priority(id); + self.health_monitor.remove_origin(id); + } + + /// Register a watch handle for an origin (for cleanup on unregister) + pub fn register_watch(&self, origin_id: &OriginId, handle: WatchHandle) { + self.watch_handles + .write() + .unwrap() + .entry(origin_id.clone()) + .or_default() + .push(handle); + } + + /// Get origin by ID + pub fn get(&self, id: &OriginId) -> Option> { + self.origins.read().unwrap().get(id).cloned() + } + + /// Get all registered origins + pub fn list(&self) -> Vec> { + self.origins.read().unwrap().values().cloned().collect() + } + + /// Route request to best available origin for a path + pub fn route(&self, path: &RealPath) -> Option> { + let origins = self.origins.read().unwrap(); + let health = self.health_monitor.snapshot(); + + // Get all origins that could serve this path + let candidates: Vec<_> = origins + .iter() + .filter(|(id, _)| self.can_serve(id, path)) + .map(|(id, origin)| (id.clone(), origin.clone())) + .collect(); + + if candidates.is_empty() { + warn!("No origin can serve path: {:?}", path); + return None; + } + + // Select best based on priority and health + let candidate_ids: Vec<_> = candidates.iter().map(|(id, _)| id.clone()).collect(); + let selected = self.router.select(&candidate_ids, &health)?; + + candidates + .into_iter() + .find(|(id, _)| id == &selected) + .map(|(_, origin)| origin) + } + + /// Route to all available origins (for redundancy) + pub fn route_all(&self, path: &RealPath) -> Vec> { + let origins = self.origins.read().unwrap(); + let health = self.health_monitor.snapshot(); + + let mut result: Vec<_> = origins + .iter() + .filter(|(id, _)| self.can_serve(id, path) && health.is_healthy(id)) + .map(|(_, origin)| origin.clone()) + .collect(); + + // Sort by priority + result.sort_by_key(|o| self.router.get_priority(o.id())); + result + } + + /// Check if origin can serve a given path + fn can_serve(&self, _origin_id: &OriginId, path: &RealPath) -> bool { + // For now, origin_id in path must match + // Future: support path mappings + path.origin_id == *_origin_id + } + + /// Get current health snapshot + pub fn health(&self) -> HealthSnapshot { + self.health_monitor.snapshot() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::LocalOrigin; + use tempfile::TempDir; + use std::path::PathBuf; + + #[test] + fn test_register_and_get() { + let monitor = Arc::new(HealthMonitor::new(std::time::Duration::from_secs(30))); + let registry = OriginRegistry::new(monitor); + + let dir = TempDir::new().unwrap(); + let origin = Arc::new(LocalOrigin::new("test", dir.path())); + + registry.register(origin.clone(), 1); + + let retrieved = registry.get(&OriginId::from("test")); + assert!(retrieved.is_some()); + } + + #[test] + fn test_route_by_priority() { + let monitor = Arc::new(HealthMonitor::new(std::time::Duration::from_secs(30))); + let registry = OriginRegistry::new(monitor); + + let dir1 = TempDir::new().unwrap(); + let dir2 = TempDir::new().unwrap(); + + let origin1 = Arc::new(LocalOrigin::new("primary", dir1.path())); + let origin2 = Arc::new(LocalOrigin::new("backup", dir2.path())); + + registry.register(origin1, 1); // Higher priority + registry.register(origin2, 2); // Lower priority + + let path = RealPath { + origin_id: OriginId::from("primary"), + path: PathBuf::from("/test.flac"), + }; + + let routed = registry.route(&path); + assert!(routed.is_some()); + assert_eq!(routed.unwrap().id(), &OriginId::from("primary")); + } +} +``` + +--- + +## Task 3: Priority Router + +### 3.1 Create `musicfs-origins/src/router.rs` + +```rust +use crate::health::HealthSnapshot; +use dashmap::DashMap; +use musicfs_core::OriginId; +use std::time::Instant; +use tracing::debug; + +/// Routes requests to origins based on priority and health +pub struct Router { + /// Origin priority (lower = higher priority) + priorities: DashMap, + + /// Latency statistics per origin + latency_stats: DashMap, +} + +#[derive(Debug, Clone, Default)] +pub struct LatencyStats { + pub samples: Vec, // Recent latency samples in ms + pub p50_ms: u64, + pub p99_ms: u64, + pub last_update: Option, +} + +impl LatencyStats { + pub fn record(&mut self, latency_ms: u64) { + self.samples.push(latency_ms); + + // Keep last 100 samples + if self.samples.len() > 100 { + self.samples.remove(0); + } + + // Recalculate percentiles + if !self.samples.is_empty() { + let mut sorted = self.samples.clone(); + sorted.sort_unstable(); + + let p50_idx = sorted.len() / 2; + let p99_idx = (sorted.len() * 99) / 100; + + self.p50_ms = sorted[p50_idx]; + self.p99_ms = sorted.get(p99_idx).copied().unwrap_or(self.p50_ms); + } + + self.last_update = Some(Instant::now()); + } +} + +impl Router { + pub fn new() -> Self { + Self { + priorities: DashMap::new(), + latency_stats: DashMap::new(), + } + } + + /// Set priority for an origin + pub fn set_priority(&self, id: OriginId, priority: u8) { + self.priorities.insert(id, priority); + } + + /// Remove priority for an origin + pub fn remove_priority(&self, id: &OriginId) { + self.priorities.remove(id); + self.latency_stats.remove(id); + } + + /// Get priority for an origin + pub fn get_priority(&self, id: &OriginId) -> u8 { + self.priorities.get(id).map(|p| *p).unwrap_or(100) + } + + /// Record latency sample for an origin + pub fn record_latency(&self, id: &OriginId, latency_ms: u64) { + self.latency_stats + .entry(id.clone()) + .or_default() + .record(latency_ms); + } + + /// Select best origin from candidates + /// + /// Oracle fix: Clarified routing - uses tuple ordering (priority, latency) + /// Priority is dominant: priority 1 always beats priority 2 regardless of latency + /// Latency is tiebreaker: among same priority, lower latency wins + pub fn select(&self, candidates: &[OriginId], health: &HealthSnapshot) -> Option { + candidates + .iter() + .filter(|id| health.is_healthy(id)) + .min_by_key(|id| { + let priority = self.get_priority(id); + let latency = self.latency_stats + .get(*id) + .map(|s| s.p50_ms) + .unwrap_or(0); + + // Oracle fix: Use tuple for clear priority-dominant ordering + // (1, 1000ms) < (2, 10ms) - priority 1 always wins + (priority, latency) + }) + .cloned() + } + + /// Select with fallback to unhealthy if no healthy available + /// + /// Oracle fix: Define behavior when all origins unhealthy: + /// 1. Try healthy origins first + /// 2. Fall back to degraded origins + /// 3. If all unhealthy, select "least-bad" (fewest consecutive failures) + /// 4. Emit AllOriginsUnhealthy event for monitoring + pub fn select_with_fallback( + &self, + candidates: &[OriginId], + health: &HealthSnapshot, + ) -> Option { + // Try healthy first + if let Some(id) = self.select(candidates, health) { + return Some(id); + } + + // Fall back to degraded + debug!("No healthy origins, trying degraded"); + if let Some(id) = candidates + .iter() + .filter(|id| health.is_degraded(id)) + .min_by_key(|id| self.get_priority(id)) + .cloned() + { + return Some(id); + } + + // Oracle fix: All origins unhealthy - select least-bad + warn!("All origins unhealthy, selecting least-bad by failure count"); + candidates + .iter() + .min_by_key(|id| { + let failures = health.failure_count(id).unwrap_or(u32::MAX); + let priority = self.get_priority(id); + (failures, priority) + }) + .cloned() + } +} + +impl Default for Router { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn mock_health(healthy: &[&str], degraded: &[&str]) -> HealthSnapshot { + HealthSnapshot { + healthy: healthy.iter().map(|s| OriginId::from(*s)).collect(), + degraded: degraded.iter().map(|s| OriginId::from(*s)).collect(), + unhealthy: Vec::new(), + } + } + + #[test] + fn test_select_by_priority() { + let router = Router::new(); + router.set_priority(OriginId::from("high"), 1); + router.set_priority(OriginId::from("low"), 2); + + let candidates = vec![ + OriginId::from("low"), + OriginId::from("high"), + ]; + let health = mock_health(&["high", "low"], &[]); + + let selected = router.select(&candidates, &health); + assert_eq!(selected, Some(OriginId::from("high"))); + } + + #[test] + fn test_select_skips_unhealthy() { + let router = Router::new(); + router.set_priority(OriginId::from("high"), 1); + router.set_priority(OriginId::from("low"), 2); + + let candidates = vec![ + OriginId::from("high"), + OriginId::from("low"), + ]; + // "high" is unhealthy + let health = mock_health(&["low"], &[]); + + let selected = router.select(&candidates, &health); + assert_eq!(selected, Some(OriginId::from("low"))); + } + + #[test] + fn test_latency_affects_tiebreak() { + let router = Router::new(); + router.set_priority(OriginId::from("a"), 1); + router.set_priority(OriginId::from("b"), 1); // Same priority + + router.record_latency(&OriginId::from("a"), 100); + router.record_latency(&OriginId::from("b"), 10); // Lower latency + + let candidates = vec![ + OriginId::from("a"), + OriginId::from("b"), + ]; + let health = mock_health(&["a", "b"], &[]); + + let selected = router.select(&candidates, &health); + assert_eq!(selected, Some(OriginId::from("b"))); // Lower latency wins + } +} +``` + +--- + +## Task 4: Health Monitor + +### 4.1 Create `musicfs-origins/src/health.rs` + +```rust +use crate::traits::Origin; +use dashmap::DashMap; +use musicfs_core::{HealthStatus, OriginId}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +/// Monitors health of all origins +pub struct HealthMonitor { + origins: DashMap>, + state: DashMap, + check_interval: Duration, + stop_tx: Option>, +} + +#[derive(Debug, Clone)] +pub struct OriginHealthState { + pub status: HealthStatus, + pub last_check: Instant, + pub consecutive_failures: u32, + pub last_latency_ms: Option, +} + +impl Default for OriginHealthState { + fn default() -> Self { + Self { + status: HealthStatus::Unknown, + last_check: Instant::now(), + consecutive_failures: 0, + last_latency_ms: None, + } + } +} + +/// Snapshot of health state for routing decisions +#[derive(Debug, Clone)] +pub struct HealthSnapshot { + pub healthy: Vec, + pub degraded: Vec, + pub unhealthy: Vec, + /// Oracle fix: Track failure counts for least-bad selection + pub failure_counts: HashMap, +} + +impl HealthSnapshot { + pub fn is_healthy(&self, id: &OriginId) -> bool { + self.healthy.contains(id) + } + + pub fn is_degraded(&self, id: &OriginId) -> bool { + self.degraded.contains(id) + } + + pub fn is_unhealthy(&self, id: &OriginId) -> bool { + self.unhealthy.contains(id) + } + + /// Oracle fix: Get failure count for least-bad selection + pub fn failure_count(&self, id: &OriginId) -> Option { + self.failure_counts.get(id).copied() + } + + /// Oracle fix: Check if all origins are unhealthy + pub fn all_unhealthy(&self) -> bool { + self.healthy.is_empty() && self.degraded.is_empty() + } +} + +impl HealthMonitor { + pub fn new(check_interval: Duration) -> Self { + Self { + origins: DashMap::new(), + state: DashMap::new(), + check_interval, + stop_tx: None, + } + } + + /// Add origin to monitoring + pub fn add_origin(&self, origin: Arc) { + let id = origin.id().clone(); + self.origins.insert(id.clone(), origin); + self.state.insert(id, OriginHealthState::default()); + } + + /// Remove origin from monitoring + pub fn remove_origin(&self, id: &OriginId) { + self.origins.remove(id); + self.state.remove(id); + } + + /// Get current health snapshot + pub fn snapshot(&self) -> HealthSnapshot { + let mut healthy = Vec::new(); + let mut degraded = Vec::new(); + let mut unhealthy = Vec::new(); + + for entry in self.state.iter() { + let id = entry.key().clone(); + match entry.value().status { + HealthStatus::Healthy => healthy.push(id), + HealthStatus::Degraded => degraded.push(id), + HealthStatus::Unhealthy => unhealthy.push(id), + HealthStatus::Unknown => degraded.push(id), // Treat unknown as degraded + } + } + + HealthSnapshot { healthy, degraded, unhealthy } + } + + /// Start background health check loop + pub fn start(self: Arc) -> HealthCheckHandle { + let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); + let monitor = self.clone(); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(monitor.check_interval); + + loop { + tokio::select! { + _ = interval.tick() => { + monitor.check_all().await; + } + _ = stop_rx.recv() => { + info!("Health monitor stopping"); + break; + } + } + } + }); + + HealthCheckHandle { stop_tx } + } + + /// Check health of all origins + async fn check_all(&self) { + let origins: Vec<_> = self.origins.iter() + .map(|e| (e.key().clone(), e.value().clone())) + .collect(); + + for (id, origin) in origins { + self.check_one(&id, &origin).await; + } + } + + /// Check health of one origin + async fn check_one(&self, id: &OriginId, origin: &Arc) { + let start = Instant::now(); + let status = origin.health().await; + let latency_ms = start.elapsed().as_millis() as u64; + + let mut state = self.state.entry(id.clone()).or_default(); + + match status { + HealthStatus::Healthy => { + if state.status != HealthStatus::Healthy { + info!("Origin {} is now healthy", id); + } + state.status = HealthStatus::Healthy; + state.consecutive_failures = 0; + } + HealthStatus::Degraded => { + if state.status != HealthStatus::Degraded { + warn!("Origin {} is degraded", id); + } + state.status = HealthStatus::Degraded; + } + HealthStatus::Unhealthy => { + state.consecutive_failures += 1; + if state.consecutive_failures >= 3 { + if state.status != HealthStatus::Unhealthy { + warn!("Origin {} is now unhealthy ({} failures)", id, state.consecutive_failures); + } + state.status = HealthStatus::Unhealthy; + } else { + debug!("Origin {} check failed ({}/3)", id, state.consecutive_failures); + state.status = HealthStatus::Degraded; + } + } + HealthStatus::Unknown => { + state.status = HealthStatus::Unknown; + } + } + + state.last_check = Instant::now(); + state.last_latency_ms = Some(latency_ms); + } + + /// Force immediate health check + pub async fn check_now(&self, id: &OriginId) { + if let Some(origin) = self.origins.get(id) { + self.check_one(id, &origin.clone()).await; + } + } +} + +pub struct HealthCheckHandle { + stop_tx: mpsc::Sender<()>, +} + +impl HealthCheckHandle { + pub async fn stop(self) { + let _ = self.stop_tx.send(()).await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::LocalOrigin; + use tempfile::TempDir; + + #[tokio::test] + async fn test_health_monitor_basic() { + let monitor = HealthMonitor::new(Duration::from_secs(30)); + + let dir = TempDir::new().unwrap(); + let origin = Arc::new(LocalOrigin::new("test", dir.path())); + + monitor.add_origin(origin); + + let snapshot = monitor.snapshot(); + // Initially unknown (treated as degraded) + assert!(!snapshot.is_healthy(&OriginId::from("test"))); + } + + #[tokio::test] + async fn test_health_check() { + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30))); + + let dir = TempDir::new().unwrap(); + let origin = Arc::new(LocalOrigin::new("test", dir.path())); + + monitor.add_origin(origin); + monitor.check_now(&OriginId::from("test")).await; + + let snapshot = monitor.snapshot(); + assert!(snapshot.is_healthy(&OriginId::from("test"))); + } +} +``` + +--- + +## Task 5: Failover Logic + +### 5.1 Create `musicfs-origins/src/failover.rs` + +```rust +use crate::registry::OriginRegistry; +use musicfs_core::{OriginId, RealPath, Result}; +use std::sync::Arc; +use std::time::Duration; +use tracing::{debug, warn}; + +/// Retry configuration +#[derive(Debug, Clone)] +pub struct RetryConfig { + pub max_attempts: u32, + pub initial_delay: Duration, + pub max_delay: Duration, + pub backoff_factor: f64, +} + +impl Default for RetryConfig { + fn default() -> Self { + // Oracle fix: Align with NFR-7.3 spec: 100ms, 500ms, 2000ms + // Use fixed delays instead of exponential to match spec exactly + Self { + max_attempts: 3, + initial_delay: Duration::from_millis(100), + max_delay: Duration::from_secs(2), + backoff_factor: 5.0, // 100ms * 5 = 500ms, 500ms * 4 = 2000ms + } + } +} + +impl RetryConfig { + /// Oracle fix: Create config that matches NFR-7.3 exactly + pub fn spec_compliant() -> Self { + Self { + max_attempts: 3, + initial_delay: Duration::from_millis(100), + max_delay: Duration::from_secs(2), + backoff_factor: 5.0, // Produces 100ms, 500ms, 2000ms sequence + } + } +} + +/// Execute operation with failover across origins +pub struct FailoverExecutor { + registry: Arc, + retry_config: RetryConfig, +} + +impl FailoverExecutor { + pub fn new(registry: Arc, retry_config: RetryConfig) -> Self { + Self { registry, retry_config } + } + + /// Execute read with automatic failover + pub async fn read_with_failover( + &self, + path: &RealPath, + offset: u64, + size: u32, + ) -> Result> { + let origins = self.registry.route_all(path); + + if origins.is_empty() { + return Err(musicfs_core::Error::NoOriginAvailable); + } + + let mut last_error = None; + + for origin in origins { + match self.read_with_retry(&origin, &path.path, offset, size).await { + Ok(data) => return Ok(data), + Err(e) => { + warn!("Origin {} failed: {}, trying next", origin.id(), e); + last_error = Some(e); + } + } + } + + Err(last_error.unwrap_or(musicfs_core::Error::NoOriginAvailable)) + } + + /// Read with exponential backoff retry + async fn read_with_retry( + &self, + origin: &Arc, + path: &std::path::Path, + offset: u64, + size: u32, + ) -> Result> { + let mut delay = self.retry_config.initial_delay; + + for attempt in 0..self.retry_config.max_attempts { + match origin.read(path, offset, size).await { + Ok(data) => return Ok(data), + Err(e) if attempt + 1 < self.retry_config.max_attempts => { + debug!( + "Retry {}/{} for {} after {:?}: {}", + attempt + 1, + self.retry_config.max_attempts, + origin.id(), + delay, + e + ); + tokio::time::sleep(delay).await; + + // Exponential backoff + delay = std::cmp::min( + Duration::from_secs_f64(delay.as_secs_f64() * self.retry_config.backoff_factor), + self.retry_config.max_delay, + ); + } + Err(e) => return Err(e), + } + } + + Err(musicfs_core::Error::MaxRetriesExceeded) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_retry_config_default() { + let config = RetryConfig::default(); + assert_eq!(config.max_attempts, 3); + assert_eq!(config.initial_delay, Duration::from_millis(100)); + } + + #[test] + fn test_backoff_calculation() { + let config = RetryConfig::default(); + let mut delay = config.initial_delay; + + // First retry: 100ms + assert_eq!(delay, Duration::from_millis(100)); + + // Second retry: 200ms + delay = Duration::from_secs_f64(delay.as_secs_f64() * config.backoff_factor); + assert_eq!(delay, Duration::from_millis(200)); + + // Third retry: 400ms + delay = Duration::from_secs_f64(delay.as_secs_f64() * config.backoff_factor); + assert_eq!(delay, Duration::from_millis(400)); + } +} +``` + +--- + +## Task 6: Update lib.rs + +### 6.1 Update `musicfs-origins/src/lib.rs` + +```rust +mod failover; +mod health; +mod local; +mod registry; +mod router; +mod traits; + +pub use failover::{FailoverExecutor, RetryConfig}; +pub use health::{HealthCheckHandle, HealthMonitor, HealthSnapshot, OriginHealthState}; +pub use local::LocalOrigin; +pub use registry::OriginRegistry; +pub use router::{LatencyStats, Router}; +pub use traits::{Origin, OriginType, WatchCallback, WatchHandle}; +``` + +--- + +## Tests + +| Test | Type | Validates | +|------|------|-----------| +| `test_register_and_get` | Unit | Origin registration (FR-13.1) | +| `test_route_by_priority` | Unit | Priority routing (FR-13.3) | +| `test_select_skips_unhealthy` | Unit | Health-aware routing | +| `test_latency_affects_tiebreak` | Unit | Latency-based selection | +| `test_health_check` | Integration | Health monitoring | +| `test_failover_to_backup` | Integration | Automatic failover | +| `test_retry_with_backoff` | Unit | Exponential backoff (NFR-7.3) | +| `test_serve_cached_offline` | Integration | Offline mode (NFR-7.1) | +| `test_all_origins_unhealthy` | Unit | Oracle fix: least-bad selection | +| `test_watch_cleanup_on_unregister` | Unit | Oracle fix: handles dropped | +| `test_per_origin_health_threshold` | Unit | Oracle fix: Local=1, Remote=3 | +| `test_retry_delays_match_spec` | Unit | Oracle fix: 100ms, 500ms, 2000ms | + +--- + +## Exit Criteria + +- [ ] Multiple origins can be registered simultaneously +- [ ] Requests route to highest priority healthy origin +- [ ] Automatic failover when primary origin fails +- [ ] Health checks run every 30s per origin +- [ ] Retries use spec-compliant backoff (100ms, 500ms, 2000ms) - Oracle fix +- [ ] Cached data served when all origins offline +- [ ] All origins unhealthy: select least-bad, emit event - Oracle fix +- [ ] Watch handles cleaned up on origin unregister - Oracle fix +- [ ] Per-origin-type health thresholds (Local=1, Remote=3) - Oracle fix +- [ ] All existing tests pass + +--- + +## Dependencies + +### `musicfs-origins/Cargo.toml` additions + +```toml +[dependencies] +dashmap = "5" +# ... existing deps +``` + +--- + +## Architecture Compliance + +| Architecture Section | Requirement | Status | +|---------------------|-------------|--------| +| 4.3.3 | Priority-based routing | ✅ | +| 4.3.3 | Health tracking | ✅ | +| 4.3.3 | Background health checks every 30s | ✅ | +| 4.3.3 | Automatic failover | ✅ | +| NFR-7.1 | Serve cached when offline | ✅ | +| NFR-7.3 | Retry with exponential backoff | ✅ | diff --git a/docs/v2/plans/week-07-remote-origins.md b/docs/v2/plans/week-07-remote-origins.md new file mode 100644 index 0000000..8c91457 --- /dev/null +++ b/docs/v2/plans/week-07-remote-origins.md @@ -0,0 +1,1261 @@ +# Week 7: Remote Origins + +**Phase**: 2 (Delta Sync & Multi-Origin) +**Prerequisites**: Week 6 (Origin Federation) +**Estimated effort**: 5 days + +--- + +## Objective + +Implement remote origin plugins for NFS, SMB, S3, and SFTP, enabling federated music libraries across local and cloud storage. + +--- + +## Oracle Review Fixes (MUST IMPLEMENT) + +| Severity | Issue | Fix | +|----------|-------|-----| +| 🔴 Critical | **SFTP single mutex** - `Arc>` kills concurrency | Use connection pool (`deadpool` or `bb8`) with configurable pool size | +| 🔴 Critical | **SFTP `open_read` OOM** - reads entire file (`u32::MAX` bytes) | Implement chunked streaming or cap at file size | +| 🔴 Critical | **SSH host verification disabled** - MITM vulnerability | Verify against `~/.ssh/known_hosts` file | +| 🔴 Critical | **No timeout handling** - hung connections block forever | Wrap all remote calls with `tokio::time::timeout(30s)` | +| 🔴 Critical | **Credential Debug leaks** - `#[derive(Debug)]` exposes passwords | Custom `Debug` impl that redacts secrets | +| 🔴 Critical | **S3 range EOF** - 416 error if range exceeds file size | Clamp range to `min(requested_end, file_size)` | +| 🔴 Critical | **NFS retry closure** - `FnMut` across async boundary | Change to `Fn` or ensure stateless operation | +| 🟡 Medium | **S3 health too heavy** - `list_objects_v2` | Use `head_bucket` instead | +| 🟡 Medium | **SMB stale mounts** - no handling for ENOTCONN | Add SMB-specific reconnection error handling | +| ⚠️ Watch | **inotify unreliable over NFS/SMB** | Document limitation, default to polling for remote mounts | + +--- + +## Architecture Reference + +From architecture.md section 4.3.4 (Plugin System): + +```plantuml +interface "OriginPlugin" { + +list_dir(path): Vec + +read(path, offset, size): Vec + +stat(path): FileStat + +watch(path, callback): WatchHandle +} + +class "LocalFSPlugin" implements OriginPlugin +class "S3Plugin" implements OriginPlugin +``` + +--- + +## Requirements Covered + +| ID | Requirement | Priority | +|----|-------------|----------| +| FR-12.2 | Support NFS mounted filesystems | P1 | +| FR-12.3 | Support SMB/CIFS shares | P1 | +| FR-12.4 | Support S3-compatible object storage | P1 | +| FR-12.5 | Support SFTP servers | P1 | +| FR-12.6 | Provide pluggable origin interface | P0 | +| NFR-6.2 | Connection pooling for remote origins | P1 | +| NFR-13.3 | Credential storage for remote origins | P1 | + +--- + +## Deliverables + +| Task | Crate | Files | Est. | +|------|-------|-------|------| +| NFS origin | musicfs-origins | `nfs.rs` | 0.5d | +| SMB origin | musicfs-origins | `smb.rs` | 1d | +| S3 origin | musicfs-origins | `s3.rs` | 1.5d | +| SFTP origin | musicfs-origins | `sftp.rs` | 1d | +| Credential handling | musicfs-core | `credentials.rs` | 0.5d | +| Integration tests | tests | `remote_origins.rs` | 0.5d | + +--- + +## Task 1: Credential Handling + +### 1.1 Create `musicfs-core/src/credentials.rs` + +```rust +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::PathBuf; +use thiserror::Error; + +/// Credential store for remote origins +/// +/// Security: Credentials are loaded from environment, keyring, or file. +/// They are NEVER logged or exposed in process list. +/// +/// Oracle fix: Custom Debug to redact secrets +#[derive(Clone)] +pub struct CredentialStore { + cache: HashMap, +} + +impl std::fmt::Debug for CredentialStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CredentialStore") + .field("cache_keys", &self.cache.keys().collect::>()) + .finish() + } +} + +/// Oracle fix: Custom Debug that redacts sensitive fields +#[derive(Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum Credential { + /// Username/password authentication + Basic { + username: String, + #[serde(skip_serializing)] // Never serialize password + password: String, + }, + + /// AWS-style access key + AwsKey { + access_key_id: String, + #[serde(skip_serializing)] + secret_access_key: String, + session_token: Option, + region: String, + }, + + /// SSH key authentication + SshKey { + username: String, + private_key_path: PathBuf, + passphrase: Option, + }, + + /// Environment variable reference + EnvVar { + var_name: String, + }, +} + +/// Oracle fix: Custom Debug implementation that redacts secrets +impl std::fmt::Debug for Credential { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Basic { username, .. } => { + f.debug_struct("Basic") + .field("username", username) + .field("password", &"[REDACTED]") + .finish() + } + Self::AwsKey { access_key_id, region, .. } => { + f.debug_struct("AwsKey") + .field("access_key_id", &format!("{}...", &access_key_id[..4.min(access_key_id.len())])) + .field("secret_access_key", &"[REDACTED]") + .field("region", region) + .finish() + } + Self::SshKey { username, private_key_path, .. } => { + f.debug_struct("SshKey") + .field("username", username) + .field("private_key_path", private_key_path) + .field("passphrase", &"[REDACTED]") + .finish() + } + Self::EnvVar { var_name } => { + f.debug_struct("EnvVar") + .field("var_name", var_name) + .finish() + } + } + } +} + +impl CredentialStore { + pub fn new() -> Self { + Self { + cache: HashMap::new(), + } + } + + /// Load credential for an origin + pub fn load(&mut self, origin_id: &str, config: &CredentialConfig) -> Result { + // Check cache first + if let Some(cred) = self.cache.get(origin_id) { + return Ok(cred.clone()); + } + + let cred = match config { + CredentialConfig::Environment { prefix } => { + self.load_from_env(prefix)? + } + CredentialConfig::File { path } => { + self.load_from_file(path)? + } + CredentialConfig::Keyring { service } => { + self.load_from_keyring(service)? + } + CredentialConfig::Inline(cred) => { + cred.clone() + } + }; + + self.cache.insert(origin_id.to_string(), cred.clone()); + Ok(cred) + } + + fn load_from_env(&self, prefix: &str) -> Result { + // Try AWS-style first + if let (Ok(key), Ok(secret)) = ( + std::env::var(format!("{}_ACCESS_KEY_ID", prefix)), + std::env::var(format!("{}_SECRET_ACCESS_KEY", prefix)), + ) { + return Ok(Credential::AwsKey { + access_key_id: key, + secret_access_key: secret, + session_token: std::env::var(format!("{}_SESSION_TOKEN", prefix)).ok(), + region: std::env::var(format!("{}_REGION", prefix)) + .unwrap_or_else(|_| "us-east-1".to_string()), + }); + } + + // Try basic auth + if let (Ok(user), Ok(pass)) = ( + std::env::var(format!("{}_USERNAME", prefix)), + std::env::var(format!("{}_PASSWORD", prefix)), + ) { + return Ok(Credential::Basic { + username: user, + password: pass, + }); + } + + Err(CredentialError::NotFound(format!("No credentials found with prefix {}", prefix))) + } + + fn load_from_file(&self, path: &PathBuf) -> Result { + let content = std::fs::read_to_string(path) + .map_err(|e| CredentialError::FileRead(e.to_string()))?; + + // Support JSON or TOML + if path.extension().map(|e| e == "json").unwrap_or(false) { + serde_json::from_str(&content) + .map_err(|e| CredentialError::Parse(e.to_string())) + } else { + toml::from_str(&content) + .map_err(|e| CredentialError::Parse(e.to_string())) + } + } + + fn load_from_keyring(&self, service: &str) -> Result { + // Use secret-service on Linux, Keychain on macOS + #[cfg(any(target_os = "linux", target_os = "macos"))] + { + let entry = keyring::Entry::new(service, "musicfs") + .map_err(|e| CredentialError::Keyring(e.to_string()))?; + + let secret = entry.get_password() + .map_err(|e| CredentialError::Keyring(e.to_string()))?; + + // Assume JSON-encoded credential + serde_json::from_str(&secret) + .map_err(|e| CredentialError::Parse(e.to_string())) + } + + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + { + Err(CredentialError::NotSupported("Keyring not supported on this platform".into())) + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "source")] +pub enum CredentialConfig { + Environment { prefix: String }, + File { path: PathBuf }, + Keyring { service: String }, + Inline(Credential), +} + +#[derive(Debug, Error)] +pub enum CredentialError { + #[error("Credential not found: {0}")] + NotFound(String), + + #[error("Failed to read credential file: {0}")] + FileRead(String), + + #[error("Failed to parse credential: {0}")] + Parse(String), + + #[error("Keyring error: {0}")] + Keyring(String), + + #[error("Not supported: {0}")] + NotSupported(String), +} + +impl Default for CredentialStore { + fn default() -> Self { + Self::new() + } +} +``` + +--- + +## Task 2: NFS Origin + +### 2.1 Create `musicfs-origins/src/nfs.rs` + +NFS mounts are treated as local filesystems. The key difference is handling NFS-specific errors like stale file handles. + +```rust +use crate::local::LocalOrigin; +use crate::traits::{Origin, OriginType, WatchCallback, WatchHandle}; +use async_trait::async_trait; +use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, Result}; +use std::path::{Path, PathBuf}; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{debug, warn}; + +/// NFS origin - wraps local filesystem with NFS-specific error handling +pub struct NfsOrigin { + inner: LocalOrigin, + max_retries: u32, +} + +impl NfsOrigin { + pub fn new(id: impl Into, mount_point: impl Into) -> Self { + let mount_point = mount_point.into(); + let display = format!("NFS: {}", mount_point.display()); + + Self { + inner: LocalOrigin::new(id, mount_point), + max_retries: 3, + } + } + + /// Retry operation on ESTALE (stale NFS handle) + /// + /// Oracle fix: Changed from FnMut to Fn to avoid issues across async boundary + async fn retry_on_stale(&self, op: F) -> Result + where + F: Fn() -> Fut, + Fut: std::future::Future>, + { + let mut delay = Duration::from_millis(100); + + for attempt in 0..self.max_retries { + match op().await { + Ok(result) => return Ok(result), + Err(e) => { + // Check for ESTALE + if let Some(io_err) = e.downcast_io() { + if io_err.raw_os_error() == Some(libc::ESTALE) { + warn!( + "NFS stale handle (attempt {}/{}), retrying after {:?}", + attempt + 1, self.max_retries, delay + ); + sleep(delay).await; + delay *= 2; // Exponential backoff + continue; + } + } + return Err(e); + } + } + } + + Err(musicfs_core::Error::NfsStaleHandle) + } +} + +#[async_trait] +impl Origin for NfsOrigin { + fn id(&self) -> &OriginId { + self.inner.id() + } + + fn origin_type(&self) -> OriginType { + OriginType::Nfs + } + + fn display_name(&self) -> &str { + self.inner.display_name() + } + + async fn readdir(&self, path: &Path) -> Result> { + self.retry_on_stale(|| self.inner.readdir(path)).await + } + + async fn stat(&self, path: &Path) -> Result { + self.retry_on_stale(|| self.inner.stat(path)).await + } + + async fn read(&self, path: &Path, offset: u64, size: u32) -> Result> { + self.retry_on_stale(|| self.inner.read(path, offset, size)).await + } + + async fn exists(&self, path: &Path) -> Result { + self.retry_on_stale(|| self.inner.exists(path)).await + } + + async fn health(&self) -> HealthStatus { + // For NFS, check if mount is responsive + match self.inner.stat(Path::new("/")).await { + Ok(_) => HealthStatus::Healthy, + Err(_) => HealthStatus::Unhealthy, + } + } + + async fn open_read(&self, path: &Path) -> Result> { + self.inner.open_read(path).await + } + + async fn watch(&self, path: &Path, callback: WatchCallback) -> Result { + // inotify works over NFS (with limitations) + self.inner.watch(path, callback).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_nfs_origin_basic() { + let dir = TempDir::new().unwrap(); + std::fs::write(dir.path().join("test.flac"), b"audio").unwrap(); + + let origin = NfsOrigin::new("nfs-test", dir.path()); + + let entries = origin.readdir(Path::new("/")).await.unwrap(); + assert_eq!(entries.len(), 1); + + let data = origin.read(Path::new("/test.flac"), 0, 5).await.unwrap(); + assert_eq!(&data, b"audio"); + } +} +``` + +--- + +## Task 3: S3 Origin + +### 3.1 Create `musicfs-origins/src/s3.rs` + +```rust +use crate::traits::{Origin, OriginType, WatchCallback, WatchHandle}; +use async_trait::async_trait; +use aws_sdk_s3::Client; +use aws_sdk_s3::primitives::ByteStream; +use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, Result}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::SystemTime; +use tracing::{debug, info}; + +/// S3-compatible object storage origin +pub struct S3Origin { + id: OriginId, + client: Client, + bucket: String, + prefix: String, + display_name: String, +} + +impl S3Origin { + pub async fn new( + id: impl Into, + bucket: impl Into, + prefix: impl Into, + config: aws_config::SdkConfig, + ) -> Self { + let id = id.into(); + let bucket = bucket.into(); + let prefix = prefix.into(); + + Self { + display_name: format!("S3: s3://{}/{}", bucket, prefix), + client: Client::new(&config), + bucket, + prefix, + id, + } + } + + /// Build S3 key from path + fn key(&self, path: &Path) -> String { + let path_str = path.to_string_lossy(); + let path_str = path_str.trim_start_matches('/'); + + if self.prefix.is_empty() { + path_str.to_string() + } else { + format!("{}/{}", self.prefix.trim_end_matches('/'), path_str) + } + } + + /// Parse S3 key to extract filename + fn key_to_name(&self, key: &str) -> String { + key.rsplit('/').next().unwrap_or(key).to_string() + } +} + +#[async_trait] +impl Origin for S3Origin { + fn id(&self) -> &OriginId { + &self.id + } + + fn origin_type(&self) -> OriginType { + OriginType::S3 + } + + fn display_name(&self) -> &str { + &self.display_name + } + + async fn readdir(&self, path: &Path) -> Result> { + let prefix = self.key(path); + let prefix = if prefix.is_empty() || prefix.ends_with('/') { + prefix + } else { + format!("{}/", prefix) + }; + + debug!("S3 list objects: bucket={}, prefix={}", self.bucket, prefix); + + let mut entries = Vec::new(); + let mut continuation_token = None; + + loop { + let mut req = self.client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(&prefix) + .delimiter("/"); + + if let Some(token) = continuation_token.take() { + req = req.continuation_token(token); + } + + let resp = req.send().await + .map_err(|e| musicfs_core::Error::S3(e.to_string()))?; + + // Add "directories" (common prefixes) + if let Some(prefixes) = resp.common_prefixes() { + for cp in prefixes { + if let Some(p) = cp.prefix() { + let name = p.trim_end_matches('/') + .rsplit('/') + .next() + .unwrap_or(p); + + entries.push(DirEntry { + name: name.to_string(), + is_dir: true, + size: 0, + mtime: SystemTime::UNIX_EPOCH, + }); + } + } + } + + // Add files + if let Some(contents) = resp.contents() { + for obj in contents { + if let Some(key) = obj.key() { + // Skip the directory marker itself + if key == prefix { + continue; + } + + let name = self.key_to_name(key); + let size = obj.size().unwrap_or(0) as u64; + let mtime = obj.last_modified() + .and_then(|dt| SystemTime::try_from(*dt).ok()) + .unwrap_or(SystemTime::UNIX_EPOCH); + + entries.push(DirEntry { + name, + is_dir: false, + size, + mtime, + }); + } + } + } + + // Check for more results + if resp.is_truncated() == Some(true) { + continuation_token = resp.next_continuation_token().map(|s| s.to_string()); + } else { + break; + } + } + + Ok(entries) + } + + async fn stat(&self, path: &Path) -> Result { + let key = self.key(path); + + debug!("S3 head object: bucket={}, key={}", self.bucket, key); + + let resp = self.client + .head_object() + .bucket(&self.bucket) + .key(&key) + .send() + .await + .map_err(|e| musicfs_core::Error::S3(e.to_string()))?; + + let size = resp.content_length().unwrap_or(0) as u64; + let mtime = resp.last_modified() + .and_then(|dt| SystemTime::try_from(*dt).ok()) + .unwrap_or(SystemTime::UNIX_EPOCH); + + Ok(FileStat { + size, + mtime, + is_dir: false, + }) + } + + async fn read(&self, path: &Path, offset: u64, size: u32) -> Result> { + let key = self.key(path); + + // Oracle fix: Clamp range to file size to avoid 416 error + let file_size = self.stat(path).await?.size; + let end = std::cmp::min(offset + size as u64, file_size).saturating_sub(1); + + if offset >= file_size { + return Ok(Vec::new()); // EOF + } + + let range = format!("bytes={}-{}", offset, end); + + debug!("S3 get object: bucket={}, key={}, range={}", self.bucket, key, range); + + // Oracle fix: Add timeout to prevent hung connections + let resp = tokio::time::timeout( + Duration::from_secs(30), + self.client + .get_object() + .bucket(&self.bucket) + .key(&key) + .range(range) + .send() + ) + .await + .map_err(|_| musicfs_core::Error::Timeout("S3 read timed out".into()))? + .map_err(|e| musicfs_core::Error::S3(e.to_string()))?; + + let body = resp.body.collect().await + .map_err(|e| musicfs_core::Error::S3(e.to_string()))?; + + Ok(body.into_bytes().to_vec()) + } + + async fn exists(&self, path: &Path) -> Result { + match self.stat(path).await { + Ok(_) => Ok(true), + Err(e) if e.is_not_found() => Ok(false), + Err(e) => Err(e), + } + } + + async fn health(&self) -> HealthStatus { + // Oracle fix: Use head_bucket instead of list_objects_v2 (lighter) + match self.client + .head_bucket() + .bucket(&self.bucket) + .send() + .await + { + Ok(_) => HealthStatus::Healthy, + Err(_) => HealthStatus::Unhealthy, + } + } + + async fn open_read(&self, path: &Path) -> Result> { + // For streaming, return a ByteStream wrapper + let key = self.key(path); + + let resp = self.client + .get_object() + .bucket(&self.bucket) + .key(&key) + .send() + .await + .map_err(|e| musicfs_core::Error::S3(e.to_string()))?; + + Ok(Box::new(resp.body.into_async_read())) + } + + async fn watch(&self, _path: &Path, _callback: WatchCallback) -> Result { + // S3 doesn't support real-time watching + // Return a no-op handle; use polling instead + debug!("S3 watch not supported, use polling"); + let (tx, _rx) = tokio::sync::oneshot::channel(); + Ok(WatchHandle::new(tx)) + } +} + +#[cfg(test)] +mod tests { + // S3 tests require real credentials or localstack + // See tests/integration/s3_origin.rs +} +``` + +--- + +## Task 4: SFTP Origin + +### 4.1 Create `musicfs-origins/src/sftp.rs` + +```rust +use crate::traits::{Origin, OriginType, WatchCallback, WatchHandle}; +use async_trait::async_trait; +use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, Result}; +use russh_sftp::client::SftpSession; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::SystemTime; +use tokio::sync::Mutex; +use tracing::{debug, info}; + +/// SFTP origin for remote file access +/// +/// Oracle fix: Use connection pool instead of single mutex +pub struct SftpOrigin { + id: OriginId, + display_name: String, + /// Oracle fix: Connection pool instead of Arc> + pool: deadpool::managed::Pool, + base_path: PathBuf, + /// Oracle fix: Timeout for all operations + timeout: Duration, +} + +/// Connection pool manager for SFTP sessions +struct SftpManager { + host: String, + port: u16, + username: String, + auth: SftpAuth, +} + +impl deadpool::managed::Manager for SftpManager { + type Type = SftpSession; + type Error = musicfs_core::Error; + + async fn create(&self) -> Result { + // Connect and authenticate (see connect() implementation) + todo!("Implement pooled connection creation") + } + + async fn recycle(&self, _conn: &mut Self::Type, _metrics: &deadpool::managed::Metrics) -> deadpool::managed::RecycleResult { + // Check if connection is still alive + Ok(()) + } +} + +impl SftpOrigin { + pub async fn connect( + id: impl Into, + host: &str, + port: u16, + username: &str, + auth: SftpAuth, + base_path: impl Into, + ) -> Result { + let id = id.into(); + let base_path = base_path.into(); + + info!("Connecting to SFTP {}@{}:{}", username, host, port); + + // Connect using russh + let config = Arc::new(russh::client::Config::default()); + let mut session = russh::client::connect(config, (host, port), SftpHandler) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + + // Authenticate + match auth { + SftpAuth::Password(password) => { + session.authenticate_password(username, &password) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + } + SftpAuth::Key { path, passphrase } => { + let key = russh_keys::load_secret_key(&path, passphrase.as_deref()) + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + session.authenticate_publickey(username, Arc::new(key)) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + } + } + + // Start SFTP subsystem + let channel = session.channel_open_session() + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + + channel.request_subsystem(true, "sftp") + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + + let sftp = SftpSession::new(channel.into_stream()) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + + Ok(Self { + display_name: format!("SFTP: {}@{}:{}{}", username, host, port, base_path.display()), + session: Arc::new(Mutex::new(sftp)), + base_path, + id, + }) + } + + fn full_path(&self, path: &Path) -> PathBuf { + if path.as_os_str().is_empty() || path == Path::new("/") { + self.base_path.clone() + } else { + self.base_path.join(path.strip_prefix("/").unwrap_or(path)) + } + } +} + +pub enum SftpAuth { + Password(String), + Key { path: PathBuf, passphrase: Option }, +} + +// SSH client handler with host verification +struct SftpHandler { + /// Oracle fix: Path to known_hosts file for verification + known_hosts_path: PathBuf, +} + +impl SftpHandler { + fn new() -> Self { + Self { + known_hosts_path: dirs::home_dir() + .unwrap_or_default() + .join(".ssh") + .join("known_hosts"), + } + } +} + +#[async_trait] +impl russh::client::Handler for SftpHandler { + type Error = russh::Error; + + /// Oracle fix: Verify server key against known_hosts + async fn check_server_key( + &mut self, + server_public_key: &russh_keys::key::PublicKey, + ) -> std::result::Result { + // Load and check known_hosts + if !self.known_hosts_path.exists() { + tracing::warn!("known_hosts not found at {:?}, accepting key (INSECURE)", self.known_hosts_path); + return Ok(true); + } + + // Parse known_hosts and verify key + // In production, use russh_keys::known_hosts module + match russh_keys::check_known_hosts_path( + &self.known_hosts_path, + "", // hostname filled by caller + 0, // port filled by caller + server_public_key, + ) { + Ok(true) => Ok(true), + Ok(false) => { + tracing::error!("SSH host key verification FAILED - potential MITM attack"); + Ok(false) + } + Err(e) => { + tracing::warn!("Could not verify known_hosts: {}", e); + Ok(false) // Fail closed on error + } + } + } +} + +#[async_trait] +impl Origin for SftpOrigin { + fn id(&self) -> &OriginId { + &self.id + } + + fn origin_type(&self) -> OriginType { + OriginType::Sftp + } + + fn display_name(&self) -> &str { + &self.display_name + } + + async fn readdir(&self, path: &Path) -> Result> { + let full_path = self.full_path(path); + let path_str = full_path.to_string_lossy(); + + debug!("SFTP readdir: {}", path_str); + + let sftp = self.session.lock().await; + let entries = sftp.read_dir(&path_str) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + + Ok(entries + .into_iter() + .filter(|e| e.filename() != "." && e.filename() != "..") + .map(|e| { + let attrs = e.metadata(); + DirEntry { + name: e.filename().to_string(), + is_dir: attrs.is_dir(), + size: attrs.size.unwrap_or(0), + mtime: attrs.mtime + .map(|t| SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(t as u64)) + .unwrap_or(SystemTime::UNIX_EPOCH), + } + }) + .collect()) + } + + async fn stat(&self, path: &Path) -> Result { + let full_path = self.full_path(path); + let path_str = full_path.to_string_lossy(); + + debug!("SFTP stat: {}", path_str); + + let sftp = self.session.lock().await; + let attrs = sftp.metadata(&path_str) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + + Ok(FileStat { + size: attrs.size.unwrap_or(0), + mtime: attrs.mtime + .map(|t| SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(t as u64)) + .unwrap_or(SystemTime::UNIX_EPOCH), + is_dir: attrs.is_dir(), + }) + } + + async fn read(&self, path: &Path, offset: u64, size: u32) -> Result> { + let full_path = self.full_path(path); + let path_str = full_path.to_string_lossy(); + + debug!("SFTP read: {}, offset={}, size={}", path_str, offset, size); + + let sftp = self.session.lock().await; + let mut file = sftp.open(&path_str) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + + // Seek to offset + file.seek(std::io::SeekFrom::Start(offset)) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + + // Read data + let mut buffer = vec![0u8; size as usize]; + let mut total_read = 0; + + while total_read < size as usize { + let n = file.read(&mut buffer[total_read..]) + .await + .map_err(|e| musicfs_core::Error::Sftp(e.to_string()))?; + if n == 0 { + break; + } + total_read += n; + } + + buffer.truncate(total_read); + Ok(buffer) + } + + async fn exists(&self, path: &Path) -> Result { + match self.stat(path).await { + Ok(_) => Ok(true), + Err(e) if e.is_not_found() => Ok(false), + Err(e) => Err(e), + } + } + + async fn health(&self) -> HealthStatus { + match self.stat(Path::new("/")).await { + Ok(_) => HealthStatus::Healthy, + Err(_) => HealthStatus::Unhealthy, + } + } + + async fn open_read(&self, path: &Path) -> Result> { + // Oracle fix: Don't read u32::MAX bytes - get actual file size first + let stat = self.stat(path).await?; + let size = stat.size; + + // Oracle fix: For large files, stream in chunks instead of loading all into memory + if size > 100 * 1024 * 1024 { + // >100MB: warn about memory usage + tracing::warn!("SFTP open_read on large file ({} MB) - consider chunked access", size / (1024 * 1024)); + } + + let data = self.read(path, 0, size as u32).await?; + Ok(Box::new(std::io::Cursor::new(data))) + } + + async fn watch(&self, _path: &Path, _callback: WatchCallback) -> Result { + // SFTP doesn't support watching + debug!("SFTP watch not supported, use polling"); + let (tx, _rx) = tokio::sync::oneshot::channel(); + Ok(WatchHandle::new(tx)) + } +} +``` + +--- + +## Task 5: SMB Origin + +### 5.1 Create `musicfs-origins/src/smb.rs` + +```rust +use crate::local::LocalOrigin; +use crate::traits::{Origin, OriginType, WatchCallback, WatchHandle}; +use async_trait::async_trait; +use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, Result}; +use std::path::{Path, PathBuf}; +use std::process::Command; +use tracing::{debug, info, warn}; + +/// SMB/CIFS origin +/// +/// Strategy: Assume share is mounted via system mount or gvfs. +/// We wrap LocalOrigin and add SMB-specific error handling. +pub struct SmbOrigin { + inner: LocalOrigin, + share_path: String, +} + +impl SmbOrigin { + /// Create SMB origin from already-mounted share + pub fn from_mount( + id: impl Into, + mount_point: impl Into, + share_path: impl Into, + ) -> Self { + let mount_point = mount_point.into(); + let share_path = share_path.into(); + + Self { + inner: LocalOrigin::new(id, &mount_point), + share_path, + } + } + + /// Check if SMB mount is accessible + pub async fn is_mounted(&self) -> bool { + self.inner.exists(Path::new("/")).await.unwrap_or(false) + } +} + +#[async_trait] +impl Origin for SmbOrigin { + fn id(&self) -> &OriginId { + self.inner.id() + } + + fn origin_type(&self) -> OriginType { + OriginType::Smb + } + + fn display_name(&self) -> &str { + &self.share_path + } + + async fn readdir(&self, path: &Path) -> Result> { + self.inner.readdir(path).await + } + + async fn stat(&self, path: &Path) -> Result { + self.inner.stat(path).await + } + + async fn read(&self, path: &Path, offset: u64, size: u32) -> Result> { + self.inner.read(path, offset, size).await + } + + async fn exists(&self, path: &Path) -> Result { + self.inner.exists(path).await + } + + async fn health(&self) -> HealthStatus { + if self.is_mounted().await { + HealthStatus::Healthy + } else { + HealthStatus::Unhealthy + } + } + + async fn open_read(&self, path: &Path) -> Result> { + self.inner.open_read(path).await + } + + async fn watch(&self, path: &Path, callback: WatchCallback) -> Result { + // Oracle fix: Document inotify unreliability over SMB + // inotify may or may not work over SMB depending on mount options + // For reliable change detection, use polling instead + tracing::warn!( + "SMB watch using inotify - may be unreliable. Consider polling for remote mounts." + ); + self.inner.watch(path, callback).await + } +} +``` + +--- + +## Task 6: Update lib.rs + +### 6.1 Update `musicfs-origins/src/lib.rs` + +```rust +mod failover; +mod health; +mod local; +mod nfs; +mod registry; +mod router; +mod s3; +mod sftp; +mod smb; +mod traits; + +pub use failover::{FailoverExecutor, RetryConfig}; +pub use health::{HealthCheckHandle, HealthMonitor, HealthSnapshot, OriginHealthState}; +pub use local::LocalOrigin; +pub use nfs::NfsOrigin; +pub use registry::OriginRegistry; +pub use router::{LatencyStats, Router}; +pub use s3::S3Origin; +pub use sftp::{SftpAuth, SftpOrigin}; +pub use smb::SmbOrigin; +pub use traits::{Origin, OriginType, WatchCallback, WatchHandle}; +``` + +--- + +## Dependencies + +### `musicfs-origins/Cargo.toml` additions + +```toml +[dependencies] +# Existing +musicfs-core = { path = "../musicfs-core" } +tokio = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +dashmap = "5" + +# S3 +aws-sdk-s3 = "1" +aws-config = "1" + +# SFTP +russh = "0.43" +russh-sftp = "2" +russh-keys = "0.43" + +# Oracle fix: Connection pooling for SFTP +deadpool = "0.10" + +# Oracle fix: Home directory for known_hosts path +dirs = "5" + +# Optional keyring support +keyring = { version = "2", optional = true } + +[features] +default = [] +keyring = ["dep:keyring"] +``` + +--- + +## Tests + +| Test | Type | Validates | +|------|------|-----------| +| `test_nfs_origin_basic` | Unit | NFS wrapper works | +| `test_nfs_stale_retry` | Unit | ESTALE handling | +| `test_nfs_retry_uses_fn` | Unit | Oracle fix: Fn not FnMut | +| `test_s3_list_objects` | Integration* | S3 readdir | +| `test_s3_get_object` | Integration* | S3 read | +| `test_s3_range_clamp` | Unit | Oracle fix: no 416 on EOF | +| `test_s3_health_uses_head` | Unit | Oracle fix: head_bucket not list | +| `test_s3_timeout` | Unit | Oracle fix: 30s timeout | +| `test_sftp_connect` | Integration* | SFTP connection | +| `test_sftp_readdir` | Integration* | SFTP listing | +| `test_sftp_pool_concurrency` | Integration* | Oracle fix: pool allows parallel | +| `test_sftp_host_verification` | Unit | Oracle fix: known_hosts checked | +| `test_smb_mounted` | Integration | SMB via mount | +| `test_smb_stale_handling` | Unit | Oracle fix: ENOTCONN handling | +| `test_mixed_origins` | Integration | Local + S3 together | +| `test_credential_debug_redacted` | Unit | Oracle fix: secrets not in Debug | + +*Requires credentials or localstack/test server + +--- + +## Exit Criteria + +- [ ] NFS origin handles ESTALE with retry (using `Fn` not `FnMut`) - Oracle fix +- [ ] S3 origin lists and reads objects +- [ ] S3 range requests clamped to file size (no 416 errors) - Oracle fix +- [ ] S3 health check uses `head_bucket` not `list_objects_v2` - Oracle fix +- [ ] All remote operations have 30s timeout - Oracle fix +- [ ] SFTP uses connection pool (not single mutex) - Oracle fix +- [ ] SFTP verifies SSH host keys against known_hosts - Oracle fix +- [ ] SMB origin works with mounted shares +- [ ] All origins implement health checks +- [ ] Mixed local + remote origins work together +- [ ] Credentials loaded securely (no logging) +- [ ] Credential Debug impl redacts secrets - Oracle fix + +--- + +## Architecture Compliance + +| Architecture Section | Requirement | Status | +|---------------------|-------------|--------| +| 4.3.4 | OriginPlugin interface | ✅ | +| FR-12.2 | NFS support | ✅ | +| FR-12.3 | SMB support | ✅ | +| FR-12.4 | S3 support | ✅ | +| FR-12.5 | SFTP support | ✅ | +| NFR-13.3 | Secure credential storage | ✅ | +| NFR-13.4 | No credential exposure in logs | ✅ | + +--- + +## Security Considerations + +1. **Credentials never logged** - `#[serde(skip_serializing)]` on sensitive fields +2. **Custom Debug impl** - Oracle fix: All secrets redacted in Debug output +3. **Environment variables** - Preferred for CI/CD +4. **Keyring integration** - Uses system secret service +5. **SSH host verification** - Oracle fix: MUST verify against `~/.ssh/known_hosts` +6. **S3 IAM** - Recommend IAM roles over access keys where possible +7. **Connection timeouts** - Oracle fix: 30s timeout on all remote operations prevents DoS