use crate::proto::musicfs::v1::{ music_fs_server::MusicFs, CacheStats, ClearCacheRequest, ClearCacheResponse, Empty, Event, EventFilter, HealthStatus, MountState, OriginHealthResponse, OriginRequest, OriginsResponse, PrefetchProgress, PrefetchRequest, SearchRequest, SearchResponse, SearchResult, ShutdownRequest, StatusResponse, SyncProgress, SyncedFile, TierStats, }; use musicfs_core::{Event as CoreEvent, EventBus}; use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::{debug, info, instrument}; pub struct MusicFsServer { start_time: Instant, event_bus: Arc, version: String, scanner: Arc, origin_root: std::path::PathBuf, } impl MusicFsServer { pub fn new( event_bus: Arc, db: Arc, tree: Arc>, fetcher: Arc, origin_root: std::path::PathBuf, ) -> Self { let scanner = Arc::new(crate::scanner::OriginScanner::new( db, event_bus.clone(), tree, fetcher, )); Self { start_time: Instant::now(), event_bus, version: env!("CARGO_PKG_VERSION").to_string(), scanner, origin_root, } } fn event_to_proto(event: &CoreEvent) -> Event { let (event_type, origin_id, path, file_id) = match event { CoreEvent::FileAccessed { file_id, origin_id, path, .. } => ( "file_accessed".to_string(), Some(origin_id.to_string()), Some(path.as_str().to_string()), Some(file_id.0), ), CoreEvent::FileAdded { path, origin_id } => ( "file_added".to_string(), Some(origin_id.to_string()), Some(path.as_str().to_string()), None, ), CoreEvent::FileRemoved { path, file_id } => ( "file_removed".to_string(), None, Some(path.as_str().to_string()), file_id.map(|id| id.0), ), CoreEvent::FileModified { path } => ( "file_modified".to_string(), None, Some(path.as_str().to_string()), None, ), CoreEvent::SyncStarted { origin_id } => ( "sync_started".to_string(), Some(origin_id.to_string()), None, None, ), CoreEvent::SyncCompleted { origin_id, files_changed, } => { let mut metadata = std::collections::HashMap::new(); metadata.insert("files_changed".to_string(), files_changed.to_string()); return Event { event_type: "sync_completed".to_string(), timestamp_ms: chrono::Utc::now().timestamp_millis(), origin_id: Some(origin_id.to_string()), path: None, file_id: None, metadata, }; } CoreEvent::OriginHealthChanged { origin_id, healthy } => { let mut metadata = std::collections::HashMap::new(); metadata.insert("healthy".to_string(), healthy.to_string()); return Event { event_type: "origin_health_changed".to_string(), timestamp_ms: chrono::Utc::now().timestamp_millis(), origin_id: Some(origin_id.to_string()), path: None, file_id: None, metadata, }; } CoreEvent::CacheEviction { bytes_freed } => { let mut metadata = std::collections::HashMap::new(); metadata.insert("bytes_freed".to_string(), bytes_freed.to_string()); return Event { event_type: "cache_eviction".to_string(), timestamp_ms: chrono::Utc::now().timestamp_millis(), origin_id: None, path: None, file_id: None, metadata, }; } CoreEvent::OriginConnected { origin_id } => ( "origin_connected".to_string(), Some(origin_id.to_string()), None, None, ), CoreEvent::OriginDisconnected { origin_id } => ( "origin_disconnected".to_string(), Some(origin_id.to_string()), None, None, ), CoreEvent::AllOriginsUnhealthy { candidate_count } => { let mut metadata = std::collections::HashMap::new(); metadata.insert("candidate_count".to_string(), candidate_count.to_string()); return Event { event_type: "all_origins_unhealthy".to_string(), timestamp_ms: chrono::Utc::now().timestamp_millis(), origin_id: None, path: None, file_id: None, metadata, }; } }; Event { event_type, timestamp_ms: chrono::Utc::now().timestamp_millis(), origin_id, path, file_id, metadata: Default::default(), } } fn matches_filter(event: &CoreEvent, filter: &EventFilter) -> bool { if !filter.event_types.is_empty() { let event_type = match event { CoreEvent::FileAccessed { .. } => "file_accessed", CoreEvent::FileAdded { .. } => "file_added", CoreEvent::FileRemoved { .. } => "file_removed", CoreEvent::FileModified { .. } => "file_modified", CoreEvent::SyncStarted { .. } => "sync_started", CoreEvent::SyncCompleted { .. } => "sync_completed", CoreEvent::OriginHealthChanged { .. } => "origin_health_changed", CoreEvent::CacheEviction { .. } => "cache_eviction", CoreEvent::OriginConnected { .. } => "origin_connected", CoreEvent::OriginDisconnected { .. } => "origin_disconnected", CoreEvent::AllOriginsUnhealthy { .. } => "all_origins_unhealthy", }; if !filter.event_types.iter().any(|t| t == event_type) { return false; } } if let Some(ref origin_filter) = filter.origin_id { let event_origin = match event { CoreEvent::FileAccessed { origin_id, .. } | CoreEvent::FileAdded { origin_id, .. } | CoreEvent::SyncStarted { origin_id } | CoreEvent::SyncCompleted { origin_id, .. } | CoreEvent::OriginHealthChanged { origin_id, .. } | CoreEvent::OriginConnected { origin_id } | CoreEvent::OriginDisconnected { origin_id } => Some(origin_id.to_string()), CoreEvent::FileRemoved { .. } | CoreEvent::FileModified { .. } | CoreEvent::CacheEviction { .. } | CoreEvent::AllOriginsUnhealthy { .. } => None, }; if event_origin.as_ref() != Some(origin_filter) { return false; } } true } } #[tonic::async_trait] impl MusicFs for MusicFsServer { async fn search( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "Use SearchService for search operations", )) } type SearchStreamStream = ReceiverStream>; async fn search_stream( &self, _request: Request, ) -> Result, Status> { Err(Status::unimplemented( "Use SearchService for search operations", )) } #[instrument(level = "debug", skip(self, _request), fields(method = "get_status"))] async fn get_status( &self, _request: Request, ) -> Result, Status> { debug!("gRPC get_status called"); let uptime = self.start_time.elapsed().as_secs(); Ok(Response::new(StatusResponse { version: self.version.clone(), uptime_secs: uptime, mount_point: String::new(), state: MountState::MountReady as i32, open_file_handles: 0, fuse_ops_total: 0, files_indexed: 0, cache_size_bytes: 0, origins: vec![], })) } #[instrument(level = "info", skip(self, request), fields(method = "shutdown"))] async fn shutdown(&self, request: Request) -> Result, Status> { let req = request.into_inner(); info!( graceful = req.graceful, timeout_secs = req.timeout_secs, "gRPC shutdown requested" ); Ok(Response::new(Empty {})) } #[instrument( level = "debug", skip(self, _request), fields(method = "get_cache_stats") )] async fn get_cache_stats( &self, _request: Request, ) -> Result, Status> { debug!("gRPC get_cache_stats called"); Ok(Response::new(CacheStats { total_size_bytes: 0, used_size_bytes: 0, size_limit_bytes: 0, chunk_count: 0, chunks_unique: 0, dedup_ratio: 0.0, hit_count: 0, miss_count: 0, hit_ratio: 0.0, metadata_entries: 0, metadata_bytes: 0, l1_metadata: Some(TierStats { entries: 0, size_bytes: 0, hits: 0, misses: 0, }), l2_headers: Some(TierStats { entries: 0, size_bytes: 0, hits: 0, misses: 0, }), l3_chunks: Some(TierStats { entries: 0, size_bytes: 0, hits: 0, misses: 0, }), })) } #[instrument(level = "info", skip(self, request), fields(method = "clear_cache"))] async fn clear_cache( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); info!( origin_id = ?req.origin_id, clear_metadata = req.clear_metadata, clear_chunks = req.clear_chunks, "gRPC clear_cache" ); Ok(Response::new(ClearCacheResponse { bytes_cleared: 0, chunks_cleared: 0, })) } type PrefetchStream = ReceiverStream>; #[instrument(level = "debug", skip(self, request), fields(method = "prefetch"))] async fn prefetch( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let total = req.paths.len() as u32; debug!(file_count = total, "gRPC prefetch started"); let (tx, rx) = mpsc::channel(32); tokio::spawn(async move { for (i, path) in req.paths.into_iter().enumerate() { let progress = PrefetchProgress { current_path: path, completed: i as u32 + 1, total, bytes_fetched: 0, }; if tx.send(Ok(progress)).await.is_err() { break; } } }); Ok(Response::new(ReceiverStream::new(rx))) } #[instrument(level = "debug", skip(self, _request), fields(method = "list_origins"))] async fn list_origins( &self, _request: Request, ) -> Result, Status> { debug!("gRPC list_origins called"); Ok(Response::new(OriginsResponse { origins: vec![] })) } #[instrument( level = "debug", skip(self, request), fields(method = "get_origin_health") )] async fn get_origin_health( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); debug!(origin_id = %req.origin_id, "gRPC get_origin_health"); Ok(Response::new(OriginHealthResponse { origin_id: req.origin_id, status: HealthStatus::HealthUnknown as i32, message: None, last_check_secs: 0, })) } type RescanOriginStream = ReceiverStream>; #[instrument(level = "info", skip(self, request), fields(method = "rescan_origin"))] async fn rescan_origin( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let subdir = req.subdir.as_deref().filter(|s| !s.is_empty()); info!( origin_id = %req.origin_id, subdir = ?subdir, "gRPC rescan_origin started" ); let (tx, rx) = mpsc::channel(32); let (progress_tx, mut progress_rx) = mpsc::channel::(64); let origin_id = musicfs_core::OriginId::from(req.origin_id.as_str()); let scanner = self.scanner.clone(); let origin_root = self.origin_root.clone(); let subdir_owned = subdir.map(|s| s.to_string()); tokio::spawn(async move { let forward_handle = { let tx = tx.clone(); tokio::spawn(async move { while let Some(progress) = progress_rx.recv().await { let proto = SyncProgress { phase: progress.phase, current: progress.current, total: progress.total, current_path: progress.current_path, bytes_synced: progress.bytes_synced, new_files: vec![], }; if tx.send(Ok(proto)).await.is_err() { break; } } }) }; let result = scanner .scan( &origin_id, &origin_root, subdir_owned.as_deref(), progress_tx, ) .await; forward_handle.abort(); match result { Ok(scan_result) => { let synced_files: Vec = scan_result .new_files .iter() .map(|f| SyncedFile { path: f.path.clone(), file_id: f.file_id.0, virtual_path: f.virtual_path.clone(), }) .collect(); let _ = tx .send(Ok(SyncProgress { phase: "complete".to_string(), current: scan_result.new_files.len() as u32 + scan_result.changed + scan_result.deleted, total: scan_result.new_files.len() as u32 + scan_result.changed + scan_result.deleted + scan_result.unchanged, current_path: String::new(), bytes_synced: scan_result.bytes_synced, new_files: synced_files, })) .await; } Err(e) => { let _ = tx .send(Err(Status::internal(format!("rescan failed: {}", e)))) .await; } } }); Ok(Response::new(ReceiverStream::new(rx))) } type SubscribeEventsStream = ReceiverStream>; #[instrument( level = "info", skip(self, request), fields(method = "subscribe_events") )] async fn subscribe_events( &self, request: Request, ) -> Result, Status> { info!("gRPC subscribe_events: client connected"); let filter = request.into_inner(); let mut rx = self.event_bus.subscribe(); let (tx, out_rx) = mpsc::channel(100); tokio::spawn(async move { loop { match rx.recv().await { Ok(event) => { if Self::matches_filter(&event, &filter) { let proto_event = Self::event_to_proto(&event); if tx.send(Ok(proto_event)).await.is_err() { break; } } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(skipped = n, "Event subscriber lagged, skipped events"); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { tracing::debug!("Event channel closed"); break; } } } }); Ok(Response::new(ReceiverStream::new(out_rx))) } } #[cfg(test)] mod tests { use super::*; async fn make_test_server() -> (MusicFsServer, tempfile::TempDir) { let event_bus = Arc::new(EventBus::new(16)); let db = Arc::new(musicfs_cache::Database::open_memory().unwrap()); let tree = Arc::new(parking_lot::RwLock::new( musicfs_cache::TreeBuilder::new().build(), )); let dir = tempfile::tempdir().unwrap(); let cfg = musicfs_cas::CasConfig { chunks_dir: dir.path().join("chunks"), ..Default::default() }; let store = Arc::new(musicfs_cas::CasStore::open(cfg).await.unwrap()); let fetcher = Arc::new(musicfs_cas::ContentFetcher::new(store)); let origin_root = std::path::PathBuf::from("/tmp/test-origin"); ( MusicFsServer::new(event_bus, db, tree, fetcher, origin_root), dir, ) } #[tokio::test] async fn test_get_status() { let (server, _dir) = make_test_server().await; let response = server.get_status(Request::new(Empty {})).await.unwrap(); let status = response.into_inner(); assert!(!status.version.is_empty()); assert!(status.uptime_secs < 5); } #[tokio::test] async fn test_get_cache_stats() { let (server, _dir) = make_test_server().await; let response = server .get_cache_stats(Request::new(Empty {})) .await .unwrap(); let stats = response.into_inner(); assert_eq!(stats.hit_ratio, 0.0); } }