Implement Week 7 Remote Origins with Oracle fixes

- Add credentials.rs with CredentialStore, redacted Debug (session_token shows [REDACTED])
- Add nfs.rs with ESTALE retry using Fn closure, 5s health timeout
- Add smb.rs with ENOTCONN retry handling, 5s health timeout
- Add s3.rs/sftp.rs feature-gated stubs with security documentation
- Add error variants: S3, Sftp, Timeout, Credential, NfsStaleHandle
- Fix delta.rs unused imports

Oracle fixes applied:
- SMB retry_on_disconnect for ENOTCONN (errno 107)
- session_token Debug shows [REDACTED] when Some, None otherwise
- NFS/SMB health checks wrapped with tokio::time::timeout(5s)

102 tests pass, 0 warnings.
This commit is contained in:
Alexander
2026-05-12 22:26:19 +02:00
parent d5ef68c9c9
commit 09f019730f
13 changed files with 691 additions and 3 deletions
+7 -1
View File
@@ -1,14 +1,20 @@
mod failover;
mod health;
mod local;
mod nfs;
mod registry;
mod router;
mod s3;
mod sftp;
mod smb;
mod traits;
pub use failover::{FailoverExecutor, RetryConfig};
pub use health::{HealthCheckHandle, HealthMonitor, HealthSnapshot, OriginHealthState};
pub use local::LocalOrigin;
pub use musicfs_core::OriginType;
pub use nfs::NfsOrigin;
pub use registry::OriginRegistry;
pub use router::{LatencyStats, Router};
pub use musicfs_core::OriginType;
pub use smb::SmbOrigin;
pub use traits::{Origin, WatchCallback, WatchEvent, WatchHandle};
+162
View File
@@ -0,0 +1,162 @@
use crate::local::LocalOrigin;
use crate::traits::{Origin, WatchCallback, WatchHandle};
use async_trait::async_trait;
use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, OriginType, Result};
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::time::sleep;
use tracing::{debug, warn};
pub struct NfsOrigin {
inner: LocalOrigin,
max_retries: u32,
display_name: String,
}
impl NfsOrigin {
pub fn new(id: impl Into<OriginId>, mount_point: impl Into<PathBuf>) -> Self {
let mount_point = mount_point.into();
let display_name = format!("NFS: {}", mount_point.display());
Self {
inner: LocalOrigin::new(id, &mount_point),
max_retries: 3,
display_name,
}
}
pub fn with_max_retries(mut self, retries: u32) -> Self {
self.max_retries = retries;
self
}
async fn retry_on_stale<T, F, Fut>(&self, op: F) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut delay = Duration::from_millis(100);
for attempt in 0..self.max_retries {
match op().await {
Ok(result) => return Ok(result),
Err(e) => {
if let Some(io_err) = e.downcast_io() {
#[cfg(unix)]
if io_err.raw_os_error() == Some(libc::ESTALE) {
warn!(
"NFS stale handle (attempt {}/{}), retrying after {:?}",
attempt + 1,
self.max_retries,
delay
);
sleep(delay).await;
delay *= 2;
continue;
}
}
return Err(e);
}
}
}
Err(musicfs_core::Error::NfsStaleHandle)
}
}
#[async_trait]
impl Origin for NfsOrigin {
fn id(&self) -> &OriginId {
self.inner.id()
}
fn origin_type(&self) -> OriginType {
OriginType::Nfs
}
fn display_name(&self) -> &str {
&self.display_name
}
async fn readdir(&self, path: &Path) -> Result<Vec<DirEntry>> {
self.retry_on_stale(|| self.inner.readdir(path)).await
}
async fn stat(&self, path: &Path) -> Result<FileStat> {
self.retry_on_stale(|| self.inner.stat(path)).await
}
async fn read(&self, path: &Path, offset: u64, size: u32) -> Result<Vec<u8>> {
self.retry_on_stale(|| self.inner.read(path, offset, size))
.await
}
async fn read_full(&self, path: &Path) -> Result<Vec<u8>> {
self.retry_on_stale(|| self.inner.read_full(path)).await
}
async fn exists(&self, path: &Path) -> Result<bool> {
self.retry_on_stale(|| self.inner.exists(path)).await
}
async fn health(&self) -> HealthStatus {
let health_timeout = Duration::from_secs(5);
match tokio::time::timeout(health_timeout, self.inner.stat(Path::new("/"))).await {
Ok(Ok(_)) => HealthStatus::Healthy,
Ok(Err(_)) | Err(_) => HealthStatus::Unhealthy,
}
}
async fn open_read(&self, path: &Path) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>> {
self.inner.open_read(path).await
}
async fn watch(&self, path: &Path, callback: WatchCallback) -> Result<WatchHandle> {
debug!("NFS watch - inotify may be unreliable over NFS, consider polling");
self.inner.watch(path, callback).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_nfs_origin_basic() {
let dir = TempDir::new().unwrap();
std::fs::write(dir.path().join("test.flac"), b"audio").unwrap();
let origin = NfsOrigin::new("nfs-test", dir.path());
let entries = origin.readdir(Path::new("/")).await.unwrap();
assert_eq!(entries.len(), 1);
let data = origin.read(Path::new("/test.flac"), 0, 5).await.unwrap();
assert_eq!(&data, b"audio");
}
#[tokio::test]
async fn test_nfs_origin_health() {
let dir = TempDir::new().unwrap();
let origin = NfsOrigin::new("nfs-test", dir.path());
assert_eq!(origin.health().await, HealthStatus::Healthy);
}
#[tokio::test]
async fn test_nfs_origin_type() {
let dir = TempDir::new().unwrap();
let origin = NfsOrigin::new("nfs-test", dir.path());
assert_eq!(origin.origin_type(), OriginType::Nfs);
}
#[test]
fn test_retry_uses_fn_not_fnmut() {
fn assert_fn<F: Fn() -> Fut, Fut>(_: F) {}
let closure = || async { Ok::<_, musicfs_core::Error>(()) };
assert_fn(closure);
}
}
+51
View File
@@ -0,0 +1,51 @@
//! S3-compatible object storage origin
//!
//! This module is feature-gated behind the `s3` feature to avoid heavy AWS SDK dependencies.
//!
//! # Oracle Security Fixes (MUST IMPLEMENT)
//!
//! 1. **Range EOF** - Clamp range to `min(requested_end, file_size)` to avoid 416 errors
//! 2. **Health check** - Use `head_bucket` not `list_objects_v2` (lighter operation)
//! 3. **Timeout handling** - Wrap all remote calls with `tokio::time::timeout(30s)`
//!
//! # Example Implementation (when feature enabled)
//!
//! ```ignore
//! async fn read(&self, path: &Path, offset: u64, size: u32) -> Result<Vec<u8>> {
//! // Oracle fix: Clamp range to file size to avoid 416 error
//! let file_size = self.stat(path).await?.size;
//! let end = std::cmp::min(offset + size as u64, file_size).saturating_sub(1);
//!
//! if offset >= file_size {
//! return Ok(Vec::new()); // EOF
//! }
//!
//! let range = format!("bytes={}-{}", offset, end);
//!
//! // Oracle fix: Add timeout to prevent hung connections
//! let resp = tokio::time::timeout(
//! Duration::from_secs(30),
//! self.client.get_object().bucket(&self.bucket).key(&key).range(range).send()
//! )
//! .await
//! .map_err(|_| Error::Timeout("S3 read timed out".into()))?
//! .map_err(|e| Error::S3(e.to_string()))?;
//!
//! // ...
//! }
//!
//! async fn health(&self) -> HealthStatus {
//! // Oracle fix: Use head_bucket instead of list_objects_v2 (lighter)
//! match self.client.head_bucket().bucket(&self.bucket).send().await {
//! Ok(_) => HealthStatus::Healthy,
//! Err(_) => HealthStatus::Unhealthy,
//! }
//! }
//! ```
#[cfg(feature = "s3")]
mod implementation {
// Full S3 implementation would go here when aws-sdk-s3 is enabled
}
@@ -0,0 +1,12 @@
#![allow(dead_code)]
//! SFTP origin - feature-gated to avoid russh/deadpool dependencies
#[cfg(feature = "sftp")]
mod implementation {
// Full SFTP implementation with connection pooling
// Oracle fixes to implement:
// 1. Use deadpool connection pool, not Arc<Mutex<SftpSession>>
// 2. Verify SSH host keys against ~/.ssh/known_hosts
// 3. Wrap all operations with tokio::time::timeout(30s)
// 4. Cap open_read to actual file size, not u32::MAX
}
+154
View File
@@ -0,0 +1,154 @@
use crate::local::LocalOrigin;
use crate::traits::{Origin, WatchCallback, WatchHandle};
use async_trait::async_trait;
use musicfs_core::{DirEntry, FileStat, HealthStatus, OriginId, OriginType, Result};
use std::future::Future;
use std::path::{Path, PathBuf};
use tracing::{debug, warn};
pub struct SmbOrigin {
inner: LocalOrigin,
share_path: String,
}
impl SmbOrigin {
pub fn from_mount(
id: impl Into<OriginId>,
mount_point: impl Into<PathBuf>,
share_path: impl Into<String>,
) -> Self {
let mount_point = mount_point.into();
let share_path = share_path.into();
Self {
inner: LocalOrigin::new(id, &mount_point),
share_path,
}
}
pub async fn is_mounted(&self) -> bool {
self.inner.exists(Path::new("/")).await.unwrap_or(false)
}
async fn retry_on_disconnect<T, F, Fut>(&self, op: F) -> Result<T>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T>>,
{
const MAX_RETRIES: u32 = 3;
for attempt in 0..MAX_RETRIES {
match op().await {
Ok(val) => return Ok(val),
Err(e) => {
if Self::is_enotconn(&e) && attempt < MAX_RETRIES - 1 {
debug!(attempt, "SMB ENOTCONN, retrying");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
}
return Err(e);
}
}
}
unreachable!()
}
#[cfg(unix)]
fn is_enotconn(err: &musicfs_core::Error) -> bool {
if let musicfs_core::Error::Io(io_err) = err {
io_err.raw_os_error() == Some(libc::ENOTCONN)
} else {
false
}
}
#[cfg(not(unix))]
fn is_enotconn(_err: &musicfs_core::Error) -> bool {
false
}
}
#[async_trait]
impl Origin for SmbOrigin {
fn id(&self) -> &OriginId {
self.inner.id()
}
fn origin_type(&self) -> OriginType {
OriginType::Smb
}
fn display_name(&self) -> &str {
&self.share_path
}
async fn readdir(&self, path: &Path) -> Result<Vec<DirEntry>> {
self.retry_on_disconnect(|| self.inner.readdir(path)).await
}
async fn stat(&self, path: &Path) -> Result<FileStat> {
self.retry_on_disconnect(|| self.inner.stat(path)).await
}
async fn read(&self, path: &Path, offset: u64, size: u32) -> Result<Vec<u8>> {
self.retry_on_disconnect(|| self.inner.read(path, offset, size)).await
}
async fn read_full(&self, path: &Path) -> Result<Vec<u8>> {
self.retry_on_disconnect(|| self.inner.read_full(path)).await
}
async fn exists(&self, path: &Path) -> Result<bool> {
self.retry_on_disconnect(|| self.inner.exists(path)).await
}
async fn health(&self) -> HealthStatus {
let health_timeout = std::time::Duration::from_secs(5);
match tokio::time::timeout(health_timeout, self.is_mounted()).await {
Ok(true) => HealthStatus::Healthy,
Ok(false) | Err(_) => HealthStatus::Unhealthy,
}
}
async fn open_read(&self, path: &Path) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>> {
self.inner.open_read(path).await
}
async fn watch(&self, path: &Path, callback: WatchCallback) -> Result<WatchHandle> {
warn!("SMB watch using inotify - may be unreliable. Consider polling for remote mounts.");
self.inner.watch(path, callback).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_smb_origin_basic() {
let dir = TempDir::new().unwrap();
std::fs::write(dir.path().join("test.flac"), b"audio").unwrap();
let origin = SmbOrigin::from_mount("smb-test", dir.path(), "//server/share");
let entries = origin.readdir(Path::new("/")).await.unwrap();
assert_eq!(entries.len(), 1);
}
#[tokio::test]
async fn test_smb_origin_type() {
let dir = TempDir::new().unwrap();
let origin = SmbOrigin::from_mount("smb-test", dir.path(), "//server/share");
assert_eq!(origin.origin_type(), OriginType::Smb);
}
#[tokio::test]
async fn test_smb_display_name() {
let dir = TempDir::new().unwrap();
let origin = SmbOrigin::from_mount("smb-test", dir.path(), "//server/music");
assert_eq!(origin.display_name(), "//server/music");
}
}