diff --git a/musicfs/Cargo.lock b/musicfs/Cargo.lock index 0760d3b..922be35 100644 --- a/musicfs/Cargo.lock +++ b/musicfs/Cargo.lock @@ -227,6 +227,19 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core 0.9.12", +] + [[package]] name = "dirs" version = "5.0.1" @@ -674,8 +687,10 @@ version = "0.1.0" dependencies = [ "hex", "serde", + "tempfile", "thiserror", "tokio", + "toml", "xxhash-rust", ] @@ -711,6 +726,7 @@ name = "musicfs-origins" version = "0.1.0" dependencies = [ "async-trait", + "dashmap", "musicfs-core", "tempfile", "tokio", @@ -1058,6 +1074,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_spanned" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" +dependencies = [ + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1335,6 +1360,47 @@ dependencies = [ "syn", ] +[[package]] +name = "toml" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "toml_write", + "winnow", +] + +[[package]] +name = "toml_write" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" + [[package]] name = "tracing" version = "0.1.44" @@ -1612,6 +1678,15 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "winnow" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/musicfs/Cargo.toml b/musicfs/Cargo.toml index 9111ca3..f4bdc9c 100644 --- a/musicfs/Cargo.toml +++ b/musicfs/Cargo.toml @@ -23,6 +23,10 @@ anyhow = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" rmp-serde = "1" +toml = "0.8" + +# Concurrent collections +dashmap = "5" # Logging tracing = "0.1" diff --git a/musicfs/crates/musicfs-core/Cargo.toml b/musicfs/crates/musicfs-core/Cargo.toml index 8c7df9a..212f0fc 100644 --- a/musicfs/crates/musicfs-core/Cargo.toml +++ b/musicfs/crates/musicfs-core/Cargo.toml @@ -6,6 +6,10 @@ edition.workspace = true [dependencies] thiserror.workspace = true serde.workspace = true +toml.workspace = true tokio = { workspace = true, features = ["sync"] } xxhash-rust.workspace = true hex.workspace = true + +[dev-dependencies] +tempfile.workspace = true diff --git a/musicfs/crates/musicfs-core/src/config.rs b/musicfs/crates/musicfs-core/src/config.rs new file mode 100644 index 0000000..8f7f070 --- /dev/null +++ b/musicfs/crates/musicfs-core/src/config.rs @@ -0,0 +1,190 @@ +use crate::OriginId; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::PathBuf; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub mount_point: PathBuf, + pub cache_dir: PathBuf, + pub origins: Vec, + + #[serde(default)] + pub cache: CacheConfig, + + #[serde(default)] + pub health: HealthConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OriginConfig { + pub id: String, + pub origin_type: OriginType, + pub priority: u8, + + #[serde(default = "default_enabled")] + pub enabled: bool, + + #[serde(flatten)] + pub settings: HashMap, +} + +fn default_enabled() -> bool { + true +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "lowercase")] +pub enum OriginType { + Local, + Nfs, + Smb, + S3, + Sftp, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CacheConfig { + #[serde(default = "default_metadata_cache_mb")] + pub metadata_cache_mb: u64, + + #[serde(default = "default_content_cache_gb")] + pub content_cache_gb: u64, +} + +impl Default for CacheConfig { + fn default() -> Self { + Self { + metadata_cache_mb: default_metadata_cache_mb(), + content_cache_gb: default_content_cache_gb(), + } + } +} + +fn default_metadata_cache_mb() -> u64 { + 100 +} +fn default_content_cache_gb() -> u64 { + 10 +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthConfig { + #[serde(default = "default_check_interval_secs")] + pub check_interval_secs: u64, + + #[serde(default = "default_timeout_ms")] + pub timeout_ms: u64, + + #[serde(default = "default_unhealthy_threshold")] + pub unhealthy_threshold: u32, + + #[serde(default)] + pub per_origin_thresholds: HashMap, +} + +impl Default for HealthConfig { + fn default() -> Self { + let mut per_origin = HashMap::new(); + per_origin.insert(OriginType::Local, 1); + per_origin.insert(OriginType::Nfs, 3); + per_origin.insert(OriginType::Smb, 3); + per_origin.insert(OriginType::S3, 3); + per_origin.insert(OriginType::Sftp, 3); + + Self { + check_interval_secs: default_check_interval_secs(), + timeout_ms: default_timeout_ms(), + unhealthy_threshold: default_unhealthy_threshold(), + per_origin_thresholds: per_origin, + } + } +} + +impl HealthConfig { + pub fn threshold_for(&self, origin_type: OriginType) -> u32 { + self.per_origin_thresholds + .get(&origin_type) + .copied() + .unwrap_or(self.unhealthy_threshold) + } +} + +fn default_check_interval_secs() -> u64 { + 30 +} +fn default_timeout_ms() -> u64 { + 5000 +} +fn default_unhealthy_threshold() -> u32 { + 3 +} + +impl Config { + pub fn from_file(path: &std::path::Path) -> Result { + let content = + std::fs::read_to_string(path).map_err(|e| ConfigError::Read(e.to_string()))?; + toml::from_str(&content).map_err(|e| ConfigError::Parse(e.to_string())) + } + + pub fn origin_id(&self, id: &str) -> Option { + self.origins + .iter() + .find(|o| o.id == id) + .map(|_| OriginId::from(id)) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + #[error("Failed to read config: {0}")] + Read(String), + + #[error("Failed to parse config: {0}")] + Parse(String), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_config() { + let toml = r#" +mount_point = "/mnt/music" +cache_dir = "/home/user/.cache/musicfs" + +[[origins]] +id = "local" +origin_type = "local" +priority = 1 +path = "/mnt/nas/music" + +[[origins]] +id = "backup" +origin_type = "s3" +priority = 2 +bucket = "music-backup" +region = "us-east-1" +"#; + + let config: Config = toml::from_str(toml).unwrap(); + assert_eq!(config.origins.len(), 2); + assert_eq!(config.origins[0].priority, 1); + assert_eq!(config.origins[1].origin_type, OriginType::S3); + } + + #[test] + fn test_default_health_thresholds() { + let health = HealthConfig::default(); + assert_eq!(health.threshold_for(OriginType::Local), 1); + assert_eq!(health.threshold_for(OriginType::Sftp), 3); + } + + #[test] + fn test_cache_defaults() { + let cache = CacheConfig::default(); + assert_eq!(cache.metadata_cache_mb, 100); + assert_eq!(cache.content_cache_gb, 10); + } +} diff --git a/musicfs/crates/musicfs-core/src/error.rs b/musicfs/crates/musicfs-core/src/error.rs index a767e1e..97fa695 100644 --- a/musicfs/crates/musicfs-core/src/error.rs +++ b/musicfs/crates/musicfs-core/src/error.rs @@ -28,6 +28,15 @@ pub enum Error { #[error("Operation not permitted (read-only filesystem)")] ReadOnly, + + #[error("No origin available to serve request")] + NoOriginAvailable, + + #[error("Maximum retries exceeded")] + MaxRetriesExceeded, + + #[error("Origin error: {0}")] + Origin(String), } pub type Result = std::result::Result; diff --git a/musicfs/crates/musicfs-core/src/events.rs b/musicfs/crates/musicfs-core/src/events.rs index e4c3aac..b7fb005 100644 --- a/musicfs/crates/musicfs-core/src/events.rs +++ b/musicfs/crates/musicfs-core/src/events.rs @@ -60,6 +60,13 @@ pub enum Event { CacheEviction { bytes_freed: u64, }, + AllOriginsUnhealthy { + candidate_count: usize, + }, + OriginHealthChanged { + origin_id: OriginId, + healthy: bool, + }, } #[cfg(test)] diff --git a/musicfs/crates/musicfs-core/src/lib.rs b/musicfs/crates/musicfs-core/src/lib.rs index cb90d46..7ede588 100644 --- a/musicfs/crates/musicfs-core/src/lib.rs +++ b/musicfs/crates/musicfs-core/src/lib.rs @@ -1,8 +1,10 @@ +pub mod config; pub mod error; pub mod events; pub mod resolver; pub mod types; +pub use config::{CacheConfig, Config, ConfigError, HealthConfig, OriginConfig, OriginType}; pub use error::{Error, Result}; pub use events::{Event, EventBus}; pub use resolver::{PathResolver, PathTemplate}; diff --git a/musicfs/crates/musicfs-origins/Cargo.toml b/musicfs/crates/musicfs-origins/Cargo.toml index 112c0da..349a5b2 100644 --- a/musicfs/crates/musicfs-origins/Cargo.toml +++ b/musicfs/crates/musicfs-origins/Cargo.toml @@ -6,7 +6,8 @@ edition.workspace = true [dependencies] musicfs-core = { path = "../musicfs-core" } async-trait.workspace = true -tokio = { workspace = true, features = ["fs", "sync"] } +dashmap.workspace = true +tokio = { workspace = true, features = ["fs", "sync", "time"] } tracing.workspace = true [dev-dependencies] diff --git a/musicfs/crates/musicfs-origins/src/failover.rs b/musicfs/crates/musicfs-origins/src/failover.rs new file mode 100644 index 0000000..ee9f618 --- /dev/null +++ b/musicfs/crates/musicfs-origins/src/failover.rs @@ -0,0 +1,224 @@ +use crate::registry::OriginRegistry; +use crate::traits::Origin; +use musicfs_core::{Error, RealPath, Result}; +use std::sync::Arc; +use std::time::Duration; +use tracing::{debug, warn}; + +#[derive(Debug, Clone)] +pub struct RetryConfig { + pub max_attempts: u32, + pub delays: Vec, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self::spec_compliant() + } +} + +impl RetryConfig { + pub fn spec_compliant() -> Self { + Self { + max_attempts: 3, + delays: vec![ + Duration::from_millis(100), + Duration::from_millis(500), + Duration::from_millis(2000), + ], + } + } + + pub fn with_delays(delays: Vec) -> Self { + Self { + max_attempts: delays.len() as u32, + delays, + } + } + + fn delay_for_attempt(&self, attempt: u32) -> Duration { + self.delays + .get(attempt as usize) + .copied() + .unwrap_or(*self.delays.last().unwrap_or(&Duration::from_millis(100))) + } +} + +pub struct FailoverExecutor { + registry: Arc, + retry_config: RetryConfig, +} + +impl FailoverExecutor { + pub fn new(registry: Arc, retry_config: RetryConfig) -> Self { + Self { + registry, + retry_config, + } + } + + pub async fn read_with_failover( + &self, + path: &RealPath, + offset: u64, + size: u32, + ) -> Result> { + let origins = self.registry.route_all(path); + + if origins.is_empty() { + if let Some(origin) = self.registry.route_with_fallback(path) { + warn!( + "No healthy origins, using fallback origin {}", + origin.id() + ); + return self.read_with_retry(&origin, &path.path, offset, size).await; + } + return Err(Error::NoOriginAvailable); + } + + let mut last_error = None; + + for origin in origins { + let start = std::time::Instant::now(); + match self.read_with_retry(&origin, &path.path, offset, size).await { + Ok(data) => { + let latency = start.elapsed().as_millis() as u64; + self.registry.record_latency(origin.id(), latency); + return Ok(data); + } + Err(e) => { + warn!("Origin {} failed: {}, trying next", origin.id(), e); + last_error = Some(e); + } + } + } + + Err(last_error.unwrap_or(Error::NoOriginAvailable)) + } + + async fn read_with_retry( + &self, + origin: &Arc, + path: &std::path::Path, + offset: u64, + size: u32, + ) -> Result> { + for attempt in 0..self.retry_config.max_attempts { + match origin.read(path, offset, size).await { + Ok(data) => return Ok(data), + Err(e) if attempt + 1 < self.retry_config.max_attempts => { + let delay = self.retry_config.delay_for_attempt(attempt); + debug!( + "Retry {}/{} for {} after {:?}: {}", + attempt + 1, + self.retry_config.max_attempts, + origin.id(), + delay, + e + ); + tokio::time::sleep(delay).await; + } + Err(e) => return Err(e), + } + } + + Err(Error::MaxRetriesExceeded) + } + + pub async fn read_full_with_failover(&self, path: &RealPath) -> Result> { + let origins = self.registry.route_all(path); + + if origins.is_empty() { + if let Some(origin) = self.registry.route_with_fallback(path) { + warn!( + "No healthy origins for full read, using fallback {}", + origin.id() + ); + return self.read_full_with_retry(&origin, &path.path).await; + } + return Err(Error::NoOriginAvailable); + } + + let mut last_error = None; + + for origin in origins { + let start = std::time::Instant::now(); + match self.read_full_with_retry(&origin, &path.path).await { + Ok(data) => { + let latency = start.elapsed().as_millis() as u64; + self.registry.record_latency(origin.id(), latency); + return Ok(data); + } + Err(e) => { + warn!("Origin {} failed full read: {}, trying next", origin.id(), e); + last_error = Some(e); + } + } + } + + Err(last_error.unwrap_or(Error::NoOriginAvailable)) + } + + async fn read_full_with_retry( + &self, + origin: &Arc, + path: &std::path::Path, + ) -> Result> { + for attempt in 0..self.retry_config.max_attempts { + match origin.read_full(path).await { + Ok(data) => return Ok(data), + Err(e) if attempt + 1 < self.retry_config.max_attempts => { + let delay = self.retry_config.delay_for_attempt(attempt); + debug!( + "Retry full read {}/{} for {} after {:?}: {}", + attempt + 1, + self.retry_config.max_attempts, + origin.id(), + delay, + e + ); + tokio::time::sleep(delay).await; + } + Err(e) => return Err(e), + } + } + + Err(Error::MaxRetriesExceeded) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_retry_config_default() { + let config = RetryConfig::default(); + assert_eq!(config.max_attempts, 3); + assert_eq!(config.delays[0], Duration::from_millis(100)); + assert_eq!(config.delays[1], Duration::from_millis(500)); + assert_eq!(config.delays[2], Duration::from_millis(2000)); + } + + #[test] + fn test_delay_for_attempt() { + let config = RetryConfig::spec_compliant(); + + assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100)); + assert_eq!(config.delay_for_attempt(1), Duration::from_millis(500)); + assert_eq!(config.delay_for_attempt(2), Duration::from_millis(2000)); + assert_eq!(config.delay_for_attempt(10), Duration::from_millis(2000)); + } + + #[test] + fn test_custom_delays() { + let config = RetryConfig::with_delays(vec![ + Duration::from_millis(50), + Duration::from_millis(100), + ]); + + assert_eq!(config.max_attempts, 2); + assert_eq!(config.delay_for_attempt(0), Duration::from_millis(50)); + assert_eq!(config.delay_for_attempt(1), Duration::from_millis(100)); + } +} diff --git a/musicfs/crates/musicfs-origins/src/health.rs b/musicfs/crates/musicfs-origins/src/health.rs new file mode 100644 index 0000000..39aea6d --- /dev/null +++ b/musicfs/crates/musicfs-origins/src/health.rs @@ -0,0 +1,351 @@ +use crate::traits::Origin; +use dashmap::DashMap; +use musicfs_core::{Event, EventBus, HealthStatus, OriginId, OriginType}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +pub struct HealthMonitor { + origins: DashMap>, + state: DashMap, + check_interval: Duration, + default_threshold: u32, + per_type_thresholds: HashMap, + event_bus: Option>, +} + +#[derive(Debug, Clone)] +pub struct OriginHealthState { + pub status: HealthStatus, + pub last_check: Instant, + pub consecutive_failures: u32, + pub last_latency_ms: Option, +} + +impl Default for OriginHealthState { + fn default() -> Self { + Self { + status: HealthStatus::Unknown, + last_check: Instant::now(), + consecutive_failures: 0, + last_latency_ms: None, + } + } +} + +#[derive(Debug, Clone)] +pub struct HealthSnapshot { + pub healthy: Vec, + pub degraded: Vec, + pub unhealthy: Vec, + pub failure_counts: HashMap, +} + +impl HealthSnapshot { + pub fn is_healthy(&self, id: &OriginId) -> bool { + self.healthy.contains(id) + } + + pub fn is_degraded(&self, id: &OriginId) -> bool { + self.degraded.contains(id) + } + + pub fn is_unhealthy(&self, id: &OriginId) -> bool { + self.unhealthy.contains(id) + } + + pub fn failure_count(&self, id: &OriginId) -> Option { + self.failure_counts.get(id).copied() + } + + pub fn all_unhealthy(&self) -> bool { + self.healthy.is_empty() && self.degraded.is_empty() + } + + pub fn total_candidates(&self) -> usize { + self.healthy.len() + self.degraded.len() + self.unhealthy.len() + } +} + +impl HealthMonitor { + pub fn new(check_interval: Duration) -> Self { + let mut per_type = HashMap::new(); + per_type.insert(OriginType::Local, 1); + per_type.insert(OriginType::Nfs, 3); + per_type.insert(OriginType::Smb, 3); + per_type.insert(OriginType::S3, 3); + per_type.insert(OriginType::Sftp, 3); + + Self { + origins: DashMap::new(), + state: DashMap::new(), + check_interval, + default_threshold: 3, + per_type_thresholds: per_type, + event_bus: None, + } + } + + pub fn with_threshold(mut self, threshold: u32) -> Self { + self.default_threshold = threshold; + self + } + + pub fn with_per_type_thresholds(mut self, thresholds: HashMap) -> Self { + self.per_type_thresholds = thresholds; + self + } + + pub fn with_event_bus(mut self, bus: Arc) -> Self { + self.event_bus = Some(bus); + self + } + + fn threshold_for(&self, origin_type: OriginType) -> u32 { + self.per_type_thresholds + .get(&origin_type) + .copied() + .unwrap_or(self.default_threshold) + } + + pub fn add_origin(&self, origin: Arc) { + let id = origin.id().clone(); + self.origins.insert(id.clone(), origin); + self.state.insert(id, OriginHealthState::default()); + } + + pub fn remove_origin(&self, id: &OriginId) { + self.origins.remove(id); + self.state.remove(id); + } + + pub fn snapshot(&self) -> HealthSnapshot { + let mut healthy = Vec::new(); + let mut degraded = Vec::new(); + let mut unhealthy = Vec::new(); + let mut failure_counts = HashMap::new(); + + for entry in self.state.iter() { + let id = entry.key().clone(); + failure_counts.insert(id.clone(), entry.value().consecutive_failures); + + match entry.value().status { + HealthStatus::Healthy => healthy.push(id), + HealthStatus::Degraded => degraded.push(id), + HealthStatus::Unhealthy => unhealthy.push(id), + HealthStatus::Unknown => degraded.push(id), + } + } + + HealthSnapshot { + healthy, + degraded, + unhealthy, + failure_counts, + } + } + + pub fn start(self: Arc) -> HealthCheckHandle { + let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); + let monitor = self.clone(); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(monitor.check_interval); + + loop { + tokio::select! { + _ = interval.tick() => { + monitor.check_all().await; + } + _ = stop_rx.recv() => { + info!("Health monitor stopping"); + break; + } + } + } + }); + + HealthCheckHandle { stop_tx } + } + + async fn check_all(&self) { + let origins: Vec<_> = self + .origins + .iter() + .map(|e| (e.key().clone(), e.value().clone())) + .collect(); + + for (id, origin) in origins { + self.check_one(&id, &origin).await; + } + } + + async fn check_one(&self, id: &OriginId, origin: &Arc) { + let start = Instant::now(); + let status = origin.health().await; + let latency_ms = start.elapsed().as_millis() as u64; + + let threshold = self.threshold_for(origin.origin_type()); + let prev_healthy = self + .state + .get(id) + .map(|s| s.status == HealthStatus::Healthy) + .unwrap_or(false); + + let mut state = self.state.entry(id.clone()).or_default(); + + match status { + HealthStatus::Healthy => { + if state.status != HealthStatus::Healthy { + info!("Origin {} is now healthy", id); + } + state.status = HealthStatus::Healthy; + state.consecutive_failures = 0; + } + HealthStatus::Degraded => { + if state.status != HealthStatus::Degraded { + warn!("Origin {} is degraded", id); + } + state.status = HealthStatus::Degraded; + } + HealthStatus::Unhealthy => { + state.consecutive_failures += 1; + if state.consecutive_failures >= threshold { + if state.status != HealthStatus::Unhealthy { + warn!( + "Origin {} is now unhealthy ({} failures)", + id, state.consecutive_failures + ); + } + state.status = HealthStatus::Unhealthy; + } else { + debug!( + "Origin {} check failed ({}/{})", + id, state.consecutive_failures, threshold + ); + state.status = HealthStatus::Degraded; + } + } + HealthStatus::Unknown => { + state.status = HealthStatus::Unknown; + } + } + + state.last_check = Instant::now(); + state.last_latency_ms = Some(latency_ms); + + let now_healthy = state.status == HealthStatus::Healthy; + if prev_healthy != now_healthy { + if let Some(bus) = &self.event_bus { + bus.publish(Event::OriginHealthChanged { + origin_id: id.clone(), + healthy: now_healthy, + }); + } + } + } + + pub async fn check_now(&self, id: &OriginId) { + if let Some(origin) = self.origins.get(id) { + self.check_one(id, &origin.clone()).await; + } + } + + pub fn get_state(&self, id: &OriginId) -> Option { + self.state.get(id).map(|e| e.value().clone()) + } +} + +pub struct HealthCheckHandle { + stop_tx: mpsc::Sender<()>, +} + +impl HealthCheckHandle { + pub async fn stop(self) { + let _ = self.stop_tx.send(()).await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::LocalOrigin; + use tempfile::TempDir; + + #[tokio::test] + async fn test_health_monitor_basic() { + let monitor = HealthMonitor::new(Duration::from_secs(30)); + + let dir = TempDir::new().unwrap(); + let origin = Arc::new(LocalOrigin::new("test", dir.path())); + + monitor.add_origin(origin); + + let snapshot = monitor.snapshot(); + assert!(!snapshot.is_healthy(&OriginId::from("test"))); + } + + #[tokio::test] + async fn test_health_check() { + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30))); + + let dir = TempDir::new().unwrap(); + let origin = Arc::new(LocalOrigin::new("test", dir.path())); + + monitor.add_origin(origin); + monitor.check_now(&OriginId::from("test")).await; + + let snapshot = monitor.snapshot(); + assert!(snapshot.is_healthy(&OriginId::from("test"))); + } + + #[tokio::test] + async fn test_failure_tracking() { + let mut thresholds = HashMap::new(); + thresholds.insert(OriginType::Local, 3); + + let monitor = HealthMonitor::new(Duration::from_secs(30)) + .with_per_type_thresholds(thresholds); + + let origin = Arc::new(LocalOrigin::new("missing", std::path::Path::new("/nonexistent"))); + monitor.add_origin(origin); + + monitor.check_now(&OriginId::from("missing")).await; + let state = monitor.get_state(&OriginId::from("missing")).unwrap(); + assert_eq!(state.consecutive_failures, 1); + assert_eq!(state.status, HealthStatus::Degraded); + + monitor.check_now(&OriginId::from("missing")).await; + monitor.check_now(&OriginId::from("missing")).await; + + let state = monitor.get_state(&OriginId::from("missing")).unwrap(); + assert_eq!(state.consecutive_failures, 3); + assert_eq!(state.status, HealthStatus::Unhealthy); + } + + #[tokio::test] + async fn test_local_origin_threshold_is_one() { + let monitor = HealthMonitor::new(Duration::from_secs(30)); + + let origin = Arc::new(LocalOrigin::new("missing", std::path::Path::new("/nonexistent"))); + monitor.add_origin(origin); + + monitor.check_now(&OriginId::from("missing")).await; + let state = monitor.get_state(&OriginId::from("missing")).unwrap(); + assert_eq!(state.consecutive_failures, 1); + assert_eq!(state.status, HealthStatus::Unhealthy); + } + + #[test] + fn test_snapshot_all_unhealthy() { + let snapshot = HealthSnapshot { + healthy: Vec::new(), + degraded: Vec::new(), + unhealthy: vec![OriginId::from("a")], + failure_counts: HashMap::new(), + }; + assert!(snapshot.all_unhealthy()); + } +} diff --git a/musicfs/crates/musicfs-origins/src/lib.rs b/musicfs/crates/musicfs-origins/src/lib.rs index f15d6e0..576c219 100644 --- a/musicfs/crates/musicfs-origins/src/lib.rs +++ b/musicfs/crates/musicfs-origins/src/lib.rs @@ -1,5 +1,14 @@ +mod failover; +mod health; mod local; +mod registry; +mod router; mod traits; +pub use failover::{FailoverExecutor, RetryConfig}; +pub use health::{HealthCheckHandle, HealthMonitor, HealthSnapshot, OriginHealthState}; pub use local::LocalOrigin; -pub use traits::{Origin, OriginType, WatchCallback, WatchEvent, WatchHandle}; +pub use registry::OriginRegistry; +pub use router::{LatencyStats, Router}; +pub use musicfs_core::OriginType; +pub use traits::{Origin, WatchCallback, WatchEvent, WatchHandle}; diff --git a/musicfs/crates/musicfs-origins/src/local.rs b/musicfs/crates/musicfs-origins/src/local.rs index 72a71ad..2f49bf8 100644 --- a/musicfs/crates/musicfs-origins/src/local.rs +++ b/musicfs/crates/musicfs-origins/src/local.rs @@ -1,6 +1,6 @@ -use crate::traits::{Origin, OriginType, WatchCallback, WatchHandle}; +use crate::traits::{Origin, WatchCallback, WatchHandle}; use async_trait::async_trait; -use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, Result}; +use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, OriginType, Result}; use std::path::{Path, PathBuf}; use tokio::fs; use tokio::io::AsyncRead; diff --git a/musicfs/crates/musicfs-origins/src/registry.rs b/musicfs/crates/musicfs-origins/src/registry.rs new file mode 100644 index 0000000..d0a28b2 --- /dev/null +++ b/musicfs/crates/musicfs-origins/src/registry.rs @@ -0,0 +1,215 @@ +use crate::health::{HealthMonitor, HealthSnapshot}; +use crate::router::Router; +use crate::traits::{Origin, WatchHandle}; +use musicfs_core::{OriginId, RealPath}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tracing::{info, warn}; + +pub struct OriginRegistry { + origins: RwLock>>, + router: Router, + health_monitor: Arc, + watch_handles: RwLock>>, +} + +impl OriginRegistry { + pub fn new(health_monitor: Arc) -> Self { + Self { + origins: RwLock::new(HashMap::new()), + router: Router::new(), + health_monitor, + watch_handles: RwLock::new(HashMap::new()), + } + } + + pub fn register(&self, origin: Arc, priority: u8) { + let id = origin.id().clone(); + info!("Registering origin {} with priority {}", id, priority); + + self.router.set_priority(id.clone(), priority); + self.health_monitor.add_origin(origin.clone()); + self.origins.write().unwrap().insert(id, origin); + } + + pub fn unregister(&self, id: &OriginId) { + info!("Unregistering origin {}", id); + + if let Some(handles) = self.watch_handles.write().unwrap().remove(id) { + info!("Dropping {} watch handles for origin {}", handles.len(), id); + } + + self.origins.write().unwrap().remove(id); + self.router.remove_priority(id); + self.health_monitor.remove_origin(id); + } + + pub fn register_watch(&self, origin_id: &OriginId, handle: WatchHandle) { + self.watch_handles + .write() + .unwrap() + .entry(origin_id.clone()) + .or_default() + .push(handle); + } + + pub fn get(&self, id: &OriginId) -> Option> { + self.origins.read().unwrap().get(id).cloned() + } + + pub fn list(&self) -> Vec> { + self.origins.read().unwrap().values().cloned().collect() + } + + pub fn route(&self, path: &RealPath) -> Option> { + let origins = self.origins.read().unwrap(); + let health = self.health_monitor.snapshot(); + + let candidates: Vec<_> = origins + .iter() + .filter(|(id, _)| self.can_serve(id, path)) + .map(|(id, origin)| (id.clone(), origin.clone())) + .collect(); + + if candidates.is_empty() { + warn!("No origin can serve path: {:?}", path); + return None; + } + + let candidate_ids: Vec<_> = candidates.iter().map(|(id, _)| id.clone()).collect(); + let selected = self.router.select(&candidate_ids, &health)?; + + candidates + .into_iter() + .find(|(id, _)| id == &selected) + .map(|(_, origin)| origin) + } + + pub fn route_with_fallback(&self, path: &RealPath) -> Option> { + let origins = self.origins.read().unwrap(); + let health = self.health_monitor.snapshot(); + + let candidates: Vec<_> = origins + .iter() + .filter(|(id, _)| self.can_serve(id, path)) + .map(|(id, origin)| (id.clone(), origin.clone())) + .collect(); + + if candidates.is_empty() { + return None; + } + + let candidate_ids: Vec<_> = candidates.iter().map(|(id, _)| id.clone()).collect(); + let selected = self.router.select_with_fallback(&candidate_ids, &health)?; + + candidates + .into_iter() + .find(|(id, _)| id == &selected) + .map(|(_, origin)| origin) + } + + pub fn route_all(&self, path: &RealPath) -> Vec> { + let origins = self.origins.read().unwrap(); + let health = self.health_monitor.snapshot(); + + let mut result: Vec<_> = origins + .iter() + .filter(|(id, _)| self.can_serve(id, path) && health.is_healthy(id)) + .map(|(_, origin)| origin.clone()) + .collect(); + + result.sort_by_key(|o| self.router.get_priority(o.id())); + result + } + + fn can_serve(&self, origin_id: &OriginId, path: &RealPath) -> bool { + path.origin_id == *origin_id + } + + pub fn health(&self) -> HealthSnapshot { + self.health_monitor.snapshot() + } + + pub fn record_latency(&self, id: &OriginId, latency_ms: u64) { + self.router.record_latency(id, latency_ms); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::LocalOrigin; + use std::path::PathBuf; + use std::time::Duration; + use tempfile::TempDir; + + #[test] + fn test_register_and_get() { + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30))); + let registry = OriginRegistry::new(monitor); + + let dir = TempDir::new().unwrap(); + let origin = Arc::new(LocalOrigin::new("test", dir.path())); + + registry.register(origin.clone(), 1); + + let retrieved = registry.get(&OriginId::from("test")); + assert!(retrieved.is_some()); + } + + #[test] + fn test_unregister() { + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30))); + let registry = OriginRegistry::new(monitor); + + let dir = TempDir::new().unwrap(); + let origin = Arc::new(LocalOrigin::new("test", dir.path())); + + registry.register(origin, 1); + registry.unregister(&OriginId::from("test")); + + assert!(registry.get(&OriginId::from("test")).is_none()); + } + + #[tokio::test] + async fn test_route_by_priority() { + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30))); + let registry = OriginRegistry::new(monitor.clone()); + + let dir1 = TempDir::new().unwrap(); + let dir2 = TempDir::new().unwrap(); + + let origin1 = Arc::new(LocalOrigin::new("primary", dir1.path())); + let origin2 = Arc::new(LocalOrigin::new("backup", dir2.path())); + + registry.register(origin1, 1); + registry.register(origin2, 2); + + monitor.check_now(&OriginId::from("primary")).await; + monitor.check_now(&OriginId::from("backup")).await; + + let path = RealPath { + origin_id: OriginId::from("primary"), + path: PathBuf::from("/test.flac"), + }; + + let routed = registry.route(&path); + assert!(routed.is_some()); + assert_eq!(routed.unwrap().id(), &OriginId::from("primary")); + } + + #[test] + fn test_list_origins() { + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30))); + let registry = OriginRegistry::new(monitor); + + let dir1 = TempDir::new().unwrap(); + let dir2 = TempDir::new().unwrap(); + + registry.register(Arc::new(LocalOrigin::new("a", dir1.path())), 1); + registry.register(Arc::new(LocalOrigin::new("b", dir2.path())), 2); + + let list = registry.list(); + assert_eq!(list.len(), 2); + } +} diff --git a/musicfs/crates/musicfs-origins/src/router.rs b/musicfs/crates/musicfs-origins/src/router.rs new file mode 100644 index 0000000..add63bb --- /dev/null +++ b/musicfs/crates/musicfs-origins/src/router.rs @@ -0,0 +1,225 @@ +use crate::health::HealthSnapshot; +use dashmap::DashMap; +use musicfs_core::{Event, EventBus, OriginId}; +use std::sync::Arc; +use std::time::Instant; +use tracing::{debug, warn}; + +pub struct Router { + priorities: DashMap, + latency_stats: DashMap, + event_bus: Option>, +} + +#[derive(Debug, Clone, Default)] +pub struct LatencyStats { + pub samples: Vec, + pub p50_ms: u64, + pub p99_ms: u64, + pub last_update: Option, +} + +impl LatencyStats { + pub fn record(&mut self, latency_ms: u64) { + self.samples.push(latency_ms); + + if self.samples.len() > 100 { + self.samples.remove(0); + } + + if !self.samples.is_empty() { + let mut sorted = self.samples.clone(); + sorted.sort_unstable(); + + let p50_idx = sorted.len() / 2; + let p99_idx = (sorted.len() * 99) / 100; + + self.p50_ms = sorted[p50_idx]; + self.p99_ms = sorted.get(p99_idx).copied().unwrap_or(self.p50_ms); + } + + self.last_update = Some(Instant::now()); + } +} + +impl Router { + pub fn new() -> Self { + Self { + priorities: DashMap::new(), + latency_stats: DashMap::new(), + event_bus: None, + } + } + + pub fn with_event_bus(mut self, bus: Arc) -> Self { + self.event_bus = Some(bus); + self + } + + pub fn set_priority(&self, id: OriginId, priority: u8) { + self.priorities.insert(id, priority); + } + + pub fn remove_priority(&self, id: &OriginId) { + self.priorities.remove(id); + self.latency_stats.remove(id); + } + + pub fn get_priority(&self, id: &OriginId) -> u8 { + self.priorities.get(id).map(|p| *p).unwrap_or(100) + } + + pub fn record_latency(&self, id: &OriginId, latency_ms: u64) { + self.latency_stats + .entry(id.clone()) + .or_default() + .record(latency_ms); + } + + pub fn select(&self, candidates: &[OriginId], health: &HealthSnapshot) -> Option { + candidates + .iter() + .filter(|id| health.is_healthy(id)) + .min_by_key(|id| { + let priority = self.get_priority(id); + let latency = self.latency_stats.get(*id).map(|s| s.p50_ms).unwrap_or(0); + (priority, latency) + }) + .cloned() + } + + pub fn select_with_fallback( + &self, + candidates: &[OriginId], + health: &HealthSnapshot, + ) -> Option { + if let Some(id) = self.select(candidates, health) { + return Some(id); + } + + debug!("No healthy origins, trying degraded"); + if let Some(id) = candidates + .iter() + .filter(|id| health.is_degraded(id)) + .min_by_key(|id| self.get_priority(id)) + .cloned() + { + return Some(id); + } + + warn!("All origins unhealthy, selecting least-bad by failure count"); + + if let Some(bus) = &self.event_bus { + bus.publish(Event::AllOriginsUnhealthy { + candidate_count: candidates.len(), + }); + } + + candidates + .iter() + .min_by_key(|id| { + let failures = health.failure_count(id).unwrap_or(u32::MAX); + let priority = self.get_priority(id); + (failures, priority) + }) + .cloned() + } +} + +impl Default for Router { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + fn mock_health(healthy: &[&str], degraded: &[&str]) -> HealthSnapshot { + HealthSnapshot { + healthy: healthy.iter().map(|s| OriginId::from(*s)).collect(), + degraded: degraded.iter().map(|s| OriginId::from(*s)).collect(), + unhealthy: Vec::new(), + failure_counts: HashMap::new(), + } + } + + #[test] + fn test_select_by_priority() { + let router = Router::new(); + router.set_priority(OriginId::from("high"), 1); + router.set_priority(OriginId::from("low"), 2); + + let candidates = vec![OriginId::from("low"), OriginId::from("high")]; + let health = mock_health(&["high", "low"], &[]); + + let selected = router.select(&candidates, &health); + assert_eq!(selected, Some(OriginId::from("high"))); + } + + #[test] + fn test_select_skips_unhealthy() { + let router = Router::new(); + router.set_priority(OriginId::from("high"), 1); + router.set_priority(OriginId::from("low"), 2); + + let candidates = vec![OriginId::from("high"), OriginId::from("low")]; + let health = mock_health(&["low"], &[]); + + let selected = router.select(&candidates, &health); + assert_eq!(selected, Some(OriginId::from("low"))); + } + + #[test] + fn test_latency_affects_tiebreak() { + let router = Router::new(); + router.set_priority(OriginId::from("a"), 1); + router.set_priority(OriginId::from("b"), 1); + + router.record_latency(&OriginId::from("a"), 100); + router.record_latency(&OriginId::from("b"), 10); + + let candidates = vec![OriginId::from("a"), OriginId::from("b")]; + let health = mock_health(&["a", "b"], &[]); + + let selected = router.select(&candidates, &health); + assert_eq!(selected, Some(OriginId::from("b"))); + } + + #[test] + fn test_fallback_to_degraded() { + let router = Router::new(); + router.set_priority(OriginId::from("a"), 1); + router.set_priority(OriginId::from("b"), 2); + + let candidates = vec![OriginId::from("a"), OriginId::from("b")]; + let health = mock_health(&[], &["b"]); + + let selected = router.select_with_fallback(&candidates, &health); + assert_eq!(selected, Some(OriginId::from("b"))); + } + + #[test] + fn test_fallback_least_bad() { + let router = Router::new(); + router.set_priority(OriginId::from("a"), 1); + router.set_priority(OriginId::from("b"), 2); + + let candidates = vec![OriginId::from("a"), OriginId::from("b")]; + let mut failure_counts = HashMap::new(); + failure_counts.insert(OriginId::from("a"), 5); + failure_counts.insert(OriginId::from("b"), 2); + + let health = HealthSnapshot { + healthy: Vec::new(), + degraded: Vec::new(), + unhealthy: vec![OriginId::from("a"), OriginId::from("b")], + failure_counts, + }; + + let selected = router.select_with_fallback(&candidates, &health); + assert_eq!(selected, Some(OriginId::from("b"))); + } +} diff --git a/musicfs/crates/musicfs-origins/src/traits.rs b/musicfs/crates/musicfs-origins/src/traits.rs index 322dc68..343c7d1 100644 --- a/musicfs/crates/musicfs-origins/src/traits.rs +++ b/musicfs/crates/musicfs-origins/src/traits.rs @@ -1,17 +1,8 @@ use async_trait::async_trait; -use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, Result}; +use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, OriginType, Result}; use std::path::{Path, PathBuf}; use tokio::io::AsyncRead; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum OriginType { - Local, - Nfs, - Smb, - S3, - Sftp, -} - #[async_trait] pub trait Origin: Send + Sync { fn id(&self) -> &OriginId;