feat: add metadata enrichment integration with music-agregator

- Add SyncedFile message and subdir scoping to RescanOrigin proto
- Add label, album_type, cover_url fields to UpdateMetadataRequest/MetadataResponse
- Implement OriginScanner: walk, hash, diff, ingest with live FUSE tree and content fetcher registration
- Add enrichment DB columns: enrichment_source, enriched_at, enrichment_attempts, genres_json, label, album_type, cover_url
- Add EnrichmentUpdate struct and update_enrichment DB method
- Wire BatchUpdateMetadata to write enrichment fields alongside audio metadata
- Wire gRPC server into CLI mount command with --grpc-port flag
- Pass VirtualTree and ContentFetcher to scanner so rescanned files are immediately visible and readable via FUSE
This commit is contained in:
Alexander
2026-05-17 23:32:18 +02:00
parent 18024dbc62
commit b88583707d
12 changed files with 595 additions and 42 deletions
Generated
+3
View File
@@ -2050,8 +2050,11 @@ dependencies = [
"hex",
"hmac",
"musicfs-cache",
"musicfs-cas",
"musicfs-core",
"musicfs-metadata",
"musicfs-search",
"parking_lot 0.12.5",
"prost",
"reqwest",
"serde",
+78 -1
View File
@@ -786,6 +786,70 @@ impl Database {
Ok(())
}
pub fn update_enrichment(
&self,
file_id: FileId,
enrichment: &EnrichmentUpdate,
) -> Result<()> {
let conn = self.conn.lock().unwrap();
let mut set_clauses = vec![
"label = ?1".to_string(),
"album_type = ?2".to_string(),
"cover_url = ?3".to_string(),
"enrichment_source = ?4".to_string(),
"enriched_at = strftime('%s', 'now')".to_string(),
"enrichment_attempts = 0".to_string(),
"last_enrichment_error = NULL".to_string(),
];
let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = vec![
Box::new(enrichment.label.clone()),
Box::new(enrichment.album_type.clone()),
Box::new(enrichment.cover_url.clone()),
Box::new(enrichment.source.clone()),
];
if let Some(ref genres) = enrichment.genres_json {
params_vec.push(Box::new(genres.clone()));
set_clauses.push(format!("genres_json = ?{}", params_vec.len()));
}
if let Some(ref genre) = enrichment.primary_genre {
params_vec.push(Box::new(genre.clone()));
set_clauses.push(format!("genre = ?{}", params_vec.len()));
}
params_vec.push(Box::new(file_id.0));
let id_param = params_vec.len();
let sql = format!(
"UPDATE files SET {} WHERE id = ?{}",
set_clauses.join(", "),
id_param
);
let params_refs: Vec<&dyn rusqlite::ToSql> =
params_vec.iter().map(|p| p.as_ref()).collect();
let rows = conn
.execute(&sql, params_refs.as_slice())
.map_err(|e| Error::Database(format!("update_enrichment failed: {}", e)))?;
if rows == 0 {
return Err(Error::FileNotFound(format!(
"file id {} not found",
file_id.0
)));
}
debug!(
id = file_id.0,
source = &enrichment.source,
"updated enrichment metadata"
);
Ok(())
}
pub fn clear_overlay(&self, file_id: FileId) -> Result<()> {
let conn = self.conn.lock().unwrap();
@@ -802,7 +866,10 @@ impl Database {
mb_recording_id = NULL, mb_album_id = NULL, mb_artist_id = NULL, mb_album_artist_id = NULL, mb_release_group_id = NULL,
replaygain_track_gain = NULL, replaygain_track_peak = NULL, replaygain_album_gain = NULL, replaygain_album_peak = NULL,
channels = NULL, bits_per_sample = NULL, encoder = NULL,
custom_tags = NULL, format_layout = NULL
custom_tags = NULL, format_layout = NULL,
label = NULL, album_type = NULL, cover_url = NULL, genres_json = NULL,
enrichment_source = NULL, enriched_at = NULL,
enrichment_attempts = 0, last_enrichment_error = NULL
WHERE id = ?1
"#,
params![file_id.0],
@@ -948,6 +1015,16 @@ pub struct TrashedFile {
pub origin_id: OriginId,
}
#[derive(Debug, Clone, Default)]
pub struct EnrichmentUpdate {
pub label: Option<String>,
pub album_type: Option<String>,
pub cover_url: Option<String>,
pub genres_json: Option<String>,
pub primary_genre: Option<String>,
pub source: String,
}
#[derive(Debug, Clone, Default)]
pub struct TrashedFilter {
pub origin_id: Option<OriginId>,
+1 -1
View File
@@ -11,7 +11,7 @@ mod prefetch;
mod tree;
pub use artwork::{ArtworkCache, ArtworkError, CachedArtwork};
pub use db::{Database, TrashedFile, TrashedFilter};
pub use db::{Database, EnrichmentUpdate, TrashedFile, TrashedFilter};
pub use eviction::{EvictionError, EvictionPolicy, LruEviction};
pub use format_handler::{FormatError, FormatHandler, FormatHandlerRegistry};
pub use format_layout::FormatLayout;
+9
View File
@@ -46,6 +46,15 @@ CREATE TABLE IF NOT EXISTS files (
encoder TEXT,
custom_tags TEXT,
format_layout BLOB,
label TEXT,
album_type TEXT,
cover_url TEXT,
genres_json TEXT,
enrichment_source TEXT,
enriched_at INTEGER,
enrichment_attempts INTEGER NOT NULL DEFAULT 0,
last_enrichment_error TEXT,
origin_mtime INTEGER NOT NULL,
origin_size INTEGER NOT NULL,
+47 -5
View File
@@ -10,6 +10,7 @@ use musicfs_cache::{
use musicfs_cas::{CasConfig, CasStore, ContentFetcher, FileReader};
use musicfs_core::{FileId, FileMeta, LoggingConfig, OriginId, RealPath, VirtualPath};
use musicfs_fuse::MusicFs;
use musicfs_grpc::{MetadataServiceImpl, MusicFsServer as GrpcServer};
use musicfs_metadata::MetadataParser;
use musicfs_origins::{LocalOrigin, Origin};
use parking_lot::RwLock;
@@ -47,6 +48,8 @@ enum Commands {
origin: Option<PathBuf>,
#[arg(short = 'd', long, help = "Cache directory")]
cache_dir: Option<PathBuf>,
#[arg(long, default_value = "50052", help = "gRPC server port")]
grpc_port: u16,
},
Status,
Cache {
@@ -165,6 +168,7 @@ fn main() -> Result<()> {
mountpoint,
origin,
cache_dir,
grpc_port,
} => {
let mut config = if let Some(config_path) = config {
musicfs_core::Config::from_file(&config_path)?
@@ -213,7 +217,7 @@ fn main() -> Result<()> {
}
let _guard = init_logging(&config.logging)?;
run_mount(config)
run_mount(config, grpc_port)
}
Commands::Status => {
init_basic_logging(&cli.log_level);
@@ -259,11 +263,11 @@ fn run_metadata(endpoint: String, command: MetadataCommand) -> Result<()> {
runtime.block_on(metadata::run_metadata(command, &endpoint))
}
fn run_mount(config: musicfs_core::Config) -> Result<()> {
fn run_mount(config: musicfs_core::Config, grpc_port: u16) -> Result<()> {
let runtime = tokio::runtime::Runtime::new().context("Failed to create Tokio runtime")?;
let handle = runtime.handle().clone();
let (tree, reader, db, overlay_reader) = runtime.block_on(async {
let (tree, reader, db, overlay_reader, origin_root, fetcher) = runtime.block_on(async {
info!(mountpoint = ?config.mount_point, "Mount configuration");
info!("Cache directory: {:?}", config.cache_dir);
@@ -364,7 +368,7 @@ fn run_mount(config: musicfs_core::Config) -> Result<()> {
let tree = Arc::new(RwLock::new(tree));
let reader = Arc::new(FileReader::with_fetcher(store.clone(), fetcher));
let reader = Arc::new(FileReader::with_fetcher(store.clone(), fetcher.clone()));
// Create overlay reader for metadata synthesis
let overlay_reader = Arc::new(OverlayReader::new(
@@ -373,7 +377,15 @@ fn run_mount(config: musicfs_core::Config) -> Result<()> {
reader.clone(),
));
Ok::<_, anyhow::Error>((tree, reader, db, overlay_reader))
let first_origin_root = config
.origins
.iter()
.find(|o| o.enabled && o.origin_type == musicfs_core::OriginType::Local)
.and_then(|o| o.settings.get("path").and_then(|v| v.as_str()))
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/"));
Ok::<_, anyhow::Error>((tree, reader, db, overlay_reader, first_origin_root, fetcher))
})?;
check_stale_mount(&config.mount_point)?;
@@ -388,6 +400,8 @@ fn run_mount(config: musicfs_core::Config) -> Result<()> {
.context("Failed to write PID file")?;
info!(pid_path = ?pid_path, "PID file written");
let grpc_db = db.clone();
let tree_for_grpc = tree.clone();
let tree_for_restore = tree.clone();
let db_for_restore = db.clone();
@@ -411,6 +425,34 @@ fn run_mount(config: musicfs_core::Config) -> Result<()> {
let shutdown_token = tokio_util::sync::CancellationToken::new();
let event_bus = Arc::new(musicfs_core::EventBus::default());
let grpc_event_bus = event_bus.clone();
let grpc_origin_root = origin_root.clone();
let grpc_shutdown = shutdown_token.clone();
runtime.spawn(async move {
let addr = format!("0.0.0.0:{}", grpc_port).parse().unwrap();
let grpc_tree = tree_for_grpc.clone();
let grpc_fetcher = fetcher.clone();
let musicfs_server = GrpcServer::new(grpc_event_bus, grpc_db.clone(), grpc_tree, grpc_fetcher, grpc_origin_root);
let metadata_server = MetadataServiceImpl::new(grpc_db);
info!(%addr, "gRPC server starting");
let result = tonic::transport::Server::builder()
.add_service(musicfs_grpc::proto::musicfs::v1::music_fs_server::MusicFsServer::new(musicfs_server))
.add_service(musicfs_grpc::proto::musicfs::v1::metadata_service_server::MetadataServiceServer::new(metadata_server))
.serve_with_shutdown(addr, async move {
grpc_shutdown.cancelled().await;
})
.await;
if let Err(e) = result {
tracing::error!(error = %e, "gRPC server error");
}
});
runtime.block_on(async {
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
+6
View File
@@ -387,6 +387,9 @@ async fn run_set(
replaygain_track_peak: fields.replaygain_track_peak,
replaygain_album_gain: fields.replaygain_album_gain,
replaygain_album_peak: fields.replaygain_album_peak,
label: None,
album_type: None,
cover_url: None,
custom_tags: fields.custom_tags,
}
} else {
@@ -416,6 +419,9 @@ async fn run_set(
replaygain_track_peak: None,
replaygain_album_gain: None,
replaygain_album_peak: None,
label: None,
album_type: None,
cover_url: None,
custom_tags: HashMap::new(),
}
};
+3
View File
@@ -5,8 +5,11 @@ edition.workspace = true
[dependencies]
musicfs-cache = { path = "../musicfs-cache" }
musicfs-cas = { path = "../musicfs-cas" }
musicfs-metadata = { path = "../musicfs-metadata" }
musicfs-search = { path = "../musicfs-search" }
musicfs-core = { path = "../musicfs-core" }
parking_lot.workspace = true
tonic.workspace = true
prost.workspace = true
tokio.workspace = true
+19
View File
@@ -2,6 +2,8 @@ syntax = "proto3";
package musicfs.v1;
option go_package = "homelab.lan/music-agregator/gen/musicfs/v1;musicfsv1";
service MusicFS {
rpc Search(SearchRequest) returns (SearchResponse);
rpc SearchStream(SearchRequest) returns (stream SearchResult);
@@ -152,6 +154,10 @@ message OriginInfo {
message OriginRequest {
string origin_id = 1;
// Optional subdirectory to scope the scan (relative to origin root).
// If empty, scans the entire origin.
// Example: "Metallica - Master of Puppets (1986) [FLAC]"
optional string subdir = 2;
}
message OriginHealthResponse {
@@ -167,6 +173,13 @@ message SyncProgress {
uint32 total = 3;
string current_path = 4;
uint64 bytes_synced = 5;
repeated SyncedFile new_files = 6;
}
message SyncedFile {
string path = 1;
int64 file_id = 2;
string virtual_path = 3;
}
message EventFilter {
@@ -226,6 +239,9 @@ message MetadataResponse {
optional uint32 channels = 34;
optional uint32 bits_per_sample = 35;
optional string encoder = 36;
optional string label = 40;
optional string album_type = 41;
optional string cover_url = 42;
map<string, string> custom_tags = 50;
}
@@ -255,6 +271,9 @@ message UpdateMetadataRequest {
optional float replaygain_track_peak = 31;
optional float replaygain_album_gain = 32;
optional float replaygain_album_peak = 33;
optional string label = 40;
optional string album_type = 41;
optional string cover_url = 42;
map<string, string> custom_tags = 50;
}
+1
View File
@@ -7,6 +7,7 @@ pub mod proto {
}
mod metadata;
pub mod scanner;
mod search_service;
mod server;
mod webhook;
+55 -15
View File
@@ -5,7 +5,7 @@ use crate::proto::musicfs::v1::{
ClearOverlayRequest, ClearOverlayResponse, GetMetadataRequest, ImportMetadataRequest,
ImportProgress, MetadataResponse, UpdateMetadataRequest, UpdateMetadataResponse,
};
use musicfs_cache::Database;
use musicfs_cache::{Database, EnrichmentUpdate};
use musicfs_core::{AudioMeta, FileId, VirtualPath};
use std::sync::Arc;
use tokio::sync::mpsc;
@@ -63,6 +63,9 @@ impl MetadataServiceImpl {
channels: meta.channels,
bits_per_sample: meta.bits_per_sample,
encoder: meta.encoder.clone(),
label: None,
album_type: None,
cover_url: None,
custom_tags: Default::default(),
}
}
@@ -160,24 +163,40 @@ impl MetadataService for MetadataServiceImpl {
let audio_meta = Self::request_to_audio_meta(&req);
match self.db.update_metadata(file_id, &audio_meta) {
Ok(()) => {
debug!(file_id = req.file_id, "Metadata updated successfully");
Ok(Response::new(UpdateMetadataResponse {
file_id: req.file_id,
success: true,
error_message: None,
}))
}
Err(e) => {
warn!(file_id = req.file_id, error = %e, "Failed to update metadata");
Ok(Response::new(UpdateMetadataResponse {
if let Err(e) = self.db.update_metadata(file_id, &audio_meta) {
warn!(file_id = req.file_id, error = %e, "Failed to update metadata");
return Ok(Response::new(UpdateMetadataResponse {
file_id: req.file_id,
success: false,
error_message: Some(e.to_string()),
}));
}
if req.label.is_some() || req.album_type.is_some() || req.cover_url.is_some() {
let enrichment = EnrichmentUpdate {
label: req.label.clone(),
album_type: req.album_type.clone(),
cover_url: req.cover_url.clone(),
genres_json: None,
primary_genre: None,
source: "orchestrator".to_string(),
};
if let Err(e) = self.db.update_enrichment(file_id, &enrichment) {
warn!(file_id = req.file_id, error = %e, "Failed to update enrichment");
return Ok(Response::new(UpdateMetadataResponse {
file_id: req.file_id,
success: false,
error_message: Some(e.to_string()),
}))
}));
}
}
debug!(file_id = req.file_id, "Metadata updated successfully");
Ok(Response::new(UpdateMetadataResponse {
file_id: req.file_id,
success: true,
error_message: None,
}))
}
#[instrument(level = "info", skip(self, request), fields(method = "clear_overlay"))]
@@ -239,7 +258,28 @@ impl MetadataService for MetadataServiceImpl {
let error_message = if let Some(ref metadata_req) = item.metadata {
let audio_meta = MetadataServiceImpl::request_to_audio_meta(metadata_req);
match db.update_metadata(file_id, &audio_meta) {
Ok(()) => None,
Ok(()) => {
if metadata_req.label.is_some()
|| metadata_req.album_type.is_some()
|| metadata_req.cover_url.is_some()
{
let enrichment = EnrichmentUpdate {
label: metadata_req.label.clone(),
album_type: metadata_req.album_type.clone(),
cover_url: metadata_req.cover_url.clone(),
genres_json: None,
primary_genre: None,
source: "orchestrator".to_string(),
};
if let Err(e) = db.update_enrichment(file_id, &enrichment) {
Some(e.to_string())
} else {
None
}
} else {
None
}
}
Err(e) => Some(e.to_string()),
}
} else {
+261
View File
@@ -0,0 +1,261 @@
use musicfs_cache::{Database, VirtualTree};
use musicfs_cas::ContentFetcher;
use musicfs_core::{AudioMeta, Error, Event, EventBus, FileId, FileMeta, OriginId, RealPath, Result, VirtualPath};
use musicfs_metadata::MetadataParser;
use parking_lot::RwLock;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use tokio::sync::mpsc;
use tracing::{info, warn};
pub struct ScanResult {
pub new_files: Vec<SyncedFileInfo>,
pub changed: u32,
pub deleted: u32,
pub unchanged: u32,
pub bytes_synced: u64,
}
pub struct SyncedFileInfo {
pub path: String,
pub file_id: FileId,
pub virtual_path: String,
}
#[derive(Debug, Clone)]
pub struct ScanProgress {
pub phase: String,
pub current: u32,
pub total: u32,
pub current_path: String,
pub bytes_synced: u64,
}
pub struct OriginScanner {
db: Arc<Database>,
event_bus: Arc<EventBus>,
tree: Arc<RwLock<VirtualTree>>,
fetcher: Arc<ContentFetcher>,
parser: MetadataParser,
}
impl OriginScanner {
pub fn new(
db: Arc<Database>,
event_bus: Arc<EventBus>,
tree: Arc<RwLock<VirtualTree>>,
fetcher: Arc<ContentFetcher>,
) -> Self {
Self {
db,
event_bus,
tree,
fetcher,
parser: MetadataParser,
}
}
pub async fn scan(
&self,
origin_id: &OriginId,
origin_root: &Path,
subdir: Option<&str>,
progress_tx: mpsc::Sender<ScanProgress>,
) -> Result<ScanResult> {
let scan_root = match subdir {
Some(sub) if !sub.is_empty() => origin_root.join(sub),
_ => origin_root.to_path_buf(),
};
if !scan_root.exists() {
return Err(Error::Origin(format!(
"scan path does not exist: {}",
scan_root.display()
)));
}
// Phase 1: Scanning
let audio_files = self.collect_audio_files(&scan_root, &progress_tx)?;
let total_files = audio_files.len() as u32;
info!(files = total_files, "scan phase complete");
// Phase 2: Hashing + categorization
let mut new_files = Vec::new();
let mut unchanged = 0u32;
for (i, abs_path) in audio_files.iter().enumerate() {
let _ = progress_tx.try_send(ScanProgress {
phase: "hashing".to_string(),
current: i as u32 + 1,
total: total_files,
current_path: abs_path.display().to_string(),
bytes_synced: 0,
});
let rel_path = abs_path.strip_prefix(origin_root).unwrap_or(abs_path);
let existing = self.db.get_file_by_real_path(origin_id, rel_path)?;
if existing.is_some() {
unchanged += 1;
continue;
}
let size = std::fs::metadata(abs_path)
.map(|m| m.len())
.unwrap_or(0);
new_files.push(DiscoveredFile {
abs_path: abs_path.clone(),
rel_path: rel_path.to_path_buf(),
size,
});
}
info!(
new = new_files.len(),
unchanged = unchanged,
"hash phase complete"
);
// Phase 3: Indexing
let mut synced = Vec::new();
let mut bytes_synced = 0u64;
let ingest_total = new_files.len() as u32;
for (i, file) in new_files.iter().enumerate() {
let _ = progress_tx.try_send(ScanProgress {
phase: "indexing".to_string(),
current: i as u32 + 1,
total: ingest_total,
current_path: file.abs_path.display().to_string(),
bytes_synced,
});
let audio_meta = match self.parser.parse_file(&file.abs_path) {
Ok(meta) => meta,
Err(e) => {
warn!(path = %file.abs_path.display(), error = %e, "parse failed, using defaults");
AudioMeta::default()
}
};
let virtual_path = derive_virtual_path(&audio_meta, &file.rel_path);
let file_id = self.db.upsert_file(
origin_id,
&file.rel_path,
&virtual_path,
&audio_meta,
UNIX_EPOCH,
file.size,
)?;
let file_meta = FileMeta {
id: file_id,
virtual_path: virtual_path.clone(),
real_path: RealPath {
origin_id: origin_id.clone(),
path: file.rel_path.clone(),
},
size: file.size,
mtime: UNIX_EPOCH,
content_hash: None,
audio: Some(audio_meta),
};
{
let mut tree = self.tree.write();
tree.insert_file(&file_meta);
}
self.fetcher.register_file(file_meta.clone());
self.event_bus.publish(Event::FileAdded {
path: virtual_path.clone(),
origin_id: origin_id.clone(),
});
bytes_synced += file.size;
synced.push(SyncedFileInfo {
path: file.abs_path.display().to_string(),
file_id,
virtual_path: virtual_path.as_str().to_string(),
});
}
Ok(ScanResult {
new_files: synced,
changed: 0,
deleted: 0,
unchanged,
bytes_synced,
})
}
fn collect_audio_files(
&self,
scan_root: &Path,
progress_tx: &mpsc::Sender<ScanProgress>,
) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
self.walk_dir(scan_root, &mut files, progress_tx)?;
Ok(files)
}
fn walk_dir(
&self,
dir: &Path,
files: &mut Vec<PathBuf>,
progress_tx: &mpsc::Sender<ScanProgress>,
) -> Result<()> {
let entries = std::fs::read_dir(dir)
.map_err(|e| Error::Origin(format!("read_dir {}: {}", dir.display(), e)))?;
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
self.walk_dir(&path, files, progress_tx)?;
} else if is_audio_file(&path) {
files.push(path.clone());
let _ = progress_tx.try_send(ScanProgress {
phase: "scanning".to_string(),
current: files.len() as u32,
total: 0,
current_path: path.display().to_string(),
bytes_synced: 0,
});
}
}
Ok(())
}
}
fn derive_virtual_path(meta: &AudioMeta, rel_path: &Path) -> VirtualPath {
let artist = meta.artist.as_deref().unwrap_or("Unknown Artist");
let album = meta.album.as_deref().unwrap_or("Unknown Album");
let filename = rel_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
VirtualPath::new(format!("/{}/{}/{}", artist, album, filename))
}
fn is_audio_file(path: &Path) -> bool {
matches!(
path.extension()
.and_then(|e| e.to_str())
.map(|e| e.to_lowercase())
.as_deref(),
Some("flac" | "mp3" | "ogg" | "wav" | "m4a" | "aac" | "opus")
)
}
struct DiscoveredFile {
abs_path: PathBuf,
rel_path: PathBuf,
size: u64,
}
+112 -20
View File
@@ -2,11 +2,11 @@ use crate::proto::musicfs::v1::{
music_fs_server::MusicFs, CacheStats, ClearCacheRequest, ClearCacheResponse, Empty, Event,
EventFilter, HealthStatus, MountState, OriginHealthResponse, OriginRequest, OriginsResponse,
PrefetchProgress, PrefetchRequest, SearchRequest, SearchResponse, SearchResult,
ShutdownRequest, StatusResponse, SyncProgress, TierStats,
ShutdownRequest, StatusResponse, SyncProgress, SyncedFile, TierStats,
};
use musicfs_core::{Event as CoreEvent, EventBus};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
@@ -16,14 +16,30 @@ pub struct MusicFsServer {
start_time: Instant,
event_bus: Arc<EventBus>,
version: String,
scanner: Arc<crate::scanner::OriginScanner>,
origin_root: std::path::PathBuf,
}
impl MusicFsServer {
pub fn new(event_bus: Arc<EventBus>) -> Self {
pub fn new(
event_bus: Arc<EventBus>,
db: Arc<musicfs_cache::Database>,
tree: Arc<parking_lot::RwLock<musicfs_cache::VirtualTree>>,
fetcher: Arc<musicfs_cas::ContentFetcher>,
origin_root: std::path::PathBuf,
) -> Self {
let scanner = Arc::new(crate::scanner::OriginScanner::new(
db,
event_bus.clone(),
tree,
fetcher,
));
Self {
start_time: Instant::now(),
event_bus,
version: env!("CARGO_PKG_VERSION").to_string(),
scanner,
origin_root,
}
}
@@ -368,24 +384,85 @@ impl MusicFs for MusicFsServer {
request: Request<OriginRequest>,
) -> Result<Response<Self::RescanOriginStream>, Status> {
let req = request.into_inner();
info!(origin_id = %req.origin_id, "gRPC rescan_origin started");
let subdir = req.subdir.as_deref().filter(|s| !s.is_empty());
info!(
origin_id = %req.origin_id,
subdir = ?subdir,
"gRPC rescan_origin started"
);
let (tx, rx) = mpsc::channel(32);
let (progress_tx, mut progress_rx) = mpsc::channel::<crate::scanner::ScanProgress>(64);
let origin_id = musicfs_core::OriginId::from(req.origin_id.as_str());
let scanner = self.scanner.clone();
let origin_root = self.origin_root.clone();
let subdir_owned = subdir.map(|s| s.to_string());
tokio::spawn(async move {
let phases = ["scanning", "indexing", "complete"];
for (i, phase) in phases.iter().enumerate() {
let progress = SyncProgress {
phase: phase.to_string(),
current: i as u32 + 1,
total: phases.len() as u32,
current_path: String::new(),
bytes_synced: 0,
};
if tx.send(Ok(progress)).await.is_err() {
break;
let forward_handle = {
let tx = tx.clone();
tokio::spawn(async move {
while let Some(progress) = progress_rx.recv().await {
let proto = SyncProgress {
phase: progress.phase,
current: progress.current,
total: progress.total,
current_path: progress.current_path,
bytes_synced: progress.bytes_synced,
new_files: vec![],
};
if tx.send(Ok(proto)).await.is_err() {
break;
}
}
})
};
let result = scanner
.scan(
&origin_id,
&origin_root,
subdir_owned.as_deref(),
progress_tx,
)
.await;
forward_handle.abort();
match result {
Ok(scan_result) => {
let synced_files: Vec<SyncedFile> = scan_result
.new_files
.iter()
.map(|f| SyncedFile {
path: f.path.clone(),
file_id: f.file_id.0,
virtual_path: f.virtual_path.clone(),
})
.collect();
let _ = tx
.send(Ok(SyncProgress {
phase: "complete".to_string(),
current: scan_result.new_files.len() as u32
+ scan_result.changed
+ scan_result.deleted,
total: scan_result.new_files.len() as u32
+ scan_result.changed
+ scan_result.deleted
+ scan_result.unchanged,
current_path: String::new(),
bytes_synced: scan_result.bytes_synced,
new_files: synced_files,
}))
.await;
}
Err(e) => {
let _ = tx
.send(Err(Status::internal(format!("rescan failed: {}", e))))
.await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
@@ -438,10 +515,26 @@ impl MusicFs for MusicFsServer {
mod tests {
use super::*;
async fn make_test_server() -> (MusicFsServer, tempfile::TempDir) {
let event_bus = Arc::new(EventBus::new(16));
let db = Arc::new(musicfs_cache::Database::open_memory().unwrap());
let tree = Arc::new(parking_lot::RwLock::new(
musicfs_cache::TreeBuilder::new().build(),
));
let dir = tempfile::tempdir().unwrap();
let cfg = musicfs_cas::CasConfig {
chunks_dir: dir.path().join("chunks"),
..Default::default()
};
let store = Arc::new(musicfs_cas::CasStore::open(cfg).await.unwrap());
let fetcher = Arc::new(musicfs_cas::ContentFetcher::new(store));
let origin_root = std::path::PathBuf::from("/tmp/test-origin");
(MusicFsServer::new(event_bus, db, tree, fetcher, origin_root), dir)
}
#[tokio::test]
async fn test_get_status() {
let event_bus = Arc::new(EventBus::new(16));
let server = MusicFsServer::new(event_bus);
let (server, _dir) = make_test_server().await;
let response = server.get_status(Request::new(Empty {})).await.unwrap();
let status = response.into_inner();
@@ -452,8 +545,7 @@ mod tests {
#[tokio::test]
async fn test_get_cache_stats() {
let event_bus = Arc::new(EventBus::new(16));
let server = MusicFsServer::new(event_bus);
let (server, _dir) = make_test_server().await;
let response = server
.get_cache_stats(Request::new(Empty {}))