diff --git a/musicfs/Cargo.lock b/musicfs/Cargo.lock index 3787df5..847279b 100644 --- a/musicfs/Cargo.lock +++ b/musicfs/Cargo.lock @@ -432,6 +432,7 @@ dependencies = [ "hex", "musicfs-cache", "musicfs-core", + "musicfs-origins", "rmp-serde", "serde", "sled", diff --git a/musicfs/crates/musicfs-cas/Cargo.toml b/musicfs/crates/musicfs-cas/Cargo.toml index 8d484bc..eddbe9a 100644 --- a/musicfs/crates/musicfs-cas/Cargo.toml +++ b/musicfs/crates/musicfs-cas/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] musicfs-core = { path = "../musicfs-core" } +musicfs-origins = { path = "../musicfs-origins" } tokio.workspace = true tracing.workspace = true serde.workspace = true diff --git a/musicfs/crates/musicfs-cas/src/fetcher.rs b/musicfs/crates/musicfs-cas/src/fetcher.rs new file mode 100644 index 0000000..9af8361 --- /dev/null +++ b/musicfs/crates/musicfs-cas/src/fetcher.rs @@ -0,0 +1,261 @@ +use crate::{CasStore, ChunkManifest, ChunkRef}; +use musicfs_core::{Event, EventBus, FileId, FileMeta, OriginId}; +use musicfs_origins::Origin; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tracing::{debug, info}; + +pub struct ContentFetcher { + store: Arc, + origins: RwLock>>, + file_meta: RwLock>, + event_bus: Option>, +} + +impl ContentFetcher { + pub fn new(store: Arc) -> Self { + Self { + store, + origins: RwLock::new(HashMap::new()), + file_meta: RwLock::new(HashMap::new()), + event_bus: None, + } + } + + pub fn with_event_bus(store: Arc, event_bus: Arc) -> Self { + Self { + store, + origins: RwLock::new(HashMap::new()), + file_meta: RwLock::new(HashMap::new()), + event_bus: Some(event_bus), + } + } + + pub fn register_origin(&self, origin: Arc) { + let id = origin.id().clone(); + self.origins.write().unwrap().insert(id, origin); + } + + pub fn register_file(&self, meta: FileMeta) { + self.file_meta.write().unwrap().insert(meta.id, meta); + } + + pub fn register_files(&self, files: impl IntoIterator) { + let mut map = self.file_meta.write().unwrap(); + for meta in files { + map.insert(meta.id, meta); + } + } + + pub async fn fetch_file(&self, file_id: FileId) -> Result { + let meta = { + let files = self.file_meta.read().unwrap(); + files + .get(&file_id) + .cloned() + .ok_or(FetchError::FileNotFound(file_id))? + }; + + let origin = { + let origins = self.origins.read().unwrap(); + origins + .get(&meta.real_path.origin_id) + .cloned() + .ok_or_else(|| FetchError::OriginNotFound(meta.real_path.origin_id.clone()))? + }; + + info!( + "Fetching file {:?} from origin {}", + file_id, + origin.id() + ); + + let data = origin + .read(&meta.real_path.path, 0, meta.size as u32) + .await + .map_err(|e| FetchError::OriginRead(e.to_string()))?; + + let hash = self.store.put(&data).await.map_err(FetchError::Store)?; + + let manifest = ChunkManifest { + file_id, + total_size: meta.size, + chunks: vec![ChunkRef { + hash, + offset: 0, + size: data.len() as u32, + }], + }; + + debug!( + "Created manifest for {:?}: {} bytes, 1 chunk", + file_id, meta.size + ); + + Ok(manifest) + } + + pub async fn ensure_cached(&self, file_id: FileId) -> Result { + self.fetch_file(file_id).await + } + + pub fn get_file_meta(&self, file_id: FileId) -> Option { + self.file_meta.read().unwrap().get(&file_id).cloned() + } + + pub fn emit_access_event(&self, meta: &FileMeta, offset: u64, size: u32) { + if let Some(bus) = &self.event_bus { + bus.publish(Event::FileAccessed { + path: meta.virtual_path.clone(), + origin_id: meta.real_path.origin_id.clone(), + offset, + size, + }); + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum FetchError { + #[error("File not found: {0:?}")] + FileNotFound(FileId), + + #[error("Origin not found: {0}")] + OriginNotFound(OriginId), + + #[error("Origin read error: {0}")] + OriginRead(String), + + #[error("Store error: {0}")] + Store(#[from] crate::CasError), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::CasConfig; + use musicfs_core::{RealPath, VirtualPath}; + use musicfs_origins::LocalOrigin; + use std::path::PathBuf; + use std::time::SystemTime; + use tempfile::TempDir; + + #[tokio::test] + async fn test_fetch_file() { + let cas_dir = TempDir::new().unwrap(); + let origin_dir = TempDir::new().unwrap(); + + std::fs::write(origin_dir.path().join("test.flac"), b"fake audio data").unwrap(); + + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + let fetcher = ContentFetcher::new(store.clone()); + + let origin = Arc::new(LocalOrigin::new("local", origin_dir.path())); + fetcher.register_origin(origin); + + let meta = FileMeta { + id: FileId(1), + virtual_path: VirtualPath::new("/Artist/Album/test.flac"), + real_path: RealPath { + origin_id: OriginId::from("local"), + path: PathBuf::from("/test.flac"), + }, + size: 15, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + }; + fetcher.register_file(meta); + + let manifest = fetcher.fetch_file(FileId(1)).await.unwrap(); + assert_eq!(manifest.total_size, 15); + assert_eq!(manifest.chunks.len(), 1); + + let data = store.get(&manifest.chunks[0].hash).await.unwrap(); + assert_eq!(&data[..], b"fake audio data"); + } + + #[tokio::test] + async fn test_fetch_file_not_found() { + let cas_dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + let fetcher = ContentFetcher::new(store); + + let result = fetcher.fetch_file(FileId(999)).await; + assert!(matches!(result, Err(FetchError::FileNotFound(_)))); + } + + #[tokio::test] + async fn test_fetch_emits_event() { + let cas_dir = TempDir::new().unwrap(); + let origin_dir = TempDir::new().unwrap(); + std::fs::write(origin_dir.path().join("test.flac"), b"audio").unwrap(); + + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + let event_bus = Arc::new(EventBus::default()); + let mut rx = event_bus.subscribe(); + + let fetcher = ContentFetcher::with_event_bus(store, event_bus); + let origin = Arc::new(LocalOrigin::new("local", origin_dir.path())); + fetcher.register_origin(origin); + + let meta = FileMeta { + id: FileId(1), + virtual_path: VirtualPath::new("/Artist/test.flac"), + real_path: RealPath { + origin_id: OriginId::from("local"), + path: PathBuf::from("/test.flac"), + }, + size: 5, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + }; + fetcher.register_file(meta.clone()); + + fetcher.emit_access_event(&meta, 0, 5); + + let event = rx.try_recv().unwrap(); + assert!(matches!(event, Event::FileAccessed { .. })); + } + + #[tokio::test] + async fn test_fetch_origin_not_found() { + let cas_dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + let fetcher = ContentFetcher::new(store); + + let meta = FileMeta { + id: FileId(1), + virtual_path: VirtualPath::new("/test.flac"), + real_path: RealPath { + origin_id: OriginId::from("nonexistent"), + path: PathBuf::from("/test.flac"), + }, + size: 100, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + }; + fetcher.register_file(meta); + + let result = fetcher.fetch_file(FileId(1)).await; + assert!(matches!(result, Err(FetchError::OriginNotFound(_)))); + } +} diff --git a/musicfs/crates/musicfs-cas/src/lib.rs b/musicfs/crates/musicfs-cas/src/lib.rs index 235a10f..81ec6ee 100644 --- a/musicfs/crates/musicfs-cas/src/lib.rs +++ b/musicfs/crates/musicfs-cas/src/lib.rs @@ -1,7 +1,9 @@ mod chunks; +mod fetcher; mod reader; mod store; pub use chunks::{ChunkLocation, ChunkRef}; +pub use fetcher::{ContentFetcher, FetchError}; pub use reader::{ChunkManifest, FileReader, ReaderError}; pub use store::{CasConfig, CasError, CasStore, DedupStats}; diff --git a/musicfs/crates/musicfs-cas/src/reader.rs b/musicfs/crates/musicfs-cas/src/reader.rs index ce5e673..9a9c3f4 100644 --- a/musicfs/crates/musicfs-cas/src/reader.rs +++ b/musicfs/crates/musicfs-cas/src/reader.rs @@ -1,10 +1,11 @@ use crate::chunks::ChunkRef; +use crate::fetcher::{ContentFetcher, FetchError}; use crate::store::CasStore; use bytes::{Bytes, BytesMut}; use musicfs_core::FileId; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChunkManifest { @@ -33,14 +34,24 @@ impl ChunkManifest { } pub struct FileReader { - store: std::sync::Arc, + store: Arc, + fetcher: Option>, manifests: RwLock>, } impl FileReader { - pub fn new(store: std::sync::Arc) -> Self { + pub fn new(store: Arc) -> Self { Self { store, + fetcher: None, + manifests: RwLock::new(HashMap::new()), + } + } + + pub fn with_fetcher(store: Arc, fetcher: Arc) -> Self { + Self { + store, + fetcher: Some(fetcher), manifests: RwLock::new(HashMap::new()), } } @@ -50,19 +61,39 @@ impl FileReader { manifests.insert(manifest.file_id, manifest); } + async fn get_or_fetch_manifest(&self, file_id: FileId) -> Result { + { + let manifests = self.manifests.read().unwrap(); + if let Some(m) = manifests.get(&file_id) { + return Ok(m.clone()); + } + } + + let Some(fetcher) = &self.fetcher else { + return Err(ReaderError::ManifestNotFound(file_id)); + }; + + let manifest = fetcher.ensure_cached(file_id).await?; + self.manifests + .write() + .unwrap() + .insert(file_id, manifest.clone()); + Ok(manifest) + } + pub async fn read( &self, file_id: FileId, offset: u64, size: u32, ) -> Result { - let manifest = { - let manifests = self.manifests.read().unwrap(); - manifests - .get(&file_id) - .cloned() - .ok_or(ReaderError::ManifestNotFound(file_id))? - }; + let manifest = self.get_or_fetch_manifest(file_id).await?; + + if let Some(fetcher) = &self.fetcher { + if let Some(meta) = fetcher.get_file_meta(file_id) { + fetcher.emit_access_event(&meta, offset, size); + } + } if offset >= manifest.total_size { return Ok(Bytes::new()); @@ -105,6 +136,9 @@ pub enum ReaderError { #[error("Manifest not found for file {0:?}")] ManifestNotFound(FileId), + #[error("Fetch error: {0}")] + Fetch(#[from] FetchError), + #[error("CAS error: {0}")] Cas(#[from] crate::store::CasError), } @@ -123,7 +157,7 @@ mod tests { chunks_dir: dir.path().join("chunks"), ..Default::default() }; - let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + let store = Arc::new(CasStore::open(config).await.unwrap()); let data = b"Hello, World!"; let hash = store.put(data).await.unwrap(); @@ -150,7 +184,7 @@ mod tests { chunks_dir: dir.path().join("chunks"), ..Default::default() }; - let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + let store = Arc::new(CasStore::open(config).await.unwrap()); let data = b"ABCDEFGHIJ"; let hash = store.put(data).await.unwrap(); @@ -177,7 +211,7 @@ mod tests { chunks_dir: dir.path().join("chunks"), ..Default::default() }; - let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + let store = Arc::new(CasStore::open(config).await.unwrap()); let chunk1 = b"AAAA"; let chunk2 = b"BBBB"; @@ -213,7 +247,7 @@ mod tests { chunks_dir: dir.path().join("chunks"), ..Default::default() }; - let store = std::sync::Arc::new(CasStore::open(config).await.unwrap()); + let store = Arc::new(CasStore::open(config).await.unwrap()); let data = b"short"; let hash = store.put(data).await.unwrap(); diff --git a/musicfs/crates/musicfs-cas/tests/integration.rs b/musicfs/crates/musicfs-cas/tests/integration.rs index 4ecd660..916a99d 100644 --- a/musicfs/crates/musicfs-cas/tests/integration.rs +++ b/musicfs/crates/musicfs-cas/tests/integration.rs @@ -1,6 +1,7 @@ use musicfs_cache::TreeBuilder; -use musicfs_cas::{CasConfig, CasStore, ChunkManifest, ChunkRef, FileReader}; +use musicfs_cas::{CasConfig, CasStore, ChunkManifest, ChunkRef, ContentFetcher, FileReader}; use musicfs_core::{FileId, FileMeta, OriginId, RealPath, VirtualPath}; +use musicfs_origins::LocalOrigin; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use std::time::SystemTime; @@ -98,3 +99,98 @@ async fn test_deduplication() { assert_eq!(hash1, hash2); assert_eq!(size_after_first, size_after_second); } + +#[tokio::test] +async fn test_fetcher_cache_miss_flow() { + let origin_dir = TempDir::new().unwrap(); + let cas_dir = TempDir::new().unwrap(); + + let test_content = b"This is audio content that will be fetched on cache miss"; + let test_file_path = origin_dir.path().join("test.flac"); + std::fs::write(&test_file_path, test_content).unwrap(); + + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + + let origin_id = OriginId::from("test-origin"); + let origin = Arc::new(LocalOrigin::new(origin_id.clone(), origin_dir.path().to_path_buf())); + + let fetcher = ContentFetcher::new(store.clone()); + fetcher.register_origin(origin); + + let file_id = FileId(42); + let file_meta = FileMeta { + id: file_id, + virtual_path: VirtualPath::new("/Artist/Album/test.flac"), + real_path: RealPath { + origin_id, + path: PathBuf::from("/test.flac"), + }, + size: test_content.len() as u64, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + }; + fetcher.register_file(file_meta); + + let manifest = fetcher.fetch_file(file_id).await.unwrap(); + + assert_eq!(manifest.file_id, file_id); + assert_eq!(manifest.total_size, test_content.len() as u64); + assert_eq!(manifest.chunks.len(), 1); + + let chunk_data = store.get(&manifest.chunks[0].hash).await.unwrap(); + assert_eq!(&chunk_data[..], test_content); +} + +#[tokio::test] +async fn test_reader_with_fetcher_integration() { + let origin_dir = TempDir::new().unwrap(); + let cas_dir = TempDir::new().unwrap(); + + let test_content = b"Audio file content for reader integration test"; + let test_file_path = origin_dir.path().join("song.flac"); + std::fs::write(&test_file_path, test_content).unwrap(); + + let config = CasConfig { + chunks_dir: cas_dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(CasStore::open(config).await.unwrap()); + + let origin_id = OriginId::from("local"); + let origin = Arc::new(LocalOrigin::new(origin_id.clone(), origin_dir.path().to_path_buf())); + + let fetcher = ContentFetcher::new(store.clone()); + fetcher.register_origin(origin); + + let file_id = FileId(100); + let file_meta = FileMeta { + id: file_id, + virtual_path: VirtualPath::new("/Test/song.flac"), + real_path: RealPath { + origin_id, + path: PathBuf::from("/song.flac"), + }, + size: test_content.len() as u64, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + }; + fetcher.register_file(file_meta); + + let reader = FileReader::with_fetcher(store, Arc::new(fetcher)); + + let result = reader + .read(file_id, 0, test_content.len() as u32) + .await + .unwrap(); + + assert_eq!(&result[..], test_content); + + let result2 = reader.read(file_id, 0, 10).await.unwrap(); + assert_eq!(&result2[..], &test_content[..10]); +}