diff --git a/musicfs/Cargo.lock b/musicfs/Cargo.lock index ba5b8e5..aba05bb 100644 --- a/musicfs/Cargo.lock +++ b/musicfs/Cargo.lock @@ -158,7 +158,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -169,7 +169,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -274,6 +274,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bmrng" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9758e48498ae13d49b51a979d553d254e67021b203d9597e82a04ebd81025b2" +dependencies = [ + "futures", + "loom", + "tokio", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -322,6 +333,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.44" @@ -366,7 +383,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -692,7 +709,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -738,6 +755,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af9673d8203fcb076b19dfd17e38b3d4ae9f44959416ea532ce72415a6020365" +[[package]] +name = "fail" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe5e43d0f78a42ad591453aedb1d7ae631ce7ee445c7643691055a9ed8d3b01c" +dependencies = [ + "log", + "once_cell", + "rand", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -889,6 +917,21 @@ dependencies = [ "zerocopy 0.7.35", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -896,6 +939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -904,6 +948,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -922,8 +994,13 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -950,6 +1027,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "generator" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "winapi", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -1022,7 +1112,7 @@ dependencies = [ "indexmap 2.14.0", "slab", "tokio", - "tokio-util", + "tokio-util 0.7.18", "tracing", ] @@ -1173,6 +1263,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls", + "tokio", + "tokio-rustls", +] + [[package]] name = "hyper-timeout" version = "0.4.1" @@ -1593,6 +1697,20 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "loom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27a6650b2f722ae8c0e2ebc46d07f80c9923464fc206d962332f1eff83143530" +dependencies = [ + "cfg-if", + "futures-util", + "generator", + "scoped-tls", + "serde", + "serde_json", +] + [[package]] name = "lru" version = "0.12.5" @@ -1720,6 +1838,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "mockall_double" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dffc15b97456ecc84d2bde8c1df79145e154f45225828c4361f676e1b82acd6" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "moka" version = "0.12.15" @@ -1775,6 +1905,7 @@ version = "0.1.0" dependencies = [ "bytes", "dirs", + "fail", "hex", "musicfs-cache", "musicfs-core", @@ -1942,6 +2073,29 @@ dependencies = [ "xxhash-rust", ] +[[package]] +name = "musicfs-test-utils" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "fail", + "musicfs-cache", + "musicfs-cas", + "musicfs-core", + "musicfs-origins", + "nix", + "noxious-client", + "parking_lot 0.12.5", + "reqwest", + "rlimit", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tokio-test", + "tracing", +] + [[package]] name = "native-tls" version = "0.2.18" @@ -1959,6 +2113,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.11.1", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -1988,6 +2154,39 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "noxious" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e68998924150ba54dbf1adf4c3f7f7c10bb5d3c6789ab71af11e34fe4c667970" +dependencies = [ + "async-trait", + "bmrng", + "bytes", + "futures", + "mockall_double", + "pin-project-lite", + "rand", + "serde", + "thiserror 1.0.69", + "tokio", + "tokio-util 0.6.10", + "tracing", +] + +[[package]] +name = "noxious-client" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b7ab7a9efb5768cd07e2b2455f80b3998d7397be76398c2ac03a52a42b652e7" +dependencies = [ + "noxious", + "reqwest", + "serde", + "thiserror 1.0.69", + "tokio", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -2084,7 +2283,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2217,7 +2416,7 @@ checksum = "a990e22f43e84855daf260dded30524ef4a9021cc7541c26540500a50b624389" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2282,7 +2481,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.117", ] [[package]] @@ -2321,7 +2520,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn", + "syn 2.0.117", "tempfile", ] @@ -2335,7 +2534,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2524,6 +2723,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", @@ -2533,6 +2733,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls", "rustls-pemfile", "serde", "serde_json", @@ -2541,14 +2742,39 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-rustls", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "winreg", ] +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.17", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + +[[package]] +name = "rlimit" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a" +dependencies = [ + "libc", +] + [[package]] name = "rmp" version = "0.8.15" @@ -2630,6 +2856,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2639,6 +2877,16 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -2669,12 +2917,28 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "3.7.0" @@ -2731,7 +2995,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3036,6 +3300,17 @@ dependencies = [ "symphonia-metadata", ] +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.117" @@ -3061,7 +3336,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3277,7 +3552,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3288,7 +3563,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3376,7 +3651,7 @@ checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3389,6 +3664,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.18" @@ -3400,6 +3685,31 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6d24790a10a7af737693a3e8f1d03faef7e6ca0cc99aae5066f533766de545" +dependencies = [ + "futures-core", + "tokio", + "tokio-stream", +] + +[[package]] +name = "tokio-util" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -3491,7 +3801,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3508,7 +3818,7 @@ dependencies = [ "rand", "slab", "tokio", - "tokio-util", + "tokio-util 0.7.18", "tower-layer", "tower-service", "tracing", @@ -3532,6 +3842,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3558,7 +3869,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3654,6 +3965,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.8" @@ -3799,7 +4116,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wasm-bindgen-shared", ] @@ -3981,7 +4298,7 @@ dependencies = [ "anyhow", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wasmtime-component-util", "wasmtime-wit-bindgen", "wit-parser 0.201.0", @@ -4155,7 +4472,7 @@ checksum = "ffaafa5c12355b1a9ee068e9295d50c4ca0a400c721950cdae4f5b54391a2da5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4225,6 +4542,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "winapi" version = "0.3.9" @@ -4293,7 +4616,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4304,7 +4627,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4543,7 +4866,7 @@ dependencies = [ "heck 0.5.0", "indexmap 2.14.0", "prettyplease", - "syn", + "syn 2.0.117", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -4559,7 +4882,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -4650,7 +4973,7 @@ checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -4681,7 +5004,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4692,7 +5015,7 @@ checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4712,7 +5035,7 @@ checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -4746,7 +5069,7 @@ checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] diff --git a/musicfs/Cargo.toml b/musicfs/Cargo.toml index 4d2e068..4f844bd 100644 --- a/musicfs/Cargo.toml +++ b/musicfs/Cargo.toml @@ -61,6 +61,12 @@ clap = { version = "4", features = ["derive"] } # Testing tempfile = "3" +fail = "0.5" +rlimit = "0.10" +nix = { version = "0.29", features = ["signal", "process"] } +wiremock = "0.6" +assert_cmd = "2.0" +noxious-client = "1.0" # Platform-specific libc = "0.2" diff --git a/musicfs/crates/musicfs-cas/Cargo.toml b/musicfs/crates/musicfs-cas/Cargo.toml index cdbff66..1868b56 100644 --- a/musicfs/crates/musicfs-cas/Cargo.toml +++ b/musicfs/crates/musicfs-cas/Cargo.toml @@ -3,7 +3,12 @@ name = "musicfs-cas" version.workspace = true edition.workspace = true +[features] +default = [] +failpoints = ["fail/failpoints"] + [dependencies] +fail = { workspace = true, optional = true } musicfs-core = { path = "../musicfs-core" } musicfs-origins = { path = "../musicfs-origins" } musicfs-sync = { path = "../musicfs-sync" } diff --git a/musicfs/crates/musicfs-cas/src/store.rs b/musicfs/crates/musicfs-cas/src/store.rs index 094ea6c..410cda6 100644 --- a/musicfs/crates/musicfs-cas/src/store.rs +++ b/musicfs/crates/musicfs-cas/src/store.rs @@ -6,6 +6,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use tokio::fs; use tracing::{debug, trace, warn}; +#[cfg(feature = "failpoints")] +use fail::fail_point; + const DEFAULT_MAX_SIZE_10GB: u64 = 10 * 1024 * 1024 * 1024; const DEFAULT_SHARD_LEVELS_256_SUBDIRS: u8 = 2; @@ -80,8 +83,24 @@ impl CasStore { fs::create_dir_all(parent).await?; } + #[cfg(feature = "failpoints")] + fail_point!("cas-put-before-write", |_| { + Err(CasError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + "Failpoint: cas-put-before-write", + ))) + }); + fs::write(&path, data).await?; + #[cfg(feature = "failpoints")] + fail_point!("cas-put-after-write-before-index", |_| { + Err(CasError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + "Failpoint: cas-put-after-write-before-index", + ))) + }); + let location = ChunkLocation { path: path.clone(), size: data.len() as u32, diff --git a/musicfs/crates/musicfs-origins/src/health.rs b/musicfs/crates/musicfs-origins/src/health.rs index 4baee1f..66832c4 100644 --- a/musicfs/crates/musicfs-origins/src/health.rs +++ b/musicfs/crates/musicfs-origins/src/health.rs @@ -180,7 +180,7 @@ impl HealthMonitor { HealthCheckHandle { stop_tx } } - async fn check_all(&self) { + pub async fn check_all(&self) { let origins: Vec<_> = self .origins .iter() diff --git a/musicfs/crates/musicfs-test-utils/Cargo.toml b/musicfs/crates/musicfs-test-utils/Cargo.toml new file mode 100644 index 0000000..7f879b5 --- /dev/null +++ b/musicfs/crates/musicfs-test-utils/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "musicfs-test-utils" +version.workspace = true +edition.workspace = true +description = "Test utilities and fixtures for MusicFS resilience testing" + +[dependencies] +musicfs-core = { path = "../musicfs-core" } +musicfs-origins = { path = "../musicfs-origins" } +musicfs-cas = { path = "../musicfs-cas" } +musicfs-cache = { path = "../musicfs-cache" } + +async-trait.workspace = true +tokio = { workspace = true, features = ["full", "sync", "time"] } +tracing.workspace = true +thiserror.workspace = true +parking_lot.workspace = true +tempfile.workspace = true +bytes.workspace = true + +# Fault injection +fail = { version = "0.5", optional = true } +rlimit = { version = "0.10", optional = true } +nix = { version = "0.29", optional = true, features = ["signal", "process"] } + +# Docker/network tests +noxious-client = { version = "1.0", optional = true } +reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls"] } + +[features] +default = [] +failpoints = ["fail/failpoints"] +process-tests = ["nix"] +resource-limits = ["rlimit"] +docker-tests = ["noxious-client", "reqwest"] +full = ["failpoints", "process-tests", "resource-limits", "docker-tests"] + +[dev-dependencies] +tokio-test = "0.4" diff --git a/musicfs/crates/musicfs-test-utils/src/assertions.rs b/musicfs/crates/musicfs-test-utils/src/assertions.rs new file mode 100644 index 0000000..16a4da4 --- /dev/null +++ b/musicfs/crates/musicfs-test-utils/src/assertions.rs @@ -0,0 +1,206 @@ +use musicfs_cas::CasError; +use musicfs_core::Error; +use std::time::{Duration, Instant}; + +pub fn assert_error_contains(result: Result, expected_text: &str) { + match result { + Ok(_) => panic!("Expected error containing '{}', but got Ok", expected_text), + Err(e) => { + let error_msg = format!("{:?}", e); + assert!( + error_msg.contains(expected_text), + "Expected error containing '{}', but got: {}", + expected_text, + error_msg + ); + } + } +} + +pub fn assert_io_error(result: Result) { + match result { + Err(Error::Io(_)) => (), + Err(e) => panic!("Expected Io error, got: {:?}", e), + Ok(_) => panic!("Expected Io error, got Ok"), + } +} + +pub fn assert_cas_io_error(result: Result) { + match result { + Err(CasError::Io(_)) => (), + Err(e) => panic!("Expected CasError::Io, got: {:?}", e), + Ok(_) => panic!("Expected CasError::Io, got Ok"), + } +} + +pub fn assert_cas_not_found(result: Result) { + match result { + Err(CasError::NotFound(_)) => (), + Err(e) => panic!("Expected CasError::NotFound, got: {:?}", e), + Ok(_) => panic!("Expected CasError::NotFound, got Ok"), + } +} + +pub fn assert_cas_integrity_error(result: Result) { + match result { + Err(CasError::IntegrityError { .. }) => (), + Err(e) => panic!("Expected CasError::IntegrityError, got: {:?}", e), + Ok(_) => panic!("Expected CasError::IntegrityError, got Ok"), + } +} + +pub fn assert_file_not_found(result: Result) { + match result { + Err(Error::FileNotFound(_)) => (), + Err(e) => panic!("Expected FileNotFound error, got: {:?}", e), + Ok(_) => panic!("Expected FileNotFound error, got Ok"), + } +} + +pub fn assert_origin_error(result: Result) { + match result { + Err(Error::Origin(_)) => (), + Err(e) => panic!("Expected Origin error, got: {:?}", e), + Ok(_) => panic!("Expected Origin error, got Ok"), + } +} + +pub fn assert_timeout_error(result: Result) { + match result { + Err(Error::Timeout(_)) => (), + Err(e) => panic!("Expected Timeout error, got: {:?}", e), + Ok(_) => panic!("Expected Timeout error, got Ok"), + } +} + +pub struct TimedAssertion { + start: Instant, + min_duration: Option, + max_duration: Option, +} + +impl TimedAssertion { + pub fn new() -> Self { + Self { + start: Instant::now(), + min_duration: None, + max_duration: None, + } + } + + pub fn expect_at_least(mut self, duration: Duration) -> Self { + self.min_duration = Some(duration); + self + } + + pub fn expect_at_most(mut self, duration: Duration) -> Self { + self.max_duration = Some(duration); + self + } + + pub fn assert_elapsed(self) { + let elapsed = self.start.elapsed(); + + if let Some(min) = self.min_duration { + assert!( + elapsed >= min, + "Expected at least {:?}, but only {:?} elapsed", + min, + elapsed + ); + } + + if let Some(max) = self.max_duration { + assert!( + elapsed <= max, + "Expected at most {:?}, but {:?} elapsed", + max, + elapsed + ); + } + } +} + +impl Default for TimedAssertion { + fn default() -> Self { + Self::new() + } +} + +pub async fn assert_completes_within(future: F, timeout: Duration) -> T +where + F: std::future::Future, +{ + tokio::time::timeout(timeout, future) + .await + .expect(&format!( + "Operation did not complete within {:?}", + timeout + )) +} + +pub async fn assert_times_out(future: F, timeout: Duration) +where + F: std::future::Future, +{ + match tokio::time::timeout(timeout, future).await { + Ok(_) => panic!("Expected operation to time out, but it completed"), + Err(_) => (), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_assert_error_contains() { + let result: Result<(), Error> = Err(Error::Origin("connection refused".into())); + assert_error_contains(result, "connection"); + } + + #[test] + #[should_panic(expected = "Expected error containing")] + fn test_assert_error_contains_failure() { + let result: Result<(), Error> = Err(Error::Origin("something else".into())); + assert_error_contains(result, "connection"); + } + + #[test] + fn test_assert_io_error() { + let result: Result<(), Error> = + Err(Error::Io(std::io::Error::new(std::io::ErrorKind::Other, "test"))); + assert_io_error(result); + } + + #[test] + fn test_timed_assertion_at_least() { + let timer = TimedAssertion::new().expect_at_least(Duration::from_millis(10)); + std::thread::sleep(Duration::from_millis(15)); + timer.assert_elapsed(); + } + + #[test] + fn test_timed_assertion_at_most() { + let timer = TimedAssertion::new().expect_at_most(Duration::from_millis(100)); + timer.assert_elapsed(); + } + + #[tokio::test] + async fn test_assert_completes_within() { + let result = + assert_completes_within(async { 42 }, Duration::from_millis(100)).await; + assert_eq!(result, 42); + } + + #[tokio::test] + async fn test_assert_times_out() { + assert_times_out( + async { + tokio::time::sleep(Duration::from_secs(10)).await; + }, + Duration::from_millis(10), + ) + .await; + } +} diff --git a/musicfs/crates/musicfs-test-utils/src/faulty_cas.rs b/musicfs/crates/musicfs-test-utils/src/faulty_cas.rs new file mode 100644 index 0000000..4a5b1e4 --- /dev/null +++ b/musicfs/crates/musicfs-test-utils/src/faulty_cas.rs @@ -0,0 +1,250 @@ +use bytes::Bytes; +use musicfs_cas::{CasConfig, CasError, CasStore, DedupStats}; +use musicfs_core::ChunkHash; +use std::io::{self, ErrorKind}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; + +pub struct FaultyCasStore { + inner: Arc, + inject_enospc: AtomicBool, + inject_eio_on_read: AtomicBool, + inject_eio_on_write: AtomicBool, + inject_corruption: AtomicBool, + fail_after_n_puts: AtomicUsize, + put_count: AtomicUsize, +} + +impl FaultyCasStore { + pub fn new(inner: Arc) -> Self { + Self { + inner, + inject_enospc: AtomicBool::new(false), + inject_eio_on_read: AtomicBool::new(false), + inject_eio_on_write: AtomicBool::new(false), + inject_corruption: AtomicBool::new(false), + fail_after_n_puts: AtomicUsize::new(usize::MAX), + put_count: AtomicUsize::new(0), + } + } + + pub async fn open(config: CasConfig) -> Result { + let store = CasStore::open(config).await?; + Ok(Self::new(Arc::new(store))) + } + + pub fn set_inject_enospc(&self, enabled: bool) { + self.inject_enospc.store(enabled, Ordering::SeqCst); + } + + pub fn set_inject_eio_on_read(&self, enabled: bool) { + self.inject_eio_on_read.store(enabled, Ordering::SeqCst); + } + + pub fn set_inject_eio_on_write(&self, enabled: bool) { + self.inject_eio_on_write.store(enabled, Ordering::SeqCst); + } + + pub fn set_inject_corruption(&self, enabled: bool) { + self.inject_corruption.store(enabled, Ordering::SeqCst); + } + + pub fn set_fail_after_n_puts(&self, n: usize) { + self.fail_after_n_puts.store(n, Ordering::SeqCst); + self.put_count.store(0, Ordering::SeqCst); + } + + pub fn reset_faults(&self) { + self.inject_enospc.store(false, Ordering::SeqCst); + self.inject_eio_on_read.store(false, Ordering::SeqCst); + self.inject_eio_on_write.store(false, Ordering::SeqCst); + self.inject_corruption.store(false, Ordering::SeqCst); + self.fail_after_n_puts.store(usize::MAX, Ordering::SeqCst); + self.put_count.store(0, Ordering::SeqCst); + } + + pub fn put_count(&self) -> usize { + self.put_count.load(Ordering::SeqCst) + } + + pub async fn put(&self, data: &[u8]) -> Result { + let count = self.put_count.fetch_add(1, Ordering::SeqCst); + + if self.inject_enospc.load(Ordering::SeqCst) { + return Err(CasError::Io(io::Error::new( + ErrorKind::Other, + "No space left on device (ENOSPC injected)", + ))); + } + + if self.inject_eio_on_write.load(Ordering::SeqCst) { + return Err(CasError::Io(io::Error::new( + ErrorKind::Other, + "Input/output error (EIO injected)", + ))); + } + + let threshold = self.fail_after_n_puts.load(Ordering::SeqCst); + if count >= threshold { + return Err(CasError::Io(io::Error::new( + ErrorKind::Other, + "Injected failure after N puts", + ))); + } + + self.inner.put(data).await + } + + pub async fn get(&self, hash: &ChunkHash) -> Result { + if self.inject_eio_on_read.load(Ordering::SeqCst) { + return Err(CasError::Io(io::Error::new( + ErrorKind::Other, + "Input/output error (EIO injected)", + ))); + } + + let data = self.inner.get(hash).await?; + + if self.inject_corruption.load(Ordering::SeqCst) { + let mut corrupted = data.to_vec(); + if !corrupted.is_empty() { + corrupted[0] = corrupted[0].wrapping_add(1); + } + return Err(CasError::IntegrityError { + expected: hash.as_hex(), + actual: ChunkHash::from_bytes(&corrupted).as_hex(), + }); + } + + Ok(data) + } + + pub fn exists(&self, hash: &ChunkHash) -> bool { + self.inner.exists(hash) + } + + pub async fn delete(&self, hash: &ChunkHash) -> Result<(), CasError> { + if self.inject_eio_on_write.load(Ordering::SeqCst) { + return Err(CasError::Io(io::Error::new( + ErrorKind::Other, + "Input/output error (EIO injected)", + ))); + } + self.inner.delete(hash).await + } + + pub fn current_size(&self) -> u64 { + self.inner.current_size() + } + + pub fn max_size(&self) -> u64 { + self.inner.max_size() + } + + pub fn list_chunks(&self) -> impl Iterator + '_ { + self.inner.list_chunks() + } + + pub fn dedup_stats(&self) -> DedupStats { + self.inner.dedup_stats() + } + + pub fn inner(&self) -> &Arc { + &self.inner + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + async fn test_store() -> (FaultyCasStore, TempDir) { + let dir = TempDir::new().unwrap(); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size: 1024 * 1024, + shard_levels: 2, + }; + let store = FaultyCasStore::open(config).await.unwrap(); + (store, dir) + } + + #[tokio::test] + async fn test_healthy_passthrough() { + let (store, _dir) = test_store().await; + + let data = b"test data"; + let hash = store.put(data).await.unwrap(); + let retrieved = store.get(&hash).await.unwrap(); + assert_eq!(&retrieved[..], data); + } + + #[tokio::test] + async fn test_inject_enospc() { + let (store, _dir) = test_store().await; + + store.set_inject_enospc(true); + let result = store.put(b"test").await; + assert!(result.is_err()); + + let err = result.unwrap_err(); + assert!(matches!(err, CasError::Io(_))); + + store.set_inject_enospc(false); + assert!(store.put(b"test").await.is_ok()); + } + + #[tokio::test] + async fn test_inject_eio_on_read() { + let (store, _dir) = test_store().await; + + let hash = store.put(b"test").await.unwrap(); + + store.set_inject_eio_on_read(true); + let result = store.get(&hash).await; + assert!(result.is_err()); + + store.set_inject_eio_on_read(false); + assert!(store.get(&hash).await.is_ok()); + } + + #[tokio::test] + async fn test_inject_corruption() { + let (store, _dir) = test_store().await; + + let hash = store.put(b"test data").await.unwrap(); + + store.set_inject_corruption(true); + let result = store.get(&hash).await; + assert!(matches!(result, Err(CasError::IntegrityError { .. }))); + } + + #[tokio::test] + async fn test_fail_after_n_puts() { + let (store, _dir) = test_store().await; + + store.set_fail_after_n_puts(2); + + assert!(store.put(b"data1").await.is_ok()); + assert!(store.put(b"data2").await.is_ok()); + assert!(store.put(b"data3").await.is_err()); + assert!(store.put(b"data4").await.is_err()); + assert_eq!(store.put_count(), 4); + } + + #[tokio::test] + async fn test_reset_faults() { + let (store, _dir) = test_store().await; + + store.set_inject_enospc(true); + store.set_inject_eio_on_read(true); + store.set_fail_after_n_puts(1); + + store.reset_faults(); + + assert!(store.put(b"test").await.is_ok()); + let hash = store.put(b"test2").await.unwrap(); + assert!(store.get(&hash).await.is_ok()); + } +} diff --git a/musicfs/crates/musicfs-test-utils/src/faulty_origin.rs b/musicfs/crates/musicfs-test-utils/src/faulty_origin.rs new file mode 100644 index 0000000..723108c --- /dev/null +++ b/musicfs/crates/musicfs-test-utils/src/faulty_origin.rs @@ -0,0 +1,328 @@ +use async_trait::async_trait; +use musicfs_core::{DirEntry, Error, FileStat, HealthStatus, OriginId, OriginType, Result}; +use musicfs_origins::{Origin, WatchCallback, WatchHandle}; +use parking_lot::RwLock; +use std::io::{self, ErrorKind}; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::AsyncRead; + +#[derive(Debug, Clone)] +pub enum FailMode { + Healthy, + FailEveryNth(usize), + FailAfterN(usize), + TimeoutMs(u64), + PartialRead { max_bytes: usize }, + ReturnError(ErrorKind), +} + +impl Default for FailMode { + fn default() -> Self { + FailMode::Healthy + } +} + +pub struct FaultyOrigin { + inner: Arc, + fail_mode: Arc>, + call_count: AtomicUsize, +} + +impl FaultyOrigin { + pub fn new(inner: Arc, mode: FailMode) -> Self { + Self { + inner, + fail_mode: Arc::new(RwLock::new(mode)), + call_count: AtomicUsize::new(0), + } + } + + pub fn wrap(inner: impl Origin + 'static) -> Self { + Self::new(Arc::new(inner), FailMode::Healthy) + } + + pub fn set_mode(&self, mode: FailMode) { + *self.fail_mode.write() = mode; + } + + pub fn call_count(&self) -> usize { + self.call_count.load(Ordering::SeqCst) + } + + pub fn reset_count(&self) { + self.call_count.store(0, Ordering::SeqCst); + } + + fn increment_and_check(&self) -> Option { + let count = self.call_count.fetch_add(1, Ordering::SeqCst) + 1; + let mode = self.fail_mode.read(); + + match *mode { + FailMode::Healthy => None, + FailMode::FailEveryNth(n) if n > 0 && count % n == 0 => { + Some(Error::Origin("Injected failure (every Nth)".into())) + } + FailMode::FailEveryNth(_) => None, + FailMode::FailAfterN(n) if count > n => { + Some(Error::Origin("Injected failure (after N)".into())) + } + FailMode::FailAfterN(_) => None, + FailMode::TimeoutMs(_) => None, + FailMode::PartialRead { .. } => None, + FailMode::ReturnError(kind) => { + Some(Error::Io(io::Error::new(kind, "Injected I/O error"))) + } + } + } + + async fn maybe_timeout(&self) -> Option { + let mode = self.fail_mode.read().clone(); + if let FailMode::TimeoutMs(ms) = mode { + tokio::time::sleep(Duration::from_millis(ms)).await; + Some(Error::Timeout("Injected timeout".into())) + } else { + None + } + } + + fn truncate_if_partial(&self, mut data: Vec) -> Vec { + let mode = self.fail_mode.read(); + if let FailMode::PartialRead { max_bytes } = *mode { + data.truncate(max_bytes); + } + data + } +} + +#[async_trait] +impl Origin for FaultyOrigin { + fn id(&self) -> &OriginId { + self.inner.id() + } + + fn origin_type(&self) -> OriginType { + self.inner.origin_type() + } + + fn display_name(&self) -> &str { + self.inner.display_name() + } + + async fn readdir(&self, path: &Path) -> Result> { + if let Some(err) = self.increment_and_check() { + return Err(err); + } + if let Some(err) = self.maybe_timeout().await { + return Err(err); + } + self.inner.readdir(path).await + } + + async fn stat(&self, path: &Path) -> Result { + if let Some(err) = self.increment_and_check() { + return Err(err); + } + if let Some(err) = self.maybe_timeout().await { + return Err(err); + } + self.inner.stat(path).await + } + + async fn read(&self, path: &Path, offset: u64, size: u32) -> Result> { + if let Some(err) = self.increment_and_check() { + return Err(err); + } + if let Some(err) = self.maybe_timeout().await { + return Err(err); + } + let data = self.inner.read(path, offset, size).await?; + Ok(self.truncate_if_partial(data)) + } + + async fn read_full(&self, path: &Path) -> Result> { + if let Some(err) = self.increment_and_check() { + return Err(err); + } + if let Some(err) = self.maybe_timeout().await { + return Err(err); + } + let data = self.inner.read_full(path).await?; + Ok(self.truncate_if_partial(data)) + } + + async fn exists(&self, path: &Path) -> Result { + if let Some(err) = self.increment_and_check() { + return Err(err); + } + if let Some(err) = self.maybe_timeout().await { + return Err(err); + } + self.inner.exists(path).await + } + + async fn health(&self) -> HealthStatus { + let mode = self.fail_mode.read().clone(); + match mode { + FailMode::Healthy => self.inner.health().await, + FailMode::ReturnError(_) => HealthStatus::Unhealthy, + FailMode::TimeoutMs(ms) => { + tokio::time::sleep(Duration::from_millis(ms)).await; + HealthStatus::Unhealthy + } + FailMode::FailAfterN(n) if self.call_count.load(Ordering::SeqCst) >= n => { + HealthStatus::Unhealthy + } + _ => self.inner.health().await, + } + } + + async fn open_read(&self, path: &Path) -> Result> { + if let Some(err) = self.increment_and_check() { + return Err(err); + } + if let Some(err) = self.maybe_timeout().await { + return Err(err); + } + self.inner.open_read(path).await + } + + async fn watch(&self, path: &Path, callback: WatchCallback) -> Result { + self.inner.watch(path, callback).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::SystemTime; + + struct MockOrigin { + id: OriginId, + } + + impl MockOrigin { + fn new(id: &str) -> Self { + Self { + id: OriginId::from(id), + } + } + } + + #[async_trait] + impl Origin for MockOrigin { + fn id(&self) -> &OriginId { + &self.id + } + + fn origin_type(&self) -> OriginType { + OriginType::Local + } + + fn display_name(&self) -> &str { + "mock" + } + + async fn readdir(&self, _path: &Path) -> Result> { + Ok(vec![]) + } + + async fn stat(&self, _path: &Path) -> Result { + Ok(FileStat { + size: 1000, + mtime: SystemTime::now(), + is_dir: false, + }) + } + + async fn read(&self, _path: &Path, _offset: u64, size: u32) -> Result> { + Ok(vec![0u8; size as usize]) + } + + async fn read_full(&self, _path: &Path) -> Result> { + Ok(vec![0u8; 100]) + } + + async fn exists(&self, _path: &Path) -> Result { + Ok(true) + } + + async fn health(&self) -> HealthStatus { + HealthStatus::Healthy + } + + async fn open_read(&self, _path: &Path) -> Result> { + Err(Error::Origin("Not implemented".into())) + } + + async fn watch(&self, _path: &Path, _callback: WatchCallback) -> Result { + Err(Error::Origin("Not implemented".into())) + } + } + + #[tokio::test] + async fn test_healthy_passthrough() { + let inner = Arc::new(MockOrigin::new("test")); + let faulty = FaultyOrigin::new(inner, FailMode::Healthy); + + let result = faulty.stat(Path::new("/test")).await; + assert!(result.is_ok()); + assert_eq!(faulty.call_count(), 1); + } + + #[tokio::test] + async fn test_fail_every_nth() { + let inner = Arc::new(MockOrigin::new("test")); + let faulty = FaultyOrigin::new(inner, FailMode::FailEveryNth(2)); + + assert!(faulty.stat(Path::new("/test")).await.is_ok()); + assert!(faulty.stat(Path::new("/test")).await.is_err()); + assert!(faulty.stat(Path::new("/test")).await.is_ok()); + assert!(faulty.stat(Path::new("/test")).await.is_err()); + assert_eq!(faulty.call_count(), 4); + } + + #[tokio::test] + async fn test_fail_after_n() { + let inner = Arc::new(MockOrigin::new("test")); + let faulty = FaultyOrigin::new(inner, FailMode::FailAfterN(2)); + + assert!(faulty.stat(Path::new("/test")).await.is_ok()); + assert!(faulty.stat(Path::new("/test")).await.is_ok()); + assert!(faulty.stat(Path::new("/test")).await.is_err()); + assert!(faulty.stat(Path::new("/test")).await.is_err()); + } + + #[tokio::test] + async fn test_partial_read() { + let inner = Arc::new(MockOrigin::new("test")); + let faulty = FaultyOrigin::new(inner, FailMode::PartialRead { max_bytes: 10 }); + + let data = faulty.read(Path::new("/test"), 0, 100).await.unwrap(); + assert_eq!(data.len(), 10); + } + + #[tokio::test] + async fn test_mode_change_mid_test() { + let inner = Arc::new(MockOrigin::new("test")); + let faulty = FaultyOrigin::new(inner, FailMode::ReturnError(ErrorKind::ConnectionRefused)); + + assert!(faulty.stat(Path::new("/test")).await.is_err()); + + faulty.set_mode(FailMode::Healthy); + assert!(faulty.stat(Path::new("/test")).await.is_ok()); + } + + #[tokio::test] + async fn test_health_reflects_mode() { + let inner = Arc::new(MockOrigin::new("test")); + let faulty = FaultyOrigin::new(inner, FailMode::Healthy); + + assert_eq!(faulty.health().await, HealthStatus::Healthy); + + faulty.set_mode(FailMode::ReturnError(ErrorKind::ConnectionRefused)); + assert_eq!(faulty.health().await, HealthStatus::Unhealthy); + } +} diff --git a/musicfs/crates/musicfs-test-utils/src/fixtures.rs b/musicfs/crates/musicfs-test-utils/src/fixtures.rs new file mode 100644 index 0000000..6ed3257 --- /dev/null +++ b/musicfs/crates/musicfs-test-utils/src/fixtures.rs @@ -0,0 +1,255 @@ +use musicfs_cache::TreeBuilder; +use musicfs_cas::{CasConfig, CasStore}; +use musicfs_core::{ + AudioFormat, AudioMeta, FileId, FileMeta, OriginId, RealPath, VirtualPath, +}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; +use std::time::SystemTime; +use tempfile::TempDir; + +pub fn make_file_meta(id: i64, vpath: &str, size: u64) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(vpath), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from(vpath), + }, + size, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + } +} + +pub fn make_file_meta_with_origin(id: i64, vpath: &str, size: u64, origin_id: &str) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(vpath), + real_path: RealPath { + origin_id: OriginId::from(origin_id), + path: PathBuf::from(vpath), + }, + size, + mtime: SystemTime::now(), + content_hash: None, + audio: None, + } +} + +pub fn make_audio_meta(artist: &str, album: &str, title: &str) -> AudioMeta { + AudioMeta { + title: Some(title.to_string()), + artist: Some(artist.to_string()), + album: Some(album.to_string()), + album_artist: None, + genre: None, + year: None, + track: None, + disc: None, + duration_ms: Some(180_000), + bitrate: Some(320), + sample_rate: Some(44100), + format: AudioFormat::Flac, + } +} + +pub fn make_audio_file( + id: i64, + vpath: &str, + size: u64, + artist: &str, + album: &str, + title: &str, +) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(vpath), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from(vpath), + }, + size, + mtime: SystemTime::now(), + content_hash: None, + audio: Some(make_audio_meta(artist, album, title)), + } +} + +pub fn make_audio_file_full( + id: i64, + vpath: &str, + size: u64, + artist: &str, + album: &str, + title: &str, + track: u32, + year: u32, +) -> FileMeta { + let mut audio = make_audio_meta(artist, album, title); + audio.track = Some(track); + audio.year = Some(year); + + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(vpath), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from(vpath), + }, + size, + mtime: SystemTime::now(), + content_hash: None, + audio: Some(audio), + } +} + +pub struct TestCasStore { + pub store: Arc, + pub dir: TempDir, +} + +pub async fn setup_test_cas() -> TestCasStore { + let dir = TempDir::new().expect("Failed to create temp dir for CAS"); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size: 100 * 1024 * 1024, + shard_levels: 2, + }; + let store = CasStore::open(config) + .await + .expect("Failed to open CAS store"); + TestCasStore { + store: Arc::new(store), + dir, + } +} + +pub async fn setup_test_cas_with_size(max_size: u64) -> TestCasStore { + let dir = TempDir::new().expect("Failed to create temp dir for CAS"); + let config = CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size, + shard_levels: 2, + }; + let store = CasStore::open(config) + .await + .expect("Failed to open CAS store"); + TestCasStore { + store: Arc::new(store), + dir, + } +} + +pub fn setup_test_tree(files: &[FileMeta]) -> Arc> { + let mut builder = TreeBuilder::new(); + for file in files { + builder.add_file(file); + } + Arc::new(RwLock::new(builder.build())) +} + +pub fn create_test_file(dir: &Path, relative_path: &str, content: &[u8]) -> PathBuf { + let full_path = dir.join(relative_path); + if let Some(parent) = full_path.parent() { + std::fs::create_dir_all(parent).expect("Failed to create parent directories"); + } + std::fs::write(&full_path, content).expect("Failed to write test file"); + full_path +} + +pub fn create_test_dir_structure(base: &Path, structure: &[&str]) { + for path in structure { + let full_path = base.join(path); + if path.ends_with('/') { + std::fs::create_dir_all(&full_path).expect("Failed to create directory"); + } else { + if let Some(parent) = full_path.parent() { + std::fs::create_dir_all(parent).expect("Failed to create parent"); + } + std::fs::write(&full_path, format!("content of {}", path)) + .expect("Failed to write file"); + } + } +} + +pub struct TestOriginDir { + pub dir: TempDir, +} + +impl TestOriginDir { + pub fn new() -> Self { + Self { + dir: TempDir::new().expect("Failed to create origin temp dir"), + } + } + + pub fn add_file(&self, path: &str, content: &[u8]) -> PathBuf { + create_test_file(self.dir.path(), path, content) + } + + pub fn add_audio_file(&self, path: &str) -> PathBuf { + let fake_audio = b"FAKE_FLAC_HEADER_FOR_TESTING_ONLY"; + self.add_file(path, fake_audio) + } + + pub fn path(&self) -> &Path { + self.dir.path() + } +} + +impl Default for TestOriginDir { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_make_file_meta() { + let meta = make_file_meta(1, "/Artist/Album/Track.flac", 1000); + assert_eq!(meta.id.0, 1); + assert_eq!(meta.virtual_path.as_str(), "/Artist/Album/Track.flac"); + assert_eq!(meta.size, 1000); + assert!(meta.audio.is_none()); + } + + #[test] + fn test_make_audio_file() { + let meta = make_audio_file(1, "/path.flac", 5000, "Artist", "Album", "Title"); + assert!(meta.audio.is_some()); + let audio = meta.audio.unwrap(); + assert_eq!(audio.artist, Some("Artist".to_string())); + assert_eq!(audio.album, Some("Album".to_string())); + assert_eq!(audio.title, Some("Title".to_string())); + } + + #[tokio::test] + async fn test_setup_test_cas() { + let test_cas = setup_test_cas().await; + let hash = test_cas.store.put(b"test data").await.unwrap(); + assert!(test_cas.store.exists(&hash)); + } + + #[test] + fn test_setup_test_tree() { + let files = vec![ + make_file_meta(1, "/A/B/1.flac", 100), + make_file_meta(2, "/A/B/2.flac", 200), + ]; + let tree = setup_test_tree(&files); + let guard = tree.read().unwrap(); + assert!(guard.file_count() > 0); + } + + #[test] + fn test_origin_dir() { + let origin = TestOriginDir::new(); + let path = origin.add_file("artist/album/track.flac", b"content"); + assert!(path.exists()); + } +} diff --git a/musicfs/crates/musicfs-test-utils/src/lib.rs b/musicfs/crates/musicfs-test-utils/src/lib.rs new file mode 100644 index 0000000..c59a010 --- /dev/null +++ b/musicfs/crates/musicfs-test-utils/src/lib.rs @@ -0,0 +1,9 @@ +pub mod assertions; +pub mod faulty_cas; +pub mod faulty_origin; +pub mod fixtures; + +pub use assertions::*; +pub use faulty_cas::FaultyCasStore; +pub use faulty_origin::{FailMode, FaultyOrigin}; +pub use fixtures::*; diff --git a/musicfs/crates/musicfs-test-utils/tests/docker_network.rs b/musicfs/crates/musicfs-test-utils/tests/docker_network.rs new file mode 100644 index 0000000..b0a9bd5 --- /dev/null +++ b/musicfs/crates/musicfs-test-utils/tests/docker_network.rs @@ -0,0 +1,148 @@ +#![cfg(feature = "docker-tests")] + +use musicfs_core::{OriginId, OriginType}; +use musicfs_origins::{HealthMonitor, LocalOrigin, OriginRegistry}; +use noxious_client::{Client, StreamDirection, Toxic, ToxicKind}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tempfile::TempDir; + +const TOXIPROXY_API: &str = "http://localhost:8474"; +const TOXIPROXY_LISTEN: &str = "localhost:18080"; +const UPSTREAM_ADDR: &str = "minio:9000"; + +async fn require_toxiproxy() { + let available = match reqwest::get(format!("{}/version", TOXIPROXY_API)).await { + Ok(resp) => resp.status().is_success(), + Err(_) => false, + }; + assert!(available, "Toxiproxy not available at {}. Run: cd tests/integration && docker-compose up -d", TOXIPROXY_API); +} + +#[tokio::test] +#[ignore = "Requires docker-compose up -d (tests/integration/docker-compose.yml)"] +async fn test_toxiproxy_latency_injection() { + require_toxiproxy().await; + + let client = Client::new(TOXIPROXY_API); + let proxy = client + .create_proxy("minio_latency", TOXIPROXY_LISTEN, UPSTREAM_ADDR) + .await + .expect("Failed to create proxy"); + + let toxic = Toxic { + name: "latency_downstream".to_string(), + kind: ToxicKind::Latency { + latency: 500, + jitter: 100, + }, + direction: StreamDirection::Downstream, + toxicity: 1.0, + }; + + proxy + .add_toxic(&toxic) + .await + .expect("Failed to add toxic"); + + let start = std::time::Instant::now(); + let _ = reqwest::get(format!("http://{}/minio/health/live", TOXIPROXY_LISTEN)).await; + let elapsed = start.elapsed(); + + assert!( + elapsed >= Duration::from_millis(400), + "Latency should be injected, got {:?}", + elapsed + ); + + proxy.delete().await.expect("Failed to cleanup proxy"); +} + +#[tokio::test] +#[ignore = "Requires docker-compose up -d (tests/integration/docker-compose.yml)"] +async fn test_toxiproxy_timeout_simulates_network_partition() { + require_toxiproxy().await; + + let client = Client::new(TOXIPROXY_API); + let proxy = client + .create_proxy("minio_partition", TOXIPROXY_LISTEN, UPSTREAM_ADDR) + .await + .expect("Failed to create proxy"); + + let result = reqwest::get(format!("http://{}/minio/health/live", TOXIPROXY_LISTEN)).await; + assert!(result.is_ok(), "Should reach MinIO through proxy initially"); + + let toxic = Toxic { + name: "timeout".to_string(), + kind: ToxicKind::Timeout { timeout: 0 }, + direction: StreamDirection::Downstream, + toxicity: 1.0, + }; + + proxy + .add_toxic(&toxic) + .await + .expect("Failed to add toxic"); + + let result = tokio::time::timeout( + Duration::from_secs(2), + reqwest::get(format!("http://{}/minio/health/live", TOXIPROXY_LISTEN)), + ) + .await; + + assert!( + result.is_err() || result.unwrap().is_err(), + "Should timeout during partition" + ); + + proxy + .remove_toxic("timeout") + .await + .expect("Failed to remove toxic"); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let result = reqwest::get(format!("http://{}/minio/health/live", TOXIPROXY_LISTEN)).await; + assert!(result.is_ok(), "Should reach MinIO after partition heals"); + + proxy.delete().await.expect("Failed to cleanup proxy"); +} + +#[tokio::test] +#[ignore = "Requires docker-compose up -d (tests/integration/docker-compose.yml)"] +async fn test_toxiproxy_slow_close_throttles_responses() { + require_toxiproxy().await; + + let client = Client::new(TOXIPROXY_API); + let proxy = client + .create_proxy("minio_slow", TOXIPROXY_LISTEN, UPSTREAM_ADDR) + .await + .expect("Failed to create proxy"); + + let toxic = Toxic { + name: "slow_close".to_string(), + kind: ToxicKind::SlowClose { delay: 1000 }, + direction: StreamDirection::Downstream, + toxicity: 1.0, + }; + + proxy + .add_toxic(&toxic) + .await + .expect("Failed to add toxic"); + + let start = std::time::Instant::now(); + let _ = reqwest::get(format!("http://{}/minio/health/live", TOXIPROXY_LISTEN)).await; + let elapsed = start.elapsed(); + + assert!( + elapsed >= Duration::from_millis(800), + "Slow close should delay response, got {:?}", + elapsed + ); + + proxy.delete().await.expect("Failed to cleanup proxy"); +} + + diff --git a/musicfs/crates/musicfs-test-utils/tests/resilience.rs b/musicfs/crates/musicfs-test-utils/tests/resilience.rs new file mode 100644 index 0000000..2c1ea19 --- /dev/null +++ b/musicfs/crates/musicfs-test-utils/tests/resilience.rs @@ -0,0 +1,253 @@ +use musicfs_cache::{VirtualTree, ROOT_INODE}; +use musicfs_cas::{CasConfig, CasStore}; +use musicfs_core::{HealthStatus, OriginId, OriginType, RealPath}; +use musicfs_origins::{HealthMonitor, LocalOrigin, OriginRegistry}; +use musicfs_test_utils::{FaultyOrigin, FailMode}; +use std::collections::HashMap; +use std::io::ErrorKind; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tempfile::TempDir; + +fn setup_test_file(dir: &TempDir, name: &str, content: &[u8]) -> PathBuf { + let path = dir.path().join(name); + std::fs::write(&path, content).unwrap(); + path +} + +async fn setup_cas(dir: &Path) -> CasStore { + CasStore::open(CasConfig { + chunks_dir: dir.join("chunks"), + max_size: 100 * 1024 * 1024, + shard_levels: 2, + }) + .await + .unwrap() +} + +fn create_faulty_origin(id: &str, dir: &TempDir, mode: FailMode) -> Arc { + let inner = Arc::new(LocalOrigin::new(OriginId::from(id), dir.path().to_path_buf())); + Arc::new(FaultyOrigin::new(inner, mode)) +} + +#[tokio::test] +async fn test_sqlite_integrity_check_detects_corruption() { + todo!("Issue 2.4: Implement Database::open_with_integrity_check()") +} + +#[tokio::test] +async fn test_tantivy_corruption_triggers_rebuild() { + todo!("Issue 2.4: Implement SearchIndex::open_with_recovery()") +} + +#[tokio::test] +async fn test_sled_corruption_triggers_repair() { + todo!("Issue 3.5: Implement sled recovery in CasStore::open()") +} + +#[tokio::test] +async fn test_cas_put_handles_enospc() { + let dir = TempDir::new().unwrap(); + let store = CasStore::open(CasConfig { + chunks_dir: dir.path().join("chunks"), + max_size: 100, + shard_levels: 2, + }) + .await + .unwrap(); + + let large_data = vec![0u8; 1000]; + let result = store.put(&large_data).await; + + assert!(result.is_err(), "Issue 2.8: CasStore should pre-check space and reject oversized write"); +} + +#[test] +fn test_poisoned_tree_lock_returns_eio_not_panic() { + use std::sync::{Arc, RwLock}; + use std::thread; + + let lock = Arc::new(RwLock::new(42)); + let lock_clone = lock.clone(); + + let handle = thread::spawn(move || { + let _guard = lock_clone.write().unwrap(); + panic!("writer panic"); + }); + + let _ = handle.join(); + + let result = lock.read(); + assert!(result.is_ok(), "Issue 2.9: Lock access after panic should return EIO, not poison error"); +} + +#[test] +fn test_parking_lot_rwlock_survives_panic() { + use parking_lot::RwLock; + use std::sync::Arc; + use std::thread; + + let tree = Arc::new(RwLock::new(VirtualTree::new())); + let tree_clone = tree.clone(); + + let handle = thread::spawn(move || { + let _guard = tree_clone.write(); + panic!("writer panic"); + }); + + let _ = handle.join(); + + assert!(tree.read().get(ROOT_INODE).is_some(), "parking_lot RwLock should survive writer panic"); +} + +#[tokio::test] +async fn test_failover_on_primary_death() { + let primary_dir = TempDir::new().unwrap(); + let backup_dir = TempDir::new().unwrap(); + setup_test_file(&primary_dir, "test.txt", b"primary"); + setup_test_file(&backup_dir, "test.txt", b"backup"); + + let primary = create_faulty_origin("primary", &primary_dir, FailMode::ReturnError(ErrorKind::ConnectionRefused)); + let backup = create_faulty_origin("backup", &backup_dir, FailMode::Healthy); + + let mut thresholds = HashMap::new(); + thresholds.insert(OriginType::Local, 1); + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30)).with_per_type_thresholds(thresholds)); + let registry = Arc::new(OriginRegistry::new(monitor.clone())); + + registry.register(primary.clone(), 1); + registry.register(backup.clone(), 2); + + monitor.check_now(&OriginId::from("primary")).await; + monitor.check_now(&OriginId::from("backup")).await; + + assert!(registry.health().is_unhealthy(&OriginId::from("primary"))); + assert!(registry.health().is_healthy(&OriginId::from("backup"))); + + let path = RealPath { + origin_id: OriginId::from("backup"), + path: PathBuf::from("/test.txt"), + }; + let candidates = registry.route_all(&path); + assert_eq!(candidates.len(), 1); + assert_eq!(candidates[0].id(), &OriginId::from("backup")); +} + +#[tokio::test] +async fn test_origin_recovery_resumes_routing() { + let dir = TempDir::new().unwrap(); + setup_test_file(&dir, "test.txt", b"content"); + + let faulty = create_faulty_origin("recovering", &dir, FailMode::ReturnError(ErrorKind::ConnectionRefused)); + + let mut thresholds = HashMap::new(); + thresholds.insert(OriginType::Local, 1); + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30)).with_per_type_thresholds(thresholds)); + monitor.add_origin(faulty.clone()); + + monitor.check_now(&OriginId::from("recovering")).await; + assert_eq!(monitor.get_state(&OriginId::from("recovering")).unwrap().status, HealthStatus::Unhealthy); + + faulty.set_mode(FailMode::Healthy); + monitor.check_now(&OriginId::from("recovering")).await; + + assert_eq!(monitor.get_state(&OriginId::from("recovering")).unwrap().status, HealthStatus::Healthy); + assert_eq!(monitor.get_state(&OriginId::from("recovering")).unwrap().consecutive_failures, 0); +} + +#[tokio::test] +async fn test_local_origin_health_check_has_timeout() { + let dir = TempDir::new().unwrap(); + setup_test_file(&dir, "test.txt", b"content"); + + let slow = create_faulty_origin("slow", &dir, FailMode::TimeoutMs(5_000)); + + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30))); + monitor.add_origin(slow.clone()); + + let start = Instant::now(); + monitor.check_now(&OriginId::from("slow")).await; + let elapsed = start.elapsed(); + + assert!(elapsed < Duration::from_secs(2), + "Issue 4.2.1: Health check should timeout in <2s, took {:?}", elapsed); + + let state = monitor.get_state(&OriginId::from("slow")).unwrap(); + assert_eq!(state.status, HealthStatus::Unhealthy); +} + +#[tokio::test] +async fn test_health_checks_run_in_parallel() { + let slow1_dir = TempDir::new().unwrap(); + let slow2_dir = TempDir::new().unwrap(); + let slow3_dir = TempDir::new().unwrap(); + + let slow1 = create_faulty_origin("slow1", &slow1_dir, FailMode::TimeoutMs(200)); + let slow2 = create_faulty_origin("slow2", &slow2_dir, FailMode::TimeoutMs(200)); + let slow3 = create_faulty_origin("slow3", &slow3_dir, FailMode::TimeoutMs(200)); + + let monitor = Arc::new(HealthMonitor::new(Duration::from_secs(30))); + monitor.add_origin(slow1); + monitor.add_origin(slow2); + monitor.add_origin(slow3); + + let start = Instant::now(); + monitor.check_all().await; + let elapsed = start.elapsed(); + + assert!(elapsed < Duration::from_millis(350), "Issue 4.2.2: check_all() should run in parallel (sequential would take ~600ms), took {:?}", elapsed); +} + +#[tokio::test] +async fn test_tantivy_survives_uncommitted_crash() { + todo!("Issue 5.2: Implement tantivy crash recovery test") +} + +#[tokio::test] +async fn test_fd_exhaustion_handling() { + todo!("Issue 5.3: Implement fd exhaustion test with rlimit") +} + +#[tokio::test] +async fn test_corrupt_chunk_auto_refetched() { + let dir = TempDir::new().unwrap(); + let origin_dir = TempDir::new().unwrap(); + setup_test_file(&origin_dir, "test.flac", b"original audio data"); + + let store = setup_cas(dir.path()).await; + let data = b"chunk data"; + let hash = store.put(data).await.unwrap(); + + let hex = hash.as_hex(); + let chunk_path = dir.path().join("chunks").join(&hex[0..2]).join(&hex[2..4]).join(&hex); + let mut corrupted = std::fs::read(&chunk_path).unwrap(); + corrupted[0] = corrupted[0].wrapping_add(1); + std::fs::write(&chunk_path, &corrupted).unwrap(); + + let result = store.get(&hash).await; + + assert!(result.is_ok(), "Issue 6.4: Corrupted chunk should be auto-refetched from origin"); +} + +#[tokio::test] +async fn test_missing_chunk_triggers_origin_fetch() { + todo!("Issue 6.4: Implement missing chunk origin fetch") +} + +#[tokio::test] +async fn test_passthrough_mode_when_cache_disk_dead() { + todo!("Issue 6.6: Implement passthrough mode") +} + +#[test] +fn test_systemd_service_has_execstoppost() { + let service_path = std::path::Path::new("../../../systemd/musicfs.service"); + if !service_path.exists() { + panic!("Issue 3.7: systemd/musicfs.service does not exist"); + } + + let content = std::fs::read_to_string(service_path).unwrap(); + assert!(content.contains("ExecStopPost") || content.contains("fusermount"), + "Issue 3.7: Service file should have ExecStopPost with fusermount for cleanup"); +} diff --git a/musicfs/tests/integration/docker-compose.yml b/musicfs/tests/integration/docker-compose.yml new file mode 100644 index 0000000..9249a3e --- /dev/null +++ b/musicfs/tests/integration/docker-compose.yml @@ -0,0 +1,40 @@ +services: + toxiproxy: + image: ghcr.io/shopify/toxiproxy:2.9.0 + ports: + - "8474:8474" + - "20000-20010:20000-20010" + healthcheck: + test: ["CMD", "/toxiproxy-cli", "list"] + interval: 5s + timeout: 3s + retries: 3 + + minio: + image: minio/minio:latest + command: server /data --console-address ":9001" + ports: + - "9000:9000" + - "9001:9001" + environment: + MINIO_ROOT_USER: test + MINIO_ROOT_PASSWORD: testtest123 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 5s + timeout: 3s + retries: 3 + volumes: + - minio-data:/data + + sftp: + image: atmoz/sftp:latest + ports: + - "2222:22" + command: test:test:::music + volumes: + - sftp-data:/home/test/music + +volumes: + minio-data: + sftp-data: