From e575276b6f7799c14978ffa187d03f51a9db53e5 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 12 May 2026 18:55:58 +0200 Subject: [PATCH] Add Week 4b plan: Origin-CAS connector for cache-miss handling - Create week-04b-origin-connector.md with ContentFetcher design - Update development-plan.md: Phase 1 now includes Week 4b - Update architecture.md: Phase 1 table includes Week 4b - Plan includes EventBus integration per FR-18.1 (Oracle-verified) --- docs/v2/architecture.md | 5 +- docs/v2/development-plan.md | 53 ++- docs/v2/plans/week-04b-origin-connector.md | 501 +++++++++++++++++++++ 3 files changed, 555 insertions(+), 4 deletions(-) create mode 100644 docs/v2/plans/week-04b-origin-connector.md diff --git a/docs/v2/architecture.md b/docs/v2/architecture.md index a59c9ec..56f6767 100644 --- a/docs/v2/architecture.md +++ b/docs/v2/architecture.md @@ -1005,7 +1005,7 @@ and better fit for systems programming. ## 7. Implementation Plan -### 7.1 Phase 1: MVP (4 weeks) +### 7.1 Phase 1: MVP (4.5 weeks) **Goal:** Basic functional filesystem with single origin. @@ -1014,7 +1014,8 @@ and better fit for systems programming. | 1 | Project setup, FUSE skeleton, local origin plugin | | 2 | Metadata extraction (symphonia), SQLite schema | | 3 | Virtual path resolver, tree cache, basic readdir/stat/read | -| 4 | CAS implementation, chunk caching, integration tests | +| 4 | CAS implementation, chunk caching, LRU eviction | +| 4b | Origin→CAS connector (ContentFetcher), cache-miss handling | **Exit Criteria:** - Mount and browse local music library diff --git a/docs/v2/development-plan.md b/docs/v2/development-plan.md index 2df81ca..a10beb6 100644 --- a/docs/v2/development-plan.md +++ b/docs/v2/development-plan.md @@ -64,12 +64,14 @@ musicfs/ --- -## 2. Phase 1: MVP (Weeks 1-4) +## 2. Phase 1: MVP (Weeks 1-4b) **Goal**: Basic functional filesystem with single local origin. **Requirements Covered**: FR-1, FR-2, FR-3, FR-4, FR-5, FR-6, FR-7, FR-8, FR-9, FR-18, NFR-1.1-1.7 +**Note**: Week 4b bridges Origin→CAS data flow (cache-miss handling) required for actual file reads. + --- ### Week 1: Foundation @@ -523,8 +525,55 @@ impl LruEviction { - [ ] Chunks stored in CAS with deduplication - [ ] Cache size limit enforced via eviction -- [ ] Audio playback works through mounted filesystem - [ ] Cache persists across daemon restarts + +**Note**: Audio playback requires Week 4b (Origin→CAS connector). + +--- + +### Week 4b: Origin-CAS Connector + +**Detailed plan**: See `plans/week-04b-origin-connector.md` + +#### Summary + +Bridges the gap between Origin (source files) and CAS (chunk cache). Without this, FUSE read() cannot return actual file content. + +#### Deliverables + +| Task | Crate | Files | Requirements | +|------|-------|-------|--------------| +| ContentFetcher | musicfs-cas | `fetcher.rs` | FR-3.2 | +| Cache-miss handling | musicfs-cas | `reader.rs` | FR-3.2 | +| FUSE integration | musicfs-fuse | `filesystem.rs` | FR-3.1-3.2 | + +#### Key Components + +```rust +pub struct ContentFetcher { + store: Arc, + origins: HashMap>, + file_meta: HashMap, +} + +impl ContentFetcher { + /// Fetch file from origin, store in CAS, return manifest + pub async fn fetch_file(&self, file_id: FileId) -> Result; +} +``` + +#### Tests + +| Test | Type | Validates | +|------|------|-----------| +| `test_fetch_file` | Unit | Origin → CAS works | +| `test_reader_cache_miss` | Unit | Fetcher called on miss | +| `test_e2e_cat_file` | E2E | `cat` returns content | + +#### Exit Criteria + +- [ ] `cat /mnt/musicfs/Artist/Album/track.flac` returns actual data +- [ ] Audio playback works through mounted filesystem - [ ] All Phase 1 requirements pass acceptance tests --- diff --git a/docs/v2/plans/week-04b-origin-connector.md b/docs/v2/plans/week-04b-origin-connector.md new file mode 100644 index 0000000..de2b0f2 --- /dev/null +++ b/docs/v2/plans/week-04b-origin-connector.md @@ -0,0 +1,501 @@ +# Week 4b: Origin-CAS Connector + +**Phase**: 1 (MVP) +**Prerequisites**: Week 4 (CAS & Chunk Caching) +**Estimated effort**: 1 day + +--- + +## Objective + +Bridge the gap between Origin (source files) and CAS (chunk cache) to enable actual file reads through FUSE. This implements the "cache miss" flow from architecture section 4.3.5. + +**Problem**: Week 4 implemented CAS storage and FileReader, but there's no code that: +1. Detects when requested chunks aren't cached +2. Fetches data from Origin +3. Stores chunks in CAS +4. Creates ChunkManifest for the file + +**Solution**: Create `ContentFetcher` that orchestrates Origin → CAS data flow on cache miss. + +--- + +## Architecture Reference + +From architecture.md section 4.3.5 (Read Operation Activity): + +``` +|CAS| +:compute chunk range for [offset, offset+size]; +if (all chunks cached?) then (yes) + :read from local chunk files; +else (no) + |OriginFederation| + :select healthy origin by priority; + :fetch missing byte range; + |CAS| + :chunk fetched data (CDC); + :store chunks by hash; + :update chunk manifest; +endif +``` + +--- + +## Deliverables + +| Task | Crate | Files | Done | +|------|-------|-------|------| +| ContentFetcher implementation | musicfs-cas | `fetcher.rs` | [ ] | +| FileId → FileMeta resolver | musicfs-cas | `fetcher.rs` | [ ] | +| Update FileReader for cache-miss | musicfs-cas | `reader.rs` | [ ] | +| Update FUSE with fetcher | musicfs-fuse | `filesystem.rs` | [ ] | +| E2E test: cat file through FUSE | tests | `integration.rs` | [ ] | + +--- + +## Task 1: ContentFetcher + +### 1.1 Create `musicfs-cas/src/fetcher.rs` + +```rust +use crate::{CasStore, ChunkManifest, ChunkRef}; +use musicfs_core::{Event, EventBus, FileId, FileMeta, OriginId, RealPath}; +use musicfs_origins::Origin; +use std::collections::HashMap; +use std::path::Path; +use std::sync::{Arc, RwLock}; +use tracing::{debug, info}; + +pub struct ContentFetcher { + store: Arc, + origins: RwLock>>, + file_meta: RwLock>, + event_bus: Option>, +} + +impl ContentFetcher { + pub fn new(store: Arc) -> Self { + Self { + store, + origins: RwLock::new(HashMap::new()), + file_meta: RwLock::new(HashMap::new()), + event_bus: None, + } + } + + pub fn with_event_bus(store: Arc, event_bus: Arc) -> Self { + Self { + store, + origins: RwLock::new(HashMap::new()), + file_meta: RwLock::new(HashMap::new()), + event_bus: Some(event_bus), + } + } + + pub fn register_origin(&self, origin: Arc) { + let id = origin.id().clone(); + self.origins.write().unwrap().insert(id, origin); + } + + pub fn register_file(&self, meta: FileMeta) { + self.file_meta.write().unwrap().insert(meta.id, meta); + } + + pub fn register_files(&self, files: impl IntoIterator) { + let mut map = self.file_meta.write().unwrap(); + for meta in files { + map.insert(meta.id, meta); + } + } + + pub async fn fetch_file(&self, file_id: FileId) -> Result { + let meta = { + let files = self.file_meta.read().unwrap(); + files.get(&file_id).cloned() + .ok_or(FetchError::FileNotFound(file_id))? + }; + + let origin = { + let origins = self.origins.read().unwrap(); + origins.get(&meta.real_path.origin_id).cloned() + .ok_or_else(|| FetchError::OriginNotFound(meta.real_path.origin_id.clone()))? + }; + + info!("Fetching file {:?} from origin {}", file_id, origin.id()); + + let data = origin.read(&meta.real_path.path, 0, meta.size as u32).await + .map_err(|e| FetchError::OriginRead(e.to_string()))?; + + let hash = self.store.put(&data).await + .map_err(FetchError::Store)?; + + let manifest = ChunkManifest { + file_id, + total_size: meta.size, + chunks: vec![ChunkRef { + hash, + offset: 0, + size: data.len() as u32, + }], + }; + + debug!("Created manifest for {:?}: {} bytes, 1 chunk", file_id, meta.size); + + Ok(manifest) + } + + pub fn emit_access_event(&self, meta: &FileMeta, offset: u64, size: u32) { + if let Some(bus) = &self.event_bus { + bus.publish(Event::FileAccessed { + path: meta.virtual_path.clone(), + origin_id: meta.real_path.origin_id.clone(), + offset, + size, + }); + } + } + + pub async fn ensure_cached(&self, file_id: FileId) -> Result { + self.fetch_file(file_id).await + } + + pub fn get_file_meta(&self, file_id: FileId) -> Option { + self.file_meta.read().unwrap().get(&file_id).cloned() + } +} + +#[derive(Debug, thiserror::Error)] +pub enum FetchError { + #[error("File not found: {0:?}")] + FileNotFound(FileId), + + #[error("Origin not found: {0}")] + OriginNotFound(OriginId), + + #[error("Origin read error: {0}")] + OriginRead(String), + + #[error("Store error: {0}")] + Store(#[from] crate::CasError), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::CasConfig; + use musicfs_core::VirtualPath; + use musicfs_origins::LocalOrigin; + use std::path::PathBuf; + use std::time::SystemTime; + use tempfile::TempDir; + + #[tokio::test] + async fn test_fetch_file() { + let cas_dir = TempDir::new().unwrap(); + let origin_dir = TempDir::new().unwrap(); + + std::fs::write(origin_dir.path().join("test.flac"), b"fake audio data").unwrap(); + + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + let fetcher = ContentFetcher::new(store.clone()); + + let origin = Arc::new(LocalOrigin::new("local", origin_dir.path())); + fetcher.register_origin(origin); + + let meta = FileMeta { + id: FileId(1), + virtual_path: VirtualPath::new("/Artist/Album/test.flac"), + real_path: RealPath { + origin_id: OriginId::from("local"), + path: PathBuf::from("/test.flac"), + }, + size: 15, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + }; + fetcher.register_file(meta); + + let manifest = fetcher.fetch_file(FileId(1)).await.unwrap(); + assert_eq!(manifest.total_size, 15); + assert_eq!(manifest.chunks.len(), 1); + + let data = store.get(&manifest.chunks[0].hash).await.unwrap(); + assert_eq!(&data[..], b"fake audio data"); + } + + #[tokio::test] + async fn test_fetch_file_not_found() { + let cas_dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + let fetcher = ContentFetcher::new(store); + + let result = fetcher.fetch_file(FileId(999)).await; + assert!(matches!(result, Err(FetchError::FileNotFound(_)))); + } + + #[tokio::test] + async fn test_fetch_emits_event() { + let cas_dir = TempDir::new().unwrap(); + let origin_dir = TempDir::new().unwrap(); + std::fs::write(origin_dir.path().join("test.flac"), b"audio").unwrap(); + + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + let event_bus = Arc::new(EventBus::default()); + let mut rx = event_bus.subscribe(); + + let fetcher = ContentFetcher::with_event_bus(store, event_bus); + let origin = Arc::new(LocalOrigin::new("local", origin_dir.path())); + fetcher.register_origin(origin); + + let meta = FileMeta { + id: FileId(1), + virtual_path: VirtualPath::new("/Artist/test.flac"), + real_path: RealPath { + origin_id: OriginId::from("local"), + path: PathBuf::from("/test.flac"), + }, + size: 5, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + }; + fetcher.register_file(meta.clone()); + + fetcher.emit_access_event(&meta, 0, 5); + + let event = rx.try_recv().unwrap(); + assert!(matches!(event, Event::FileAccessed { .. })); + } +} +``` + +--- + +## Task 2: Update FileReader + +### 2.1 Update `musicfs-cas/src/reader.rs` + +Add fetcher integration for cache-miss handling: + +```rust +use crate::fetcher::{ContentFetcher, FetchError}; + +pub struct FileReader { + store: Arc, + fetcher: Option>, + manifests: RwLock>, +} + +impl FileReader { + pub fn new(store: Arc) -> Self { + Self { + store, + fetcher: None, + manifests: RwLock::new(HashMap::new()), + } + } + + pub fn with_fetcher(store: Arc, fetcher: Arc) -> Self { + Self { + store, + fetcher: Some(fetcher), + manifests: RwLock::new(HashMap::new()), + } + } + + pub async fn read( + &self, + file_id: FileId, + offset: u64, + size: u32, + ) -> Result { + let manifest = self.get_or_fetch_manifest(file_id).await?; + + if let Some(fetcher) = &self.fetcher { + if let Some(meta) = fetcher.get_file_meta(file_id) { + fetcher.emit_access_event(&meta, offset, size); + } + } + + // ... rest of read logic unchanged + } + + async fn get_or_fetch_manifest(&self, file_id: FileId) -> Result { + { + let manifests = self.manifests.read().unwrap(); + if let Some(m) = manifests.get(&file_id) { + return Ok(m.clone()); + } + } + + let Some(fetcher) = &self.fetcher else { + return Err(ReaderError::ManifestNotFound(file_id)); + }; + + let manifest = fetcher.ensure_cached(file_id).await + .map_err(ReaderError::Fetch)?; + + self.manifests.write().unwrap().insert(file_id, manifest.clone()); + Ok(manifest) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ReaderError { + #[error("Manifest not found for file {0:?}")] + ManifestNotFound(FileId), + + #[error("Fetch error: {0}")] + Fetch(#[from] FetchError), + + #[error("CAS error: {0}")] + Cas(#[from] crate::CasError), +} +``` + +--- + +## Task 3: Update lib.rs + +### 3.1 Update `musicfs-cas/src/lib.rs` + +```rust +mod chunks; +mod fetcher; +mod reader; +mod store; + +pub use chunks::{ChunkLocation, ChunkRef}; +pub use fetcher::{ContentFetcher, FetchError}; +pub use reader::{ChunkManifest, FileReader, ReaderError}; +pub use store::{CasConfig, CasError, CasStore, DedupStats}; +``` + +--- + +## Task 4: Update Cargo.toml + +### 4.1 Update `musicfs-cas/Cargo.toml` + +```toml +[dependencies] +musicfs-core = { path = "../musicfs-core" } +musicfs-origins = { path = "../musicfs-origins" } +# ... rest unchanged +``` + +--- + +## Task 5: Update FUSE Integration + +### 5.1 Update `musicfs-fuse/src/filesystem.rs` + +```rust +use musicfs_cas::{ContentFetcher, FileReader}; + +pub struct MusicFs { + tree: Arc>, + reader: Option>, + fetcher: Option>, + uid: u32, + gid: u32, +} + +impl MusicFs { + pub fn with_content_access( + tree: Arc>, + reader: Arc, + fetcher: Arc, + ) -> Self { + Self { + tree, + reader: Some(reader), + fetcher: Some(fetcher), + uid: unsafe { libc::getuid() }, + gid: unsafe { libc::getgid() }, + } + } +} +``` + +--- + +## Tests + +| Test | Type | Validates | +|------|------|-----------| +| `test_fetch_file` | Unit | Origin → CAS fetch works | +| `test_fetch_file_not_found` | Unit | Missing file error | +| `test_fetch_emits_event` | Unit | FileAccessed event emitted (FR-18.1) | +| `test_reader_with_fetcher` | Unit | Cache-miss triggers fetch | +| `test_e2e_cat_file` | Integration | `cat` returns file content | + +--- + +## Exit Criteria + +- [ ] `ContentFetcher` fetches from Origin and stores in CAS +- [ ] `FileReader` calls fetcher on cache miss +- [ ] File metadata (FileId → FileMeta) is resolvable +- [ ] `cat /mnt/musicfs/Artist/Album/track.flac` returns actual audio data +- [ ] All existing tests still pass + +--- + +## Dependencies + +### `musicfs-cas/Cargo.toml` + +```toml +[dependencies] +musicfs-origins = { path = "../musicfs-origins" } +``` + +--- + +## Implementation Notes + +1. **Week 4 treated whole files as single chunks** - this continues that approach +2. **CDC chunking deferred to Week 5** - fetcher will be updated then +3. **No OriginFederation yet** - single origin lookup for MVP +4. **FileMeta registration** - caller must register files before they can be fetched +5. **EventBus integration** - emits `FileAccessed` event per FR-18.1 (P0) +6. **Full file fetch** - currently fetches entire file on cache miss; byte-range optimization deferred + +## Architecture Compliance + +| Architecture Section | Requirement | Status | +|---------------------|-------------|--------| +| 4.3.5 | Cache miss → fetch from origin | ✅ | +| 4.3.5 | Store chunks by hash | ✅ | +| 4.3.5 | Update chunk manifest | ✅ | +| 4.3.5 | Emit FileAccessed event | ✅ | +| 4.3.3 | OriginFederation (multi-origin) | ⏳ Deferred | +| 4.3.5 | Byte-range fetch | ⏳ Deferred | +| 4.3.5 | CDC chunking | ⏳ Week 5 | + +--- + +## Next Steps + +After this, the MVP is complete: +- Mount filesystem +- Browse virtual tree (Artist/Album/Track) +- Read actual file content through FUSE +- Audio playback works + +Week 5 adds CDC chunking for efficient delta sync.