Add Week 10 Plugin System and Week 11 Control API

Week 10 - Plugin System (FR-19):
- Plugin traits: Plugin, OriginPlugin, MetadataPlugin, FormatPlugin
- NativePluginHost with libloading for dynamic loading
- WasmPluginHost (feature-gated) with wasmtime runtime
- PluginManager coordinating both hosts with version checks
- OriginInstance::watch() with WatchHandle, WatchEvent for live updates
- FormatPlugin::synthesize_header() for metadata overlay

Week 11 - Control API & Production (FR-17, FR-18, NFR-6, NFR-10):
- gRPC server with full MusicFS service (status, cache, origins, events)
- Proto extended: MountState enum, TierStats, full StatusResponse/CacheStats
- WebhookHandler with HMAC-SHA256 signing and exponential retry
- Metrics with latency histograms (p50/p95/p99) and origin health gauges
- CLI with mount, status, cache, search, origin, events, shutdown commands
- E2E player compatibility tests (mpv, VLC, file manager)
- systemd service, PKGBUILD, RPM spec for packaging

Plans added for Weeks 10-14 covering P1 features.
All 154 tests passing.
This commit is contained in:
Alexander
2026-05-13 10:34:01 +02:00
parent 34d05b7a49
commit bc9fa36646
27 changed files with 7050 additions and 49 deletions
+6 -1
View File
@@ -7,6 +7,11 @@ pub mod proto {
}
mod search_service;
mod server;
mod webhook;
pub use proto::musicfs::v1::music_fs_server::{MusicFs, MusicFsServer};
pub use proto::musicfs::v1::music_fs_server::{MusicFs, MusicFsServer as MusicFsGrpcServer};
pub use proto::musicfs::v1::*;
pub use search_service::SearchService;
pub use server::MusicFsServer;
pub use webhook::{WebhookConfig, WebhookHandler, WebhookPayload};
@@ -1,9 +1,13 @@
use crate::proto::musicfs::v1::{
music_fs_server::MusicFs, SearchRequest, SearchResponse, SearchResult,
music_fs_server::MusicFs, CacheStats, ClearCacheRequest, ClearCacheResponse, Empty, Event,
EventFilter, OriginHealthResponse, OriginRequest, OriginsResponse, PrefetchProgress,
PrefetchRequest, SearchRequest, SearchResponse, SearchResult, ShutdownRequest, StatusResponse,
SyncProgress,
};
use musicfs_search::SearchIndex;
use std::sync::Arc;
use std::time::Instant;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tracing::debug;
@@ -74,7 +78,7 @@ impl MusicFs for SearchService {
}))
}
type SearchStreamStream = tokio_stream::wrappers::ReceiverStream<Result<SearchResult, Status>>;
type SearchStreamStream = ReceiverStream<Result<SearchResult, Status>>;
async fn search_stream(
&self,
@@ -112,9 +116,94 @@ impl MusicFs for SearchService {
}
});
Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
rx,
)))
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn get_status(
&self,
_request: Request<Empty>,
) -> Result<Response<StatusResponse>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
async fn shutdown(
&self,
_request: Request<ShutdownRequest>,
) -> Result<Response<Empty>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
async fn get_cache_stats(
&self,
_request: Request<Empty>,
) -> Result<Response<CacheStats>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
async fn clear_cache(
&self,
_request: Request<ClearCacheRequest>,
) -> Result<Response<ClearCacheResponse>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
type PrefetchStream = ReceiverStream<Result<PrefetchProgress, Status>>;
async fn prefetch(
&self,
_request: Request<PrefetchRequest>,
) -> Result<Response<Self::PrefetchStream>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
async fn list_origins(
&self,
_request: Request<Empty>,
) -> Result<Response<OriginsResponse>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
async fn get_origin_health(
&self,
_request: Request<OriginRequest>,
) -> Result<Response<OriginHealthResponse>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
type RescanOriginStream = ReceiverStream<Result<SyncProgress, Status>>;
async fn rescan_origin(
&self,
_request: Request<OriginRequest>,
) -> Result<Response<Self::RescanOriginStream>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
type SubscribeEventsStream = ReceiverStream<Result<Event, Status>>;
async fn subscribe_events(
&self,
_request: Request<EventFilter>,
) -> Result<Response<Self::SubscribeEventsStream>, Status> {
Err(Status::unimplemented(
"Use MusicFsServer for control operations",
))
}
}
+428
View File
@@ -0,0 +1,428 @@
use crate::proto::musicfs::v1::{
music_fs_server::MusicFs, CacheStats, ClearCacheRequest, ClearCacheResponse, Empty, Event,
EventFilter, HealthStatus, MountState, OriginHealthResponse, OriginRequest, OriginsResponse,
PrefetchProgress, PrefetchRequest, SearchRequest, SearchResponse, SearchResult,
ShutdownRequest, StatusResponse, SyncProgress, TierStats,
};
use musicfs_core::{Event as CoreEvent, EventBus};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tracing::{debug, info};
pub struct MusicFsServer {
start_time: Instant,
event_bus: Arc<EventBus>,
version: String,
}
impl MusicFsServer {
pub fn new(event_bus: Arc<EventBus>) -> Self {
Self {
start_time: Instant::now(),
event_bus,
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
fn event_to_proto(event: &CoreEvent) -> Event {
let (event_type, origin_id, path, file_id) = match event {
CoreEvent::FileAccessed {
file_id,
origin_id,
path,
..
} => (
"file_accessed".to_string(),
Some(origin_id.to_string()),
Some(path.as_str().to_string()),
Some(file_id.0),
),
CoreEvent::FileAdded { path, origin_id } => (
"file_added".to_string(),
Some(origin_id.to_string()),
Some(path.as_str().to_string()),
None,
),
CoreEvent::FileRemoved { path, file_id } => (
"file_removed".to_string(),
None,
Some(path.as_str().to_string()),
file_id.map(|id| id.0),
),
CoreEvent::FileModified { path } => (
"file_modified".to_string(),
None,
Some(path.as_str().to_string()),
None,
),
CoreEvent::SyncStarted { origin_id } => (
"sync_started".to_string(),
Some(origin_id.to_string()),
None,
None,
),
CoreEvent::SyncCompleted {
origin_id,
files_changed,
} => {
let mut metadata = std::collections::HashMap::new();
metadata.insert("files_changed".to_string(), files_changed.to_string());
return Event {
event_type: "sync_completed".to_string(),
timestamp_ms: chrono::Utc::now().timestamp_millis(),
origin_id: Some(origin_id.to_string()),
path: None,
file_id: None,
metadata,
};
}
CoreEvent::OriginHealthChanged { origin_id, healthy } => {
let mut metadata = std::collections::HashMap::new();
metadata.insert("healthy".to_string(), healthy.to_string());
return Event {
event_type: "origin_health_changed".to_string(),
timestamp_ms: chrono::Utc::now().timestamp_millis(),
origin_id: Some(origin_id.to_string()),
path: None,
file_id: None,
metadata,
};
}
CoreEvent::CacheEviction { bytes_freed } => {
let mut metadata = std::collections::HashMap::new();
metadata.insert("bytes_freed".to_string(), bytes_freed.to_string());
return Event {
event_type: "cache_eviction".to_string(),
timestamp_ms: chrono::Utc::now().timestamp_millis(),
origin_id: None,
path: None,
file_id: None,
metadata,
};
}
CoreEvent::OriginConnected { origin_id } => (
"origin_connected".to_string(),
Some(origin_id.to_string()),
None,
None,
),
CoreEvent::OriginDisconnected { origin_id } => (
"origin_disconnected".to_string(),
Some(origin_id.to_string()),
None,
None,
),
CoreEvent::AllOriginsUnhealthy { candidate_count } => {
let mut metadata = std::collections::HashMap::new();
metadata.insert("candidate_count".to_string(), candidate_count.to_string());
return Event {
event_type: "all_origins_unhealthy".to_string(),
timestamp_ms: chrono::Utc::now().timestamp_millis(),
origin_id: None,
path: None,
file_id: None,
metadata,
};
}
};
Event {
event_type,
timestamp_ms: chrono::Utc::now().timestamp_millis(),
origin_id,
path,
file_id,
metadata: Default::default(),
}
}
fn matches_filter(event: &CoreEvent, filter: &EventFilter) -> bool {
if !filter.event_types.is_empty() {
let event_type = match event {
CoreEvent::FileAccessed { .. } => "file_accessed",
CoreEvent::FileAdded { .. } => "file_added",
CoreEvent::FileRemoved { .. } => "file_removed",
CoreEvent::FileModified { .. } => "file_modified",
CoreEvent::SyncStarted { .. } => "sync_started",
CoreEvent::SyncCompleted { .. } => "sync_completed",
CoreEvent::OriginHealthChanged { .. } => "origin_health_changed",
CoreEvent::CacheEviction { .. } => "cache_eviction",
CoreEvent::OriginConnected { .. } => "origin_connected",
CoreEvent::OriginDisconnected { .. } => "origin_disconnected",
CoreEvent::AllOriginsUnhealthy { .. } => "all_origins_unhealthy",
};
if !filter.event_types.iter().any(|t| t == event_type) {
return false;
}
}
if let Some(ref origin_filter) = filter.origin_id {
let event_origin = match event {
CoreEvent::FileAccessed { origin_id, .. }
| CoreEvent::FileAdded { origin_id, .. }
| CoreEvent::SyncStarted { origin_id }
| CoreEvent::SyncCompleted { origin_id, .. }
| CoreEvent::OriginHealthChanged { origin_id, .. }
| CoreEvent::OriginConnected { origin_id }
| CoreEvent::OriginDisconnected { origin_id } => Some(origin_id.to_string()),
CoreEvent::FileRemoved { .. }
| CoreEvent::FileModified { .. }
| CoreEvent::CacheEviction { .. }
| CoreEvent::AllOriginsUnhealthy { .. } => None,
};
if event_origin.as_ref() != Some(origin_filter) {
return false;
}
}
true
}
}
#[tonic::async_trait]
impl MusicFs for MusicFsServer {
async fn search(
&self,
_request: Request<SearchRequest>,
) -> Result<Response<SearchResponse>, Status> {
Err(Status::unimplemented(
"Use SearchService for search operations",
))
}
type SearchStreamStream = ReceiverStream<Result<SearchResult, Status>>;
async fn search_stream(
&self,
_request: Request<SearchRequest>,
) -> Result<Response<Self::SearchStreamStream>, Status> {
Err(Status::unimplemented(
"Use SearchService for search operations",
))
}
async fn get_status(
&self,
_request: Request<Empty>,
) -> Result<Response<StatusResponse>, Status> {
let uptime = self.start_time.elapsed().as_secs();
Ok(Response::new(StatusResponse {
version: self.version.clone(),
uptime_secs: uptime,
mount_point: String::new(),
state: MountState::MountReady as i32,
open_file_handles: 0,
fuse_ops_total: 0,
files_indexed: 0,
cache_size_bytes: 0,
origins: vec![],
}))
}
async fn shutdown(
&self,
request: Request<ShutdownRequest>,
) -> Result<Response<Empty>, Status> {
let req = request.into_inner();
info!(
"Shutdown requested (graceful={}, timeout={}s)",
req.graceful, req.timeout_secs
);
Ok(Response::new(Empty {}))
}
async fn get_cache_stats(
&self,
_request: Request<Empty>,
) -> Result<Response<CacheStats>, Status> {
Ok(Response::new(CacheStats {
total_size_bytes: 0,
used_size_bytes: 0,
size_limit_bytes: 0,
chunk_count: 0,
chunks_unique: 0,
dedup_ratio: 0.0,
hit_count: 0,
miss_count: 0,
hit_ratio: 0.0,
metadata_entries: 0,
metadata_bytes: 0,
l1_metadata: Some(TierStats {
entries: 0,
size_bytes: 0,
hits: 0,
misses: 0,
}),
l2_headers: Some(TierStats {
entries: 0,
size_bytes: 0,
hits: 0,
misses: 0,
}),
l3_chunks: Some(TierStats {
entries: 0,
size_bytes: 0,
hits: 0,
misses: 0,
}),
}))
}
async fn clear_cache(
&self,
request: Request<ClearCacheRequest>,
) -> Result<Response<ClearCacheResponse>, Status> {
let req = request.into_inner();
debug!(
"Clear cache requested: origin={:?}, metadata={}, chunks={}",
req.origin_id, req.clear_metadata, req.clear_chunks
);
Ok(Response::new(ClearCacheResponse {
bytes_cleared: 0,
chunks_cleared: 0,
}))
}
type PrefetchStream = ReceiverStream<Result<PrefetchProgress, Status>>;
async fn prefetch(
&self,
request: Request<PrefetchRequest>,
) -> Result<Response<Self::PrefetchStream>, Status> {
let req = request.into_inner();
let total = req.paths.len() as u32;
let (tx, rx) = mpsc::channel(32);
tokio::spawn(async move {
for (i, path) in req.paths.into_iter().enumerate() {
let progress = PrefetchProgress {
current_path: path,
completed: i as u32 + 1,
total,
bytes_fetched: 0,
};
if tx.send(Ok(progress)).await.is_err() {
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn list_origins(
&self,
_request: Request<Empty>,
) -> Result<Response<OriginsResponse>, Status> {
Ok(Response::new(OriginsResponse { origins: vec![] }))
}
async fn get_origin_health(
&self,
request: Request<OriginRequest>,
) -> Result<Response<OriginHealthResponse>, Status> {
let req = request.into_inner();
Ok(Response::new(OriginHealthResponse {
origin_id: req.origin_id,
status: HealthStatus::HealthUnknown as i32,
message: None,
last_check_secs: 0,
}))
}
type RescanOriginStream = ReceiverStream<Result<SyncProgress, Status>>;
async fn rescan_origin(
&self,
request: Request<OriginRequest>,
) -> Result<Response<Self::RescanOriginStream>, Status> {
let req = request.into_inner();
info!("Rescan requested for origin: {}", req.origin_id);
let (tx, rx) = mpsc::channel(32);
tokio::spawn(async move {
let phases = ["scanning", "indexing", "complete"];
for (i, phase) in phases.iter().enumerate() {
let progress = SyncProgress {
phase: phase.to_string(),
current: i as u32 + 1,
total: phases.len() as u32,
current_path: String::new(),
bytes_synced: 0,
};
if tx.send(Ok(progress)).await.is_err() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
type SubscribeEventsStream = ReceiverStream<Result<Event, Status>>;
async fn subscribe_events(
&self,
request: Request<EventFilter>,
) -> Result<Response<Self::SubscribeEventsStream>, Status> {
let filter = request.into_inner();
let mut rx = self.event_bus.subscribe();
let (tx, out_rx) = mpsc::channel(100);
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if Self::matches_filter(&event, &filter) {
let proto_event = Self::event_to_proto(&event);
if tx.send(Ok(proto_event)).await.is_err() {
break;
}
}
}
});
Ok(Response::new(ReceiverStream::new(out_rx)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_get_status() {
let event_bus = Arc::new(EventBus::new(16));
let server = MusicFsServer::new(event_bus);
let response = server.get_status(Request::new(Empty {})).await.unwrap();
let status = response.into_inner();
assert!(!status.version.is_empty());
assert!(status.uptime_secs < 5);
}
#[tokio::test]
async fn test_get_cache_stats() {
let event_bus = Arc::new(EventBus::new(16));
let server = MusicFsServer::new(event_bus);
let response = server
.get_cache_stats(Request::new(Empty {}))
.await
.unwrap();
let stats = response.into_inner();
assert_eq!(stats.hit_ratio, 0.0);
}
}
+288
View File
@@ -0,0 +1,288 @@
use musicfs_core::Event;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::sync::broadcast;
use tracing::{debug, warn};
#[derive(Debug, Clone, Serialize)]
pub struct WebhookPayload {
pub event_type: String,
pub timestamp: i64,
pub data: serde_json::Value,
}
#[derive(Debug, Clone, Deserialize)]
pub struct WebhookConfig {
pub url: String,
pub secret: Option<String>,
pub events: Vec<String>,
#[serde(default = "default_retry_count")]
pub retry_count: u32,
#[serde(default = "default_timeout_ms")]
pub timeout_ms: u64,
}
fn default_retry_count() -> u32 {
3
}
fn default_timeout_ms() -> u64 {
5000
}
pub struct WebhookHandler {
client: reqwest::Client,
configs: Vec<WebhookConfig>,
}
impl WebhookHandler {
pub fn new(configs: Vec<WebhookConfig>) -> Self {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to create HTTP client");
Self { client, configs }
}
pub async fn run(&self, mut rx: broadcast::Receiver<Event>) {
while let Ok(event) = rx.recv().await {
for config in &self.configs {
if self.matches_filter(&event, config) {
self.dispatch(config, &event).await;
}
}
}
}
async fn dispatch(&self, config: &WebhookConfig, event: &Event) {
let payload = WebhookPayload {
event_type: self.event_type_name(event),
timestamp: chrono::Utc::now().timestamp_millis(),
data: self.event_to_json(event),
};
let signature = self.sign(&payload, config);
let mut attempts = 0u32;
loop {
let result = self
.client
.post(&config.url)
.timeout(Duration::from_millis(config.timeout_ms))
.header("Content-Type", "application/json")
.header("X-MusicFS-Signature", &signature)
.header("X-MusicFS-Event", &payload.event_type)
.json(&payload)
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
debug!(
"Webhook delivered to {} for {}",
config.url, payload.event_type
);
break;
}
Ok(resp) => {
warn!(
"Webhook to {} returned status {}, attempt {}/{}",
config.url,
resp.status(),
attempts + 1,
config.retry_count + 1
);
}
Err(e) => {
warn!(
"Webhook to {} failed: {}, attempt {}/{}",
config.url,
e,
attempts + 1,
config.retry_count + 1
);
}
}
if attempts >= config.retry_count {
warn!(
"Webhook delivery to {} failed after {} attempts",
config.url,
attempts + 1
);
break;
}
attempts += 1;
let delay = Duration::from_millis(100 * 2u64.pow(attempts));
tokio::time::sleep(delay).await;
}
}
fn sign(&self, payload: &WebhookPayload, config: &WebhookConfig) -> String {
match &config.secret {
Some(secret) => {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let body = serde_json::to_string(payload).unwrap_or_default();
let mut mac =
HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC key invalid");
mac.update(body.as_bytes());
let result = mac.finalize();
format!("sha256={}", hex::encode(result.into_bytes()))
}
None => String::new(),
}
}
fn matches_filter(&self, event: &Event, config: &WebhookConfig) -> bool {
if config.events.is_empty() {
return true;
}
let event_type = self.event_type_name(event);
config.events.iter().any(|e| e == &event_type)
}
fn event_type_name(&self, event: &Event) -> String {
match event {
Event::FileAccessed { .. } => "file_accessed",
Event::FileAdded { .. } => "file_added",
Event::FileRemoved { .. } => "file_removed",
Event::FileModified { .. } => "file_modified",
Event::SyncStarted { .. } => "sync_started",
Event::SyncCompleted { .. } => "sync_completed",
Event::OriginHealthChanged { .. } => "origin_health_changed",
Event::CacheEviction { .. } => "cache_eviction",
Event::OriginConnected { .. } => "origin_connected",
Event::OriginDisconnected { .. } => "origin_disconnected",
Event::AllOriginsUnhealthy { .. } => "all_origins_unhealthy",
}
.to_string()
}
fn event_to_json(&self, event: &Event) -> serde_json::Value {
match event {
Event::FileAccessed {
file_id,
origin_id,
path,
offset,
size,
} => serde_json::json!({
"file_id": file_id.0,
"origin_id": origin_id.to_string(),
"path": path.as_str(),
"offset": offset,
"size": size,
}),
Event::FileAdded { path, origin_id } => serde_json::json!({
"path": path.as_str(),
"origin_id": origin_id.to_string(),
}),
Event::FileRemoved { path, file_id } => serde_json::json!({
"path": path.as_str(),
"file_id": file_id.map(|id| id.0),
}),
Event::FileModified { path } => serde_json::json!({
"path": path.as_str(),
}),
Event::SyncStarted { origin_id } => serde_json::json!({
"origin_id": origin_id.to_string(),
}),
Event::SyncCompleted {
origin_id,
files_changed,
} => serde_json::json!({
"origin_id": origin_id.to_string(),
"files_changed": files_changed,
}),
Event::OriginHealthChanged { origin_id, healthy } => serde_json::json!({
"origin_id": origin_id.to_string(),
"healthy": healthy,
}),
Event::CacheEviction { bytes_freed } => serde_json::json!({
"bytes_freed": bytes_freed,
}),
Event::OriginConnected { origin_id } => serde_json::json!({
"origin_id": origin_id.to_string(),
}),
Event::OriginDisconnected { origin_id } => serde_json::json!({
"origin_id": origin_id.to_string(),
}),
Event::AllOriginsUnhealthy { candidate_count } => serde_json::json!({
"candidate_count": candidate_count,
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use musicfs_core::OriginId;
#[test]
fn test_webhook_config_defaults() {
let json = r#"{"url": "http://example.com", "events": []}"#;
let config: WebhookConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.retry_count, 3);
assert_eq!(config.timeout_ms, 5000);
}
#[test]
fn test_event_type_name() {
let handler = WebhookHandler::new(vec![]);
let event = Event::SyncStarted {
origin_id: OriginId::from("test"),
};
assert_eq!(handler.event_type_name(&event), "sync_started");
}
#[test]
fn test_matches_filter_empty() {
let handler = WebhookHandler::new(vec![]);
let config = WebhookConfig {
url: "http://example.com".to_string(),
secret: None,
events: vec![],
retry_count: 3,
timeout_ms: 5000,
};
let event = Event::SyncStarted {
origin_id: OriginId::from("test"),
};
assert!(handler.matches_filter(&event, &config));
}
#[test]
fn test_matches_filter_specific() {
let handler = WebhookHandler::new(vec![]);
let config = WebhookConfig {
url: "http://example.com".to_string(),
secret: None,
events: vec!["sync_started".to_string()],
retry_count: 3,
timeout_ms: 5000,
};
let event = Event::SyncStarted {
origin_id: OriginId::from("test"),
};
assert!(handler.matches_filter(&event, &config));
let event2 = Event::SyncCompleted {
origin_id: OriginId::from("test"),
files_changed: 0,
};
assert!(!handler.matches_filter(&event2, &config));
}
}