diff --git a/musicfs/Cargo.lock b/musicfs/Cargo.lock index f99063f..4dd942a 100644 --- a/musicfs/Cargo.lock +++ b/musicfs/Cargo.lock @@ -1938,6 +1938,7 @@ dependencies = [ "parking_lot 0.12.5", "sd-notify", "tokio", + "tokio-util 0.7.18", "tracing", "tracing-appender", "tracing-journald", @@ -2090,6 +2091,7 @@ dependencies = [ "musicfs-cas", "musicfs-core", "musicfs-origins", + "musicfs-search", "nix", "noxious-client", "parking_lot 0.12.5", @@ -2100,6 +2102,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-test", + "tokio-util 0.7.18", "tracing", ] @@ -3735,6 +3738,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] diff --git a/musicfs/Cargo.toml b/musicfs/Cargo.toml index 366d16a..4dc5de8 100644 --- a/musicfs/Cargo.toml +++ b/musicfs/Cargo.toml @@ -13,6 +13,7 @@ repository = "https://github.com/user/musicfs" [workspace.dependencies] # Async runtime tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["rt"] } async-trait = "0.1" # Error handling diff --git a/musicfs/crates/musicfs-cache/src/db.rs b/musicfs/crates/musicfs-cache/src/db.rs index 21bb61a..b2f759e 100644 --- a/musicfs/crates/musicfs-cache/src/db.rs +++ b/musicfs/crates/musicfs-cache/src/db.rs @@ -6,7 +6,7 @@ use rusqlite::{params, Connection, OptionalExtension}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; const SCHEMA: &str = include_str!("schema.sql"); @@ -32,6 +32,34 @@ impl Database { Ok(db) } + pub fn open_with_integrity_check(path: &Path) -> Result { + debug!(?path, "Opening database with integrity check"); + + let conn = Connection::open(path) + .map_err(|e| Error::Database(format!("open failed: {}", e)))?; + + let integrity: String = conn + .query_row("PRAGMA integrity_check(1)", [], |row| row.get(0)) + .map_err(|e| Error::Database(format!("integrity check failed: {}", e)))?; + + if integrity != "ok" { + warn!(path = ?path, result = %integrity, "Database integrity check failed"); + return Err(Error::DatabaseCorrupted(format!( + "integrity check failed: {}", integrity + ))); + } + + conn.execute_batch(SCHEMA) + .map_err(|e| Error::Database(format!("schema init failed: {}", e)))?; + + let db = Self { + conn: Arc::new(Mutex::new(conn)), + }; + let count = db.file_count().unwrap_or(0); + info!(path = ?path, file_count = count, "Database opened (integrity verified)"); + Ok(db) + } + pub fn open_memory() -> Result { let conn = Connection::open_in_memory() .map_err(|e| Error::Database(format!("open_in_memory failed: {}", e)))?; diff --git a/musicfs/crates/musicfs-cas/src/store.rs b/musicfs/crates/musicfs-cas/src/store.rs index 410cda6..4943b73 100644 --- a/musicfs/crates/musicfs-cas/src/store.rs +++ b/musicfs/crates/musicfs-cas/src/store.rs @@ -4,7 +4,7 @@ use musicfs_core::ChunkHash; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::fs; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; #[cfg(feature = "failpoints")] use fail::fail_point; @@ -45,7 +45,27 @@ impl CasStore { fs::create_dir_all(&config.chunks_dir).await?; let index_path = config.chunks_dir.join("index.sled"); - let index = sled::open(&index_path)?; + let index = match sled::open(&index_path) { + Ok(db) => db, + Err(e) => { + warn!(error = %e, path = ?index_path, "sled index corrupted, attempting recovery"); + + match sled::Config::new().path(&index_path).open() { + Ok(db) => { + info!("sled index repaired successfully"); + db + } + Err(repair_err) => { + warn!(error = %repair_err, "sled repair failed, recreating index"); + if index_path.exists() { + std::fs::remove_dir_all(&index_path) + .map_err(CasError::Io)?; + } + sled::open(&index_path)? + } + } + } + }; let current_size = Self::calculate_size(&config.chunks_dir).await; @@ -79,6 +99,22 @@ impl CasStore { return Ok(hash); } + if self.config.max_size > 0 { + let new_size = self.current_size.load(Ordering::SeqCst) + data.len() as u64; + if new_size > self.config.max_size { + warn!( + current_size = self.current_size.load(Ordering::SeqCst), + chunk_size = data.len(), + max_size = self.config.max_size, + "CAS store full, rejecting write" + ); + return Err(CasError::StoreFull { + current: self.current_size.load(Ordering::SeqCst), + max: self.config.max_size, + }); + } + } + if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } @@ -251,6 +287,9 @@ pub enum CasError { #[error("Serialization error: {0}")] Serialization(String), + + #[error("Store full: {current} / {max} bytes")] + StoreFull { current: u64, max: u64 }, } #[cfg(test)] diff --git a/musicfs/crates/musicfs-cli/Cargo.toml b/musicfs/crates/musicfs-cli/Cargo.toml index 6b5a2b5..3dc43fd 100644 --- a/musicfs/crates/musicfs-cli/Cargo.toml +++ b/musicfs/crates/musicfs-cli/Cargo.toml @@ -17,6 +17,7 @@ musicfs-metadata.path = "../musicfs-metadata" clap.workspace = true tokio.workspace = true +tokio-util.workspace = true tracing.workspace = true tracing-subscriber.workspace = true tracing-appender.workspace = true diff --git a/musicfs/crates/musicfs-cli/src/main.rs b/musicfs/crates/musicfs-cli/src/main.rs index 3dcfdc7..b3ae111 100644 --- a/musicfs/crates/musicfs-cli/src/main.rs +++ b/musicfs/crates/musicfs-cli/src/main.rs @@ -208,6 +208,8 @@ fn run_mount( } info!("MusicFS ready, PID {}", std::process::id()); + let shutdown_token = tokio_util::sync::CancellationToken::new(); + runtime.block_on(async { let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; @@ -223,6 +225,13 @@ fn run_mount( } } + info!("Beginning ordered shutdown"); + shutdown_token.cancel(); + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + info!("Background tasks stopped"); + Ok::<_, anyhow::Error>(()) })?; diff --git a/musicfs/crates/musicfs-core/src/error.rs b/musicfs/crates/musicfs-core/src/error.rs index 26d92b7..4b754e5 100644 --- a/musicfs/crates/musicfs-core/src/error.rs +++ b/musicfs/crates/musicfs-core/src/error.rs @@ -23,6 +23,9 @@ pub enum Error { #[error("Database error: {0}")] Database(String), + #[error("Database corrupted: {0}")] + DatabaseCorrupted(String), + #[error("NFS stale file handle")] NfsStaleHandle, diff --git a/musicfs/crates/musicfs-core/src/lib.rs b/musicfs/crates/musicfs-core/src/lib.rs index 39884ac..bf51af4 100644 --- a/musicfs/crates/musicfs-core/src/lib.rs +++ b/musicfs/crates/musicfs-core/src/lib.rs @@ -4,6 +4,7 @@ pub mod error; pub mod events; pub mod metrics; pub mod resolver; +pub mod supervisor; pub mod types; pub use config::{ diff --git a/musicfs/crates/musicfs-core/src/supervisor.rs b/musicfs/crates/musicfs-core/src/supervisor.rs new file mode 100644 index 0000000..6c8df86 --- /dev/null +++ b/musicfs/crates/musicfs-core/src/supervisor.rs @@ -0,0 +1,181 @@ +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; +use tracing::{error, warn}; + +pub struct TaskSupervisor { + tasks: Arc>>, +} + +struct TaskEntry { + handle: JoinHandle<()>, + status: TaskStatus, + restart_count: u32, + last_restart: Option, +} + +#[derive(Debug, Clone)] +pub enum TaskStatus { + Running, + Failed { error: String, at: Instant }, + Restarting { attempt: u32 }, + Stopped, +} + +impl Default for TaskSupervisor { + fn default() -> Self { + Self::new() + } +} + +impl TaskSupervisor { + pub fn new() -> Self { + Self { + tasks: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub fn spawn_supervised(&self, name: &str, future: F) + where + F: std::future::Future + Send + 'static, + { + let name_owned = name.to_string(); + + let handle = tokio::spawn(async move { + future.await; + }); + + self.tasks.write().insert( + name_owned, + TaskEntry { + handle, + status: TaskStatus::Running, + restart_count: 0, + last_restart: None, + }, + ); + } + + pub fn spawn_critical(&self, name: &str, factory: F) + where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: std::future::Future + Send + 'static, + { + let tasks = self.tasks.clone(); + let name_owned = name.to_string(); + + let monitor_handle = tokio::spawn(async move { + let mut restart_count = 0u32; + let max_restarts = 5u32; + let backoff_durations = [ + Duration::from_secs(1), + Duration::from_secs(5), + Duration::from_secs(30), + ]; + + loop { + let handle = tokio::spawn(factory()); + + { + let mut t = tasks.write(); + if let Some(entry) = t.get_mut(&name_owned) { + entry.status = TaskStatus::Running; + } + } + + match handle.await { + Ok(()) => { + let mut t = tasks.write(); + if let Some(entry) = t.get_mut(&name_owned) { + entry.status = TaskStatus::Stopped; + } + break; + } + Err(e) => { + restart_count += 1; + + if restart_count > max_restarts { + error!(task = %name_owned, "Task exceeded max restarts ({}), giving up", max_restarts); + let mut t = tasks.write(); + if let Some(entry) = t.get_mut(&name_owned) { + entry.status = TaskStatus::Failed { + error: format!("Exceeded max restarts: {}", e), + at: Instant::now(), + }; + } + break; + } + + let backoff_idx = + (restart_count as usize - 1).min(backoff_durations.len() - 1); + let backoff = backoff_durations[backoff_idx]; + + warn!( + task = %name_owned, + error = %e, + attempt = restart_count, + backoff_ms = backoff.as_millis() as u64, + "Critical task failed, restarting with backoff" + ); + + { + let mut t = tasks.write(); + if let Some(entry) = t.get_mut(&name_owned) { + entry.status = TaskStatus::Restarting { + attempt: restart_count, + }; + entry.restart_count = restart_count; + entry.last_restart = Some(Instant::now()); + } + } + + tokio::time::sleep(backoff).await; + } + } + } + }); + + self.tasks.write().insert( + name.to_string(), + TaskEntry { + handle: monitor_handle, + status: TaskStatus::Running, + restart_count: 0, + last_restart: None, + }, + ); + } + + pub fn task_status(&self, name: &str) -> TaskStatus { + let mut tasks = self.tasks.write(); + if let Some(entry) = tasks.get_mut(name) { + if entry.handle.is_finished() { + entry.status = TaskStatus::Failed { + error: "Task exited".into(), + at: Instant::now(), + }; + } + entry.status.clone() + } else { + TaskStatus::Stopped + } + } + + pub fn check_all(&self) -> Vec<(String, TaskStatus)> { + let mut tasks = self.tasks.write(); + tasks + .iter_mut() + .map(|(name, entry)| { + if entry.handle.is_finished() { + entry.status = TaskStatus::Failed { + error: "Task exited".into(), + at: Instant::now(), + }; + } + (name.clone(), entry.status.clone()) + }) + .collect() + } +} diff --git a/musicfs/crates/musicfs-search/src/index.rs b/musicfs/crates/musicfs-search/src/index.rs index 86c39cc..196a9ba 100644 --- a/musicfs/crates/musicfs-search/src/index.rs +++ b/musicfs/crates/musicfs-search/src/index.rs @@ -6,7 +6,7 @@ use tantivy::collector::TopDocs; use tantivy::query::{BooleanQuery, FuzzyTermQuery, Occur, Query, QueryParser}; use tantivy::schema::{Field, Schema, Value, STORED, TEXT, INDEXED}; use tantivy::{Index, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument, Term}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; const SCHEMA_VERSION: u32 = 1; @@ -95,6 +95,28 @@ impl SearchIndex { }) } + pub fn open_with_recovery(index_path: &Path) -> Result { + match Self::open(index_path) { + Ok(index) => { + let docs = index.reader.searcher().num_docs(); + info!(docs, "Search index opened successfully"); + Ok(index) + } + Err(e) => { + warn!( + error = %e, + path = ?index_path, + "Search index corrupted, rebuilding from scratch" + ); + if index_path.exists() { + std::fs::remove_dir_all(index_path) + .map_err(SearchError::Io)?; + } + Self::open(index_path) + } + } + } + pub fn index_file(&self, file: &FileMeta) -> Result<(), SearchError> { let mut doc = TantivyDocument::new(); diff --git a/musicfs/crates/musicfs-test-utils/Cargo.toml b/musicfs/crates/musicfs-test-utils/Cargo.toml index 5259417..5354848 100644 --- a/musicfs/crates/musicfs-test-utils/Cargo.toml +++ b/musicfs/crates/musicfs-test-utils/Cargo.toml @@ -9,6 +9,7 @@ musicfs-core = { path = "../musicfs-core" } musicfs-origins = { path = "../musicfs-origins" } musicfs-cas = { path = "../musicfs-cas" } musicfs-cache = { path = "../musicfs-cache" } +musicfs-search = { path = "../musicfs-search" } async-trait.workspace = true tokio = { workspace = true, features = ["full", "sync", "time"] } @@ -37,5 +38,6 @@ full = ["failpoints", "process-tests", "resource-limits", "docker-tests"] [dev-dependencies] tokio-test = "0.4" +tokio-util.workspace = true sd-notify.workspace = true libc.workspace = true diff --git a/musicfs/crates/musicfs-test-utils/tests/resilience.rs b/musicfs/crates/musicfs-test-utils/tests/resilience.rs index 1dd70a2..c9c1c3e 100644 --- a/musicfs/crates/musicfs-test-utils/tests/resilience.rs +++ b/musicfs/crates/musicfs-test-utils/tests/resilience.rs @@ -1,14 +1,20 @@ -use musicfs_cache::{VirtualTree, ROOT_INODE}; +use musicfs_cache::{Database, VirtualTree, ROOT_INODE}; use musicfs_cas::{CasConfig, CasStore}; -use musicfs_core::{HealthStatus, OriginId, OriginType, RealPath}; +use musicfs_core::supervisor::{TaskStatus, TaskSupervisor}; +use musicfs_core::{ + AudioMeta, FileId, FileMeta, HealthStatus, OriginId, OriginType, RealPath, VirtualPath, +}; use musicfs_origins::{HealthMonitor, LocalOrigin, OriginRegistry}; -use musicfs_test_utils::{FaultyOrigin, FailMode}; +use musicfs_search::SearchIndex; +use musicfs_test_utils::{FailMode, FaultyOrigin}; use std::collections::HashMap; use std::io::ErrorKind; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, UNIX_EPOCH}; use tempfile::TempDir; +use tokio_util::sync::CancellationToken; fn setup_test_file(dir: &TempDir, name: &str, content: &[u8]) -> PathBuf { let path = dir.path().join(name); @@ -31,19 +37,101 @@ fn create_faulty_origin(id: &str, dir: &TempDir, mode: FailMode) -> Arc FileMeta { + let name = Path::new(path) + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown") + .to_string(); + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(path), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from(path), + }, + size, + mtime: UNIX_EPOCH, + content_hash: None, + audio: Some(AudioMeta { + title: Some(name), + ..Default::default() + }), + } +} + #[tokio::test] async fn test_sqlite_integrity_check_detects_corruption() { - todo!("Issue 2.4: Implement Database::open_with_integrity_check()") + let dir = TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + + { + let db = Database::open(&db_path).unwrap(); + db.upsert_file( + &OriginId::from("test"), + Path::new("/test.flac"), + &VirtualPath::new("/Test.flac"), + &AudioMeta::default(), + UNIX_EPOCH, + 1000, + ) + .unwrap(); + } + + let mut data = std::fs::read(&db_path).unwrap(); + let mid = data.len() / 2; + data[mid..mid + 100].fill(0xFF); + std::fs::write(&db_path, &data).unwrap(); + + let result = Database::open_with_integrity_check(&db_path); + assert!(result.is_err()); } #[tokio::test] async fn test_tantivy_corruption_triggers_rebuild() { - todo!("Issue 2.4: Implement SearchIndex::open_with_recovery()") + let dir = TempDir::new().unwrap(); + let index_path = dir.path().join("search_idx"); + + { + let index = SearchIndex::open(&index_path).unwrap(); + index.index_file(&make_file_meta(1, "/a.flac", 1000)).unwrap(); + index.commit().unwrap(); + } + + std::fs::write(index_path.join("meta.json"), b"corrupted").unwrap(); + + let index = SearchIndex::open_with_recovery(&index_path).unwrap(); + let results = index.search("a", 10).unwrap(); + assert_eq!(results.len(), 0); } #[tokio::test] async fn test_sled_corruption_triggers_repair() { - todo!("Issue 3.5: Implement sled recovery in CasStore::open()") + let dir = TempDir::new().unwrap(); + let chunks_dir = dir.path().join("chunks"); + let config = CasConfig { + chunks_dir: chunks_dir.clone(), + max_size: 10_000_000, + shard_levels: 2, + }; + + { + let store = CasStore::open(config.clone()).await.unwrap(); + store.put(b"test data").await.unwrap(); + } + + let sled_dir = chunks_dir.join("index.sled"); + if sled_dir.exists() { + for entry in std::fs::read_dir(&sled_dir).unwrap() { + let entry = entry.unwrap(); + if entry.metadata().unwrap().is_file() { + std::fs::write(entry.path(), b"corrupted").unwrap(); + } + } + } + + let result = CasStore::open(config).await; + assert!(result.is_ok(), "sled should recover from corruption"); } #[tokio::test] @@ -203,9 +291,21 @@ async fn test_health_checks_run_in_parallel() { assert!(elapsed < Duration::from_millis(350), "Issue 4.2.2: check_all() should run in parallel (sequential would take ~600ms), took {:?}", elapsed); } -#[tokio::test] -async fn test_tantivy_survives_uncommitted_crash() { - todo!("Issue 5.2: Implement tantivy crash recovery test") +#[test] +fn test_tantivy_survives_uncommitted_crash() { + let dir = TempDir::new().unwrap(); + let index_path = dir.path().join("search_idx"); + + { + let index = SearchIndex::open(&index_path).unwrap(); + index.index_file(&make_file_meta(1, "/a.flac", 1000)).unwrap(); + index.commit().unwrap(); + index.index_file(&make_file_meta(2, "/b.flac", 1000)).unwrap(); + } + + let index = SearchIndex::open(&index_path).unwrap(); + let results = index.search("a", 10).unwrap(); + assert_eq!(results.len(), 1); } #[tokio::test] @@ -304,6 +404,86 @@ fn test_sd_notify_ready_sent() { std::env::remove_var("NOTIFY_SOCKET"); } +#[tokio::test] +async fn test_shutdown_cancels_background_tasks() { + let token = CancellationToken::new(); + let stopped = Arc::new(AtomicBool::new(false)); + let stopped_clone = stopped.clone(); + let token_clone = token.clone(); + + tokio::spawn(async move { + token_clone.cancelled().await; + stopped_clone.store(true, Ordering::SeqCst); + }); + + assert!(!stopped.load(Ordering::SeqCst)); + token.cancel(); + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(stopped.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn test_shutdown_flushes_tantivy() { + let dir = TempDir::new().unwrap(); + let idx_path = dir.path().join("idx"); + + { + let index = SearchIndex::open(&idx_path).unwrap(); + index.index_file(&make_file_meta(1, "/a.flac", 1000)).unwrap(); + index.commit().unwrap(); + } + + let index2 = SearchIndex::open(&idx_path).unwrap(); + assert_eq!(index2.search("a", 10).unwrap().len(), 1); +} + +#[tokio::test] +async fn test_supervisor_detects_task_completion() { + let supervisor = TaskSupervisor::new(); + supervisor.spawn_supervised("fast", async {}); + tokio::time::sleep(Duration::from_millis(50)).await; +} + +#[tokio::test] +async fn test_supervisor_detects_panic() { + let supervisor = TaskSupervisor::new(); + supervisor.spawn_supervised("panicker", async { + panic!("boom"); + }); + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(matches!( + supervisor.task_status("panicker"), + TaskStatus::Failed { .. } + )); +} + +#[tokio::test] +async fn test_supervisor_restarts_critical_task() { + let count = Arc::new(AtomicU32::new(0)); + let c = count.clone(); + + let supervisor = TaskSupervisor::new(); + supervisor.spawn_critical("restartable", move || { + let c = c.clone(); + async move { + let n = c.fetch_add(1, Ordering::SeqCst); + if n == 0 { + panic!("first run fails"); + } + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + } + } + }); + + tokio::time::sleep(Duration::from_secs(2)).await; + assert_eq!(count.load(Ordering::SeqCst), 2); + assert!(matches!( + supervisor.task_status("restartable"), + TaskStatus::Running + )); +} + #[tokio::test] async fn test_sigterm_triggers_shutdown() { use std::process::{Command, Stdio};