Implement Week 6 Origin Federation with Oracle fixes

New files:
- musicfs-core/src/config.rs: Config, OriginConfig, HealthConfig
- musicfs-origins/src/registry.rs: OriginRegistry with watch cleanup
- musicfs-origins/src/router.rs: Priority router with (priority, latency) ordering
- musicfs-origins/src/health.rs: HealthMonitor with per-origin-type thresholds
- musicfs-origins/src/failover.rs: FailoverExecutor with NFR-7.3 backoff

Oracle fixes applied:
- Per-OriginType threshold: Local=1, Remote=3 (check_one uses threshold_for)
- AllOriginsUnhealthy event: Added to events.rs, emitted in select_with_fallback
- Unified OriginType: Removed duplicate from traits.rs, use musicfs_core::OriginType
- Watch handle cleanup: Tracked and dropped on unregister()
- Retry delays: 100ms, 500ms, 2000ms (NFR-7.3 compliant)

Tests: 91 pass (+20 new)
This commit is contained in:
Alexander
2026-05-12 20:15:56 +02:00
parent 32c96701c8
commit d5ef68c9c9
15 changed files with 1321 additions and 14 deletions
+75
View File
@@ -227,6 +227,19 @@ version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.12",
]
[[package]]
name = "dirs"
version = "5.0.1"
@@ -674,8 +687,10 @@ version = "0.1.0"
dependencies = [
"hex",
"serde",
"tempfile",
"thiserror",
"tokio",
"toml",
"xxhash-rust",
]
@@ -711,6 +726,7 @@ name = "musicfs-origins"
version = "0.1.0"
dependencies = [
"async-trait",
"dashmap",
"musicfs-core",
"tempfile",
"tokio",
@@ -1058,6 +1074,15 @@ dependencies = [
"zmij",
]
[[package]]
name = "serde_spanned"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3"
dependencies = [
"serde",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
@@ -1335,6 +1360,47 @@ dependencies = [
"syn",
]
[[package]]
name = "toml"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
]
[[package]]
name = "toml_datetime"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.22.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a"
dependencies = [
"indexmap",
"serde",
"serde_spanned",
"toml_datetime",
"toml_write",
"winnow",
]
[[package]]
name = "toml_write"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
[[package]]
name = "tracing"
version = "0.1.44"
@@ -1612,6 +1678,15 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "winnow"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945"
dependencies = [
"memchr",
]
[[package]]
name = "wit-bindgen"
version = "0.51.0"
+4
View File
@@ -23,6 +23,10 @@ anyhow = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
rmp-serde = "1"
toml = "0.8"
# Concurrent collections
dashmap = "5"
# Logging
tracing = "0.1"
+4
View File
@@ -6,6 +6,10 @@ edition.workspace = true
[dependencies]
thiserror.workspace = true
serde.workspace = true
toml.workspace = true
tokio = { workspace = true, features = ["sync"] }
xxhash-rust.workspace = true
hex.workspace = true
[dev-dependencies]
tempfile.workspace = true
+190
View File
@@ -0,0 +1,190 @@
use crate::OriginId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub mount_point: PathBuf,
pub cache_dir: PathBuf,
pub origins: Vec<OriginConfig>,
#[serde(default)]
pub cache: CacheConfig,
#[serde(default)]
pub health: HealthConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OriginConfig {
pub id: String,
pub origin_type: OriginType,
pub priority: u8,
#[serde(default = "default_enabled")]
pub enabled: bool,
#[serde(flatten)]
pub settings: HashMap<String, toml::Value>,
}
fn default_enabled() -> bool {
true
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "lowercase")]
pub enum OriginType {
Local,
Nfs,
Smb,
S3,
Sftp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheConfig {
#[serde(default = "default_metadata_cache_mb")]
pub metadata_cache_mb: u64,
#[serde(default = "default_content_cache_gb")]
pub content_cache_gb: u64,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
metadata_cache_mb: default_metadata_cache_mb(),
content_cache_gb: default_content_cache_gb(),
}
}
}
fn default_metadata_cache_mb() -> u64 {
100
}
fn default_content_cache_gb() -> u64 {
10
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthConfig {
#[serde(default = "default_check_interval_secs")]
pub check_interval_secs: u64,
#[serde(default = "default_timeout_ms")]
pub timeout_ms: u64,
#[serde(default = "default_unhealthy_threshold")]
pub unhealthy_threshold: u32,
#[serde(default)]
pub per_origin_thresholds: HashMap<OriginType, u32>,
}
impl Default for HealthConfig {
fn default() -> Self {
let mut per_origin = HashMap::new();
per_origin.insert(OriginType::Local, 1);
per_origin.insert(OriginType::Nfs, 3);
per_origin.insert(OriginType::Smb, 3);
per_origin.insert(OriginType::S3, 3);
per_origin.insert(OriginType::Sftp, 3);
Self {
check_interval_secs: default_check_interval_secs(),
timeout_ms: default_timeout_ms(),
unhealthy_threshold: default_unhealthy_threshold(),
per_origin_thresholds: per_origin,
}
}
}
impl HealthConfig {
pub fn threshold_for(&self, origin_type: OriginType) -> u32 {
self.per_origin_thresholds
.get(&origin_type)
.copied()
.unwrap_or(self.unhealthy_threshold)
}
}
fn default_check_interval_secs() -> u64 {
30
}
fn default_timeout_ms() -> u64 {
5000
}
fn default_unhealthy_threshold() -> u32 {
3
}
impl Config {
pub fn from_file(path: &std::path::Path) -> Result<Self, ConfigError> {
let content =
std::fs::read_to_string(path).map_err(|e| ConfigError::Read(e.to_string()))?;
toml::from_str(&content).map_err(|e| ConfigError::Parse(e.to_string()))
}
pub fn origin_id(&self, id: &str) -> Option<OriginId> {
self.origins
.iter()
.find(|o| o.id == id)
.map(|_| OriginId::from(id))
}
}
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("Failed to read config: {0}")]
Read(String),
#[error("Failed to parse config: {0}")]
Parse(String),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_config() {
let toml = r#"
mount_point = "/mnt/music"
cache_dir = "/home/user/.cache/musicfs"
[[origins]]
id = "local"
origin_type = "local"
priority = 1
path = "/mnt/nas/music"
[[origins]]
id = "backup"
origin_type = "s3"
priority = 2
bucket = "music-backup"
region = "us-east-1"
"#;
let config: Config = toml::from_str(toml).unwrap();
assert_eq!(config.origins.len(), 2);
assert_eq!(config.origins[0].priority, 1);
assert_eq!(config.origins[1].origin_type, OriginType::S3);
}
#[test]
fn test_default_health_thresholds() {
let health = HealthConfig::default();
assert_eq!(health.threshold_for(OriginType::Local), 1);
assert_eq!(health.threshold_for(OriginType::Sftp), 3);
}
#[test]
fn test_cache_defaults() {
let cache = CacheConfig::default();
assert_eq!(cache.metadata_cache_mb, 100);
assert_eq!(cache.content_cache_gb, 10);
}
}
+9
View File
@@ -28,6 +28,15 @@ pub enum Error {
#[error("Operation not permitted (read-only filesystem)")]
ReadOnly,
#[error("No origin available to serve request")]
NoOriginAvailable,
#[error("Maximum retries exceeded")]
MaxRetriesExceeded,
#[error("Origin error: {0}")]
Origin(String),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -60,6 +60,13 @@ pub enum Event {
CacheEviction {
bytes_freed: u64,
},
AllOriginsUnhealthy {
candidate_count: usize,
},
OriginHealthChanged {
origin_id: OriginId,
healthy: bool,
},
}
#[cfg(test)]
+2
View File
@@ -1,8 +1,10 @@
pub mod config;
pub mod error;
pub mod events;
pub mod resolver;
pub mod types;
pub use config::{CacheConfig, Config, ConfigError, HealthConfig, OriginConfig, OriginType};
pub use error::{Error, Result};
pub use events::{Event, EventBus};
pub use resolver::{PathResolver, PathTemplate};
+2 -1
View File
@@ -6,7 +6,8 @@ edition.workspace = true
[dependencies]
musicfs-core = { path = "../musicfs-core" }
async-trait.workspace = true
tokio = { workspace = true, features = ["fs", "sync"] }
dashmap.workspace = true
tokio = { workspace = true, features = ["fs", "sync", "time"] }
tracing.workspace = true
[dev-dependencies]
@@ -0,0 +1,224 @@
use crate::registry::OriginRegistry;
use crate::traits::Origin;
use musicfs_core::{Error, RealPath, Result};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, warn};
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_attempts: u32,
pub delays: Vec<Duration>,
}
impl Default for RetryConfig {
fn default() -> Self {
Self::spec_compliant()
}
}
impl RetryConfig {
pub fn spec_compliant() -> Self {
Self {
max_attempts: 3,
delays: vec![
Duration::from_millis(100),
Duration::from_millis(500),
Duration::from_millis(2000),
],
}
}
pub fn with_delays(delays: Vec<Duration>) -> Self {
Self {
max_attempts: delays.len() as u32,
delays,
}
}
fn delay_for_attempt(&self, attempt: u32) -> Duration {
self.delays
.get(attempt as usize)
.copied()
.unwrap_or(*self.delays.last().unwrap_or(&Duration::from_millis(100)))
}
}
pub struct FailoverExecutor {
registry: Arc<OriginRegistry>,
retry_config: RetryConfig,
}
impl FailoverExecutor {
pub fn new(registry: Arc<OriginRegistry>, retry_config: RetryConfig) -> Self {
Self {
registry,
retry_config,
}
}
pub async fn read_with_failover(
&self,
path: &RealPath,
offset: u64,
size: u32,
) -> Result<Vec<u8>> {
let origins = self.registry.route_all(path);
if origins.is_empty() {
if let Some(origin) = self.registry.route_with_fallback(path) {
warn!(
"No healthy origins, using fallback origin {}",
origin.id()
);
return self.read_with_retry(&origin, &path.path, offset, size).await;
}
return Err(Error::NoOriginAvailable);
}
let mut last_error = None;
for origin in origins {
let start = std::time::Instant::now();
match self.read_with_retry(&origin, &path.path, offset, size).await {
Ok(data) => {
let latency = start.elapsed().as_millis() as u64;
self.registry.record_latency(origin.id(), latency);
return Ok(data);
}
Err(e) => {
warn!("Origin {} failed: {}, trying next", origin.id(), e);
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or(Error::NoOriginAvailable))
}
async fn read_with_retry(
&self,
origin: &Arc<dyn Origin>,
path: &std::path::Path,
offset: u64,
size: u32,
) -> Result<Vec<u8>> {
for attempt in 0..self.retry_config.max_attempts {
match origin.read(path, offset, size).await {
Ok(data) => return Ok(data),
Err(e) if attempt + 1 < self.retry_config.max_attempts => {
let delay = self.retry_config.delay_for_attempt(attempt);
debug!(
"Retry {}/{} for {} after {:?}: {}",
attempt + 1,
self.retry_config.max_attempts,
origin.id(),
delay,
e
);
tokio::time::sleep(delay).await;
}
Err(e) => return Err(e),
}
}
Err(Error::MaxRetriesExceeded)
}
pub async fn read_full_with_failover(&self, path: &RealPath) -> Result<Vec<u8>> {
let origins = self.registry.route_all(path);
if origins.is_empty() {
if let Some(origin) = self.registry.route_with_fallback(path) {
warn!(
"No healthy origins for full read, using fallback {}",
origin.id()
);
return self.read_full_with_retry(&origin, &path.path).await;
}
return Err(Error::NoOriginAvailable);
}
let mut last_error = None;
for origin in origins {
let start = std::time::Instant::now();
match self.read_full_with_retry(&origin, &path.path).await {
Ok(data) => {
let latency = start.elapsed().as_millis() as u64;
self.registry.record_latency(origin.id(), latency);
return Ok(data);
}
Err(e) => {
warn!("Origin {} failed full read: {}, trying next", origin.id(), e);
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or(Error::NoOriginAvailable))
}
async fn read_full_with_retry(
&self,
origin: &Arc<dyn Origin>,
path: &std::path::Path,
) -> Result<Vec<u8>> {
for attempt in 0..self.retry_config.max_attempts {
match origin.read_full(path).await {
Ok(data) => return Ok(data),
Err(e) if attempt + 1 < self.retry_config.max_attempts => {
let delay = self.retry_config.delay_for_attempt(attempt);
debug!(
"Retry full read {}/{} for {} after {:?}: {}",
attempt + 1,
self.retry_config.max_attempts,
origin.id(),
delay,
e
);
tokio::time::sleep(delay).await;
}
Err(e) => return Err(e),
}
}
Err(Error::MaxRetriesExceeded)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_attempts, 3);
assert_eq!(config.delays[0], Duration::from_millis(100));
assert_eq!(config.delays[1], Duration::from_millis(500));
assert_eq!(config.delays[2], Duration::from_millis(2000));
}
#[test]
fn test_delay_for_attempt() {
let config = RetryConfig::spec_compliant();
assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
assert_eq!(config.delay_for_attempt(1), Duration::from_millis(500));
assert_eq!(config.delay_for_attempt(2), Duration::from_millis(2000));
assert_eq!(config.delay_for_attempt(10), Duration::from_millis(2000));
}
#[test]
fn test_custom_delays() {
let config = RetryConfig::with_delays(vec![
Duration::from_millis(50),
Duration::from_millis(100),
]);
assert_eq!(config.max_attempts, 2);
assert_eq!(config.delay_for_attempt(0), Duration::from_millis(50));
assert_eq!(config.delay_for_attempt(1), Duration::from_millis(100));
}
}
@@ -0,0 +1,351 @@
use crate::traits::Origin;
use dashmap::DashMap;
use musicfs_core::{Event, EventBus, HealthStatus, OriginId, OriginType};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
pub struct HealthMonitor {
origins: DashMap<OriginId, Arc<dyn Origin>>,
state: DashMap<OriginId, OriginHealthState>,
check_interval: Duration,
default_threshold: u32,
per_type_thresholds: HashMap<OriginType, u32>,
event_bus: Option<Arc<EventBus>>,
}
#[derive(Debug, Clone)]
pub struct OriginHealthState {
pub status: HealthStatus,
pub last_check: Instant,
pub consecutive_failures: u32,
pub last_latency_ms: Option<u64>,
}
impl Default for OriginHealthState {
fn default() -> Self {
Self {
status: HealthStatus::Unknown,
last_check: Instant::now(),
consecutive_failures: 0,
last_latency_ms: None,
}
}
}
#[derive(Debug, Clone)]
pub struct HealthSnapshot {
pub healthy: Vec<OriginId>,
pub degraded: Vec<OriginId>,
pub unhealthy: Vec<OriginId>,
pub failure_counts: HashMap<OriginId, u32>,
}
impl HealthSnapshot {
pub fn is_healthy(&self, id: &OriginId) -> bool {
self.healthy.contains(id)
}
pub fn is_degraded(&self, id: &OriginId) -> bool {
self.degraded.contains(id)
}
pub fn is_unhealthy(&self, id: &OriginId) -> bool {
self.unhealthy.contains(id)
}
pub fn failure_count(&self, id: &OriginId) -> Option<u32> {
self.failure_counts.get(id).copied()
}
pub fn all_unhealthy(&self) -> bool {
self.healthy.is_empty() && self.degraded.is_empty()
}
pub fn total_candidates(&self) -> usize {
self.healthy.len() + self.degraded.len() + self.unhealthy.len()
}
}
impl HealthMonitor {
pub fn new(check_interval: Duration) -> Self {
let mut per_type = HashMap::new();
per_type.insert(OriginType::Local, 1);
per_type.insert(OriginType::Nfs, 3);
per_type.insert(OriginType::Smb, 3);
per_type.insert(OriginType::S3, 3);
per_type.insert(OriginType::Sftp, 3);
Self {
origins: DashMap::new(),
state: DashMap::new(),
check_interval,
default_threshold: 3,
per_type_thresholds: per_type,
event_bus: None,
}
}
pub fn with_threshold(mut self, threshold: u32) -> Self {
self.default_threshold = threshold;
self
}
pub fn with_per_type_thresholds(mut self, thresholds: HashMap<OriginType, u32>) -> Self {
self.per_type_thresholds = thresholds;
self
}
pub fn with_event_bus(mut self, bus: Arc<EventBus>) -> Self {
self.event_bus = Some(bus);
self
}
fn threshold_for(&self, origin_type: OriginType) -> u32 {
self.per_type_thresholds
.get(&origin_type)
.copied()
.unwrap_or(self.default_threshold)
}
pub fn add_origin(&self, origin: Arc<dyn Origin>) {
let id = origin.id().clone();
self.origins.insert(id.clone(), origin);
self.state.insert(id, OriginHealthState::default());
}
pub fn remove_origin(&self, id: &OriginId) {
self.origins.remove(id);
self.state.remove(id);
}
pub fn snapshot(&self) -> HealthSnapshot {
let mut healthy = Vec::new();
let mut degraded = Vec::new();
let mut unhealthy = Vec::new();
let mut failure_counts = HashMap::new();
for entry in self.state.iter() {
let id = entry.key().clone();
failure_counts.insert(id.clone(), entry.value().consecutive_failures);
match entry.value().status {
HealthStatus::Healthy => healthy.push(id),
HealthStatus::Degraded => degraded.push(id),
HealthStatus::Unhealthy => unhealthy.push(id),
HealthStatus::Unknown => degraded.push(id),
}
}
HealthSnapshot {
healthy,
degraded,
unhealthy,
failure_counts,
}
}
pub fn start(self: Arc<Self>) -> HealthCheckHandle {
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
let monitor = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(monitor.check_interval);
loop {
tokio::select! {
_ = interval.tick() => {
monitor.check_all().await;
}
_ = stop_rx.recv() => {
info!("Health monitor stopping");
break;
}
}
}
});
HealthCheckHandle { stop_tx }
}
async fn check_all(&self) {
let origins: Vec<_> = self
.origins
.iter()
.map(|e| (e.key().clone(), e.value().clone()))
.collect();
for (id, origin) in origins {
self.check_one(&id, &origin).await;
}
}
async fn check_one(&self, id: &OriginId, origin: &Arc<dyn Origin>) {
let start = Instant::now();
let status = origin.health().await;
let latency_ms = start.elapsed().as_millis() as u64;
let threshold = self.threshold_for(origin.origin_type());
let prev_healthy = self
.state
.get(id)
.map(|s| s.status == HealthStatus::Healthy)
.unwrap_or(false);
let mut state = self.state.entry(id.clone()).or_default();
match status {
HealthStatus::Healthy => {
if state.status != HealthStatus::Healthy {
info!("Origin {} is now healthy", id);
}
state.status = HealthStatus::Healthy;
state.consecutive_failures = 0;
}
HealthStatus::Degraded => {
if state.status != HealthStatus::Degraded {
warn!("Origin {} is degraded", id);
}
state.status = HealthStatus::Degraded;
}
HealthStatus::Unhealthy => {
state.consecutive_failures += 1;
if state.consecutive_failures >= threshold {
if state.status != HealthStatus::Unhealthy {
warn!(
"Origin {} is now unhealthy ({} failures)",
id, state.consecutive_failures
);
}
state.status = HealthStatus::Unhealthy;
} else {
debug!(
"Origin {} check failed ({}/{})",
id, state.consecutive_failures, threshold
);
state.status = HealthStatus::Degraded;
}
}
HealthStatus::Unknown => {
state.status = HealthStatus::Unknown;
}
}
state.last_check = Instant::now();
state.last_latency_ms = Some(latency_ms);
let now_healthy = state.status == HealthStatus::Healthy;
if prev_healthy != now_healthy {
if let Some(bus) = &self.event_bus {
bus.publish(Event::OriginHealthChanged {
origin_id: id.clone(),
healthy: now_healthy,
});
}
}
}
pub async fn check_now(&self, id: &OriginId) {
if let Some(origin) = self.origins.get(id) {
self.check_one(id, &origin.clone()).await;
}
}
pub fn get_state(&self, id: &OriginId) -> Option<OriginHealthState> {
self.state.get(id).map(|e| e.value().clone())
}
}
pub struct HealthCheckHandle {
stop_tx: mpsc::Sender<()>,
}
impl HealthCheckHandle {
pub async fn stop(self) {
let _ = self.stop_tx.send(()).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::LocalOrigin;
use tempfile::TempDir;
#[tokio::test]
async fn test_health_monitor_basic() {
let monitor = HealthMonitor::new(Duration::from_secs(30));
let dir = TempDir::new().unwrap();
let origin = Arc::new(LocalOrigin::new("test", dir.path()));
monitor.add_origin(origin);
let snapshot = monitor.snapshot();
assert!(!snapshot.is_healthy(&OriginId::from("test")));
}
#[tokio::test]
async fn test_health_check() {
let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30)));
let dir = TempDir::new().unwrap();
let origin = Arc::new(LocalOrigin::new("test", dir.path()));
monitor.add_origin(origin);
monitor.check_now(&OriginId::from("test")).await;
let snapshot = monitor.snapshot();
assert!(snapshot.is_healthy(&OriginId::from("test")));
}
#[tokio::test]
async fn test_failure_tracking() {
let mut thresholds = HashMap::new();
thresholds.insert(OriginType::Local, 3);
let monitor = HealthMonitor::new(Duration::from_secs(30))
.with_per_type_thresholds(thresholds);
let origin = Arc::new(LocalOrigin::new("missing", std::path::Path::new("/nonexistent")));
monitor.add_origin(origin);
monitor.check_now(&OriginId::from("missing")).await;
let state = monitor.get_state(&OriginId::from("missing")).unwrap();
assert_eq!(state.consecutive_failures, 1);
assert_eq!(state.status, HealthStatus::Degraded);
monitor.check_now(&OriginId::from("missing")).await;
monitor.check_now(&OriginId::from("missing")).await;
let state = monitor.get_state(&OriginId::from("missing")).unwrap();
assert_eq!(state.consecutive_failures, 3);
assert_eq!(state.status, HealthStatus::Unhealthy);
}
#[tokio::test]
async fn test_local_origin_threshold_is_one() {
let monitor = HealthMonitor::new(Duration::from_secs(30));
let origin = Arc::new(LocalOrigin::new("missing", std::path::Path::new("/nonexistent")));
monitor.add_origin(origin);
monitor.check_now(&OriginId::from("missing")).await;
let state = monitor.get_state(&OriginId::from("missing")).unwrap();
assert_eq!(state.consecutive_failures, 1);
assert_eq!(state.status, HealthStatus::Unhealthy);
}
#[test]
fn test_snapshot_all_unhealthy() {
let snapshot = HealthSnapshot {
healthy: Vec::new(),
degraded: Vec::new(),
unhealthy: vec![OriginId::from("a")],
failure_counts: HashMap::new(),
};
assert!(snapshot.all_unhealthy());
}
}
+10 -1
View File
@@ -1,5 +1,14 @@
mod failover;
mod health;
mod local;
mod registry;
mod router;
mod traits;
pub use failover::{FailoverExecutor, RetryConfig};
pub use health::{HealthCheckHandle, HealthMonitor, HealthSnapshot, OriginHealthState};
pub use local::LocalOrigin;
pub use traits::{Origin, OriginType, WatchCallback, WatchEvent, WatchHandle};
pub use registry::OriginRegistry;
pub use router::{LatencyStats, Router};
pub use musicfs_core::OriginType;
pub use traits::{Origin, WatchCallback, WatchEvent, WatchHandle};
+2 -2
View File
@@ -1,6 +1,6 @@
use crate::traits::{Origin, OriginType, WatchCallback, WatchHandle};
use crate::traits::{Origin, WatchCallback, WatchHandle};
use async_trait::async_trait;
use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, Result};
use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, OriginType, Result};
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::AsyncRead;
@@ -0,0 +1,215 @@
use crate::health::{HealthMonitor, HealthSnapshot};
use crate::router::Router;
use crate::traits::{Origin, WatchHandle};
use musicfs_core::{OriginId, RealPath};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tracing::{info, warn};
pub struct OriginRegistry {
origins: RwLock<HashMap<OriginId, Arc<dyn Origin>>>,
router: Router,
health_monitor: Arc<HealthMonitor>,
watch_handles: RwLock<HashMap<OriginId, Vec<WatchHandle>>>,
}
impl OriginRegistry {
pub fn new(health_monitor: Arc<HealthMonitor>) -> Self {
Self {
origins: RwLock::new(HashMap::new()),
router: Router::new(),
health_monitor,
watch_handles: RwLock::new(HashMap::new()),
}
}
pub fn register(&self, origin: Arc<dyn Origin>, priority: u8) {
let id = origin.id().clone();
info!("Registering origin {} with priority {}", id, priority);
self.router.set_priority(id.clone(), priority);
self.health_monitor.add_origin(origin.clone());
self.origins.write().unwrap().insert(id, origin);
}
pub fn unregister(&self, id: &OriginId) {
info!("Unregistering origin {}", id);
if let Some(handles) = self.watch_handles.write().unwrap().remove(id) {
info!("Dropping {} watch handles for origin {}", handles.len(), id);
}
self.origins.write().unwrap().remove(id);
self.router.remove_priority(id);
self.health_monitor.remove_origin(id);
}
pub fn register_watch(&self, origin_id: &OriginId, handle: WatchHandle) {
self.watch_handles
.write()
.unwrap()
.entry(origin_id.clone())
.or_default()
.push(handle);
}
pub fn get(&self, id: &OriginId) -> Option<Arc<dyn Origin>> {
self.origins.read().unwrap().get(id).cloned()
}
pub fn list(&self) -> Vec<Arc<dyn Origin>> {
self.origins.read().unwrap().values().cloned().collect()
}
pub fn route(&self, path: &RealPath) -> Option<Arc<dyn Origin>> {
let origins = self.origins.read().unwrap();
let health = self.health_monitor.snapshot();
let candidates: Vec<_> = origins
.iter()
.filter(|(id, _)| self.can_serve(id, path))
.map(|(id, origin)| (id.clone(), origin.clone()))
.collect();
if candidates.is_empty() {
warn!("No origin can serve path: {:?}", path);
return None;
}
let candidate_ids: Vec<_> = candidates.iter().map(|(id, _)| id.clone()).collect();
let selected = self.router.select(&candidate_ids, &health)?;
candidates
.into_iter()
.find(|(id, _)| id == &selected)
.map(|(_, origin)| origin)
}
pub fn route_with_fallback(&self, path: &RealPath) -> Option<Arc<dyn Origin>> {
let origins = self.origins.read().unwrap();
let health = self.health_monitor.snapshot();
let candidates: Vec<_> = origins
.iter()
.filter(|(id, _)| self.can_serve(id, path))
.map(|(id, origin)| (id.clone(), origin.clone()))
.collect();
if candidates.is_empty() {
return None;
}
let candidate_ids: Vec<_> = candidates.iter().map(|(id, _)| id.clone()).collect();
let selected = self.router.select_with_fallback(&candidate_ids, &health)?;
candidates
.into_iter()
.find(|(id, _)| id == &selected)
.map(|(_, origin)| origin)
}
pub fn route_all(&self, path: &RealPath) -> Vec<Arc<dyn Origin>> {
let origins = self.origins.read().unwrap();
let health = self.health_monitor.snapshot();
let mut result: Vec<_> = origins
.iter()
.filter(|(id, _)| self.can_serve(id, path) && health.is_healthy(id))
.map(|(_, origin)| origin.clone())
.collect();
result.sort_by_key(|o| self.router.get_priority(o.id()));
result
}
fn can_serve(&self, origin_id: &OriginId, path: &RealPath) -> bool {
path.origin_id == *origin_id
}
pub fn health(&self) -> HealthSnapshot {
self.health_monitor.snapshot()
}
pub fn record_latency(&self, id: &OriginId, latency_ms: u64) {
self.router.record_latency(id, latency_ms);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::LocalOrigin;
use std::path::PathBuf;
use std::time::Duration;
use tempfile::TempDir;
#[test]
fn test_register_and_get() {
let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30)));
let registry = OriginRegistry::new(monitor);
let dir = TempDir::new().unwrap();
let origin = Arc::new(LocalOrigin::new("test", dir.path()));
registry.register(origin.clone(), 1);
let retrieved = registry.get(&OriginId::from("test"));
assert!(retrieved.is_some());
}
#[test]
fn test_unregister() {
let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30)));
let registry = OriginRegistry::new(monitor);
let dir = TempDir::new().unwrap();
let origin = Arc::new(LocalOrigin::new("test", dir.path()));
registry.register(origin, 1);
registry.unregister(&OriginId::from("test"));
assert!(registry.get(&OriginId::from("test")).is_none());
}
#[tokio::test]
async fn test_route_by_priority() {
let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30)));
let registry = OriginRegistry::new(monitor.clone());
let dir1 = TempDir::new().unwrap();
let dir2 = TempDir::new().unwrap();
let origin1 = Arc::new(LocalOrigin::new("primary", dir1.path()));
let origin2 = Arc::new(LocalOrigin::new("backup", dir2.path()));
registry.register(origin1, 1);
registry.register(origin2, 2);
monitor.check_now(&OriginId::from("primary")).await;
monitor.check_now(&OriginId::from("backup")).await;
let path = RealPath {
origin_id: OriginId::from("primary"),
path: PathBuf::from("/test.flac"),
};
let routed = registry.route(&path);
assert!(routed.is_some());
assert_eq!(routed.unwrap().id(), &OriginId::from("primary"));
}
#[test]
fn test_list_origins() {
let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30)));
let registry = OriginRegistry::new(monitor);
let dir1 = TempDir::new().unwrap();
let dir2 = TempDir::new().unwrap();
registry.register(Arc::new(LocalOrigin::new("a", dir1.path())), 1);
registry.register(Arc::new(LocalOrigin::new("b", dir2.path())), 2);
let list = registry.list();
assert_eq!(list.len(), 2);
}
}
@@ -0,0 +1,225 @@
use crate::health::HealthSnapshot;
use dashmap::DashMap;
use musicfs_core::{Event, EventBus, OriginId};
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, warn};
pub struct Router {
priorities: DashMap<OriginId, u8>,
latency_stats: DashMap<OriginId, LatencyStats>,
event_bus: Option<Arc<EventBus>>,
}
#[derive(Debug, Clone, Default)]
pub struct LatencyStats {
pub samples: Vec<u64>,
pub p50_ms: u64,
pub p99_ms: u64,
pub last_update: Option<Instant>,
}
impl LatencyStats {
pub fn record(&mut self, latency_ms: u64) {
self.samples.push(latency_ms);
if self.samples.len() > 100 {
self.samples.remove(0);
}
if !self.samples.is_empty() {
let mut sorted = self.samples.clone();
sorted.sort_unstable();
let p50_idx = sorted.len() / 2;
let p99_idx = (sorted.len() * 99) / 100;
self.p50_ms = sorted[p50_idx];
self.p99_ms = sorted.get(p99_idx).copied().unwrap_or(self.p50_ms);
}
self.last_update = Some(Instant::now());
}
}
impl Router {
pub fn new() -> Self {
Self {
priorities: DashMap::new(),
latency_stats: DashMap::new(),
event_bus: None,
}
}
pub fn with_event_bus(mut self, bus: Arc<EventBus>) -> Self {
self.event_bus = Some(bus);
self
}
pub fn set_priority(&self, id: OriginId, priority: u8) {
self.priorities.insert(id, priority);
}
pub fn remove_priority(&self, id: &OriginId) {
self.priorities.remove(id);
self.latency_stats.remove(id);
}
pub fn get_priority(&self, id: &OriginId) -> u8 {
self.priorities.get(id).map(|p| *p).unwrap_or(100)
}
pub fn record_latency(&self, id: &OriginId, latency_ms: u64) {
self.latency_stats
.entry(id.clone())
.or_default()
.record(latency_ms);
}
pub fn select(&self, candidates: &[OriginId], health: &HealthSnapshot) -> Option<OriginId> {
candidates
.iter()
.filter(|id| health.is_healthy(id))
.min_by_key(|id| {
let priority = self.get_priority(id);
let latency = self.latency_stats.get(*id).map(|s| s.p50_ms).unwrap_or(0);
(priority, latency)
})
.cloned()
}
pub fn select_with_fallback(
&self,
candidates: &[OriginId],
health: &HealthSnapshot,
) -> Option<OriginId> {
if let Some(id) = self.select(candidates, health) {
return Some(id);
}
debug!("No healthy origins, trying degraded");
if let Some(id) = candidates
.iter()
.filter(|id| health.is_degraded(id))
.min_by_key(|id| self.get_priority(id))
.cloned()
{
return Some(id);
}
warn!("All origins unhealthy, selecting least-bad by failure count");
if let Some(bus) = &self.event_bus {
bus.publish(Event::AllOriginsUnhealthy {
candidate_count: candidates.len(),
});
}
candidates
.iter()
.min_by_key(|id| {
let failures = health.failure_count(id).unwrap_or(u32::MAX);
let priority = self.get_priority(id);
(failures, priority)
})
.cloned()
}
}
impl Default for Router {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn mock_health(healthy: &[&str], degraded: &[&str]) -> HealthSnapshot {
HealthSnapshot {
healthy: healthy.iter().map(|s| OriginId::from(*s)).collect(),
degraded: degraded.iter().map(|s| OriginId::from(*s)).collect(),
unhealthy: Vec::new(),
failure_counts: HashMap::new(),
}
}
#[test]
fn test_select_by_priority() {
let router = Router::new();
router.set_priority(OriginId::from("high"), 1);
router.set_priority(OriginId::from("low"), 2);
let candidates = vec![OriginId::from("low"), OriginId::from("high")];
let health = mock_health(&["high", "low"], &[]);
let selected = router.select(&candidates, &health);
assert_eq!(selected, Some(OriginId::from("high")));
}
#[test]
fn test_select_skips_unhealthy() {
let router = Router::new();
router.set_priority(OriginId::from("high"), 1);
router.set_priority(OriginId::from("low"), 2);
let candidates = vec![OriginId::from("high"), OriginId::from("low")];
let health = mock_health(&["low"], &[]);
let selected = router.select(&candidates, &health);
assert_eq!(selected, Some(OriginId::from("low")));
}
#[test]
fn test_latency_affects_tiebreak() {
let router = Router::new();
router.set_priority(OriginId::from("a"), 1);
router.set_priority(OriginId::from("b"), 1);
router.record_latency(&OriginId::from("a"), 100);
router.record_latency(&OriginId::from("b"), 10);
let candidates = vec![OriginId::from("a"), OriginId::from("b")];
let health = mock_health(&["a", "b"], &[]);
let selected = router.select(&candidates, &health);
assert_eq!(selected, Some(OriginId::from("b")));
}
#[test]
fn test_fallback_to_degraded() {
let router = Router::new();
router.set_priority(OriginId::from("a"), 1);
router.set_priority(OriginId::from("b"), 2);
let candidates = vec![OriginId::from("a"), OriginId::from("b")];
let health = mock_health(&[], &["b"]);
let selected = router.select_with_fallback(&candidates, &health);
assert_eq!(selected, Some(OriginId::from("b")));
}
#[test]
fn test_fallback_least_bad() {
let router = Router::new();
router.set_priority(OriginId::from("a"), 1);
router.set_priority(OriginId::from("b"), 2);
let candidates = vec![OriginId::from("a"), OriginId::from("b")];
let mut failure_counts = HashMap::new();
failure_counts.insert(OriginId::from("a"), 5);
failure_counts.insert(OriginId::from("b"), 2);
let health = HealthSnapshot {
healthy: Vec::new(),
degraded: Vec::new(),
unhealthy: vec![OriginId::from("a"), OriginId::from("b")],
failure_counts,
};
let selected = router.select_with_fallback(&candidates, &health);
assert_eq!(selected, Some(OriginId::from("b")));
}
}
+1 -10
View File
@@ -1,17 +1,8 @@
use async_trait::async_trait;
use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, Result};
use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, OriginType, Result};
use std::path::{Path, PathBuf};
use tokio::io::AsyncRead;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OriginType {
Local,
Nfs,
Smb,
S3,
Sftp,
}
#[async_trait]
pub trait Origin: Send + Sync {
fn id(&self) -> &OriginId;