From b88583707d3bb036201ba119f21ff3370d425f64 Mon Sep 17 00:00:00 2001 From: Alexander Date: Sun, 17 May 2026 23:32:18 +0200 Subject: [PATCH] feat: add metadata enrichment integration with music-agregator - Add SyncedFile message and subdir scoping to RescanOrigin proto - Add label, album_type, cover_url fields to UpdateMetadataRequest/MetadataResponse - Implement OriginScanner: walk, hash, diff, ingest with live FUSE tree and content fetcher registration - Add enrichment DB columns: enrichment_source, enriched_at, enrichment_attempts, genres_json, label, album_type, cover_url - Add EnrichmentUpdate struct and update_enrichment DB method - Wire BatchUpdateMetadata to write enrichment fields alongside audio metadata - Wire gRPC server into CLI mount command with --grpc-port flag - Pass VirtualTree and ContentFetcher to scanner so rescanned files are immediately visible and readable via FUSE --- Cargo.lock | 3 + crates/musicfs-cache/src/db.rs | 79 ++++++- crates/musicfs-cache/src/lib.rs | 2 +- crates/musicfs-cache/src/schema.sql | 9 + crates/musicfs-cli/src/main.rs | 52 ++++- crates/musicfs-cli/src/metadata.rs | 6 + crates/musicfs-grpc/Cargo.toml | 3 + crates/musicfs-grpc/proto/musicfs.proto | 19 ++ crates/musicfs-grpc/src/lib.rs | 1 + crates/musicfs-grpc/src/metadata.rs | 70 +++++-- crates/musicfs-grpc/src/scanner.rs | 261 ++++++++++++++++++++++++ crates/musicfs-grpc/src/server.rs | 132 ++++++++++-- 12 files changed, 595 insertions(+), 42 deletions(-) create mode 100644 crates/musicfs-grpc/src/scanner.rs diff --git a/Cargo.lock b/Cargo.lock index 742c412..6dae04f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2050,8 +2050,11 @@ dependencies = [ "hex", "hmac", "musicfs-cache", + "musicfs-cas", "musicfs-core", + "musicfs-metadata", "musicfs-search", + "parking_lot 0.12.5", "prost", "reqwest", "serde", diff --git a/crates/musicfs-cache/src/db.rs b/crates/musicfs-cache/src/db.rs index cf160b6..0f9f797 100644 --- a/crates/musicfs-cache/src/db.rs +++ b/crates/musicfs-cache/src/db.rs @@ -786,6 +786,70 @@ impl Database { Ok(()) } + pub fn update_enrichment( + &self, + file_id: FileId, + enrichment: &EnrichmentUpdate, + ) -> Result<()> { + let conn = self.conn.lock().unwrap(); + + let mut set_clauses = vec![ + "label = ?1".to_string(), + "album_type = ?2".to_string(), + "cover_url = ?3".to_string(), + "enrichment_source = ?4".to_string(), + "enriched_at = strftime('%s', 'now')".to_string(), + "enrichment_attempts = 0".to_string(), + "last_enrichment_error = NULL".to_string(), + ]; + + let mut params_vec: Vec> = vec![ + Box::new(enrichment.label.clone()), + Box::new(enrichment.album_type.clone()), + Box::new(enrichment.cover_url.clone()), + Box::new(enrichment.source.clone()), + ]; + + if let Some(ref genres) = enrichment.genres_json { + params_vec.push(Box::new(genres.clone())); + set_clauses.push(format!("genres_json = ?{}", params_vec.len())); + } + if let Some(ref genre) = enrichment.primary_genre { + params_vec.push(Box::new(genre.clone())); + set_clauses.push(format!("genre = ?{}", params_vec.len())); + } + + params_vec.push(Box::new(file_id.0)); + let id_param = params_vec.len(); + + let sql = format!( + "UPDATE files SET {} WHERE id = ?{}", + set_clauses.join(", "), + id_param + ); + + let params_refs: Vec<&dyn rusqlite::ToSql> = + params_vec.iter().map(|p| p.as_ref()).collect(); + + let rows = conn + .execute(&sql, params_refs.as_slice()) + .map_err(|e| Error::Database(format!("update_enrichment failed: {}", e)))?; + + if rows == 0 { + return Err(Error::FileNotFound(format!( + "file id {} not found", + file_id.0 + ))); + } + + debug!( + id = file_id.0, + source = &enrichment.source, + "updated enrichment metadata" + ); + Ok(()) + } + pub fn clear_overlay(&self, file_id: FileId) -> Result<()> { let conn = self.conn.lock().unwrap(); @@ -802,7 +866,10 @@ impl Database { mb_recording_id = NULL, mb_album_id = NULL, mb_artist_id = NULL, mb_album_artist_id = NULL, mb_release_group_id = NULL, replaygain_track_gain = NULL, replaygain_track_peak = NULL, replaygain_album_gain = NULL, replaygain_album_peak = NULL, channels = NULL, bits_per_sample = NULL, encoder = NULL, - custom_tags = NULL, format_layout = NULL + custom_tags = NULL, format_layout = NULL, + label = NULL, album_type = NULL, cover_url = NULL, genres_json = NULL, + enrichment_source = NULL, enriched_at = NULL, + enrichment_attempts = 0, last_enrichment_error = NULL WHERE id = ?1 "#, params![file_id.0], @@ -948,6 +1015,16 @@ pub struct TrashedFile { pub origin_id: OriginId, } +#[derive(Debug, Clone, Default)] +pub struct EnrichmentUpdate { + pub label: Option, + pub album_type: Option, + pub cover_url: Option, + pub genres_json: Option, + pub primary_genre: Option, + pub source: String, +} + #[derive(Debug, Clone, Default)] pub struct TrashedFilter { pub origin_id: Option, diff --git a/crates/musicfs-cache/src/lib.rs b/crates/musicfs-cache/src/lib.rs index 1223eb2..f70c872 100644 --- a/crates/musicfs-cache/src/lib.rs +++ b/crates/musicfs-cache/src/lib.rs @@ -11,7 +11,7 @@ mod prefetch; mod tree; pub use artwork::{ArtworkCache, ArtworkError, CachedArtwork}; -pub use db::{Database, TrashedFile, TrashedFilter}; +pub use db::{Database, EnrichmentUpdate, TrashedFile, TrashedFilter}; pub use eviction::{EvictionError, EvictionPolicy, LruEviction}; pub use format_handler::{FormatError, FormatHandler, FormatHandlerRegistry}; pub use format_layout::FormatLayout; diff --git a/crates/musicfs-cache/src/schema.sql b/crates/musicfs-cache/src/schema.sql index f7df19b..430d82f 100644 --- a/crates/musicfs-cache/src/schema.sql +++ b/crates/musicfs-cache/src/schema.sql @@ -46,6 +46,15 @@ CREATE TABLE IF NOT EXISTS files ( encoder TEXT, custom_tags TEXT, format_layout BLOB, + + label TEXT, + album_type TEXT, + cover_url TEXT, + genres_json TEXT, + enrichment_source TEXT, + enriched_at INTEGER, + enrichment_attempts INTEGER NOT NULL DEFAULT 0, + last_enrichment_error TEXT, origin_mtime INTEGER NOT NULL, origin_size INTEGER NOT NULL, diff --git a/crates/musicfs-cli/src/main.rs b/crates/musicfs-cli/src/main.rs index 807d906..3d1a4a7 100644 --- a/crates/musicfs-cli/src/main.rs +++ b/crates/musicfs-cli/src/main.rs @@ -10,6 +10,7 @@ use musicfs_cache::{ use musicfs_cas::{CasConfig, CasStore, ContentFetcher, FileReader}; use musicfs_core::{FileId, FileMeta, LoggingConfig, OriginId, RealPath, VirtualPath}; use musicfs_fuse::MusicFs; +use musicfs_grpc::{MetadataServiceImpl, MusicFsServer as GrpcServer}; use musicfs_metadata::MetadataParser; use musicfs_origins::{LocalOrigin, Origin}; use parking_lot::RwLock; @@ -47,6 +48,8 @@ enum Commands { origin: Option, #[arg(short = 'd', long, help = "Cache directory")] cache_dir: Option, + #[arg(long, default_value = "50052", help = "gRPC server port")] + grpc_port: u16, }, Status, Cache { @@ -165,6 +168,7 @@ fn main() -> Result<()> { mountpoint, origin, cache_dir, + grpc_port, } => { let mut config = if let Some(config_path) = config { musicfs_core::Config::from_file(&config_path)? @@ -213,7 +217,7 @@ fn main() -> Result<()> { } let _guard = init_logging(&config.logging)?; - run_mount(config) + run_mount(config, grpc_port) } Commands::Status => { init_basic_logging(&cli.log_level); @@ -259,11 +263,11 @@ fn run_metadata(endpoint: String, command: MetadataCommand) -> Result<()> { runtime.block_on(metadata::run_metadata(command, &endpoint)) } -fn run_mount(config: musicfs_core::Config) -> Result<()> { +fn run_mount(config: musicfs_core::Config, grpc_port: u16) -> Result<()> { let runtime = tokio::runtime::Runtime::new().context("Failed to create Tokio runtime")?; let handle = runtime.handle().clone(); - let (tree, reader, db, overlay_reader) = runtime.block_on(async { + let (tree, reader, db, overlay_reader, origin_root, fetcher) = runtime.block_on(async { info!(mountpoint = ?config.mount_point, "Mount configuration"); info!("Cache directory: {:?}", config.cache_dir); @@ -364,7 +368,7 @@ fn run_mount(config: musicfs_core::Config) -> Result<()> { let tree = Arc::new(RwLock::new(tree)); - let reader = Arc::new(FileReader::with_fetcher(store.clone(), fetcher)); + let reader = Arc::new(FileReader::with_fetcher(store.clone(), fetcher.clone())); // Create overlay reader for metadata synthesis let overlay_reader = Arc::new(OverlayReader::new( @@ -373,7 +377,15 @@ fn run_mount(config: musicfs_core::Config) -> Result<()> { reader.clone(), )); - Ok::<_, anyhow::Error>((tree, reader, db, overlay_reader)) + let first_origin_root = config + .origins + .iter() + .find(|o| o.enabled && o.origin_type == musicfs_core::OriginType::Local) + .and_then(|o| o.settings.get("path").and_then(|v| v.as_str())) + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from("/")); + + Ok::<_, anyhow::Error>((tree, reader, db, overlay_reader, first_origin_root, fetcher)) })?; check_stale_mount(&config.mount_point)?; @@ -388,6 +400,8 @@ fn run_mount(config: musicfs_core::Config) -> Result<()> { .context("Failed to write PID file")?; info!(pid_path = ?pid_path, "PID file written"); + let grpc_db = db.clone(); + let tree_for_grpc = tree.clone(); let tree_for_restore = tree.clone(); let db_for_restore = db.clone(); @@ -411,6 +425,34 @@ fn run_mount(config: musicfs_core::Config) -> Result<()> { let shutdown_token = tokio_util::sync::CancellationToken::new(); + let event_bus = Arc::new(musicfs_core::EventBus::default()); + let grpc_event_bus = event_bus.clone(); + let grpc_origin_root = origin_root.clone(); + let grpc_shutdown = shutdown_token.clone(); + + runtime.spawn(async move { + let addr = format!("0.0.0.0:{}", grpc_port).parse().unwrap(); + + let grpc_tree = tree_for_grpc.clone(); + let grpc_fetcher = fetcher.clone(); + let musicfs_server = GrpcServer::new(grpc_event_bus, grpc_db.clone(), grpc_tree, grpc_fetcher, grpc_origin_root); + let metadata_server = MetadataServiceImpl::new(grpc_db); + + info!(%addr, "gRPC server starting"); + + let result = tonic::transport::Server::builder() + .add_service(musicfs_grpc::proto::musicfs::v1::music_fs_server::MusicFsServer::new(musicfs_server)) + .add_service(musicfs_grpc::proto::musicfs::v1::metadata_service_server::MetadataServiceServer::new(metadata_server)) + .serve_with_shutdown(addr, async move { + grpc_shutdown.cancelled().await; + }) + .await; + + if let Err(e) = result { + tracing::error!(error = %e, "gRPC server error"); + } + }); + runtime.block_on(async { let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; diff --git a/crates/musicfs-cli/src/metadata.rs b/crates/musicfs-cli/src/metadata.rs index adef048..324d6ab 100644 --- a/crates/musicfs-cli/src/metadata.rs +++ b/crates/musicfs-cli/src/metadata.rs @@ -387,6 +387,9 @@ async fn run_set( replaygain_track_peak: fields.replaygain_track_peak, replaygain_album_gain: fields.replaygain_album_gain, replaygain_album_peak: fields.replaygain_album_peak, + label: None, + album_type: None, + cover_url: None, custom_tags: fields.custom_tags, } } else { @@ -416,6 +419,9 @@ async fn run_set( replaygain_track_peak: None, replaygain_album_gain: None, replaygain_album_peak: None, + label: None, + album_type: None, + cover_url: None, custom_tags: HashMap::new(), } }; diff --git a/crates/musicfs-grpc/Cargo.toml b/crates/musicfs-grpc/Cargo.toml index f9742bb..341a08d 100644 --- a/crates/musicfs-grpc/Cargo.toml +++ b/crates/musicfs-grpc/Cargo.toml @@ -5,8 +5,11 @@ edition.workspace = true [dependencies] musicfs-cache = { path = "../musicfs-cache" } +musicfs-cas = { path = "../musicfs-cas" } +musicfs-metadata = { path = "../musicfs-metadata" } musicfs-search = { path = "../musicfs-search" } musicfs-core = { path = "../musicfs-core" } +parking_lot.workspace = true tonic.workspace = true prost.workspace = true tokio.workspace = true diff --git a/crates/musicfs-grpc/proto/musicfs.proto b/crates/musicfs-grpc/proto/musicfs.proto index 7d07dae..f4a01ff 100644 --- a/crates/musicfs-grpc/proto/musicfs.proto +++ b/crates/musicfs-grpc/proto/musicfs.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package musicfs.v1; +option go_package = "homelab.lan/music-agregator/gen/musicfs/v1;musicfsv1"; + service MusicFS { rpc Search(SearchRequest) returns (SearchResponse); rpc SearchStream(SearchRequest) returns (stream SearchResult); @@ -152,6 +154,10 @@ message OriginInfo { message OriginRequest { string origin_id = 1; + // Optional subdirectory to scope the scan (relative to origin root). + // If empty, scans the entire origin. + // Example: "Metallica - Master of Puppets (1986) [FLAC]" + optional string subdir = 2; } message OriginHealthResponse { @@ -167,6 +173,13 @@ message SyncProgress { uint32 total = 3; string current_path = 4; uint64 bytes_synced = 5; + repeated SyncedFile new_files = 6; +} + +message SyncedFile { + string path = 1; + int64 file_id = 2; + string virtual_path = 3; } message EventFilter { @@ -226,6 +239,9 @@ message MetadataResponse { optional uint32 channels = 34; optional uint32 bits_per_sample = 35; optional string encoder = 36; + optional string label = 40; + optional string album_type = 41; + optional string cover_url = 42; map custom_tags = 50; } @@ -255,6 +271,9 @@ message UpdateMetadataRequest { optional float replaygain_track_peak = 31; optional float replaygain_album_gain = 32; optional float replaygain_album_peak = 33; + optional string label = 40; + optional string album_type = 41; + optional string cover_url = 42; map custom_tags = 50; } diff --git a/crates/musicfs-grpc/src/lib.rs b/crates/musicfs-grpc/src/lib.rs index b480215..1fff46a 100644 --- a/crates/musicfs-grpc/src/lib.rs +++ b/crates/musicfs-grpc/src/lib.rs @@ -7,6 +7,7 @@ pub mod proto { } mod metadata; +pub mod scanner; mod search_service; mod server; mod webhook; diff --git a/crates/musicfs-grpc/src/metadata.rs b/crates/musicfs-grpc/src/metadata.rs index a679a40..3929372 100644 --- a/crates/musicfs-grpc/src/metadata.rs +++ b/crates/musicfs-grpc/src/metadata.rs @@ -5,7 +5,7 @@ use crate::proto::musicfs::v1::{ ClearOverlayRequest, ClearOverlayResponse, GetMetadataRequest, ImportMetadataRequest, ImportProgress, MetadataResponse, UpdateMetadataRequest, UpdateMetadataResponse, }; -use musicfs_cache::Database; +use musicfs_cache::{Database, EnrichmentUpdate}; use musicfs_core::{AudioMeta, FileId, VirtualPath}; use std::sync::Arc; use tokio::sync::mpsc; @@ -63,6 +63,9 @@ impl MetadataServiceImpl { channels: meta.channels, bits_per_sample: meta.bits_per_sample, encoder: meta.encoder.clone(), + label: None, + album_type: None, + cover_url: None, custom_tags: Default::default(), } } @@ -160,24 +163,40 @@ impl MetadataService for MetadataServiceImpl { let audio_meta = Self::request_to_audio_meta(&req); - match self.db.update_metadata(file_id, &audio_meta) { - Ok(()) => { - debug!(file_id = req.file_id, "Metadata updated successfully"); - Ok(Response::new(UpdateMetadataResponse { - file_id: req.file_id, - success: true, - error_message: None, - })) - } - Err(e) => { - warn!(file_id = req.file_id, error = %e, "Failed to update metadata"); - Ok(Response::new(UpdateMetadataResponse { + if let Err(e) = self.db.update_metadata(file_id, &audio_meta) { + warn!(file_id = req.file_id, error = %e, "Failed to update metadata"); + return Ok(Response::new(UpdateMetadataResponse { + file_id: req.file_id, + success: false, + error_message: Some(e.to_string()), + })); + } + + if req.label.is_some() || req.album_type.is_some() || req.cover_url.is_some() { + let enrichment = EnrichmentUpdate { + label: req.label.clone(), + album_type: req.album_type.clone(), + cover_url: req.cover_url.clone(), + genres_json: None, + primary_genre: None, + source: "orchestrator".to_string(), + }; + if let Err(e) = self.db.update_enrichment(file_id, &enrichment) { + warn!(file_id = req.file_id, error = %e, "Failed to update enrichment"); + return Ok(Response::new(UpdateMetadataResponse { file_id: req.file_id, success: false, error_message: Some(e.to_string()), - })) + })); } } + + debug!(file_id = req.file_id, "Metadata updated successfully"); + Ok(Response::new(UpdateMetadataResponse { + file_id: req.file_id, + success: true, + error_message: None, + })) } #[instrument(level = "info", skip(self, request), fields(method = "clear_overlay"))] @@ -239,7 +258,28 @@ impl MetadataService for MetadataServiceImpl { let error_message = if let Some(ref metadata_req) = item.metadata { let audio_meta = MetadataServiceImpl::request_to_audio_meta(metadata_req); match db.update_metadata(file_id, &audio_meta) { - Ok(()) => None, + Ok(()) => { + if metadata_req.label.is_some() + || metadata_req.album_type.is_some() + || metadata_req.cover_url.is_some() + { + let enrichment = EnrichmentUpdate { + label: metadata_req.label.clone(), + album_type: metadata_req.album_type.clone(), + cover_url: metadata_req.cover_url.clone(), + genres_json: None, + primary_genre: None, + source: "orchestrator".to_string(), + }; + if let Err(e) = db.update_enrichment(file_id, &enrichment) { + Some(e.to_string()) + } else { + None + } + } else { + None + } + } Err(e) => Some(e.to_string()), } } else { diff --git a/crates/musicfs-grpc/src/scanner.rs b/crates/musicfs-grpc/src/scanner.rs new file mode 100644 index 0000000..3695545 --- /dev/null +++ b/crates/musicfs-grpc/src/scanner.rs @@ -0,0 +1,261 @@ +use musicfs_cache::{Database, VirtualTree}; +use musicfs_cas::ContentFetcher; +use musicfs_core::{AudioMeta, Error, Event, EventBus, FileId, FileMeta, OriginId, RealPath, Result, VirtualPath}; +use musicfs_metadata::MetadataParser; +use parking_lot::RwLock; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::UNIX_EPOCH; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +pub struct ScanResult { + pub new_files: Vec, + pub changed: u32, + pub deleted: u32, + pub unchanged: u32, + pub bytes_synced: u64, +} + +pub struct SyncedFileInfo { + pub path: String, + pub file_id: FileId, + pub virtual_path: String, +} + +#[derive(Debug, Clone)] +pub struct ScanProgress { + pub phase: String, + pub current: u32, + pub total: u32, + pub current_path: String, + pub bytes_synced: u64, +} + +pub struct OriginScanner { + db: Arc, + event_bus: Arc, + tree: Arc>, + fetcher: Arc, + parser: MetadataParser, +} + +impl OriginScanner { + pub fn new( + db: Arc, + event_bus: Arc, + tree: Arc>, + fetcher: Arc, + ) -> Self { + Self { + db, + event_bus, + tree, + fetcher, + parser: MetadataParser, + } + } + + pub async fn scan( + &self, + origin_id: &OriginId, + origin_root: &Path, + subdir: Option<&str>, + progress_tx: mpsc::Sender, + ) -> Result { + let scan_root = match subdir { + Some(sub) if !sub.is_empty() => origin_root.join(sub), + _ => origin_root.to_path_buf(), + }; + + if !scan_root.exists() { + return Err(Error::Origin(format!( + "scan path does not exist: {}", + scan_root.display() + ))); + } + + // Phase 1: Scanning + let audio_files = self.collect_audio_files(&scan_root, &progress_tx)?; + let total_files = audio_files.len() as u32; + info!(files = total_files, "scan phase complete"); + + // Phase 2: Hashing + categorization + let mut new_files = Vec::new(); + let mut unchanged = 0u32; + + for (i, abs_path) in audio_files.iter().enumerate() { + let _ = progress_tx.try_send(ScanProgress { + phase: "hashing".to_string(), + current: i as u32 + 1, + total: total_files, + current_path: abs_path.display().to_string(), + bytes_synced: 0, + }); + + let rel_path = abs_path.strip_prefix(origin_root).unwrap_or(abs_path); + + let existing = self.db.get_file_by_real_path(origin_id, rel_path)?; + if existing.is_some() { + unchanged += 1; + continue; + } + + let size = std::fs::metadata(abs_path) + .map(|m| m.len()) + .unwrap_or(0); + + new_files.push(DiscoveredFile { + abs_path: abs_path.clone(), + rel_path: rel_path.to_path_buf(), + size, + }); + } + + info!( + new = new_files.len(), + unchanged = unchanged, + "hash phase complete" + ); + + // Phase 3: Indexing + let mut synced = Vec::new(); + let mut bytes_synced = 0u64; + let ingest_total = new_files.len() as u32; + + for (i, file) in new_files.iter().enumerate() { + let _ = progress_tx.try_send(ScanProgress { + phase: "indexing".to_string(), + current: i as u32 + 1, + total: ingest_total, + current_path: file.abs_path.display().to_string(), + bytes_synced, + }); + + let audio_meta = match self.parser.parse_file(&file.abs_path) { + Ok(meta) => meta, + Err(e) => { + warn!(path = %file.abs_path.display(), error = %e, "parse failed, using defaults"); + AudioMeta::default() + } + }; + + let virtual_path = derive_virtual_path(&audio_meta, &file.rel_path); + + let file_id = self.db.upsert_file( + origin_id, + &file.rel_path, + &virtual_path, + &audio_meta, + UNIX_EPOCH, + file.size, + )?; + + let file_meta = FileMeta { + id: file_id, + virtual_path: virtual_path.clone(), + real_path: RealPath { + origin_id: origin_id.clone(), + path: file.rel_path.clone(), + }, + size: file.size, + mtime: UNIX_EPOCH, + content_hash: None, + audio: Some(audio_meta), + }; + + { + let mut tree = self.tree.write(); + tree.insert_file(&file_meta); + } + + self.fetcher.register_file(file_meta.clone()); + + self.event_bus.publish(Event::FileAdded { + path: virtual_path.clone(), + origin_id: origin_id.clone(), + }); + + bytes_synced += file.size; + + synced.push(SyncedFileInfo { + path: file.abs_path.display().to_string(), + file_id, + virtual_path: virtual_path.as_str().to_string(), + }); + } + + Ok(ScanResult { + new_files: synced, + changed: 0, + deleted: 0, + unchanged, + bytes_synced, + }) + } + + fn collect_audio_files( + &self, + scan_root: &Path, + progress_tx: &mpsc::Sender, + ) -> Result> { + let mut files = Vec::new(); + self.walk_dir(scan_root, &mut files, progress_tx)?; + Ok(files) + } + + fn walk_dir( + &self, + dir: &Path, + files: &mut Vec, + progress_tx: &mpsc::Sender, + ) -> Result<()> { + let entries = std::fs::read_dir(dir) + .map_err(|e| Error::Origin(format!("read_dir {}: {}", dir.display(), e)))?; + + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + self.walk_dir(&path, files, progress_tx)?; + } else if is_audio_file(&path) { + files.push(path.clone()); + let _ = progress_tx.try_send(ScanProgress { + phase: "scanning".to_string(), + current: files.len() as u32, + total: 0, + current_path: path.display().to_string(), + bytes_synced: 0, + }); + } + } + + Ok(()) + } +} + +fn derive_virtual_path(meta: &AudioMeta, rel_path: &Path) -> VirtualPath { + let artist = meta.artist.as_deref().unwrap_or("Unknown Artist"); + let album = meta.album.as_deref().unwrap_or("Unknown Album"); + let filename = rel_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown"); + + VirtualPath::new(format!("/{}/{}/{}", artist, album, filename)) +} + +fn is_audio_file(path: &Path) -> bool { + matches!( + path.extension() + .and_then(|e| e.to_str()) + .map(|e| e.to_lowercase()) + .as_deref(), + Some("flac" | "mp3" | "ogg" | "wav" | "m4a" | "aac" | "opus") + ) +} + +struct DiscoveredFile { + abs_path: PathBuf, + rel_path: PathBuf, + size: u64, +} diff --git a/crates/musicfs-grpc/src/server.rs b/crates/musicfs-grpc/src/server.rs index f8bde00..1d5d1de 100644 --- a/crates/musicfs-grpc/src/server.rs +++ b/crates/musicfs-grpc/src/server.rs @@ -2,11 +2,11 @@ use crate::proto::musicfs::v1::{ music_fs_server::MusicFs, CacheStats, ClearCacheRequest, ClearCacheResponse, Empty, Event, EventFilter, HealthStatus, MountState, OriginHealthResponse, OriginRequest, OriginsResponse, PrefetchProgress, PrefetchRequest, SearchRequest, SearchResponse, SearchResult, - ShutdownRequest, StatusResponse, SyncProgress, TierStats, + ShutdownRequest, StatusResponse, SyncProgress, SyncedFile, TierStats, }; use musicfs_core::{Event as CoreEvent, EventBus}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -16,14 +16,30 @@ pub struct MusicFsServer { start_time: Instant, event_bus: Arc, version: String, + scanner: Arc, + origin_root: std::path::PathBuf, } impl MusicFsServer { - pub fn new(event_bus: Arc) -> Self { + pub fn new( + event_bus: Arc, + db: Arc, + tree: Arc>, + fetcher: Arc, + origin_root: std::path::PathBuf, + ) -> Self { + let scanner = Arc::new(crate::scanner::OriginScanner::new( + db, + event_bus.clone(), + tree, + fetcher, + )); Self { start_time: Instant::now(), event_bus, version: env!("CARGO_PKG_VERSION").to_string(), + scanner, + origin_root, } } @@ -368,24 +384,85 @@ impl MusicFs for MusicFsServer { request: Request, ) -> Result, Status> { let req = request.into_inner(); - info!(origin_id = %req.origin_id, "gRPC rescan_origin started"); + let subdir = req.subdir.as_deref().filter(|s| !s.is_empty()); + info!( + origin_id = %req.origin_id, + subdir = ?subdir, + "gRPC rescan_origin started" + ); let (tx, rx) = mpsc::channel(32); + let (progress_tx, mut progress_rx) = mpsc::channel::(64); + + let origin_id = musicfs_core::OriginId::from(req.origin_id.as_str()); + let scanner = self.scanner.clone(); + let origin_root = self.origin_root.clone(); + let subdir_owned = subdir.map(|s| s.to_string()); tokio::spawn(async move { - let phases = ["scanning", "indexing", "complete"]; - for (i, phase) in phases.iter().enumerate() { - let progress = SyncProgress { - phase: phase.to_string(), - current: i as u32 + 1, - total: phases.len() as u32, - current_path: String::new(), - bytes_synced: 0, - }; - if tx.send(Ok(progress)).await.is_err() { - break; + let forward_handle = { + let tx = tx.clone(); + tokio::spawn(async move { + while let Some(progress) = progress_rx.recv().await { + let proto = SyncProgress { + phase: progress.phase, + current: progress.current, + total: progress.total, + current_path: progress.current_path, + bytes_synced: progress.bytes_synced, + new_files: vec![], + }; + if tx.send(Ok(proto)).await.is_err() { + break; + } + } + }) + }; + + let result = scanner + .scan( + &origin_id, + &origin_root, + subdir_owned.as_deref(), + progress_tx, + ) + .await; + + forward_handle.abort(); + + match result { + Ok(scan_result) => { + let synced_files: Vec = scan_result + .new_files + .iter() + .map(|f| SyncedFile { + path: f.path.clone(), + file_id: f.file_id.0, + virtual_path: f.virtual_path.clone(), + }) + .collect(); + + let _ = tx + .send(Ok(SyncProgress { + phase: "complete".to_string(), + current: scan_result.new_files.len() as u32 + + scan_result.changed + + scan_result.deleted, + total: scan_result.new_files.len() as u32 + + scan_result.changed + + scan_result.deleted + + scan_result.unchanged, + current_path: String::new(), + bytes_synced: scan_result.bytes_synced, + new_files: synced_files, + })) + .await; + } + Err(e) => { + let _ = tx + .send(Err(Status::internal(format!("rescan failed: {}", e)))) + .await; } - tokio::time::sleep(Duration::from_millis(100)).await; } }); @@ -438,10 +515,26 @@ impl MusicFs for MusicFsServer { mod tests { use super::*; + async fn make_test_server() -> (MusicFsServer, tempfile::TempDir) { + let event_bus = Arc::new(EventBus::new(16)); + let db = Arc::new(musicfs_cache::Database::open_memory().unwrap()); + let tree = Arc::new(parking_lot::RwLock::new( + musicfs_cache::TreeBuilder::new().build(), + )); + let dir = tempfile::tempdir().unwrap(); + let cfg = musicfs_cas::CasConfig { + chunks_dir: dir.path().join("chunks"), + ..Default::default() + }; + let store = Arc::new(musicfs_cas::CasStore::open(cfg).await.unwrap()); + let fetcher = Arc::new(musicfs_cas::ContentFetcher::new(store)); + let origin_root = std::path::PathBuf::from("/tmp/test-origin"); + (MusicFsServer::new(event_bus, db, tree, fetcher, origin_root), dir) + } + #[tokio::test] async fn test_get_status() { - let event_bus = Arc::new(EventBus::new(16)); - let server = MusicFsServer::new(event_bus); + let (server, _dir) = make_test_server().await; let response = server.get_status(Request::new(Empty {})).await.unwrap(); let status = response.into_inner(); @@ -452,8 +545,7 @@ mod tests { #[tokio::test] async fn test_get_cache_stats() { - let event_bus = Arc::new(EventBus::new(16)); - let server = MusicFsServer::new(event_bus); + let (server, _dir) = make_test_server().await; let response = server .get_cache_stats(Request::new(Empty {}))