Implement Week 4b Origin-CAS connector for cache-miss handling

- Add ContentFetcher bridging Origin→CAS on cache miss
- Integrate fetcher into FileReader via with_fetcher() constructor
- Add get_or_fetch_manifest() for lazy manifest loading
- Emit FileAccessed events per FR-18.1 via EventBus
- Add 2 integration tests for e2e fetch flow
- Test count: 60 (was 54)
This commit is contained in:
Alexander
2026-05-12 19:04:48 +02:00
parent e575276b6f
commit c46750b1ec
6 changed files with 410 additions and 15 deletions
+1
View File
@@ -432,6 +432,7 @@ dependencies = [
"hex",
"musicfs-cache",
"musicfs-core",
"musicfs-origins",
"rmp-serde",
"serde",
"sled",
+1
View File
@@ -5,6 +5,7 @@ edition.workspace = true
[dependencies]
musicfs-core = { path = "../musicfs-core" }
musicfs-origins = { path = "../musicfs-origins" }
tokio.workspace = true
tracing.workspace = true
serde.workspace = true
+261
View File
@@ -0,0 +1,261 @@
use crate::{CasStore, ChunkManifest, ChunkRef};
use musicfs_core::{Event, EventBus, FileId, FileMeta, OriginId};
use musicfs_origins::Origin;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tracing::{debug, info};
pub struct ContentFetcher {
store: Arc<CasStore>,
origins: RwLock<HashMap<OriginId, Arc<dyn Origin>>>,
file_meta: RwLock<HashMap<FileId, FileMeta>>,
event_bus: Option<Arc<EventBus>>,
}
impl ContentFetcher {
pub fn new(store: Arc<CasStore>) -> Self {
Self {
store,
origins: RwLock::new(HashMap::new()),
file_meta: RwLock::new(HashMap::new()),
event_bus: None,
}
}
pub fn with_event_bus(store: Arc<CasStore>, event_bus: Arc<EventBus>) -> Self {
Self {
store,
origins: RwLock::new(HashMap::new()),
file_meta: RwLock::new(HashMap::new()),
event_bus: Some(event_bus),
}
}
pub fn register_origin(&self, origin: Arc<dyn Origin>) {
let id = origin.id().clone();
self.origins.write().unwrap().insert(id, origin);
}
pub fn register_file(&self, meta: FileMeta) {
self.file_meta.write().unwrap().insert(meta.id, meta);
}
pub fn register_files(&self, files: impl IntoIterator<Item = FileMeta>) {
let mut map = self.file_meta.write().unwrap();
for meta in files {
map.insert(meta.id, meta);
}
}
pub async fn fetch_file(&self, file_id: FileId) -> Result<ChunkManifest, FetchError> {
let meta = {
let files = self.file_meta.read().unwrap();
files
.get(&file_id)
.cloned()
.ok_or(FetchError::FileNotFound(file_id))?
};
let origin = {
let origins = self.origins.read().unwrap();
origins
.get(&meta.real_path.origin_id)
.cloned()
.ok_or_else(|| FetchError::OriginNotFound(meta.real_path.origin_id.clone()))?
};
info!(
"Fetching file {:?} from origin {}",
file_id,
origin.id()
);
let data = origin
.read(&meta.real_path.path, 0, meta.size as u32)
.await
.map_err(|e| FetchError::OriginRead(e.to_string()))?;
let hash = self.store.put(&data).await.map_err(FetchError::Store)?;
let manifest = ChunkManifest {
file_id,
total_size: meta.size,
chunks: vec![ChunkRef {
hash,
offset: 0,
size: data.len() as u32,
}],
};
debug!(
"Created manifest for {:?}: {} bytes, 1 chunk",
file_id, meta.size
);
Ok(manifest)
}
pub async fn ensure_cached(&self, file_id: FileId) -> Result<ChunkManifest, FetchError> {
self.fetch_file(file_id).await
}
pub fn get_file_meta(&self, file_id: FileId) -> Option<FileMeta> {
self.file_meta.read().unwrap().get(&file_id).cloned()
}
pub fn emit_access_event(&self, meta: &FileMeta, offset: u64, size: u32) {
if let Some(bus) = &self.event_bus {
bus.publish(Event::FileAccessed {
path: meta.virtual_path.clone(),
origin_id: meta.real_path.origin_id.clone(),
offset,
size,
});
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum FetchError {
#[error("File not found: {0:?}")]
FileNotFound(FileId),
#[error("Origin not found: {0}")]
OriginNotFound(OriginId),
#[error("Origin read error: {0}")]
OriginRead(String),
#[error("Store error: {0}")]
Store(#[from] crate::CasError),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CasConfig;
use musicfs_core::{RealPath, VirtualPath};
use musicfs_origins::LocalOrigin;
use std::path::PathBuf;
use std::time::SystemTime;
use tempfile::TempDir;
#[tokio::test]
async fn test_fetch_file() {
let cas_dir = TempDir::new().unwrap();
let origin_dir = TempDir::new().unwrap();
std::fs::write(origin_dir.path().join("test.flac"), b"fake audio data").unwrap();
let config = CasConfig {
chunks_dir: cas_dir.path().join("chunks"),
..Default::default()
};
let store = Arc::new(CasStore::open(config).await.unwrap());
let fetcher = ContentFetcher::new(store.clone());
let origin = Arc::new(LocalOrigin::new("local", origin_dir.path()));
fetcher.register_origin(origin);
let meta = FileMeta {
id: FileId(1),
virtual_path: VirtualPath::new("/Artist/Album/test.flac"),
real_path: RealPath {
origin_id: OriginId::from("local"),
path: PathBuf::from("/test.flac"),
},
size: 15,
mtime: SystemTime::now(),
content_hash: None,
audio: None,
};
fetcher.register_file(meta);
let manifest = fetcher.fetch_file(FileId(1)).await.unwrap();
assert_eq!(manifest.total_size, 15);
assert_eq!(manifest.chunks.len(), 1);
let data = store.get(&manifest.chunks[0].hash).await.unwrap();
assert_eq!(&data[..], b"fake audio data");
}
#[tokio::test]
async fn test_fetch_file_not_found() {
let cas_dir = TempDir::new().unwrap();
let config = CasConfig {
chunks_dir: cas_dir.path().join("chunks"),
..Default::default()
};
let store = Arc::new(CasStore::open(config).await.unwrap());
let fetcher = ContentFetcher::new(store);
let result = fetcher.fetch_file(FileId(999)).await;
assert!(matches!(result, Err(FetchError::FileNotFound(_))));
}
#[tokio::test]
async fn test_fetch_emits_event() {
let cas_dir = TempDir::new().unwrap();
let origin_dir = TempDir::new().unwrap();
std::fs::write(origin_dir.path().join("test.flac"), b"audio").unwrap();
let config = CasConfig {
chunks_dir: cas_dir.path().join("chunks"),
..Default::default()
};
let store = Arc::new(CasStore::open(config).await.unwrap());
let event_bus = Arc::new(EventBus::default());
let mut rx = event_bus.subscribe();
let fetcher = ContentFetcher::with_event_bus(store, event_bus);
let origin = Arc::new(LocalOrigin::new("local", origin_dir.path()));
fetcher.register_origin(origin);
let meta = FileMeta {
id: FileId(1),
virtual_path: VirtualPath::new("/Artist/test.flac"),
real_path: RealPath {
origin_id: OriginId::from("local"),
path: PathBuf::from("/test.flac"),
},
size: 5,
mtime: SystemTime::now(),
content_hash: None,
audio: None,
};
fetcher.register_file(meta.clone());
fetcher.emit_access_event(&meta, 0, 5);
let event = rx.try_recv().unwrap();
assert!(matches!(event, Event::FileAccessed { .. }));
}
#[tokio::test]
async fn test_fetch_origin_not_found() {
let cas_dir = TempDir::new().unwrap();
let config = CasConfig {
chunks_dir: cas_dir.path().join("chunks"),
..Default::default()
};
let store = Arc::new(CasStore::open(config).await.unwrap());
let fetcher = ContentFetcher::new(store);
let meta = FileMeta {
id: FileId(1),
virtual_path: VirtualPath::new("/test.flac"),
real_path: RealPath {
origin_id: OriginId::from("nonexistent"),
path: PathBuf::from("/test.flac"),
},
size: 100,
mtime: SystemTime::now(),
content_hash: None,
audio: None,
};
fetcher.register_file(meta);
let result = fetcher.fetch_file(FileId(1)).await;
assert!(matches!(result, Err(FetchError::OriginNotFound(_))));
}
}
+2
View File
@@ -1,7 +1,9 @@
mod chunks;
mod fetcher;
mod reader;
mod store;
pub use chunks::{ChunkLocation, ChunkRef};
pub use fetcher::{ContentFetcher, FetchError};
pub use reader::{ChunkManifest, FileReader, ReaderError};
pub use store::{CasConfig, CasError, CasStore, DedupStats};
+48 -14
View File
@@ -1,10 +1,11 @@
use crate::chunks::ChunkRef;
use crate::fetcher::{ContentFetcher, FetchError};
use crate::store::CasStore;
use bytes::{Bytes, BytesMut};
use musicfs_core::FileId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkManifest {
@@ -33,14 +34,24 @@ impl ChunkManifest {
}
pub struct FileReader {
store: std::sync::Arc<CasStore>,
store: Arc<CasStore>,
fetcher: Option<Arc<ContentFetcher>>,
manifests: RwLock<HashMap<FileId, ChunkManifest>>,
}
impl FileReader {
pub fn new(store: std::sync::Arc<CasStore>) -> Self {
pub fn new(store: Arc<CasStore>) -> Self {
Self {
store,
fetcher: None,
manifests: RwLock::new(HashMap::new()),
}
}
pub fn with_fetcher(store: Arc<CasStore>, fetcher: Arc<ContentFetcher>) -> Self {
Self {
store,
fetcher: Some(fetcher),
manifests: RwLock::new(HashMap::new()),
}
}
@@ -50,19 +61,39 @@ impl FileReader {
manifests.insert(manifest.file_id, manifest);
}
async fn get_or_fetch_manifest(&self, file_id: FileId) -> Result<ChunkManifest, ReaderError> {
{
let manifests = self.manifests.read().unwrap();
if let Some(m) = manifests.get(&file_id) {
return Ok(m.clone());
}
}
let Some(fetcher) = &self.fetcher else {
return Err(ReaderError::ManifestNotFound(file_id));
};
let manifest = fetcher.ensure_cached(file_id).await?;
self.manifests
.write()
.unwrap()
.insert(file_id, manifest.clone());
Ok(manifest)
}
pub async fn read(
&self,
file_id: FileId,
offset: u64,
size: u32,
) -> Result<Bytes, ReaderError> {
let manifest = {
let manifests = self.manifests.read().unwrap();
manifests
.get(&file_id)
.cloned()
.ok_or(ReaderError::ManifestNotFound(file_id))?
};
let manifest = self.get_or_fetch_manifest(file_id).await?;
if let Some(fetcher) = &self.fetcher {
if let Some(meta) = fetcher.get_file_meta(file_id) {
fetcher.emit_access_event(&meta, offset, size);
}
}
if offset >= manifest.total_size {
return Ok(Bytes::new());
@@ -105,6 +136,9 @@ pub enum ReaderError {
#[error("Manifest not found for file {0:?}")]
ManifestNotFound(FileId),
#[error("Fetch error: {0}")]
Fetch(#[from] FetchError),
#[error("CAS error: {0}")]
Cas(#[from] crate::store::CasError),
}
@@ -123,7 +157,7 @@ mod tests {
chunks_dir: dir.path().join("chunks"),
..Default::default()
};
let store = std::sync::Arc::new(CasStore::open(config).await.unwrap());
let store = Arc::new(CasStore::open(config).await.unwrap());
let data = b"Hello, World!";
let hash = store.put(data).await.unwrap();
@@ -150,7 +184,7 @@ mod tests {
chunks_dir: dir.path().join("chunks"),
..Default::default()
};
let store = std::sync::Arc::new(CasStore::open(config).await.unwrap());
let store = Arc::new(CasStore::open(config).await.unwrap());
let data = b"ABCDEFGHIJ";
let hash = store.put(data).await.unwrap();
@@ -177,7 +211,7 @@ mod tests {
chunks_dir: dir.path().join("chunks"),
..Default::default()
};
let store = std::sync::Arc::new(CasStore::open(config).await.unwrap());
let store = Arc::new(CasStore::open(config).await.unwrap());
let chunk1 = b"AAAA";
let chunk2 = b"BBBB";
@@ -213,7 +247,7 @@ mod tests {
chunks_dir: dir.path().join("chunks"),
..Default::default()
};
let store = std::sync::Arc::new(CasStore::open(config).await.unwrap());
let store = Arc::new(CasStore::open(config).await.unwrap());
let data = b"short";
let hash = store.put(data).await.unwrap();
@@ -1,6 +1,7 @@
use musicfs_cache::TreeBuilder;
use musicfs_cas::{CasConfig, CasStore, ChunkManifest, ChunkRef, FileReader};
use musicfs_cas::{CasConfig, CasStore, ChunkManifest, ChunkRef, ContentFetcher, FileReader};
use musicfs_core::{FileId, FileMeta, OriginId, RealPath, VirtualPath};
use musicfs_origins::LocalOrigin;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
@@ -98,3 +99,98 @@ async fn test_deduplication() {
assert_eq!(hash1, hash2);
assert_eq!(size_after_first, size_after_second);
}
#[tokio::test]
async fn test_fetcher_cache_miss_flow() {
let origin_dir = TempDir::new().unwrap();
let cas_dir = TempDir::new().unwrap();
let test_content = b"This is audio content that will be fetched on cache miss";
let test_file_path = origin_dir.path().join("test.flac");
std::fs::write(&test_file_path, test_content).unwrap();
let config = CasConfig {
chunks_dir: cas_dir.path().join("chunks"),
..Default::default()
};
let store = Arc::new(CasStore::open(config).await.unwrap());
let origin_id = OriginId::from("test-origin");
let origin = Arc::new(LocalOrigin::new(origin_id.clone(), origin_dir.path().to_path_buf()));
let fetcher = ContentFetcher::new(store.clone());
fetcher.register_origin(origin);
let file_id = FileId(42);
let file_meta = FileMeta {
id: file_id,
virtual_path: VirtualPath::new("/Artist/Album/test.flac"),
real_path: RealPath {
origin_id,
path: PathBuf::from("/test.flac"),
},
size: test_content.len() as u64,
mtime: SystemTime::now(),
content_hash: None,
audio: None,
};
fetcher.register_file(file_meta);
let manifest = fetcher.fetch_file(file_id).await.unwrap();
assert_eq!(manifest.file_id, file_id);
assert_eq!(manifest.total_size, test_content.len() as u64);
assert_eq!(manifest.chunks.len(), 1);
let chunk_data = store.get(&manifest.chunks[0].hash).await.unwrap();
assert_eq!(&chunk_data[..], test_content);
}
#[tokio::test]
async fn test_reader_with_fetcher_integration() {
let origin_dir = TempDir::new().unwrap();
let cas_dir = TempDir::new().unwrap();
let test_content = b"Audio file content for reader integration test";
let test_file_path = origin_dir.path().join("song.flac");
std::fs::write(&test_file_path, test_content).unwrap();
let config = CasConfig {
chunks_dir: cas_dir.path().join("chunks"),
..Default::default()
};
let store = Arc::new(CasStore::open(config).await.unwrap());
let origin_id = OriginId::from("local");
let origin = Arc::new(LocalOrigin::new(origin_id.clone(), origin_dir.path().to_path_buf()));
let fetcher = ContentFetcher::new(store.clone());
fetcher.register_origin(origin);
let file_id = FileId(100);
let file_meta = FileMeta {
id: file_id,
virtual_path: VirtualPath::new("/Test/song.flac"),
real_path: RealPath {
origin_id,
path: PathBuf::from("/song.flac"),
},
size: test_content.len() as u64,
mtime: SystemTime::now(),
content_hash: None,
audio: None,
};
fetcher.register_file(file_meta);
let reader = FileReader::with_fetcher(store, Arc::new(fetcher));
let result = reader
.read(file_id, 0, test_content.len() as u32)
.await
.unwrap();
assert_eq!(&result[..], test_content);
let result2 = reader.read(file_id, 0, 10).await.unwrap();
assert_eq!(&result2[..], &test_content[..10]);
}