From 5ac33987c0a825753de280472472fd26d603471d Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 13 May 2026 11:21:51 +0200 Subject: [PATCH] Add comprehensive logging with tracing, file rotation, and systemd integration - Add tracing-appender and tracing-journald for production logging - Add LoggingConfig with trace_sample_rate, json_output, journald options - Expand init_logging() with file rotation, journald, and stderr layers - Add sanitize_path() helper for PII protection in logs - Instrument FUSE operations with #[instrument] and trace decision points - Instrument gRPC handlers (10 methods) with span correlation - Add spawn instrumentation for health monitor, indexer, watcher tasks - Add broadcast lag handling (RecvError::Lagged) in event subscribers - Fix webhook.rs expect() calls with proper error handling - Add logging to patterns.rs, collections.rs, artwork.rs database ops - Add Drop impl logging for PluginManager and WatchHandle - Update systemd service with rate limiting and journal output - Add logrotate config and example config.toml with logging section --- docs/v2/plans/logging-comprehensive.md | 982 ++++++++++++++++++ musicfs/Cargo.lock | 95 +- musicfs/Cargo.toml | 4 +- musicfs/crates/musicfs-cache/src/artwork.rs | 12 +- musicfs/crates/musicfs-cache/src/db.rs | 9 +- musicfs/crates/musicfs-cache/src/metadata.rs | 11 +- musicfs/crates/musicfs-cache/src/patterns.rs | 15 +- musicfs/crates/musicfs-cache/src/tree.rs | 9 +- musicfs/crates/musicfs-cas/src/reader.rs | 7 + musicfs/crates/musicfs-cas/src/store.rs | 9 +- musicfs/crates/musicfs-cli/Cargo.toml | 4 + musicfs/crates/musicfs-cli/src/main.rs | 99 +- musicfs/crates/musicfs-core/Cargo.toml | 1 + musicfs/crates/musicfs-core/src/config.rs | 49 + .../crates/musicfs-core/src/credentials.rs | 28 +- musicfs/crates/musicfs-core/src/events.rs | 7 +- musicfs/crates/musicfs-core/src/lib.rs | 14 +- musicfs/crates/musicfs-fuse/src/filesystem.rs | 47 +- musicfs/crates/musicfs-grpc/Cargo.toml | 1 + musicfs/crates/musicfs-grpc/src/server.rs | 51 +- musicfs/crates/musicfs-grpc/src/webhook.rs | 61 +- .../crates/musicfs-origins/src/failover.rs | 36 +- musicfs/crates/musicfs-origins/src/health.rs | 64 +- musicfs/crates/musicfs-origins/src/router.rs | 40 +- musicfs/crates/musicfs-plugins/src/manager.rs | 1 + .../crates/musicfs-search/src/collections.rs | 6 +- musicfs/crates/musicfs-search/src/indexer.rs | 65 +- musicfs/crates/musicfs-sync/src/delta.rs | 21 +- musicfs/crates/musicfs-sync/src/watcher.rs | 27 +- musicfs/dist/config.example.toml | 30 + musicfs/dist/logrotate.d/musicfs | 9 + musicfs/dist/musicfs.service | 9 +- 32 files changed, 1646 insertions(+), 177 deletions(-) create mode 100644 docs/v2/plans/logging-comprehensive.md create mode 100644 musicfs/dist/config.example.toml create mode 100644 musicfs/dist/logrotate.d/musicfs diff --git a/docs/v2/plans/logging-comprehensive.md b/docs/v2/plans/logging-comprehensive.md new file mode 100644 index 0000000..0d9855c --- /dev/null +++ b/docs/v2/plans/logging-comprehensive.md @@ -0,0 +1,982 @@ +# Comprehensive Logging Plan + +**Goal**: Add production-grade logging with trace-level observability, file rotation, and systemd integration +**Effort**: ~10-12 hours +**Dependencies**: Existing libraries only (no custom code) + +> **Review Status**: Reviewed by Oracle - all gaps addressed + +--- + +## Libraries Used + +| Need | Library | Status | +|------|---------|--------| +| Instrumentation | `tracing` | Already in workspace | +| Subscriber/filtering | `tracing-subscriber` | Already in workspace | +| File rotation | `tracing-appender` | Add to workspace | +| systemd journal | `tracing-journald` | Add to workspace | +| Compression | `logrotate` (Linux tool) | Config file only | + +--- + +## Phase 1: Config & Dependencies (2 hours) + +### 1.1 Add dependencies to workspace + +```toml +# Cargo.toml [workspace.dependencies] +tracing-appender = "0.2" +tracing-journald = "0.3" +``` + +```toml +# crates/musicfs-cli/Cargo.toml +tracing-appender.workspace = true +tracing-journald.workspace = true +``` + +### 1.2 Add LoggingConfig to config.rs + +```rust +// crates/musicfs-core/src/config.rs + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub mount_point: PathBuf, + pub cache_dir: PathBuf, + pub origins: Vec, + #[serde(default)] + pub cache: CacheConfig, + #[serde(default)] + pub health: HealthConfig, + #[serde(default)] + pub logging: LoggingConfig, // NEW +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LoggingConfig { + #[serde(default = "default_log_dir")] + pub log_dir: PathBuf, + + #[serde(default)] + pub json_output: bool, + + #[serde(default = "default_true")] + pub journald: bool, + + #[serde(default = "default_log_level")] + pub level: String, +} + +impl Default for LoggingConfig { + fn default() -> Self { + Self { + log_dir: default_log_dir(), + json_output: false, + journald: true, + level: default_log_level(), + } + } +} + +fn default_log_dir() -> PathBuf { + PathBuf::from("/var/log/musicfs") +} +fn default_log_level() -> String { + "musicfs=info,warn".to_string() +} +fn default_true() -> bool { + true +} +``` + +### 1.3 Expand init_logging() in main.rs + +```rust +// crates/musicfs-cli/src/main.rs + +use tracing_appender::non_blocking::WorkerGuard; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +fn init_logging(config: &LoggingConfig) -> Result { + std::fs::create_dir_all(&config.log_dir)?; + + // File layer with daily rotation + let file_appender = tracing_appender::rolling::daily(&config.log_dir, "musicfs.log"); + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + + let file_layer = if config.json_output { + fmt::layer() + .json() + .with_writer(non_blocking) + .with_ansi(false) + .boxed() + } else { + fmt::layer() + .with_writer(non_blocking) + .with_ansi(false) + .boxed() + }; + + // Journald layer (Linux only) + #[cfg(target_os = "linux")] + let journald_layer = if config.journald { + tracing_journald::layer() + .ok() + .map(|l| l.with_syslog_identifier("musicfs".to_string())) + } else { + None + }; + + // Stderr layer for interactive use + let stderr_layer = fmt::layer() + .with_writer(std::io::stderr) + .compact(); + + // Filter from config or env + let filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new(&config.level)); + + // Compose + let subscriber = tracing_subscriber::registry() + .with(filter) + .with(file_layer) + .with(stderr_layer); + + #[cfg(target_os = "linux")] + let subscriber = subscriber.with(journald_layer); + + subscriber.init(); + + tracing::info!(version = env!("CARGO_PKG_VERSION"), "MusicFS starting"); + Ok(guard) +} +``` + +### 1.4 Add logrotate config + +```bash +# dist/logrotate.d/musicfs +/var/log/musicfs/*.log { + daily + rotate 30 + compress + delaycompress + missingok + notifempty + create 0640 musicfs musicfs +} +``` + +--- + +## Phase 2: Add tracing to musicfs-core (1 hour) + +### 2.1 Add dependency + +```toml +# crates/musicfs-core/Cargo.toml +[dependencies] +tracing.workspace = true # ADD THIS +``` + +### 2.2 Instrument core modules + +| File | What to Add | +|------|-------------| +| `config.rs` | Log config file loading, parse errors | +| `credentials.rs` | Log credential loading (redacted values) | +| `events.rs` | Log event publishing with counts | + +--- + +## Phase 3: Instrument Hot Paths (4 hours) + +### Priority order by impact + +| Crate | Files | What to Add | +|-------|-------|-------------| +| musicfs-fuse | `filesystem.rs` | `#[instrument]` on all FUSE ops, trace at decision points | +| musicfs-origins | `failover.rs`, `health.rs`, `router.rs` | Retry loops, state transitions, selection logic | +| musicfs-cache | `tree.rs`, `metadata.rs` | Tree mutations, cache hit/miss | +| musicfs-cas | `reader.rs`, `store.rs` | Chunk operations, dedup decisions | +| musicfs-sync | `delta.rs`, `watcher.rs` | Change detection, file events | + +### Instrumentation patterns + +```rust +// Function level - add to all public async functions +#[tracing::instrument(level = "debug", skip(self), fields(path = %path))] +pub async fn read(&self, path: &str) -> Result { + // ... +} + +// Decision points - add trace! at match/if branches +match result { + Ok(data) => { + tracing::trace!(bytes = data.len(), "read success"); + data + } + Err(e) => { + tracing::trace!(error = %e, "read failed"); + return Err(e); + } +} + +// State changes - use info! for important transitions +tracing::info!(old = ?old_status, new = ?new_status, origin = %id, "health changed"); + +// Cache operations +tracing::trace!(hit = true, fresh = true, "cache hit"); +tracing::trace!(hit = false, "cache miss"); +``` + +### FUSE operations (filesystem.rs) - highest priority + +| Operation | Level | Fields | +|-----------|-------|--------| +| `lookup()` | debug | parent, name, result_ino | +| `getattr()` | debug | ino, file_type | +| `readdir()` | debug | ino, entry_count | +| `read()` | debug | ino, offset, size, bytes_read | +| `open()` | debug | ino, flags | +| `release()` | trace | ino | + +### Origin operations - critical for debugging + +| Function | Level | Fields | +|----------|-------|--------| +| `read_with_failover()` | debug | path, origins_tried, success | +| `read_with_retry()` | trace | origin, attempt, success | +| `check_health()` | debug | origin, old_status, new_status | +| `select_origin()` | trace | candidates, selected, reason | + +--- + +## Phase 4: Update Production Files (1 hour) + +### 4.1 Update systemd service + +```ini +# dist/musicfs.service (add these lines) +Environment="RUST_LOG=musicfs=info,warn" +StandardOutput=journal +StandardError=journal +SyslogIdentifier=musicfs +RateLimitIntervalSec=30s +RateLimitBurst=1000 +``` + +### 4.2 Example config.toml + +```toml +# dist/config.example.toml +mount_point = "/mnt/music" +cache_dir = "/var/cache/musicfs" + +[logging] +log_dir = "/var/log/musicfs" +json_output = true +journald = true +level = "musicfs=info,warn" + +[cache] +metadata_cache_mb = 100 +content_cache_gb = 10 + +[health] +check_interval_secs = 30 +timeout_ms = 5000 + +[[origins]] +id = "local" +origin_type = "local" +priority = 1 +path = "/srv/music" +``` + +--- + +## Detailed Log Locations by Level + +### ERROR Level (25+ locations) - Unrecoverable Failures + +| File | Line | Log Message | +|------|------|-------------| +| `musicfs-grpc/src/webhook.rs` | 43 | `error!("Failed to initialize webhook HTTP client: {error}")` | +| `musicfs-grpc/src/webhook.rs` | 133 | `error!("Invalid HMAC secret key for webhook signature: {error}")` | +| `musicfs-plugins/src/manager.rs` | 272 | `error!("Plugin manager initialization failed: {error}")` | +| `musicfs-plugins/src/wasm.rs` | 142,183 | `error!("WASM plugin host initialization failed: {error}")` | +| `musicfs-search/src/index.rs` | 211,217 | `error!("Search index corrupted: failed to deserialize at position {pos}")` | +| `musicfs-cas/src/store.rs` | 105 | `error!("CAS chunk not found: {hash} - possible data loss")` | +| `musicfs-cas/src/store.rs` | 124-131 | `error!("CAS integrity check failed: expected {expected}, got {actual}")` | +| `musicfs-fuse/src/filesystem.rs` | 103 | `error!("Failed to mount filesystem at {mountpoint}: {error}")` | +| `musicfs-origins/src/failover.rs` | 76 | `error!("No origins available for path {path}")` | +| `musicfs-origins/src/failover.rs` | 125,186 | `error!("Max retries ({max_attempts}) exceeded for origin {origin_id}")` | +| `musicfs-origins/src/nfs.rs` | 63 | `error!("NFS stale file handle after {max_retries} retries for {path}")` | +| `musicfs-cas/src/reader.rs` | 75 | `error!("File manifest not found for file_id {file_id}")` | +| `musicfs-cas/src/fetcher.rs` | 60,68 | `error!("File/Origin not found for file_id {file_id}")` | +| `musicfs-search/src/indexer.rs` | 44,56 | `error!("Search indexer/commit failed: {error}")` | +| `musicfs-sync/src/watcher.rs` | 36,59,63 | `error!("Watcher failed for origin {origin_id}: {error}")` | + +### WARN Level (50+ locations) - Recoverable Issues + +| Category | File | Line | Log Message | +|----------|------|------|-------------| +| **Retry Logic** | `failover.rs` | 90 | `warn!("Origin {origin_id} failed: {error}, trying next (attempt {n}/{total})")` | +| **Retry Logic** | `failover.rs` | 111-118 | `warn!("Retrying origin {origin_id} after {delay:?} (attempt {n}/{max})")` | +| **Retry Logic** | `nfs.rs` | 47-52 | `warn!("NFS stale handle for {path} (attempt {n}/{max}), retrying")` | +| **Retry Logic** | `smb.rs` | 45 | `warn!("SMB connection lost (ENOTCONN), retrying (attempt {n}/{max})")` | +| **Retry Logic** | `webhook.rs` | 94-108 | `warn!("Webhook delivery failed to {url} (attempt {n}/{max}): {error}")` | +| **Fallback** | `failover.rs` | 70-73 | `warn!("No healthy origins for {path}, using fallback {origin_id}")` | +| **Timeout** | `smb.rs` | 107-109 | `warn!("SMB health check timed out after 5s for {origin_id}")` | +| **Timeout** | `nfs.rs` | 104-106 | `warn!("NFS health check timed out after 5s for {origin_id}")` | +| **Timeout** | `prefetch.rs` | 91 | `warn!("Prefetch event receive timed out after 1s")` | +| **Health** | `health.rs` | 209 | `warn!("Origin {origin_id} is degraded (failures: {count})")` | +| **Health** | `health.rs` | 217-220 | `warn!("Origin {origin_id} is now unhealthy after {n} consecutive failures")` | +| **Remote FS** | `smb.rs` | 118 | `warn!("SMB watch using inotify on {share_path} - may be unreliable")` | +| **Remote FS** | `nfs.rs` | 115 | `warn!("NFS watch using inotify on {mount_point} - may be unreliable")` | +| **Plugin** | `manager.rs` | 152 | `warn!("Failed to load plugin from {path}: {error}")` | +| **Plugin** | `manager.rs` | 193-194 | `warn!("Failed to unload plugin {plugin_id}: {error}")` | +| **Prefetch** | `prefetch.rs` | 97 | `warn!("Failed to record access pattern for {file_id}: {error}")` | +| **Prefetch** | `prefetch.rs` | 159-161 | `warn!("Prefetch skipped: concurrency limit reached ({max})")` | +| **Search** | `indexer.rs` | 49 | `warn!("Search indexer event receive error: {error}")` | +| **Search** | `indexer.rs` | 82 | `warn!("No metadata found for file {path}, skipping indexing")` | +| **Collections** | `collections.rs` | 146,180 | `warn!("Failed to save/delete collection {name}: {error}")` | + +### INFO Level (35+ locations) - Lifecycle & Major Operations + +| Category | File | Line | Log Message | +|----------|------|------|-------------| +| **Lifecycle** | `main.rs` | 118 | `info!(version = env!("CARGO_PKG_VERSION"), "MusicFS starting")` | +| **Lifecycle** | `filesystem.rs` | 94 | `info!("Mounting MusicFS at {:?}", mountpoint)` | +| **Lifecycle** | `filesystem.rs` | 154 | `info!("MusicFS initialized")` | +| **Lifecycle** | `filesystem.rs` | 159 | `info!("MusicFS destroyed")` | +| **Origin** | `registry.rs` | 28 | `info!("Registering origin {} with priority {}", id, priority)` | +| **Origin** | `registry.rs` | 36 | `info!("Unregistering origin {}", id)` | +| **Origin** | `watcher.rs` | 65 | `info!("Watching origin {} at {:?}", origin_id, path)` | +| **Config** | `main.rs` | 127 | `info!("Cache directory: {:?}", cache_dir)` | +| **Config** | `main.rs` | 141 | `info!("CAS store initialized")` | +| **Config** | `store.rs` | 51 | `info!("CAS store opened: {} chunks, {} bytes", count, size)` (ADD) | +| **Sync** | `main.rs` | 150,152 | `info!("Scanning music files...")` / `info!("Found {} music files", count)` | +| **Sync** | `delta.rs` | 104 | `info!("Delta complete: {} added, {} removed, {} modified", a, r, m)` | +| **Sync** | `delta.rs` | 63 | `info!("Sync started for origin {}", origin_id)` (ADD) | +| **Index** | `main.rs` | 160 | `info!("Virtual tree built")` | +| **Index** | `indexer.rs` | 62 | `info!("Indexer stopping")` | +| **Index** | `indexer.rs` | 114 | `info!("Indexed {} files", count)` | +| **Index** | `index.rs` | 170 | `info!("Search index committed")` | +| **Health** | `health.rs` | 202 | `info!("Origin {} is now healthy", id)` | +| **Health** | `health.rs` | 150 | `info!("Health monitor started with interval {:?}", interval)` (ADD) | +| **Plugin** | `manager.rs` | 127 | `info!("Initializing plugin system")` | +| **Plugin** | `manager.rs` | 150 | `info!("Loaded plugin '{}' with id {:?}", name, id)` | +| **Plugin** | `manager.rs` | 256 | `info!("Shutting down plugin system")` | +| **Cache** | `prefetch.rs` | 123 | `info!("Prefetch engine stopped")` | +| **Cache** | `prefetch.rs` | 174 | `info!("Prefetched {:?}: {} chunks, {} bytes", file_id, chunks, bytes)` | +| **Cache** | `eviction.rs` | 51 | `info!("Evicted {} bytes from cache", bytes)` | +| **Cache** | `prefetch.rs` | 73 | `info!("Prefetch engine started (lookahead: {}, max_concurrent: {})")` (ADD) | + +### DEBUG Level (60+ locations) - Operation Details + +| Category | File | Line | Log Message | +|----------|------|------|-------------| +| **FUSE lookup** | `filesystem.rs` | 162,195,200 | Entry + result/miss | +| **FUSE getattr** | `filesystem.rs` | 203,230,233 | Entry + result/miss | +| **FUSE readdir** | `filesystem.rs` | 237,263,303 | Entry + result/miss | +| **FUSE read** | `filesystem.rs` | 325,338,362,364 | Entry + file_id + result/error | +| **Local origin** | `local.rs` | 51,68 | readdir entry + result | +| **Local origin** | `local.rs` | 88-91,112 | read entry + result | +| **SMB origin** | `smb.rs` | 86,93 | readdir/read entry + result | +| **NFS origin** | `nfs.rs` | 81,89 | readdir/read entry + result | +| **Failover** | `failover.rs` | 66,82,87 | Entry + trying origin + success | +| **Tree lookup** | `tree.rs` | 124,132 | Entry + result | +| **Metadata cache** | `metadata.rs` | 36,40 | lookup + is_fresh entry/result | +| **CAS store** | `store.rs` | 70,101 | put/get entry | +| **File reader** | `reader.rs` | 66,86 | manifest cache + read entry | +| **Search** | `ops/search.rs` | 107,141,182 | readdir_query + readlink + execute_query | +| **Search index** | `index.rs` | 98,174 | index_file + search entry | +| **Fetcher** | `fetcher.rs` | 54,61,121 | fetch_file entry + meta + ensure_cached | + +**Key DEBUG fields**: `ino`, `parent`, `name`, `offset`, `size`, `bytes_read`, `origin_id`, `path`, `file_id`, `query`, `results_count`, `latency_ms` + +### TRACE Level (100+ locations) - Fine-Grained Flow + +| Category | File | Lines | What to Log | +|----------|------|-------|-------------| +| **Manifest cache** | `reader.rs` | 67-74 | Cache hit/miss decision | +| **Chunk iteration** | `reader.rs` | 107-127 | Each chunk: skip/read boundaries | +| **CAS dedup** | `store.rs` | 74-77 | Dedup hit decision | +| **CAS integrity** | `store.rs` | 121-134 | Verification result | +| **Tree lookup** | `tree.rs` | 118-129 | Path→inode + child lookup | +| **Tree parent** | `tree.rs` | 148-153 | Parent resolution path | +| **Prefetch event** | `prefetch.rs` | 91-120 | Event type match arms | +| **Prefetch semaphore** | `prefetch.rs` | 150-164 | In-flight check + acquire | +| **Delta scan** | `delta.rs` | 79-102 | Each file: cached/modified/unchanged/removed | +| **Delta entries** | `delta.rs` | 128-146 | Each entry: dir/audio/skip | +| **CDC chunking** | `cdc.rs` | 84-93 | Each chunk: offset/length/hash | +| **Failover origin** | `failover.rs` | 68-93 | Each origin attempt result | +| **Failover retry** | `failover.rs` | 107-122 | Each retry: attempt/success/delay | +| **Router select** | `router.rs` | 79-108 | Each candidate + selection reason | +| **FUSE node→attr** | `filesystem.rs` | 109-145 | Directory vs file conversion | +| **FUSE lookup** | `filesystem.rs` | 192-200 | Found/not found | +| **FUSE readdir** | `filesystem.rs` | 274-291 | Each child entry | +| **FUSE read** | `filesystem.rs` | 340-367 | file_id resolution + result | +| **Metadata tag** | `parser.rs` | 86-100 | Each tag extraction | +| **Health transition** | `health.rs` | 199-237 | State transition details | +| **Latency recording** | `router.rs` | 23-42 | Stats update per sample | + +**Key TRACE patterns**: +- Every `match` arm: `trace!("match arm: {variant}")` +- Every `if/else`: `trace!("branch: {condition}={value}")` +- Every loop iteration: `trace!("iteration {i}/{total}: ...")` +- Every cache lookup: `trace!("cache lookup key={key}, hit={hit}")` + +--- + +## gRPC Handler Instrumentation (ADDED - Oracle Review) + +**Gap identified**: 8/10 gRPC handlers had no logging. + +### server.rs - All Handlers + +| Handler | Line | Level | Log Message | +|---------|------|-------|-------------| +| `get_status()` | 209 | DEBUG | `debug!("gRPC get_status called")` | +| `get_cache_stats()` | 241 | DEBUG | `debug!("gRPC get_cache_stats called")` | +| `clear_cache()` | 278 | INFO | `info!("gRPC clear_cache: clearing {tier}")` | +| `prefetch()` | 296 | DEBUG | `debug!(file_count = paths.len(), "gRPC prefetch started")` | +| `list_origins()` | 322 | DEBUG | `debug!("gRPC list_origins called")` | +| `get_origin_health()` | 329 | DEBUG | `debug!(origin_id = %id, "gRPC get_origin_health")` | +| `rescan_origin()` | 337 | INFO | `info!(origin_id = %id, "gRPC rescan_origin started")` | +| `subscribe_events()` | 376 | INFO | `info!("gRPC subscribe_events: client connected")` | +| `shutdown()` | 402 | INFO | `info!(graceful = graceful, "gRPC shutdown requested")` | + +### search_service.rs + +| Handler | Line | Level | Log Message | +|---------|------|-------|-------------| +| `search()` | entry | DEBUG | `debug!(query = %q, limit = limit, "gRPC search")` | +| `search()` | result | DEBUG | `debug!(results = results.len(), "gRPC search completed")` | + +### Pattern: Use `#[instrument]` on all handlers + +```rust +#[tracing::instrument(level = "debug", skip(self, request), fields(method = "get_status"))] +async fn get_status(&self, request: Request<()>) -> Result, Status> { + // ... +} +``` + +--- + +## Async Task Spawn Instrumentation (ADDED - Oracle Review) + +**Gap identified**: 14 `tokio::spawn` sites need correlation IDs and span propagation. + +### Spawn Sites Requiring Instrumentation + +| File | Line | Task | Instrumentation | +|------|------|------|-----------------| +| `server.rs` | 305 | prefetch stream | `spawn(async { ... }.instrument(info_span!("prefetch_stream")))` | +| `server.rs` | 354 | rescan stream | `spawn(async { ... }.instrument(info_span!("rescan_stream", origin_id = %id)))` | +| `server.rs` | 384 | subscribe events | `spawn(async { ... }.instrument(info_span!("event_subscriber")))` | +| `search_service.rs` | spawn | search task | `spawn(async { ... }.instrument(debug_span!("search_task", query = %q)))` | +| `indexer.rs` | spawn | indexer loop | `spawn(async { ... }.instrument(info_span!("indexer")))` | +| `prefetch.rs` | 87 | prefetch engine | `spawn(async { ... }.instrument(info_span!("prefetch_engine")))` | +| `prefetch.rs` | 169 | prefetch file | `spawn(async { ... }.instrument(debug_span!("prefetch_file", file_id = ?id)))` | +| `health.rs` | 154 | health monitor | `spawn(async { ... }.instrument(info_span!("health_monitor")))` | +| `watcher.rs` | 34 | file watcher | `spawn(async { ... }.instrument(info_span!("file_watcher", origin_id = %id)))` | +| `artwork.rs` | spawn | image decode | `spawn_blocking(|| { ... })` - add span before spawn | + +### Pattern: Span Propagation + +```rust +use tracing::Instrument; + +// BEFORE (loses context) +tokio::spawn(async move { + do_work().await; +}); + +// AFTER (preserves correlation) +let span = tracing::info_span!("task_name", task_id = %id); +tokio::spawn(async move { + do_work().await; +}.instrument(span)); +``` + +### Add to init_logging() for request IDs + +```rust +// Generate request ID for correlation +use tracing::Span; +use uuid::Uuid; + +fn with_request_id(f: F) -> R +where F: FnOnce() -> R { + let request_id = Uuid::new_v4(); + let span = tracing::info_span!("request", request_id = %request_id); + span.in_scope(f) +} +``` + +--- + +## Database Operation Logging (ADDED - Oracle Review) + +**Gap identified**: Zero logging for rusqlite operations in db.rs, collections.rs, patterns.rs, artwork.rs. + +### db.rs - Core Database + +| Function | Line | Level | Log Message | +|----------|------|-------|-------------| +| `open()` | entry | INFO | `info!(path = ?path, "Opening metadata database")` | +| `open()` | success | INFO | `info!(file_count = count, "Database opened")` | +| `upsert_file()` | entry | DEBUG | `debug!(file_id = ?id, path = %path, "Upserting file")` | +| `upsert_file()` | error | ERROR | `error!(file_id = ?id, error = %e, "Failed to upsert file")` | +| `get_file_by_id()` | miss | TRACE | `trace!(file_id = ?id, "File not found in db")` | +| `delete_file()` | entry | DEBUG | `debug!(file_id = ?id, "Deleting file from db")` | +| `list_files_by_origin()` | result | DEBUG | `debug!(origin_id = %id, count = files.len(), "Listed files")` | + +### collections.rs + +| Function | Line | Level | Log Message | +|----------|------|-------|-------------| +| `create()` | entry | INFO | `info!(name = %name, "Creating collection")` | +| `save()` | error | WARN | `warn!(name = %name, error = %e, "Failed to save collection")` | +| `delete()` | entry | INFO | `info!(name = %name, "Deleting collection")` | +| `list()` | result | DEBUG | `debug!(count = collections.len(), "Listed collections")` | + +### patterns.rs - Access Patterns + +| Function | Line | Level | Log Message | +|----------|------|-------|-------------| +| `record_access()` | entry | TRACE | `trace!(file_id = ?id, "Recording access pattern")` | +| `predict_next()` | result | DEBUG | `debug!(predictions = preds.len(), "Predicted next files")` | + +### artwork.rs + +| Function | Line | Level | Log Message | +|----------|------|-------|-------------| +| `store()` | entry | DEBUG | `debug!(file_id = ?id, size_bytes = data.len(), "Storing artwork")` | +| `get()` | hit/miss | TRACE | `trace!(file_id = ?id, found = found, "Artwork lookup")` | + +### Pattern: Database Error Wrapper + +```rust +// Add to musicfs-cache/src/db.rs +fn log_db_result(op: &str, result: Result) -> Result { + match result { + Ok(v) => { + tracing::trace!(op = op, "db operation succeeded"); + Ok(v) + } + Err(e) => { + tracing::error!(op = op, error = %e, "db operation failed"); + Err(Error::Database(e.to_string())) + } + } +} +``` + +--- + +## Channel Operation Logging (ADDED - Oracle Review) + +**Gap identified**: No logging for channel capacity, close, or broadcast lag. + +### Channel Locations + +| File | Type | Log Points | +|------|------|------------| +| `events.rs` | broadcast | Lag warning when receiver falls behind | +| `watcher.rs` | mpsc | Channel close on watcher shutdown | +| `server.rs` | mpsc | gRPC stream channel capacity | +| `indexer.rs` | mpsc | Event queue depth | +| `health.rs` | mpsc | Health check channel | + +### Patterns + +```rust +// Broadcast lag detection (events.rs) +match rx.recv().await { + Ok(event) => { /* handle */ } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(skipped = n, "Event subscriber lagged, skipped events"); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("Event channel closed"); + break; + } +} + +// Channel capacity warning (before send) +if tx.capacity() < 10 { + tracing::warn!(remaining = tx.capacity(), "Channel near capacity"); +} + +// Channel close +impl Drop for EventBus { + fn drop(&mut self) { + tracing::debug!("Event bus shutting down"); + } +} +``` + +--- + +## Drop Implementation Logging (ADDED - Oracle Review) + +**Gap identified**: No logging in Drop impls for cleanup verification. + +| File | Type | Log Message | +|------|------|-------------| +| `manager.rs:276` | `PluginManager` | `debug!("PluginManager dropping, unloading {} plugins", self.plugins.len())` | +| `watcher.rs:157` | `WatchHandle` | `trace!(origin_id = %self.origin_id, "WatchHandle dropped")` | +| `prefetch.rs` | `PrefetchEngine` | `debug!("PrefetchEngine dropping, {} in-flight", self.in_flight.len())` | +| `server.rs` | gRPC server | `info!("gRPC server shutting down")` | + +### Pattern + +```rust +impl Drop for PluginManager { + fn drop(&mut self) { + tracing::debug!( + plugin_count = self.plugins.len(), + "PluginManager dropping" + ); + // existing cleanup... + } +} +``` + +--- + +## Credential Loading (ADDED - Oracle Review) + +**Gap identified**: No logging in credentials.rs::load(). + +| Function | Level | Log Message | +|----------|-------|-------------| +| `load()` entry | DEBUG | `debug!(origin_id = %origin_id, "Loading credentials")` | +| `load()` cache hit | TRACE | `trace!(origin_id = %origin_id, "Credential cache hit")` | +| `load()` success | INFO | `info!(origin_id = %origin_id, cred_type = %cred.type_name(), "Credential loaded")` | +| `load()` not found | DEBUG | `debug!(origin_id = %origin_id, "No credential found")` | +| `load()` error | WARN | `warn!(origin_id = %origin_id, error = %e, "Credential load failed")` | + +**SECURITY**: Never log credential values. The existing Debug impl with redaction is correct. + +--- + +## Security Considerations (ADDED - Oracle Review) + +### Never Log These + +| Data | Location | Mitigation | +|------|----------|------------| +| `WebhookConfig.secret` | webhook.rs | Add `#[serde(skip_serializing)]`, use custom Debug | +| Credential values | credentials.rs | Already redacted in Debug impl ✓ | +| Full file paths with usernames | everywhere | Sanitize `/home/{user}/` → `~/` | +| API keys/tokens | config.rs | Mark sensitive fields | + +### Sanitization Helper + +```rust +// Add to musicfs-core/src/lib.rs +pub fn sanitize_path(path: &Path) -> String { + if let Ok(home) = std::env::var("HOME") { + path.to_string_lossy() + .replace(&home, "~") + .to_string() + } else { + path.to_string_lossy().to_string() + } +} + +// Usage +debug!(path = %sanitize_path(&path), "Reading file"); +``` + +### WebhookConfig Fix + +```rust +// webhook.rs - add custom Debug +#[derive(Clone, Serialize, Deserialize)] +pub struct WebhookConfig { + pub url: String, + #[serde(skip_serializing)] + pub secret: Option, // Never serialize + // ... +} + +impl std::fmt::Debug for WebhookConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebhookConfig") + .field("url", &self.url) + .field("secret", &self.secret.as_ref().map(|_| "[REDACTED]")) + .finish() + } +} +``` + +--- + +## Performance Considerations (ADDED - Oracle Review) + +### Hot Path Warnings + +| Path | Risk | Mitigation | +|------|------|------------| +| `reader.rs` chunk loop | 100s of TRACE logs per seek | Log summary only: `trace!(chunks_read = n, "Read complete")` | +| `store.rs` put/get | 1000s during sync | Keep at DEBUG, not TRACE | +| `delta.rs` file scan | Log per file during full scan | Use TRACE, batch summaries at DEBUG | +| `parser.rs` tag extraction | Many TRACE per file | Sample: log every 100th file | + +### Trace Sampling Config + +```rust +// Add to LoggingConfig +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LoggingConfig { + // ... existing fields ... + + /// Sample rate for TRACE logs in hot paths (0.0-1.0, default 1.0) + #[serde(default = "default_sample_rate")] + pub trace_sample_rate: f32, +} + +fn default_sample_rate() -> f32 { 1.0 } + +// Usage in hot paths +if rand::random::() < config.trace_sample_rate { + trace!(...); +} +``` + +### Rate-Limited Warnings + +```rust +// For repeating warnings during outages (failover.rs) +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +static LAST_FAILOVER_WARN: AtomicU64 = AtomicU64::new(0); + +fn warn_rate_limited(origin_id: &str, error: &str) { + let now = Instant::now().elapsed().as_secs(); + let last = LAST_FAILOVER_WARN.load(Ordering::Relaxed); + if now - last >= 60 { // Max once per minute + LAST_FAILOVER_WARN.store(now, Ordering::Relaxed); + warn!(origin_id = %origin_id, error = %error, "Origin failover"); + } +} +``` + +--- + +## Standardized Field Names (ADDED - Oracle Review) + +Use these consistently across all log statements: + +| Field | Type | Usage | +|-------|------|-------| +| `origin_id` | String | Origin identifier (not `origin`) | +| `file_id` | FileId | File identifier | +| `path` | String | Virtual or real path (sanitized) | +| `size_bytes` | u64 | Size in bytes (not `size`, `bytes`, `len`) | +| `offset` | u64 | Read offset | +| `duration_ms` | u64 | Operation duration in milliseconds | +| `count` | usize | Generic count | +| `attempt` | u32 | Retry attempt number | +| `max_attempts` | u32 | Maximum retry attempts | +| `error` | impl Display | Error message (not `err`, `e`) | +| `request_id` | Uuid | Correlation ID for requests | + +--- + +## Instrumentation Patterns (ADDED - Oracle Review) + +### Use `#[instrument(err)]` for Automatic Error Logging + +```rust +// BEFORE: Manual error logging +pub async fn read(&self, path: &Path) -> Result { + match self.inner_read(path).await { + Ok(data) => Ok(data), + Err(e) => { + error!(path = ?path, error = %e, "Read failed"); + Err(e) + } + } +} + +// AFTER: Automatic with #[instrument] +#[tracing::instrument(level = "debug", skip(self), err)] +pub async fn read(&self, path: &Path) -> Result { + self.inner_read(path).await +} +``` + +### Span Events vs Regular Logs + +```rust +// Regular log - standalone event +info!("Operation completed"); + +// Span event - attached to current span context +tracing::Span::current().record("result", "success"); + +// Prefer span events for operation outcomes +#[instrument(fields(result))] +async fn operation() -> Result<()> { + // ... work ... + Span::current().record("result", "success"); + Ok(()) +} +``` + +--- + +## Fixes: Incorrect Line References (ADDED - Oracle Review) + +| File | Issue | Fix | +|------|-------|-----| +| `webhook.rs:43` | Uses `expect()` (panics) | Replace with `?` + error log | +| `webhook.rs:133` | Uses `expect()` (panics) | Replace with `?` + error log | + +```rust +// webhook.rs - BEFORE +let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to create HTTP client"); + +// webhook.rs - AFTER +let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .map_err(|e| { + error!(error = %e, "Failed to create webhook HTTP client"); + WebhookError::ClientInit(e.to_string()) + })?; +``` + +--- + +## Log Levels Guide + +| Level | Use Case | Example | +|-------|----------|---------| +| `ERROR` | Unrecoverable failures | Mount failed, DB corruption | +| `WARN` | Recoverable issues | Origin timeout, retry needed | +| `INFO` | Lifecycle events | Service start/stop, health change | +| `DEBUG` | Operation details | Function entry, request params | +| `TRACE` | Fine-grained flow | Match arms, cache hit/miss | + +--- + +## Testing Checklist + +### Basic Functionality +- [ ] Log files created in configured directory +- [ ] Daily rotation creates new files at midnight +- [ ] JSON output parseable by `jq` +- [ ] `journalctl -t musicfs` shows logs +- [ ] `RUST_LOG=musicfs=trace` enables trace output +- [ ] WorkerGuard kept alive (logs flush on shutdown) +- [ ] Logrotate compresses old files + +### Correlation & Context (NEW) +- [ ] Request IDs propagate through async tasks +- [ ] Spawned task logs include parent span context +- [ ] gRPC handler logs show method name in span + +### Security (NEW) +- [ ] WebhookConfig.secret never appears in logs +- [ ] Credential values never appear in logs +- [ ] File paths with `/home/{user}` show as `~/` + +### Performance (NEW) +- [ ] TRACE sampling respects `trace_sample_rate` config +- [ ] Hot path chunk loops log summary, not per-chunk +- [ ] Origin failover warnings are rate-limited (1/minute) +- [ ] Database operations log without blocking + +### Database & Channels (NEW) +- [ ] Database open logs file count +- [ ] Channel capacity warnings appear when queue fills +- [ ] Broadcast lag warnings appear when subscriber falls behind +- [ ] Drop implementations log cleanup + +--- + +## Summary + +| Phase | Effort | Deliverables | +|-------|--------|--------------| +| 1. Config & Dependencies | 2h | LoggingConfig, init_logging(), logrotate, trace sampling | +| 2. Core instrumentation | 1h | tracing in musicfs-core, credentials, sanitization | +| 3. Hot path instrumentation | 4h | #[instrument] + trace! across 5 crates | +| 4. gRPC & async tasks | 2h | Handler instrumentation, spawn correlation | +| 5. Database & channels | 2h | rusqlite logging, channel capacity/close | +| 6. Production files | 1h | Updated systemd, example config | +| **Total** | **12h** | Full observability | + +--- + +## Files to Modify + +### Phase 1: Config & Dependencies +| File | Changes | +|------|---------| +| `Cargo.toml` (workspace) | Add tracing-appender, tracing-journald | +| `crates/musicfs-cli/Cargo.toml` | Add dependencies | +| `crates/musicfs-core/Cargo.toml` | Add tracing | +| `crates/musicfs-core/src/config.rs` | Add LoggingConfig with trace_sample_rate | +| `crates/musicfs-cli/src/main.rs` | Expand init_logging(), request ID helper | +| `crates/musicfs-core/src/lib.rs` | Add sanitize_path() helper | + +### Phase 2: Core Instrumentation +| File | Changes | +|------|---------| +| `crates/musicfs-core/src/credentials.rs` | Add load() logging (redacted) | +| `crates/musicfs-core/src/events.rs` | Add broadcast lag detection | + +### Phase 3: Hot Path Instrumentation +| File | Changes | +|------|---------| +| `crates/musicfs-fuse/src/filesystem.rs` | Add #[instrument], trace! | +| `crates/musicfs-origins/src/failover.rs` | Add #[instrument], trace!, rate-limited warn | +| `crates/musicfs-origins/src/health.rs` | Add state transition logging | +| `crates/musicfs-origins/src/router.rs` | Add selection logging | +| `crates/musicfs-cache/src/tree.rs` | Add mutation logging | +| `crates/musicfs-cache/src/metadata.rs` | Add hit/miss logging | +| `crates/musicfs-cas/src/reader.rs` | Add chunk assembly logging (summary, not per-chunk) | +| `crates/musicfs-cas/src/store.rs` | Add dedup logging | +| `crates/musicfs-sync/src/delta.rs` | Add change detection logging | + +### Phase 4: gRPC & Async Tasks (NEW) +| File | Changes | +|------|---------| +| `crates/musicfs-grpc/src/server.rs` | Add #[instrument] to all 10 handlers, spawn correlation | +| `crates/musicfs-grpc/src/search_service.rs` | Add #[instrument], spawn instrumentation | +| `crates/musicfs-grpc/src/webhook.rs` | Fix expect() → error!, custom Debug for secret | +| `crates/musicfs-cache/src/prefetch.rs` | Add spawn instrumentation, Drop logging | +| `crates/musicfs-search/src/indexer.rs` | Add spawn instrumentation | +| `crates/musicfs-sync/src/watcher.rs` | Add spawn instrumentation, Drop logging | +| `crates/musicfs-plugins/src/manager.rs` | Add Drop logging | + +### Phase 5: Database & Channels (NEW) +| File | Changes | +|------|---------| +| `crates/musicfs-cache/src/db.rs` | Add log_db_result() helper, open/upsert/query logging | +| `crates/musicfs-search/src/collections.rs` | Add CRUD operation logging | +| `crates/musicfs-cache/src/patterns.rs` | Add access pattern logging | +| `crates/musicfs-cache/src/artwork.rs` | Add store/get logging | + +### Phase 6: Production Files +| File | Changes | +|------|---------| +| `dist/musicfs.service` | Add logging directives | +| `dist/logrotate.d/musicfs` | New file | +| `dist/config.example.toml` | Add logging section with trace_sample_rate | diff --git a/musicfs/Cargo.lock b/musicfs/Cargo.lock index 1d42d9c..ba5b8e5 100644 --- a/musicfs/Cargo.lock +++ b/musicfs/Cargo.lock @@ -1764,7 +1764,7 @@ dependencies = [ "serde", "sled", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -1784,7 +1784,7 @@ dependencies = [ "serde", "sled", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", "xxhash-rust", @@ -1805,6 +1805,8 @@ dependencies = [ "musicfs-origins", "tokio", "tracing", + "tracing-appender", + "tracing-journald", "tracing-subscriber", ] @@ -1816,9 +1818,10 @@ dependencies = [ "serde", "serde_json", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "toml", + "tracing", "xxhash-rust", ] @@ -1854,6 +1857,7 @@ dependencies = [ "serde_json", "sha2", "tempfile", + "thiserror 1.0.69", "tokio", "tokio-stream", "tonic", @@ -1868,7 +1872,7 @@ dependencies = [ "image", "musicfs-core", "symphonia", - "thiserror", + "thiserror 1.0.69", "tracing", ] @@ -1881,7 +1885,7 @@ dependencies = [ "libc", "musicfs-core", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -1897,7 +1901,7 @@ dependencies = [ "serde", "serde_json", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", "wasmtime", @@ -1915,7 +1919,7 @@ dependencies = [ "serde_json", "tantivy", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -1932,7 +1936,7 @@ dependencies = [ "rmp-serde", "serde", "tempfile", - "thiserror", + "thiserror 1.0.69", "tokio", "tracing", "xxhash-rust", @@ -2460,7 +2464,7 @@ checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ "getrandom 0.2.17", "libredox", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -2893,6 +2897,12 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "symlink" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" + [[package]] name = "symphonia" version = "0.5.5" @@ -3126,7 +3136,7 @@ dependencies = [ "tantivy-stacker", "tantivy-tokenizer-api", "tempfile", - "thiserror", + "thiserror 1.0.69", "time", "uuid", "winapi", @@ -3247,7 +3257,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" +dependencies = [ + "thiserror-impl 2.0.18", ] [[package]] @@ -3261,6 +3280,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror-impl" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -3507,6 +3537,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c" +dependencies = [ + "crossbeam-channel", + "symlink", + "thiserror 2.0.18", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.31" @@ -3528,6 +3571,17 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-journald" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3a81ed245bfb62592b1e2bc153e77656d94ee6a0497683a65a12ccaf2438d0" +dependencies = [ + "libc", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -3539,6 +3593,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.23" @@ -3549,12 +3613,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -3944,7 +4011,7 @@ dependencies = [ "log", "object 0.32.2", "target-lexicon", - "thiserror", + "thiserror 1.0.69", "wasmparser 0.201.0", "wasmtime-cranelift-shared", "wasmtime-environ", @@ -3985,7 +4052,7 @@ dependencies = [ "serde", "serde_derive", "target-lexicon", - "thiserror", + "thiserror 1.0.69", "wasm-encoder 0.201.0", "wasmparser 0.201.0", "wasmprinter", @@ -4076,7 +4143,7 @@ dependencies = [ "cranelift-entity", "serde", "serde_derive", - "thiserror", + "thiserror 1.0.69", "wasmparser 0.201.0", ] diff --git a/musicfs/Cargo.toml b/musicfs/Cargo.toml index cb899fe..4d2e068 100644 --- a/musicfs/Cargo.toml +++ b/musicfs/Cargo.toml @@ -30,7 +30,9 @@ dashmap = "5" # Logging tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +tracing-appender = "0.2" +tracing-journald = "0.3" # FUSE fuser = "0.14" diff --git a/musicfs/crates/musicfs-cache/src/artwork.rs b/musicfs/crates/musicfs-cache/src/artwork.rs index a76b19a..809805c 100644 --- a/musicfs/crates/musicfs-cache/src/artwork.rs +++ b/musicfs/crates/musicfs-cache/src/artwork.rs @@ -5,7 +5,7 @@ use musicfs_metadata::artwork::{ArtSize, ArtType, Artwork}; use std::io::Cursor; use std::path::Path; use std::sync::Arc; -use tracing::debug; +use tracing::{debug, info, trace, warn}; const MAX_ARTWORK_INPUT_SIZE: usize = 10 * 1024 * 1024; @@ -40,6 +40,7 @@ impl ArtworkCache { [], )?; + info!(path = ?db_path, "Artwork cache opened"); Ok(Self { store, db_path: db_path.to_path_buf(), @@ -47,7 +48,9 @@ impl ArtworkCache { } pub async fn store(&self, file_id: i64, artwork: &Artwork) -> Result { + trace!(file_id = file_id, size_bytes = artwork.data.len(), "Storing artwork"); if artwork.data.len() > MAX_ARTWORK_INPUT_SIZE { + warn!(file_id = file_id, size = artwork.data.len(), max = MAX_ARTWORK_INPUT_SIZE, "Artwork too large"); return Err(ArtworkError::ImageTooLarge(artwork.data.len())); } @@ -88,6 +91,7 @@ impl ArtworkCache { art_type: &str, size: ArtSize, ) -> Result>, ArtworkError> { + trace!(file_id = file_id, art_type = %art_type, "Getting artwork"); let db_path = self.db_path.clone(); let art_type_clone = art_type.to_string(); @@ -107,6 +111,7 @@ impl ArtworkCache { match hash_hex { Some(hex) => { + trace!(file_id = file_id, "Artwork cache hit"); let hash = ChunkHash::from_hex(&hex).ok_or(ArtworkError::InvalidHash)?; let data = self.store.get(&hash).await?; @@ -118,7 +123,10 @@ impl ArtworkCache { } } } - None => Ok(None), + None => { + trace!(file_id = file_id, "Artwork cache miss"); + Ok(None) + } } } diff --git a/musicfs/crates/musicfs-cache/src/db.rs b/musicfs/crates/musicfs-cache/src/db.rs index 4126383..21bb61a 100644 --- a/musicfs/crates/musicfs-cache/src/db.rs +++ b/musicfs/crates/musicfs-cache/src/db.rs @@ -16,7 +16,7 @@ pub struct Database { impl Database { pub fn open(path: &Path) -> Result { - info!(?path, "Opening database"); + debug!(?path, "Opening database"); let conn = Connection::open(path).map_err(|e| Error::Database(format!("open failed: {}", e)))?; @@ -24,9 +24,12 @@ impl Database { conn.execute_batch(SCHEMA) .map_err(|e| Error::Database(format!("schema init failed: {}", e)))?; - Ok(Self { + let db = Self { conn: Arc::new(Mutex::new(conn)), - }) + }; + let count = db.file_count().unwrap_or(0); + info!(path = ?path, file_count = count, "Database opened"); + Ok(db) } pub fn open_memory() -> Result { diff --git a/musicfs/crates/musicfs-cache/src/metadata.rs b/musicfs/crates/musicfs-cache/src/metadata.rs index 9570872..bcc3859 100644 --- a/musicfs/crates/musicfs-cache/src/metadata.rs +++ b/musicfs/crates/musicfs-cache/src/metadata.rs @@ -3,6 +3,7 @@ use musicfs_core::{AudioMeta, FileMeta, OriginId, Result, VirtualPath}; use std::path::Path; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::trace; pub struct MetadataCache { db: Arc, @@ -34,7 +35,10 @@ impl MetadataCache { } pub fn lookup(&self, path: &VirtualPath) -> Result> { - self.db.get_file_by_virtual_path(path) + let result = self.db.get_file_by_virtual_path(path)?; + let hit = result.is_some(); + trace!(path = path.as_str(), hit, "metadata cache lookup"); + Ok(result) } pub fn is_fresh( @@ -52,8 +56,11 @@ impl MetadataCache { .duration_since(UNIX_EPOCH) .unwrap_or(Duration::ZERO) .as_secs(); - Ok(current_secs == cached_secs) + let hit = current_secs == cached_secs; + trace!(path = ?real_path, hit, "metadata freshness check"); + Ok(hit) } else { + trace!(path = ?real_path, hit = false, "metadata freshness check"); Ok(false) } } diff --git a/musicfs/crates/musicfs-cache/src/patterns.rs b/musicfs/crates/musicfs-cache/src/patterns.rs index d5ae26f..6a7f44c 100644 --- a/musicfs/crates/musicfs-cache/src/patterns.rs +++ b/musicfs/crates/musicfs-cache/src/patterns.rs @@ -3,6 +3,7 @@ use parking_lot::{Mutex, RwLock}; use std::collections::HashMap; use std::path::Path; use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{debug, info, trace}; #[derive(Debug, Clone)] pub struct AccessPattern { @@ -79,15 +80,19 @@ impl PatternStore { map }; - Ok(Self { + let store = Self { db: Mutex::new(db), sequence_counts: RwLock::new(sequence_counts), time_patterns: RwLock::new(HashMap::new()), max_history, - }) + }; + let sequence_count = store.sequence_counts.read().len(); + info!(path = ?db_path, sequence_count = sequence_count, max_history = max_history, "Pattern store opened"); + Ok(store) } pub fn record(&self, file_id: FileId, _context: AccessContext) -> Result<(), PatternError> { + trace!(file_id = file_id.0, "Recording access pattern"); let now = SystemTime::now(); let timestamp = now.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64; let hour = (timestamp / 3600 % 24) as u8; @@ -144,11 +149,13 @@ impl PatternStore { .collect(); predictions.sort_by(|a, b| b.1.cmp(&a.1)); - predictions + let result: Vec = predictions .into_iter() .take(limit) .map(|(id, _)| id) - .collect() + .collect(); + debug!(file_id = current.0, predictions = result.len(), "Predicted next files"); + result } pub fn predict_for_time(&self, hour: u8, limit: usize) -> Vec { diff --git a/musicfs/crates/musicfs-cache/src/tree.rs b/musicfs/crates/musicfs-cache/src/tree.rs index c9cb6c6..ee2327f 100644 --- a/musicfs/crates/musicfs-cache/src/tree.rs +++ b/musicfs/crates/musicfs-cache/src/tree.rs @@ -4,6 +4,7 @@ use std::ffi::{OsStr, OsString}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::RwLock; use std::time::{Duration, SystemTime}; +use tracing::{debug, trace}; pub type Inode = u64; pub const ROOT_INODE: Inode = 1; @@ -123,8 +124,12 @@ impl VirtualTree { pub fn lookup(&self, parent_inode: Inode, name: &OsStr) -> Option { if let Some(VirtualNode::Directory(dir)) = self.nodes.get(&parent_inode) { - dir.children.get(name).copied() + let result = dir.children.get(name).copied(); + let hit = result.is_some(); + trace!(inode = parent_inode, name = ?name, hit, "tree lookup"); + result } else { + trace!(inode = parent_inode, name = ?name, hit = false, "tree lookup"); None } } @@ -194,6 +199,7 @@ impl VirtualTree { dir.children.insert(name, inode); } + debug!(inode, path = path.as_str(), file_id = ?meta.id, "add file to tree"); inode } @@ -263,6 +269,7 @@ impl VirtualTree { } } + debug!(inode, path = path.as_str(), file_id = ?file.file_id, "remove file from tree"); Some(file.file_id) } else { None diff --git a/musicfs/crates/musicfs-cas/src/reader.rs b/musicfs/crates/musicfs-cas/src/reader.rs index aeb3b94..e4780b7 100644 --- a/musicfs/crates/musicfs-cas/src/reader.rs +++ b/musicfs/crates/musicfs-cas/src/reader.rs @@ -6,6 +6,7 @@ use musicfs_core::FileId; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use tracing::{debug, trace}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChunkManifest { @@ -67,10 +68,12 @@ impl FileReader { { let manifests = self.manifests.read().unwrap(); if let Some(m) = manifests.get(&file_id) { + trace!(file_id = ?file_id, "manifest cache hit"); return Ok(m.clone()); } } + trace!(file_id = ?file_id, "manifest cache miss"); let Some(fetcher) = &self.fetcher else { return Err(ReaderError::ManifestNotFound(file_id)); }; @@ -103,6 +106,7 @@ impl FileReader { let end = std::cmp::min(offset + size as u64, manifest.total_size); let mut result = BytesMut::with_capacity((end - offset) as usize); + let mut chunks_read = 0u32; for chunk_ref in &manifest.chunks { let chunk_start = chunk_ref.offset; @@ -127,8 +131,11 @@ impl FileReader { }; result.extend_from_slice(&chunk_data[read_start..read_end]); + chunks_read += 1; } + let bytes_read = result.len() as u64; + debug!(file_id = ?file_id, offset, size, chunks_read, bytes_read, "read completed"); Ok(result.freeze()) } } diff --git a/musicfs/crates/musicfs-cas/src/store.rs b/musicfs/crates/musicfs-cas/src/store.rs index 3458eff..094ea6c 100644 --- a/musicfs/crates/musicfs-cas/src/store.rs +++ b/musicfs/crates/musicfs-cas/src/store.rs @@ -4,7 +4,7 @@ use musicfs_core::ChunkHash; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::fs; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; const DEFAULT_MAX_SIZE_10GB: u64 = 10 * 1024 * 1024 * 1024; const DEFAULT_SHARD_LEVELS_256_SUBDIRS: u8 = 2; @@ -72,7 +72,7 @@ impl CasStore { let path = self.chunk_path(&hash); if path.exists() { - debug!("Chunk {} already exists (dedup)", hash); + trace!(hash = %hash, size_bytes = data.len(), "dedup hit"); return Ok(hash); } @@ -94,7 +94,7 @@ impl CasStore { self.current_size .fetch_add(data.len() as u64, Ordering::SeqCst); - debug!("Stored chunk {} ({} bytes)", hash, data.len()); + debug!(hash = %hash, size_bytes = data.len(), "chunk stored"); Ok(hash) } @@ -111,6 +111,7 @@ impl CasStore { self.verify_integrity(hash, &data)?; } + debug!(hash = %hash, size_bytes = data.len(), "chunk retrieved"); Ok(Bytes::from(data)) } @@ -156,7 +157,7 @@ impl CasStore { fs::remove_file(&path).await?; self.index.remove(hash.0.as_slice())?; self.current_size.fetch_sub(meta.len(), Ordering::SeqCst); - debug!("Deleted chunk {}", hash); + debug!(hash = %hash, size_bytes = meta.len(), "chunk deleted"); } Ok(()) diff --git a/musicfs/crates/musicfs-cli/Cargo.toml b/musicfs/crates/musicfs-cli/Cargo.toml index 1a7902f..a2503ff 100644 --- a/musicfs/crates/musicfs-cli/Cargo.toml +++ b/musicfs/crates/musicfs-cli/Cargo.toml @@ -19,5 +19,9 @@ clap.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +tracing-appender.workspace = true anyhow.workspace = true dirs.workspace = true + +[target.'cfg(target_os = "linux")'.dependencies] +tracing-journald.workspace = true diff --git a/musicfs/crates/musicfs-cli/src/main.rs b/musicfs/crates/musicfs-cli/src/main.rs index 2e5ded5..2a65e65 100644 --- a/musicfs/crates/musicfs-cli/src/main.rs +++ b/musicfs/crates/musicfs-cli/src/main.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Result}; use clap::{Parser, Subcommand}; use musicfs_cache::TreeBuilder; use musicfs_cas::{CasConfig, CasStore, ContentFetcher, FileReader}; -use musicfs_core::{FileId, FileMeta, OriginId, RealPath, VirtualPath}; +use musicfs_core::{FileId, FileMeta, LoggingConfig, OriginId, RealPath, VirtualPath}; use musicfs_fuse::MusicFs; use musicfs_metadata::MetadataParser; use musicfs_origins::{LocalOrigin, Origin}; @@ -10,6 +10,8 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::SystemTime; use tracing::{debug, info}; +use tracing_appender::non_blocking::WorkerGuard; +use tracing_subscriber::{fmt, prelude::*, EnvFilter, Layer}; #[derive(Parser)] #[command(name = "musicfs")] @@ -86,7 +88,6 @@ enum OriginCommands { fn main() -> Result<()> { let cli = Cli::parse(); - init_logging(&cli.log_level); match cli.command { Commands::Mount { @@ -94,13 +95,38 @@ fn main() -> Result<()> { mountpoint, origin, cache_dir, - } => run_mount(mountpoint, origin, cache_dir), - Commands::Status => run_status(), - Commands::Cache { command } => run_cache(command), - Commands::Search { query, limit } => run_search(&query, limit), - Commands::Origin { command } => run_origin(command), - Commands::Events { r#type } => run_events(r#type), - Commands::Shutdown { graceful, timeout } => run_shutdown(graceful, timeout), + } => { + let log_config = LoggingConfig { + level: cli.log_level, + ..Default::default() + }; + let _guard = init_logging(&log_config)?; + run_mount(mountpoint, origin, cache_dir) + } + Commands::Status => { + init_basic_logging(&cli.log_level); + run_status() + } + Commands::Cache { command } => { + init_basic_logging(&cli.log_level); + run_cache(command) + } + Commands::Search { query, limit } => { + init_basic_logging(&cli.log_level); + run_search(&query, limit) + } + Commands::Origin { command } => { + init_basic_logging(&cli.log_level); + run_origin(command) + } + Commands::Events { r#type } => { + init_basic_logging(&cli.log_level); + run_events(r#type) + } + Commands::Shutdown { graceful, timeout } => { + init_basic_logging(&cli.log_level); + run_shutdown(graceful, timeout) + } } } @@ -115,9 +141,7 @@ fn run_mount( let handle = runtime.handle().clone(); let (tree, reader) = runtime.block_on(async { - info!("MusicFS starting..."); - info!("Origin: {:?}", origin_path); - info!("Mountpoint: {:?}", mountpoint); + info!(origin = ?origin_path, mountpoint = ?mountpoint, "Mount configuration"); let cache_dir = cache_dir.unwrap_or_else(|| { dirs::cache_dir() @@ -240,13 +264,58 @@ fn run_shutdown(graceful: bool, timeout: u32) -> Result<()> { Ok(()) } -fn init_logging(level: &str) { - use tracing_subscriber::{fmt, prelude::*, EnvFilter}; +fn init_logging(config: &LoggingConfig) -> Result { + std::fs::create_dir_all(&config.log_dir)?; + let file_appender = tracing_appender::rolling::daily(&config.log_dir, "musicfs.log"); + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + + let file_layer = if config.json_output { + fmt::layer() + .json() + .with_writer(non_blocking) + .with_ansi(false) + .boxed() + } else { + fmt::layer() + .with_writer(non_blocking) + .with_ansi(false) + .boxed() + }; + + let stderr_layer = fmt::layer().with_writer(std::io::stderr).compact(); + + let filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new(&config.level)); + + let subscriber = tracing_subscriber::registry() + .with(filter) + .with(file_layer) + .with(stderr_layer); + + #[cfg(target_os = "linux")] + let subscriber = { + let journald_layer = if config.journald { + tracing_journald::layer() + .ok() + .map(|l| l.with_syslog_identifier("musicfs".to_string())) + } else { + None + }; + subscriber.with(journald_layer) + }; + + subscriber.init(); + + info!(version = env!("CARGO_PKG_VERSION"), "MusicFS starting"); + Ok(guard) +} + +fn init_basic_logging(level: &str) { let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)); tracing_subscriber::registry() - .with(fmt::layer()) + .with(fmt::layer().compact()) .with(filter) .init(); } diff --git a/musicfs/crates/musicfs-core/Cargo.toml b/musicfs/crates/musicfs-core/Cargo.toml index 6f6ecba..75def2e 100644 --- a/musicfs/crates/musicfs-core/Cargo.toml +++ b/musicfs/crates/musicfs-core/Cargo.toml @@ -9,6 +9,7 @@ serde.workspace = true serde_json.workspace = true toml.workspace = true tokio = { workspace = true, features = ["sync"] } +tracing.workspace = true xxhash-rust.workspace = true hex.workspace = true diff --git a/musicfs/crates/musicfs-core/src/config.rs b/musicfs/crates/musicfs-core/src/config.rs index 8f7f070..8a2808a 100644 --- a/musicfs/crates/musicfs-core/src/config.rs +++ b/musicfs/crates/musicfs-core/src/config.rs @@ -14,6 +14,9 @@ pub struct Config { #[serde(default)] pub health: HealthConfig, + + #[serde(default)] + pub logging: LoggingConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -120,6 +123,52 @@ fn default_unhealthy_threshold() -> u32 { 3 } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LoggingConfig { + #[serde(default = "default_log_dir")] + pub log_dir: PathBuf, + + #[serde(default)] + pub json_output: bool, + + #[serde(default = "default_true")] + pub journald: bool, + + #[serde(default = "default_log_level")] + pub level: String, + + #[serde(default = "default_sample_rate")] + pub trace_sample_rate: f32, +} + +impl Default for LoggingConfig { + fn default() -> Self { + Self { + log_dir: default_log_dir(), + json_output: false, + journald: true, + level: default_log_level(), + trace_sample_rate: default_sample_rate(), + } + } +} + +fn default_log_dir() -> PathBuf { + PathBuf::from("/var/log/musicfs") +} + +fn default_log_level() -> String { + "musicfs=info,warn".to_string() +} + +fn default_true() -> bool { + true +} + +fn default_sample_rate() -> f32 { + 1.0 +} + impl Config { pub fn from_file(path: &std::path::Path) -> Result { let content = diff --git a/musicfs/crates/musicfs-core/src/credentials.rs b/musicfs/crates/musicfs-core/src/credentials.rs index 73583d6..d7c75f1 100644 --- a/musicfs/crates/musicfs-core/src/credentials.rs +++ b/musicfs/crates/musicfs-core/src/credentials.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; use thiserror::Error; +use tracing::{debug, info, trace, warn}; #[derive(Clone)] pub struct CredentialStore { @@ -106,16 +107,36 @@ impl CredentialStore { origin_id: &str, config: &CredentialConfig, ) -> Result { + debug!(origin_id = %origin_id, "Loading credentials"); + if let Some(cred) = self.cache.get(origin_id) { + trace!(origin_id = %origin_id, "Credential cache hit"); return Ok(cred.clone()); } let cred = match config { - CredentialConfig::Environment { prefix } => self.load_from_env(prefix)?, - CredentialConfig::File { path } => self.load_from_file(path)?, - CredentialConfig::Inline(cred) => cred.clone(), + CredentialConfig::Environment { prefix } => { + trace!(origin_id = %origin_id, prefix = %prefix, "Loading from environment"); + self.load_from_env(prefix)? + } + CredentialConfig::File { path } => { + trace!(origin_id = %origin_id, path = ?path, "Loading from file"); + self.load_from_file(path)? + } + CredentialConfig::Inline(cred) => { + trace!(origin_id = %origin_id, "Using inline credential"); + cred.clone() + } }; + let cred_type = match &cred { + Credential::Basic { .. } => "Basic", + Credential::AwsKey { .. } => "AwsKey", + Credential::SshKey { .. } => "SshKey", + Credential::EnvVar { .. } => "EnvVar", + }; + info!(origin_id = %origin_id, cred_type = %cred_type, "Credential loaded"); + self.cache.insert(origin_id.to_string(), cred.clone()); Ok(cred) } @@ -144,6 +165,7 @@ impl CredentialStore { }); } + warn!(prefix = %prefix, "No credentials found in environment"); Err(CredentialError::NotFound(format!( "No credentials found with prefix {}", prefix diff --git a/musicfs/crates/musicfs-core/src/events.rs b/musicfs/crates/musicfs-core/src/events.rs index 1771617..b9f49a7 100644 --- a/musicfs/crates/musicfs-core/src/events.rs +++ b/musicfs/crates/musicfs-core/src/events.rs @@ -1,5 +1,6 @@ use crate::types::{FileId, OriginId, VirtualPath}; use tokio::sync::broadcast; +use tracing::{debug, trace}; pub struct EventBus { sender: broadcast::Sender, @@ -12,7 +13,11 @@ impl EventBus { } pub fn publish(&self, event: Event) { - let _ = self.sender.send(event); + trace!(event = ?event, "Publishing event"); + let receiver_count = self.sender.receiver_count(); + if self.sender.send(event).is_err() && receiver_count > 0 { + debug!(receiver_count = receiver_count, "Event dropped, no active receivers"); + } } pub fn subscribe(&self) -> broadcast::Receiver { diff --git a/musicfs/crates/musicfs-core/src/lib.rs b/musicfs/crates/musicfs-core/src/lib.rs index ed05ba6..542d7ce 100644 --- a/musicfs/crates/musicfs-core/src/lib.rs +++ b/musicfs/crates/musicfs-core/src/lib.rs @@ -6,7 +6,19 @@ pub mod metrics; pub mod resolver; pub mod types; -pub use config::{CacheConfig, Config, ConfigError, HealthConfig, OriginConfig, OriginType}; +pub use config::{ + CacheConfig, Config, ConfigError, HealthConfig, LoggingConfig, OriginConfig, OriginType, +}; + +use std::path::Path; + +pub fn sanitize_path(path: &Path) -> String { + if let Ok(home) = std::env::var("HOME") { + path.to_string_lossy().replace(&home, "~") + } else { + path.to_string_lossy().to_string() + } +} pub use credentials::{Credential, CredentialConfig, CredentialError, CredentialStore}; pub use error::{Error, Result}; pub use events::{Event, EventBus}; diff --git a/musicfs/crates/musicfs-fuse/src/filesystem.rs b/musicfs/crates/musicfs-fuse/src/filesystem.rs index a3d682d..f7066ae 100644 --- a/musicfs/crates/musicfs-fuse/src/filesystem.rs +++ b/musicfs/crates/musicfs-fuse/src/filesystem.rs @@ -12,7 +12,7 @@ use std::path::Path; use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime}; use tokio::runtime::Handle; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, trace, warn}; const TTL: Duration = Duration::from_secs(1); const BLOCK_SIZE: u32 = 512; @@ -159,12 +159,12 @@ impl Filesystem for MusicFs { info!("MusicFS destroyed"); } + #[instrument(level = "debug", skip(self, reply))] fn lookup(&mut self, _req: &Request, parent: u64, name: &OsStr, reply: ReplyEntry) { - debug!("lookup(parent={}, name={:?})", parent, name); - let name_str = name.to_string_lossy(); if parent == ROOT_INODE && SearchOps::is_search_dir_name(&name_str) { + trace!(parent, name = %name_str, "search_dir_name matched"); if let Some(ref search_ops) = self.search_ops { search_ops.lookup_search_dir(reply); return; @@ -172,6 +172,7 @@ impl Filesystem for MusicFs { } if parent == SearchOps::search_dir_inode() { + trace!(parent, name = %name_str, "search_dir_inode matched"); if let Some(ref search_ops) = self.search_ops { let inode = self.get_or_create_query_inode(&name_str); search_ops.lookup_query_dir(&name_str, inode, reply); @@ -180,6 +181,7 @@ impl Filesystem for MusicFs { } if let Some(query) = self.get_query_for_inode(parent) { + trace!(parent, name = %name_str, query = %query, "query_inode matched"); if let Some(ref search_ops) = self.search_ops { let inode = self.get_or_create_query_inode(&format!("{}:{}", query, name_str)); search_ops.lookup_result(inode, reply); @@ -190,6 +192,7 @@ impl Filesystem for MusicFs { let tree = self.tree.read().unwrap(); if let Some(inode) = tree.lookup(parent, name) { + trace!(parent, name = %name_str, ino = inode, "file found in tree"); if let Some(node) = tree.get(inode) { let attr = self.node_to_attr(node); reply.entry(&TTL, &attr, 0); @@ -197,13 +200,14 @@ impl Filesystem for MusicFs { } } + trace!(parent, name = %name_str, "file not found"); reply.error(libc::ENOENT); } + #[instrument(level = "debug", skip(self, reply))] fn getattr(&mut self, _req: &Request, ino: u64, reply: ReplyAttr) { - debug!("getattr(ino={})", ino); - if ino == SearchOps::search_dir_inode() { + trace!(ino, "search_dir_inode matched"); if let Some(ref search_ops) = self.search_ops { search_ops.getattr_search_dir(reply); return; @@ -211,6 +215,7 @@ impl Filesystem for MusicFs { } if SearchOps::is_search_inode(ino) { + trace!(ino, "search_inode matched"); if let Some(ref search_ops) = self.search_ops { search_ops.getattr_result(ino, reply); return; @@ -218,6 +223,7 @@ impl Filesystem for MusicFs { } if self.get_query_for_inode(ino).is_some() { + trace!(ino, "query_inode matched"); if let Some(ref search_ops) = self.search_ops { search_ops.getattr_search_dir(reply); return; @@ -227,13 +233,16 @@ impl Filesystem for MusicFs { let tree = self.tree.read().unwrap(); if let Some(node) = tree.get(ino) { + trace!(ino, "inode found in tree"); let attr = self.node_to_attr(node); reply.attr(&TTL, &attr); } else { + trace!(ino, "inode not found"); reply.error(libc::ENOENT); } } + #[instrument(level = "debug", skip(self, reply))] fn readdir( &mut self, _req: &Request, @@ -242,9 +251,8 @@ impl Filesystem for MusicFs { offset: i64, mut reply: ReplyDirectory, ) { - debug!("readdir(ino={}, offset={})", ino, offset); - if ino == SearchOps::search_dir_inode() { + trace!(ino, offset, "search_dir_inode matched"); if let Some(ref search_ops) = self.search_ops { search_ops.readdir_search_root(offset, reply); return; @@ -252,6 +260,7 @@ impl Filesystem for MusicFs { } if let Some(query) = self.get_query_for_inode(ino) { + trace!(ino, offset, query = %query, "query_inode matched"); if let Some(ref search_ops) = self.search_ops { search_ops.readdir_query(&query, offset, reply); return; @@ -261,6 +270,7 @@ impl Filesystem for MusicFs { let tree = self.tree.read().unwrap(); if let Some(children) = tree.readdir(ino) { + trace!(ino, offset, children_count = children.len(), "directory found"); let parent_ino = tree.get_parent(ino).unwrap_or(ROOT_INODE); let entries: Vec<(u64, FileType, &str)> = vec![ @@ -300,15 +310,16 @@ impl Filesystem for MusicFs { reply.ok(); } else { + trace!(ino, offset, "directory not found"); reply.error(libc::ENOENT); } } + #[instrument(level = "debug", skip(self, reply))] fn open(&mut self, _req: &Request, ino: u64, flags: i32, reply: ReplyOpen) { - debug!("open(ino={}, flags={})", ino, flags); - let write_flags = libc::O_WRONLY | libc::O_RDWR | libc::O_APPEND | libc::O_TRUNC; if flags & write_flags != 0 { + trace!(ino, flags, "write flags detected"); reply.error(libc::EROFS); return; } @@ -316,12 +327,15 @@ impl Filesystem for MusicFs { let tree = self.tree.read().unwrap(); if tree.get(ino).is_some() { + trace!(ino, "inode found"); reply.opened(0, 0); } else { + trace!(ino, "inode not found"); reply.error(libc::ENOENT); } } + #[instrument(level = "debug", skip(self, reply))] fn read( &mut self, _req: &Request, @@ -333,19 +347,20 @@ impl Filesystem for MusicFs { _lock_owner: Option, reply: ReplyData, ) { - debug!("read(ino={}, offset={}, size={})", ino, offset, size); - let file_id = { let tree = self.tree.read().unwrap(); if let Some(VirtualNode::File(file)) = tree.get(ino) { + trace!(ino, "file found in tree"); file.file_id } else { + trace!(ino, "file not found"); reply.error(libc::ENOENT); return; } }; let Some(reader) = &self.reader else { + trace!(ino, "no reader available"); reply.data(&[]); return; }; @@ -359,14 +374,18 @@ impl Filesystem for MusicFs { }); match result { - Ok(data) => reply.data(&data), + Ok(data) => { + trace!(ino, offset, size_bytes = size, bytes_read = data.len(), "read successful"); + reply.data(&data); + } Err(e) => { - warn!("Read error: {}", e); + warn!(ino, offset, size_bytes = size, error = %e, "read failed"); reply.error(libc::EIO); } } } + #[instrument(level = "debug", skip(self, reply))] fn release( &mut self, _req: &Request, @@ -377,7 +396,7 @@ impl Filesystem for MusicFs { _flush: bool, reply: fuser::ReplyEmpty, ) { - debug!("release(ino={})", ino); + trace!(ino, "releasing file handle"); reply.ok(); } diff --git a/musicfs/crates/musicfs-grpc/Cargo.toml b/musicfs/crates/musicfs-grpc/Cargo.toml index e36469f..76d3670 100644 --- a/musicfs/crates/musicfs-grpc/Cargo.toml +++ b/musicfs/crates/musicfs-grpc/Cargo.toml @@ -11,6 +11,7 @@ prost.workspace = true tokio.workspace = true tokio-stream.workspace = true tracing.workspace = true +thiserror.workspace = true serde.workspace = true serde_json.workspace = true chrono.workspace = true diff --git a/musicfs/crates/musicfs-grpc/src/server.rs b/musicfs/crates/musicfs-grpc/src/server.rs index 3f3f5eb..729e177 100644 --- a/musicfs/crates/musicfs-grpc/src/server.rs +++ b/musicfs/crates/musicfs-grpc/src/server.rs @@ -10,7 +10,7 @@ use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; -use tracing::{debug, info}; +use tracing::{debug, info, instrument}; pub struct MusicFsServer { start_time: Instant, @@ -206,10 +206,12 @@ impl MusicFs for MusicFsServer { )) } + #[instrument(level = "debug", skip(self, _request), fields(method = "get_status"))] async fn get_status( &self, _request: Request, ) -> Result, Status> { + debug!("gRPC get_status called"); let uptime = self.start_time.elapsed().as_secs(); Ok(Response::new(StatusResponse { @@ -225,23 +227,27 @@ impl MusicFs for MusicFsServer { })) } + #[instrument(level = "info", skip(self, request), fields(method = "shutdown"))] async fn shutdown( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); info!( - "Shutdown requested (graceful={}, timeout={}s)", - req.graceful, req.timeout_secs + graceful = req.graceful, + timeout_secs = req.timeout_secs, + "gRPC shutdown requested" ); Ok(Response::new(Empty {})) } + #[instrument(level = "debug", skip(self, _request), fields(method = "get_cache_stats"))] async fn get_cache_stats( &self, _request: Request, ) -> Result, Status> { + debug!("gRPC get_cache_stats called"); Ok(Response::new(CacheStats { total_size_bytes: 0, used_size_bytes: 0, @@ -275,14 +281,17 @@ impl MusicFs for MusicFsServer { })) } + #[instrument(level = "info", skip(self, request), fields(method = "clear_cache"))] async fn clear_cache( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); - debug!( - "Clear cache requested: origin={:?}, metadata={}, chunks={}", - req.origin_id, req.clear_metadata, req.clear_chunks + info!( + origin_id = ?req.origin_id, + clear_metadata = req.clear_metadata, + clear_chunks = req.clear_chunks, + "gRPC clear_cache" ); Ok(Response::new(ClearCacheResponse { @@ -293,12 +302,14 @@ impl MusicFs for MusicFsServer { type PrefetchStream = ReceiverStream>; + #[instrument(level = "debug", skip(self, request), fields(method = "prefetch"))] async fn prefetch( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let total = req.paths.len() as u32; + debug!(file_count = total, "gRPC prefetch started"); let (tx, rx) = mpsc::channel(32); @@ -319,18 +330,22 @@ impl MusicFs for MusicFsServer { Ok(Response::new(ReceiverStream::new(rx))) } + #[instrument(level = "debug", skip(self, _request), fields(method = "list_origins"))] async fn list_origins( &self, _request: Request, ) -> Result, Status> { + debug!("gRPC list_origins called"); Ok(Response::new(OriginsResponse { origins: vec![] })) } + #[instrument(level = "debug", skip(self, request), fields(method = "get_origin_health"))] async fn get_origin_health( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); + debug!(origin_id = %req.origin_id, "gRPC get_origin_health"); Ok(Response::new(OriginHealthResponse { origin_id: req.origin_id, @@ -342,12 +357,13 @@ impl MusicFs for MusicFsServer { type RescanOriginStream = ReceiverStream>; + #[instrument(level = "info", skip(self, request), fields(method = "rescan_origin"))] async fn rescan_origin( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); - info!("Rescan requested for origin: {}", req.origin_id); + info!(origin_id = %req.origin_id, "gRPC rescan_origin started"); let (tx, rx) = mpsc::channel(32); @@ -373,19 +389,32 @@ impl MusicFs for MusicFsServer { type SubscribeEventsStream = ReceiverStream>; + #[instrument(level = "info", skip(self, request), fields(method = "subscribe_events"))] async fn subscribe_events( &self, request: Request, ) -> Result, Status> { + info!("gRPC subscribe_events: client connected"); let filter = request.into_inner(); let mut rx = self.event_bus.subscribe(); let (tx, out_rx) = mpsc::channel(100); tokio::spawn(async move { - while let Ok(event) = rx.recv().await { - if Self::matches_filter(&event, &filter) { - let proto_event = Self::event_to_proto(&event); - if tx.send(Ok(proto_event)).await.is_err() { + loop { + match rx.recv().await { + Ok(event) => { + if Self::matches_filter(&event, &filter) { + let proto_event = Self::event_to_proto(&event); + if tx.send(Ok(proto_event)).await.is_err() { + break; + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(skipped = n, "Event subscriber lagged, skipped events"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + tracing::debug!("Event channel closed"); break; } } diff --git a/musicfs/crates/musicfs-grpc/src/webhook.rs b/musicfs/crates/musicfs-grpc/src/webhook.rs index bcf3f73..b1ad9c7 100644 --- a/musicfs/crates/musicfs-grpc/src/webhook.rs +++ b/musicfs/crates/musicfs-grpc/src/webhook.rs @@ -2,7 +2,7 @@ use musicfs_core::Event; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::sync::broadcast; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; #[derive(Debug, Clone, Serialize)] pub struct WebhookPayload { @@ -11,9 +11,10 @@ pub struct WebhookPayload { pub data: serde_json::Value, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Clone, Deserialize)] pub struct WebhookConfig { pub url: String, + #[serde(skip_serializing)] pub secret: Option, pub events: Vec, #[serde(default = "default_retry_count")] @@ -22,6 +23,18 @@ pub struct WebhookConfig { pub timeout_ms: u64, } +impl std::fmt::Debug for WebhookConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebhookConfig") + .field("url", &self.url) + .field("secret", &self.secret.as_ref().map(|_| "[REDACTED]")) + .field("events", &self.events) + .field("retry_count", &self.retry_count) + .field("timeout_ms", &self.timeout_ms) + .finish() + } +} + fn default_retry_count() -> u32 { 3 } @@ -30,26 +43,46 @@ fn default_timeout_ms() -> u64 { 5000 } +#[derive(Debug, thiserror::Error)] +pub enum WebhookError { + #[error("Failed to initialize HTTP client: {0}")] + ClientInit(String), +} + pub struct WebhookHandler { client: reqwest::Client, configs: Vec, } impl WebhookHandler { - pub fn new(configs: Vec) -> Self { + pub fn new(configs: Vec) -> Result { let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .build() - .expect("Failed to create HTTP client"); + .map_err(|e| { + error!(error = %e, "Failed to create webhook HTTP client"); + WebhookError::ClientInit(e.to_string()) + })?; - Self { client, configs } + Ok(Self { client, configs }) } pub async fn run(&self, mut rx: broadcast::Receiver) { - while let Ok(event) = rx.recv().await { - for config in &self.configs { - if self.matches_filter(&event, config) { - self.dispatch(config, &event).await; + loop { + match rx.recv().await { + Ok(event) => { + for config in &self.configs { + if self.matches_filter(&event, config) { + self.dispatch(config, &event).await; + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!(skipped = n, "Webhook handler lagged, skipped events"); + } + Err(broadcast::error::RecvError::Closed) => { + debug!("Event channel closed, webhook handler stopping"); + break; } } } @@ -129,8 +162,14 @@ impl WebhookHandler { type HmacSha256 = Hmac; let body = serde_json::to_string(payload).unwrap_or_default(); - let mut mac = - HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC key invalid"); + let mac = match HmacSha256::new_from_slice(secret.as_bytes()) { + Ok(m) => m, + Err(e) => { + error!(error = %e, "Invalid HMAC key for webhook signature"); + return String::new(); + } + }; + let mut mac = mac; mac.update(body.as_bytes()); let result = mac.finalize(); diff --git a/musicfs/crates/musicfs-origins/src/failover.rs b/musicfs/crates/musicfs-origins/src/failover.rs index ee9f618..9cf5489 100644 --- a/musicfs/crates/musicfs-origins/src/failover.rs +++ b/musicfs/crates/musicfs-origins/src/failover.rs @@ -3,7 +3,7 @@ use crate::traits::Origin; use musicfs_core::{Error, RealPath, Result}; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, warn}; +use tracing::{trace, warn}; #[derive(Debug, Clone)] pub struct RetryConfig { @@ -79,6 +79,7 @@ impl FailoverExecutor { let mut last_error = None; for origin in origins { + trace!(origin_id = %origin.id(), "Attempting read from origin"); let start = std::time::Instant::now(); match self.read_with_retry(&origin, &path.path, offset, size).await { Ok(data) => { @@ -87,7 +88,7 @@ impl FailoverExecutor { return Ok(data); } Err(e) => { - warn!("Origin {} failed: {}, trying next", origin.id(), e); + warn!(origin_id = %origin.id(), error = %e, "Origin failed, trying next"); last_error = Some(e); } } @@ -108,13 +109,13 @@ impl FailoverExecutor { Ok(data) => return Ok(data), Err(e) if attempt + 1 < self.retry_config.max_attempts => { let delay = self.retry_config.delay_for_attempt(attempt); - debug!( - "Retry {}/{} for {} after {:?}: {}", - attempt + 1, - self.retry_config.max_attempts, - origin.id(), - delay, - e + warn!( + origin_id = %origin.id(), + attempt = attempt + 1, + max_attempts = self.retry_config.max_attempts, + error = %e, + delay_ms = delay.as_millis() as u64, + "Retrying read operation" ); tokio::time::sleep(delay).await; } @@ -142,6 +143,7 @@ impl FailoverExecutor { let mut last_error = None; for origin in origins { + trace!(origin_id = %origin.id(), "Attempting full read from origin"); let start = std::time::Instant::now(); match self.read_full_with_retry(&origin, &path.path).await { Ok(data) => { @@ -150,7 +152,7 @@ impl FailoverExecutor { return Ok(data); } Err(e) => { - warn!("Origin {} failed full read: {}, trying next", origin.id(), e); + warn!(origin_id = %origin.id(), error = %e, "Origin failed full read, trying next"); last_error = Some(e); } } @@ -169,13 +171,13 @@ impl FailoverExecutor { Ok(data) => return Ok(data), Err(e) if attempt + 1 < self.retry_config.max_attempts => { let delay = self.retry_config.delay_for_attempt(attempt); - debug!( - "Retry full read {}/{} for {} after {:?}: {}", - attempt + 1, - self.retry_config.max_attempts, - origin.id(), - delay, - e + warn!( + origin_id = %origin.id(), + attempt = attempt + 1, + max_attempts = self.retry_config.max_attempts, + error = %e, + delay_ms = delay.as_millis() as u64, + "Retrying full read operation" ); tokio::time::sleep(delay).await; } diff --git a/musicfs/crates/musicfs-origins/src/health.rs b/musicfs/crates/musicfs-origins/src/health.rs index 39aea6d..4baee1f 100644 --- a/musicfs/crates/musicfs-origins/src/health.rs +++ b/musicfs/crates/musicfs-origins/src/health.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc; -use tracing::{debug, info, warn}; +use tracing::{debug, info, info_span, Instrument}; pub struct HealthMonitor { origins: DashMap>, @@ -150,22 +150,32 @@ impl HealthMonitor { pub fn start(self: Arc) -> HealthCheckHandle { let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); let monitor = self.clone(); + let interval_secs = monitor.check_interval.as_secs(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(monitor.check_interval); + info!( + interval_secs = interval_secs, + origin_count = monitor.origins.len(), + "Health monitor starting" + ); - loop { - tokio::select! { - _ = interval.tick() => { - monitor.check_all().await; - } - _ = stop_rx.recv() => { - info!("Health monitor stopping"); - break; + tokio::spawn( + async move { + let mut interval = tokio::time::interval(monitor.check_interval); + + loop { + tokio::select! { + _ = interval.tick() => { + monitor.check_all().await; + } + _ = stop_rx.recv() => { + info!("Health monitor stopping"); + break; + } } } } - }); + .instrument(info_span!("health_monitor")), + ); HealthCheckHandle { stop_tx } } @@ -199,14 +209,24 @@ impl HealthMonitor { match status { HealthStatus::Healthy => { if state.status != HealthStatus::Healthy { - info!("Origin {} is now healthy", id); + info!( + origin_id = %id, + previous_status = ?state.status, + duration_ms = latency_ms, + "Origin health state transition to healthy" + ); } state.status = HealthStatus::Healthy; state.consecutive_failures = 0; } HealthStatus::Degraded => { if state.status != HealthStatus::Degraded { - warn!("Origin {} is degraded", id); + info!( + origin_id = %id, + previous_status = ?state.status, + duration_ms = latency_ms, + "Origin health state transition to degraded" + ); } state.status = HealthStatus::Degraded; } @@ -214,16 +234,22 @@ impl HealthMonitor { state.consecutive_failures += 1; if state.consecutive_failures >= threshold { if state.status != HealthStatus::Unhealthy { - warn!( - "Origin {} is now unhealthy ({} failures)", - id, state.consecutive_failures + info!( + origin_id = %id, + previous_status = ?state.status, + consecutive_failures = state.consecutive_failures, + threshold = threshold, + duration_ms = latency_ms, + "Origin health state transition to unhealthy" ); } state.status = HealthStatus::Unhealthy; } else { debug!( - "Origin {} check failed ({}/{})", - id, state.consecutive_failures, threshold + origin_id = %id, + consecutive_failures = state.consecutive_failures, + threshold = threshold, + "Origin health check failed" ); state.status = HealthStatus::Degraded; } diff --git a/musicfs/crates/musicfs-origins/src/router.rs b/musicfs/crates/musicfs-origins/src/router.rs index add63bb..f2d0d55 100644 --- a/musicfs/crates/musicfs-origins/src/router.rs +++ b/musicfs/crates/musicfs-origins/src/router.rs @@ -3,7 +3,7 @@ use dashmap::DashMap; use musicfs_core::{Event, EventBus, OriginId}; use std::sync::Arc; use std::time::Instant; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; pub struct Router { priorities: DashMap, @@ -77,7 +77,7 @@ impl Router { } pub fn select(&self, candidates: &[OriginId], health: &HealthSnapshot) -> Option { - candidates + let selected = candidates .iter() .filter(|id| health.is_healthy(id)) .min_by_key(|id| { @@ -85,7 +85,20 @@ impl Router { let latency = self.latency_stats.get(*id).map(|s| s.p50_ms).unwrap_or(0); (priority, latency) }) - .cloned() + .cloned(); + + if let Some(ref id) = selected { + let priority = self.get_priority(id); + let latency = self.latency_stats.get(id).map(|s| s.p50_ms).unwrap_or(0); + trace!( + origin_id = %id, + priority = priority, + latency_ms = latency, + "Selected healthy origin" + ); + } + + selected } pub fn select_with_fallback( @@ -104,6 +117,11 @@ impl Router { .min_by_key(|id| self.get_priority(id)) .cloned() { + trace!( + origin_id = %id, + priority = self.get_priority(&id), + "Selected degraded origin as fallback" + ); return Some(id); } @@ -115,14 +133,26 @@ impl Router { }); } - candidates + let selected = candidates .iter() .min_by_key(|id| { let failures = health.failure_count(id).unwrap_or(u32::MAX); let priority = self.get_priority(id); (failures, priority) }) - .cloned() + .cloned(); + + if let Some(ref id) = selected { + let failures = health.failure_count(id).unwrap_or(u32::MAX); + trace!( + origin_id = %id, + failure_count = failures, + priority = self.get_priority(id), + "Selected least-bad unhealthy origin" + ); + } + + selected } } diff --git a/musicfs/crates/musicfs-plugins/src/manager.rs b/musicfs/crates/musicfs-plugins/src/manager.rs index 1d540b9..96512c5 100644 --- a/musicfs/crates/musicfs-plugins/src/manager.rs +++ b/musicfs/crates/musicfs-plugins/src/manager.rs @@ -275,6 +275,7 @@ impl Default for PluginManager { impl Drop for PluginManager { fn drop(&mut self) { + debug!(plugin_count = self.list().len(), "PluginManager dropping"); let _ = self.shutdown(); } } diff --git a/musicfs/crates/musicfs-search/src/collections.rs b/musicfs/crates/musicfs-search/src/collections.rs index 6afbc99..149a7dd 100644 --- a/musicfs/crates/musicfs-search/src/collections.rs +++ b/musicfs/crates/musicfs-search/src/collections.rs @@ -2,7 +2,7 @@ use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::path::Path; use std::time::{Duration, SystemTime}; -use tracing::warn; +use tracing::{debug, info, warn}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SmartCollection { @@ -103,6 +103,7 @@ impl CollectionStore { [], )?; + info!(path = ?db_path, "Collection store opened"); Ok(Self { db: Mutex::new(db) }) } @@ -111,6 +112,7 @@ impl CollectionStore { name: &str, query: CollectionQuery, ) -> Result { + info!(name = %name, "Creating collection"); let query_json = serde_json::to_string(&query)?; let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -124,6 +126,7 @@ impl CollectionStore { )?; let id = db.last_insert_rowid(); + debug!(id = id, name = %name, "Collection created"); Ok(SmartCollection { id, @@ -199,6 +202,7 @@ impl CollectionStore { } pub fn delete(&self, name: &str) -> Result<(), CollectionError> { + info!(name = %name, "Deleting collection"); let db = self.db.lock(); db.execute("DELETE FROM collections WHERE name = ?1", [name])?; Ok(()) diff --git a/musicfs/crates/musicfs-search/src/indexer.rs b/musicfs/crates/musicfs-search/src/indexer.rs index ef8bb80..0958df1 100644 --- a/musicfs/crates/musicfs-search/src/indexer.rs +++ b/musicfs/crates/musicfs-search/src/indexer.rs @@ -2,7 +2,7 @@ use crate::index::{SearchError, SearchIndex}; use musicfs_core::{Event, EventBus, FileMeta}; use std::sync::Arc; use tokio::sync::mpsc; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, info_span, warn, Instrument}; pub trait MetadataLookup: Send + Sync { fn lookup(&self, path: &musicfs_core::VirtualPath) -> Option; @@ -31,43 +31,52 @@ impl Indexer { let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); let mut event_rx = self.event_bus.subscribe(); - tokio::spawn(async move { - let mut pending_commit = false; - let mut commit_timer = tokio::time::interval(std::time::Duration::from_secs(5)); + info!("Search indexer starting"); - loop { - tokio::select! { - result = event_rx.recv() => { - match result { - Ok(event) => { - if let Err(e) = self.handle_event(&event) { - error!("Indexer error: {}", e); + tokio::spawn( + async move { + let mut pending_commit = false; + let mut commit_timer = tokio::time::interval(std::time::Duration::from_secs(5)); + + loop { + tokio::select! { + result = event_rx.recv() => { + match result { + Ok(event) => { + if let Err(e) = self.handle_event(&event) { + error!("Indexer error: {}", e); + } + pending_commit = true; + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + warn!(skipped = n, "Indexer lagged, skipped events"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + debug!("Event channel closed"); + break; } - pending_commit = true; - } - Err(e) => { - warn!("Event receive error: {}", e); } } - } - _ = commit_timer.tick() => { - if pending_commit { - if let Err(e) = self.index.commit() { - error!("Index commit error: {}", e); + _ = commit_timer.tick() => { + if pending_commit { + if let Err(e) = self.index.commit() { + error!("Index commit error: {}", e); + } + pending_commit = false; } - pending_commit = false; } - } - _ = stop_rx.recv() => { - info!("Indexer stopping"); - if pending_commit { - let _ = self.index.commit(); + _ = stop_rx.recv() => { + info!("Indexer stopping"); + if pending_commit { + let _ = self.index.commit(); + } + break; } - break; } } } - }); + .instrument(info_span!("search_indexer")), + ); IndexerHandle { stop_tx } } diff --git a/musicfs/crates/musicfs-sync/src/delta.rs b/musicfs/crates/musicfs-sync/src/delta.rs index 12e855f..92b9c43 100644 --- a/musicfs/crates/musicfs-sync/src/delta.rs +++ b/musicfs/crates/musicfs-sync/src/delta.rs @@ -4,7 +4,7 @@ use musicfs_origins::Origin; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::time::SystemTime; -use tracing::{debug, info}; +use tracing::{debug, info, trace}; #[derive(Debug, Clone)] pub struct ScannedFile { @@ -66,9 +66,13 @@ impl DeltaDetector { cached: &HashMap, manifests: &HashMap>, ) -> Result { + let origin_id = origin.id().clone(); + info!(origin_id = %origin_id, "Starting delta detection"); + let mut changes = ChangeSet::default(); let origin_files = self.scan_origin(origin).await?; + trace!(origin_id = %origin_id, scanned_count = origin_files.len(), "Completed origin scan"); let cached_by_path: HashMap<_, _> = cached .values() @@ -78,7 +82,7 @@ impl DeltaDetector { for scanned in &origin_files { if let Some(cached_file) = cached_by_path.get(&scanned.path) { if self.is_modified_scan(cached_file, scanned) { - debug!("File modified: {:?}", scanned.path); + debug!(origin_id = %origin_id, path = ?scanned.path, "File modified"); if let Some(old_chunks) = manifests.get(&cached_file.id) { let new_chunks = self.compute_chunks_for_scan(origin, scanned).await?; @@ -87,7 +91,7 @@ impl DeltaDetector { } } } else { - debug!("File added: {:?}", scanned.path); + debug!(origin_id = %origin_id, path = ?scanned.path, "File added"); changes.added.push(scanned.clone()); } } @@ -96,16 +100,17 @@ impl DeltaDetector { for cached_file in cached.values() { if !origin_paths.contains(&cached_file.real_path.path) { - debug!("File removed: {:?}", cached_file.real_path.path); + debug!(origin_id = %origin_id, path = ?cached_file.real_path.path, "File removed"); changes.removed.push(cached_file.id); } } info!( - "Delta detection complete: {} added, {} removed, {} modified", - changes.added.len(), - changes.removed.len(), - changes.modified.len() + origin_id = %origin_id, + files_added = changes.added.len(), + files_removed = changes.removed.len(), + files_modified = changes.modified.len(), + "Delta detection complete" ); Ok(changes) diff --git a/musicfs/crates/musicfs-sync/src/watcher.rs b/musicfs/crates/musicfs-sync/src/watcher.rs index 1c212d8..9c9252b 100644 --- a/musicfs/crates/musicfs-sync/src/watcher.rs +++ b/musicfs/crates/musicfs-sync/src/watcher.rs @@ -5,7 +5,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc; -use tracing::{debug, error, info}; +use tracing::{error, info, info_span, trace, Instrument}; const DEBOUNCE_MS: u64 = 200; @@ -31,11 +31,15 @@ impl OriginWatcher { let root = self.root.clone(); let event_bus = self.event_bus.clone(); - tokio::spawn(async move { - if let Err(e) = Self::watch_loop(&origin_id, &root, &event_bus, &mut stop_rx).await { - error!("Watcher error: {}", e); + let origin_id_str = origin_id.to_string(); + tokio::spawn( + async move { + if let Err(e) = Self::watch_loop(&origin_id, &root, &event_bus, &mut stop_rx).await { + error!("Watcher error: {}", e); + } } - }); + .instrument(info_span!("file_watcher", origin_id = %origin_id_str)), + ); WatchHandle { stop_tx } } @@ -62,7 +66,7 @@ impl OriginWatcher { .watch(root, RecursiveMode::Recursive) .map_err(|e| WatchError::Watch(e.to_string()))?; - info!("Watching origin {} at {:?}", origin_id, root); + info!(origin_id = %origin_id, path = ?root, "Watcher started"); let mut debouncer: HashMap = HashMap::new(); @@ -72,7 +76,7 @@ impl OriginWatcher { Self::handle_notify_event(origin_id, root, event_bus, event, &mut debouncer); } _ = stop_rx.recv() => { - info!("Stopping watcher for {}", origin_id); + info!(origin_id = %origin_id, "Watcher stopped"); break; } } @@ -104,7 +108,7 @@ impl OriginWatcher { if let Some(last_seen) = debouncer.get(&relative) { if now.duration_since(*last_seen).as_millis() < DEBOUNCE_MS as u128 { - debug!("Debouncing event for {:?}", relative); + trace!(origin_id = %origin_id, path = ?relative, "Debouncing event"); continue; } } @@ -114,18 +118,18 @@ impl OriginWatcher { match event.kind { EventKind::Create(_) => { - debug!("File created: {:?}", relative); + trace!(origin_id = %origin_id, path = ?relative, "File created"); event_bus.publish(Event::FileAdded { path: vpath, origin_id: origin_id.clone(), }); } EventKind::Remove(_) => { - debug!("File removed: {:?}", relative); + trace!(origin_id = %origin_id, path = ?relative, "File removed"); event_bus.publish(Event::FileRemoved { path: vpath, file_id: None }); } EventKind::Modify(_) => { - debug!("File modified: {:?}", relative); + trace!(origin_id = %origin_id, path = ?relative, "File modified"); event_bus.publish(Event::FileModified { path: vpath }); } _ => {} @@ -156,6 +160,7 @@ impl WatchHandle { impl Drop for WatchHandle { fn drop(&mut self) { + trace!("WatchHandle dropped"); let _ = self.stop_tx.try_send(()); } } diff --git a/musicfs/dist/config.example.toml b/musicfs/dist/config.example.toml new file mode 100644 index 0000000..2171978 --- /dev/null +++ b/musicfs/dist/config.example.toml @@ -0,0 +1,30 @@ +mount_point = "/mnt/music" +cache_dir = "/var/cache/musicfs" + +[logging] +log_dir = "/var/log/musicfs" +json_output = true +journald = true +level = "musicfs=info,warn" +trace_sample_rate = 1.0 + +[cache] +metadata_cache_mb = 100 +content_cache_gb = 10 + +[health] +check_interval_secs = 30 +timeout_ms = 5000 +unhealthy_threshold = 3 + +[[origins]] +id = "local" +origin_type = "local" +priority = 1 +path = "/srv/music" + +[[origins]] +id = "nas" +origin_type = "nfs" +priority = 2 +mount_point = "/mnt/nas/music" diff --git a/musicfs/dist/logrotate.d/musicfs b/musicfs/dist/logrotate.d/musicfs new file mode 100644 index 0000000..ec37e65 --- /dev/null +++ b/musicfs/dist/logrotate.d/musicfs @@ -0,0 +1,9 @@ +/var/log/musicfs/*.log { + daily + rotate 30 + compress + delaycompress + missingok + notifempty + create 0640 musicfs musicfs +} diff --git a/musicfs/dist/musicfs.service b/musicfs/dist/musicfs.service index 7e54350..a4b957d 100644 --- a/musicfs/dist/musicfs.service +++ b/musicfs/dist/musicfs.service @@ -11,10 +11,17 @@ RestartSec=5 User=musicfs Group=musicfs +Environment="RUST_LOG=musicfs=info,warn" +StandardOutput=journal +StandardError=journal +SyslogIdentifier=musicfs +RateLimitIntervalSec=30s +RateLimitBurst=1000 + NoNewPrivileges=true ProtectSystem=strict ProtectHome=read-only -ReadWritePaths=/var/cache/musicfs /mnt/music +ReadWritePaths=/var/cache/musicfs /var/log/musicfs /mnt/music PrivateTmp=true [Install]