diff --git a/musicfs/crates/musicfs-search/Cargo.toml b/musicfs/crates/musicfs-search/Cargo.toml index d8b8139..a676556 100644 --- a/musicfs/crates/musicfs-search/Cargo.toml +++ b/musicfs/crates/musicfs-search/Cargo.toml @@ -4,3 +4,15 @@ version.workspace = true edition.workspace = true [dependencies] +musicfs-core = { path = "../musicfs-core" } + +tantivy.workspace = true +moka.workspace = true +parking_lot.workspace = true +tokio = { workspace = true, features = ["sync", "time"] } +tracing.workspace = true +thiserror.workspace = true + +[dev-dependencies] +tempfile.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/musicfs/crates/musicfs-search/src/index.rs b/musicfs/crates/musicfs-search/src/index.rs new file mode 100644 index 0000000..86c39cc --- /dev/null +++ b/musicfs/crates/musicfs-search/src/index.rs @@ -0,0 +1,372 @@ +use musicfs_core::{AudioMeta, FileId, FileMeta, VirtualPath}; +use parking_lot::RwLock; +use std::path::Path; +use std::sync::Arc; +use tantivy::collector::TopDocs; +use tantivy::query::{BooleanQuery, FuzzyTermQuery, Occur, Query, QueryParser}; +use tantivy::schema::{Field, Schema, Value, STORED, TEXT, INDEXED}; +use tantivy::{Index, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument, Term}; +use tracing::{debug, info}; + +const SCHEMA_VERSION: u32 = 1; + +pub struct SearchIndex { + index: Index, + reader: IndexReader, + writer: Arc>, + schema: SearchSchema, + pub schema_version: u32, +} + +struct SearchSchema { + schema: Schema, + file_id: Field, + virtual_path: Field, + artist: Field, + album: Field, + album_artist: Field, + title: Field, + genre: Field, + composer: Field, + year: Field, + duration_ms: Field, + bitrate: Field, + sample_rate: Field, +} + +impl SearchSchema { + fn new() -> Self { + let mut builder = Schema::builder(); + + Self { + file_id: builder.add_u64_field("file_id", INDEXED | STORED), + virtual_path: builder.add_text_field("virtual_path", STORED), + artist: builder.add_text_field("artist", TEXT | STORED), + album: builder.add_text_field("album", TEXT | STORED), + album_artist: builder.add_text_field("album_artist", TEXT | STORED), + title: builder.add_text_field("title", TEXT | STORED), + genre: builder.add_text_field("genre", TEXT | STORED), + composer: builder.add_text_field("composer", TEXT | STORED), + year: builder.add_u64_field("year", INDEXED | STORED), + duration_ms: builder.add_u64_field("duration_ms", STORED), + bitrate: builder.add_u64_field("bitrate", STORED), + sample_rate: builder.add_u64_field("sample_rate", STORED), + schema: builder.build(), + } + } +} + +#[derive(Debug, Clone)] +pub struct SearchHit { + pub file_id: FileId, + pub virtual_path: VirtualPath, + pub artist: Option, + pub album: Option, + pub title: Option, + pub score: f32, +} + +impl SearchIndex { + pub fn open(index_path: &Path) -> Result { + let schema_obj = SearchSchema::new(); + + let index = if index_path.exists() && index_path.join("meta.json").exists() { + Index::open_in_dir(index_path)? + } else { + std::fs::create_dir_all(index_path)?; + Index::create_in_dir(index_path, schema_obj.schema.clone())? + }; + + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommitWithDelay) + .try_into()?; + + let writer = index.writer(50_000_000)?; + + info!("Search index opened at {:?}", index_path); + + Ok(Self { + index, + reader, + writer: Arc::new(RwLock::new(writer)), + schema: schema_obj, + schema_version: SCHEMA_VERSION, + }) + } + + pub fn index_file(&self, file: &FileMeta) -> Result<(), SearchError> { + let mut doc = TantivyDocument::new(); + + doc.add_u64(self.schema.file_id, file.id.0 as u64); + doc.add_text(self.schema.virtual_path, file.virtual_path.as_str()); + + if let Some(ref audio) = file.audio { + Self::add_audio_fields(&mut doc, &self.schema, audio); + } + + self.writer.read().add_document(doc)?; + debug!("Indexed file {:?}", file.id); + Ok(()) + } + + fn add_audio_fields(doc: &mut TantivyDocument, schema: &SearchSchema, audio: &AudioMeta) { + if let Some(ref v) = audio.artist { + doc.add_text(schema.artist, v); + } + if let Some(ref v) = audio.album { + doc.add_text(schema.album, v); + } + if let Some(ref v) = audio.album_artist { + doc.add_text(schema.album_artist, v); + } + if let Some(ref v) = audio.title { + doc.add_text(schema.title, v); + } + if let Some(ref v) = audio.genre { + doc.add_text(schema.genre, v); + } + if let Some(ref v) = audio.year { + doc.add_u64(schema.year, *v as u64); + } + if let Some(v) = audio.duration_ms { + doc.add_u64(schema.duration_ms, v); + } + if let Some(v) = audio.bitrate { + doc.add_u64(schema.bitrate, v as u64); + } + if let Some(v) = audio.sample_rate { + doc.add_u64(schema.sample_rate, v as u64); + } + } + + pub fn remove_file(&self, file_id: FileId) -> Result<(), SearchError> { + let term = tantivy::Term::from_field_u64(self.schema.file_id, file_id.0 as u64); + self.writer.read().delete_term(term); + debug!("Removed file {:?} from index", file_id); + Ok(()) + } + + pub fn remove_by_path(&self, path: &VirtualPath) -> Result { + let searcher = self.reader.searcher(); + let query_parser = QueryParser::for_index(&self.index, vec![self.schema.virtual_path]); + let query = query_parser.parse_query(&format!("\"{}\"", path.as_str()))?; + let top_docs = searcher.search(&query, &TopDocs::with_limit(1))?; + + if let Some((_, doc_address)) = top_docs.first() { + let doc: TantivyDocument = searcher.doc(*doc_address)?; + if let Some(file_id) = doc.get_first(self.schema.file_id).and_then(|v| v.as_u64()) { + self.remove_file(FileId(file_id as i64))?; + debug!("Removed file by path {:?}", path); + return Ok(true); + } + } + Ok(false) + } + + pub fn commit(&self) -> Result<(), SearchError> { + self.writer.write().commit()?; + self.reader.reload()?; + info!("Search index committed"); + Ok(()) + } + + pub fn search(&self, query_str: &str, limit: usize) -> Result, SearchError> { + let searcher = self.reader.searcher(); + + let default_fields = vec![ + self.schema.artist, + self.schema.album, + self.schema.album_artist, + self.schema.title, + self.schema.genre, + self.schema.composer, + ]; + + let query: Box = if let Some((term, distance)) = Self::parse_fuzzy_query(query_str) { + let subqueries: Vec<(Occur, Box)> = default_fields + .iter() + .map(|&field| { + let term = Term::from_field_text(field, &term); + let fuzzy = FuzzyTermQuery::new(term, distance, true); + (Occur::Should, Box::new(fuzzy) as Box) + }) + .collect(); + Box::new(BooleanQuery::new(subqueries)) + } else { + let query_parser = QueryParser::for_index(&self.index, default_fields); + query_parser.parse_query(query_str)? + }; + + let top_docs = searcher.search(&*query, &TopDocs::with_limit(limit))?; + + let mut results = Vec::with_capacity(top_docs.len()); + for (score, doc_address) in top_docs { + let doc: TantivyDocument = searcher.doc(doc_address)?; + + let file_id = doc + .get_first(self.schema.file_id) + .and_then(|v| v.as_u64()) + .map(|id| FileId(id as i64)) + .ok_or(SearchError::CorruptedIndex)?; + + let virtual_path = doc + .get_first(self.schema.virtual_path) + .and_then(|v| v.as_str()) + .map(|s| VirtualPath::new(s)) + .ok_or(SearchError::CorruptedIndex)?; + + results.push(SearchHit { + file_id, + virtual_path, + artist: doc.get_first(self.schema.artist).and_then(|v| v.as_str()).map(String::from), + album: doc.get_first(self.schema.album).and_then(|v| v.as_str()).map(String::from), + title: doc.get_first(self.schema.title).and_then(|v| v.as_str()).map(String::from), + score, + }); + } + + debug!("Search '{}' returned {} results", query_str, results.len()); + Ok(results) + } + + pub fn count(&self) -> u64 { + self.reader.searcher().num_docs() + } + + fn parse_fuzzy_query(query_str: &str) -> Option<(String, u8)> { + let query_str = query_str.trim(); + if let Some(tilde_pos) = query_str.rfind('~') { + let term = &query_str[..tilde_pos]; + let distance_str = &query_str[tilde_pos + 1..]; + if !term.is_empty() && !term.contains(':') && !term.contains(' ') { + if let Ok(distance) = distance_str.parse::() { + if distance <= 2 { + return Some((term.to_lowercase(), distance)); + } + } + } + } + None + } +} + +#[derive(Debug, thiserror::Error)] +pub enum SearchError { + #[error("tantivy error: {0}")] + Tantivy(#[from] tantivy::TantivyError), + + #[error("query parse error: {0}")] + QueryParse(#[from] tantivy::query::QueryParserError), + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("corrupted search index")] + CorruptedIndex, +} + +#[cfg(test)] +mod tests { + use super::*; + use musicfs_core::{AudioFormat, OriginId, RealPath}; + use std::path::PathBuf; + use tempfile::TempDir; + + fn make_file(id: i64, artist: &str, album: &str, title: &str) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(format!("/{}/{}/{}.flac", artist, album, title)), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from("test.flac"), + }, + size: 1000, + mtime: std::time::SystemTime::UNIX_EPOCH, + content_hash: None, + audio: Some(AudioMeta { + artist: Some(artist.to_string()), + album: Some(album.to_string()), + title: Some(title.to_string()), + genre: Some("Metal".to_string()), + format: AudioFormat::Flac, + ..Default::default() + }), + } + } + + #[test] + fn test_search_basic() { + let dir = TempDir::new().unwrap(); + let index = SearchIndex::open(dir.path()).unwrap(); + + index.index_file(&make_file(1, "Metallica", "Black Album", "Enter Sandman")).unwrap(); + index.index_file(&make_file(2, "Metallica", "Master of Puppets", "Battery")).unwrap(); + index.index_file(&make_file(3, "Iron Maiden", "Powerslave", "Aces High")).unwrap(); + index.commit().unwrap(); + + let results = index.search("metallica", 10).unwrap(); + assert_eq!(results.len(), 2); + + let results = index.search("sandman", 10).unwrap(); + assert_eq!(results.len(), 1); + assert_eq!(results[0].title.as_deref(), Some("Enter Sandman")); + } + + #[test] + fn test_search_fuzzy() { + let dir = TempDir::new().unwrap(); + let index = SearchIndex::open(dir.path()).unwrap(); + + index.index_file(&make_file(1, "Metallica", "Black Album", "Enter Sandman")).unwrap(); + index.commit().unwrap(); + + let results = index.search("metalica~1", 10).unwrap(); + assert_eq!(results.len(), 1); + } + + #[test] + fn test_search_genre() { + let dir = TempDir::new().unwrap(); + let index = SearchIndex::open(dir.path()).unwrap(); + + index.index_file(&make_file(1, "Metallica", "Black Album", "Enter Sandman")).unwrap(); + index.commit().unwrap(); + + let results = index.search("genre:Metal", 10).unwrap(); + assert_eq!(results.len(), 1); + } + + #[test] + fn test_remove_file() { + let dir = TempDir::new().unwrap(); + let index = SearchIndex::open(dir.path()).unwrap(); + + index.index_file(&make_file(1, "Test", "Album", "Song")).unwrap(); + index.commit().unwrap(); + + assert_eq!(index.search("test", 10).unwrap().len(), 1); + + index.remove_file(FileId(1)).unwrap(); + index.commit().unwrap(); + + assert_eq!(index.search("test", 10).unwrap().len(), 0); + } + + #[test] + fn test_index_persistence() { + let dir = TempDir::new().unwrap(); + + { + let index = SearchIndex::open(dir.path()).unwrap(); + index.index_file(&make_file(1, "Artist", "Album", "Track")).unwrap(); + index.commit().unwrap(); + } + + { + let index = SearchIndex::open(dir.path()).unwrap(); + let results = index.search("artist", 10).unwrap(); + assert_eq!(results.len(), 1); + } + } +} diff --git a/musicfs/crates/musicfs-search/src/indexer.rs b/musicfs/crates/musicfs-search/src/indexer.rs new file mode 100644 index 0000000..ef8bb80 --- /dev/null +++ b/musicfs/crates/musicfs-search/src/indexer.rs @@ -0,0 +1,231 @@ +use crate::index::{SearchError, SearchIndex}; +use musicfs_core::{Event, EventBus, FileMeta}; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +pub trait MetadataLookup: Send + Sync { + fn lookup(&self, path: &musicfs_core::VirtualPath) -> Option; +} + +pub struct Indexer { + index: Arc, + event_bus: Arc, + metadata_lookup: Arc, +} + +impl Indexer { + pub fn new( + index: Arc, + event_bus: Arc, + metadata_lookup: Arc, + ) -> Self { + Self { + index, + event_bus, + metadata_lookup, + } + } + + pub fn start(self) -> IndexerHandle { + let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); + let mut event_rx = self.event_bus.subscribe(); + + tokio::spawn(async move { + let mut pending_commit = false; + let mut commit_timer = tokio::time::interval(std::time::Duration::from_secs(5)); + + loop { + tokio::select! { + result = event_rx.recv() => { + match result { + Ok(event) => { + if let Err(e) = self.handle_event(&event) { + error!("Indexer error: {}", e); + } + pending_commit = true; + } + Err(e) => { + warn!("Event receive error: {}", e); + } + } + } + _ = commit_timer.tick() => { + if pending_commit { + if let Err(e) = self.index.commit() { + error!("Index commit error: {}", e); + } + pending_commit = false; + } + } + _ = stop_rx.recv() => { + info!("Indexer stopping"); + if pending_commit { + let _ = self.index.commit(); + } + break; + } + } + } + }); + + IndexerHandle { stop_tx } + } + + fn handle_event(&self, event: &Event) -> Result<(), SearchError> { + match event { + Event::FileAdded { path, .. } => { + debug!("Indexing added file: {:?}", path); + if let Some(meta) = self.metadata_lookup.lookup(path) { + self.index.index_file(&meta)?; + } else { + warn!("No metadata found for added file: {:?}", path); + } + } + Event::FileRemoved { path, file_id } => { + debug!("Removing from index: {:?}", path); + if let Some(id) = file_id { + self.index.remove_file(*id)?; + } else if let Some(meta) = self.metadata_lookup.lookup(path) { + self.index.remove_file(meta.id)?; + } else { + self.index.remove_by_path(path)?; + } + } + Event::FileModified { path } => { + debug!("Re-indexing modified file: {:?}", path); + if let Some(meta) = self.metadata_lookup.lookup(path) { + self.index.remove_file(meta.id)?; + self.index.index_file(&meta)?; + } + } + _ => {} + } + Ok(()) + } + + pub fn index_batch(&self, files: &[FileMeta]) -> Result { + let mut count = 0; + for file in files { + self.index.index_file(file)?; + count += 1; + } + self.index.commit()?; + info!("Indexed {} files", count); + Ok(count) + } +} + +pub struct IndexerHandle { + stop_tx: mpsc::Sender<()>, +} + +impl IndexerHandle { + pub async fn stop(self) { + let _ = self.stop_tx.send(()).await; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use musicfs_core::{AudioFormat, AudioMeta, FileId, OriginId, RealPath, VirtualPath}; + use std::collections::HashMap; + use std::path::PathBuf; + use std::sync::RwLock; + use tempfile::TempDir; + + struct MockMetadataLookup { + files: RwLock>, + } + + impl MockMetadataLookup { + fn new() -> Self { + Self { + files: RwLock::new(HashMap::new()), + } + } + + fn insert(&self, meta: FileMeta) { + self.files + .write() + .unwrap() + .insert(meta.virtual_path.as_str().to_string(), meta); + } + } + + impl MetadataLookup for MockMetadataLookup { + fn lookup(&self, path: &VirtualPath) -> Option { + self.files.read().unwrap().get(path.as_str()).cloned() + } + } + + fn make_file(id: i64, path: &str, artist: &str, title: &str) -> FileMeta { + FileMeta { + id: FileId(id), + virtual_path: VirtualPath::new(path), + real_path: RealPath { + origin_id: OriginId::from("test"), + path: PathBuf::from("test.flac"), + }, + size: 1000, + mtime: std::time::SystemTime::UNIX_EPOCH, + content_hash: None, + audio: Some(AudioMeta { + artist: Some(artist.to_string()), + title: Some(title.to_string()), + format: AudioFormat::Flac, + ..Default::default() + }), + } + } + + #[tokio::test] + async fn test_indexer_handles_file_added() { + let dir = TempDir::new().unwrap(); + let index = Arc::new(SearchIndex::open(dir.path()).unwrap()); + let event_bus = Arc::new(EventBus::default()); + let metadata = Arc::new(MockMetadataLookup::new()); + + let file = make_file(1, "/Artist/Album/Track.flac", "Artist", "Track"); + metadata.insert(file.clone()); + + let indexer = Indexer::new(index.clone(), event_bus.clone(), metadata); + let handle = indexer.start(); + + event_bus.publish(Event::FileAdded { + path: VirtualPath::new("/Artist/Album/Track.flac"), + origin_id: OriginId::from("test"), + }); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + index.commit().unwrap(); + + let results = index.search("artist", 10).unwrap(); + assert_eq!(results.len(), 1); + + handle.stop().await; + } + + #[test] + fn test_index_batch() { + let dir = TempDir::new().unwrap(); + let index = Arc::new(SearchIndex::open(dir.path()).unwrap()); + let event_bus = Arc::new(EventBus::default()); + let metadata = Arc::new(MockMetadataLookup::new()); + + let indexer = Indexer::new(index.clone(), event_bus, metadata); + + let files = vec![ + make_file(1, "/a.flac", "Artist1", "Song1"), + make_file(2, "/b.flac", "Artist2", "Song2"), + make_file(3, "/c.flac", "Artist3", "Song3"), + ]; + + let count = indexer.index_batch(&files).unwrap(); + assert_eq!(count, 3); + + let results = index.search("artist1", 10).unwrap(); + assert_eq!(results.len(), 1); + } +} diff --git a/musicfs/crates/musicfs-search/src/lib.rs b/musicfs/crates/musicfs-search/src/lib.rs index f9da2c4..72ed751 100644 --- a/musicfs/crates/musicfs-search/src/lib.rs +++ b/musicfs/crates/musicfs-search/src/lib.rs @@ -1 +1,7 @@ -#![allow(dead_code)] +mod index; +mod indexer; +mod query; + +pub use index::{SearchError, SearchHit, SearchIndex}; +pub use indexer::{Indexer, IndexerHandle, MetadataLookup}; +pub use query::SearchQueryBuilder; diff --git a/musicfs/crates/musicfs-search/src/query.rs b/musicfs/crates/musicfs-search/src/query.rs new file mode 100644 index 0000000..d2ed3a4 --- /dev/null +++ b/musicfs/crates/musicfs-search/src/query.rs @@ -0,0 +1,78 @@ +use tantivy::query::{BooleanQuery, FuzzyTermQuery, Occur, Query}; +use tantivy::schema::Field; +use tantivy::Term; + +pub struct SearchQueryBuilder { + fields: Vec, + default_fuzziness: u8, +} + +impl SearchQueryBuilder { + pub fn new(fields: Vec) -> Self { + Self { + fields, + default_fuzziness: 1, + } + } + + pub fn with_fuzziness(mut self, fuzziness: u8) -> Self { + self.default_fuzziness = fuzziness; + self + } + + pub fn build_fuzzy(&self, query_text: &str) -> Box { + let terms: Vec<_> = query_text + .split_whitespace() + .filter(|t| !t.is_empty()) + .collect(); + + if terms.is_empty() { + return Box::new(tantivy::query::AllQuery); + } + + let mut clauses: Vec<(Occur, Box)> = Vec::new(); + + for term in terms { + let mut field_queries: Vec<(Occur, Box)> = Vec::new(); + + for field in &self.fields { + let fuzzy = FuzzyTermQuery::new( + Term::from_field_text(*field, &term.to_lowercase()), + self.default_fuzziness, + true, + ); + field_queries.push((Occur::Should, Box::new(fuzzy))); + } + + let field_union = BooleanQuery::new(field_queries); + clauses.push((Occur::Must, Box::new(field_union))); + } + + Box::new(BooleanQuery::new(clauses)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tantivy::schema::{Schema, TEXT}; + + #[test] + fn test_query_builder() { + let mut schema_builder = Schema::builder(); + let artist = schema_builder.add_text_field("artist", TEXT); + let title = schema_builder.add_text_field("title", TEXT); + + let builder = SearchQueryBuilder::new(vec![artist, title]); + let _query = builder.build_fuzzy("metallica sandman"); + } + + #[test] + fn test_empty_query() { + let mut schema_builder = Schema::builder(); + let artist = schema_builder.add_text_field("artist", TEXT); + + let builder = SearchQueryBuilder::new(vec![artist]); + let _query = builder.build_fuzzy(""); + } +}