Files
MusicFS/docs/v2/plans/week-08-search-index.md
Alexander 3cb6dfcaf8 Add Week 8 Search API docs and Week 8-9 plans with Oracle fixes
- docs/api/search.md: FUSE and gRPC search API documentation
- Week 8 plan: Oracle fixes for IndexWriter pattern, moka cache, gRPC API
- Week 9 plan: Oracle fixes for artwork schema, spawn_blocking, access_log
- Week 7 performance review

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-05-12 23:23:49 +02:00

38 KiB

Week 8: Search Index

Phase: 3 (Search & Smart Features)
Prerequisites: Week 7 (Remote Origins)
Estimated effort: 5 days


Objective

Implement full-text search using tantivy with a virtual /.search/ directory interface. Users can browse search results as symlinks to matching files, enabling integration with any file manager or media player.


Architecture Reference

From architecture.md section 4.2:

Search Engine | Full-text metadata search | tantivy

From architecture.md section 3.2.1:

Search query (1M files) | <500ms | 1000ms | FR-14

From architecture.md section 8.3:

tantivy | 0.21+ | Full-text search


Requirements Covered

ID Requirement Priority
FR-14.1 Index metadata for full-text search P1
FR-14.2 Expose search via virtual directory (/.search/query/) P1
FR-14.3 Support fuzzy matching P1
FR-14.4 Support search by audio fingerprint P1 (DEFER)
G7 Sub-second search across 1M+ tracks Goal

Note: FR-14.4 (audio fingerprint) requires chromaprint dependency - deferred to Phase 5.


Deliverables

Task Crate Files Est.
tantivy schema & index musicfs-search index.rs 1d
Query parser (fuzzy) musicfs-search query.rs 0.5d
Incremental indexer musicfs-search indexer.rs 1d
Search virtual directory musicfs-fuse ops/search.rs 1d
FUSE integration musicfs-fuse filesystem.rs 0.5d
gRPC Search API musicfs-grpc search_service.rs 0.5d
API Documentation docs api/search.md 0.5d
Integration tests tests search_test.rs 0.5d
Benchmark (1M tracks) benches search_bench.rs 0.5d

Task 1: tantivy Schema & Index

1.1 Add dependencies to musicfs-search/Cargo.toml

[package]
name = "musicfs-search"
version.workspace = true
edition.workspace = true

[dependencies]
musicfs-core = { path = "../musicfs-core" }

tantivy = "0.22"
tokio = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
moka = { version = "0.12", features = ["sync"] }  # TTL-based LRU for result cache

[dev-dependencies]
tempfile = { workspace = true }

1.2 Create musicfs-search/src/index.rs

use musicfs_core::{FileId, FileMeta, VirtualPath};
use std::path::Path;
use std::sync::Arc;
use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
use tantivy::schema::{Field, Schema, STORED, TEXT, INDEXED};
use tantivy::{Document, Index, IndexReader, IndexWriter, ReloadPolicy};
use tokio::sync::mpsc;
use tracing::{debug, info, error};

/// Commands sent to the single-writer task
pub enum IndexCommand {
    Add(FileMeta),
    Remove(FileId),
    Commit,
    Shutdown,
}

pub struct SearchIndex {
    index: Index,
    reader: IndexReader,
    /// Single-writer channel - IndexWriter is NOT thread-safe
    cmd_tx: mpsc::UnboundedSender<IndexCommand>,
    schema: SearchSchema,
    /// Schema version for migration detection
    pub schema_version: u32,
}

const SCHEMA_VERSION: u32 = 1;

struct SearchSchema {
    schema: Schema,
    file_id: Field,
    virtual_path: Field,
    artist: Field,
    album: Field,
    album_artist: Field,  // FR-6.4 requires album_artist
    title: Field,
    genre: Field,
    composer: Field,
    year: Field,
    duration_ms: Field,   // Additional fields from architecture SQL schema
    bitrate: Field,
    sample_rate: Field,
}

impl SearchSchema {
    fn new() -> Self {
        let mut builder = Schema::builder();
        
        Self {
            file_id: builder.add_u64_field("file_id", STORED),
            virtual_path: builder.add_text_field("virtual_path", STORED),
            artist: builder.add_text_field("artist", TEXT | STORED),
            album: builder.add_text_field("album", TEXT | STORED),
            album_artist: builder.add_text_field("album_artist", TEXT | STORED),
            title: builder.add_text_field("title", TEXT | STORED),
            genre: builder.add_text_field("genre", TEXT | STORED),  // Now searchable
            composer: builder.add_text_field("composer", TEXT | STORED),
            year: builder.add_u64_field("year", INDEXED | STORED),  // Indexed for range queries
            duration_ms: builder.add_u64_field("duration_ms", STORED),
            bitrate: builder.add_u64_field("bitrate", STORED),
            sample_rate: builder.add_u64_field("sample_rate", STORED),
            schema: builder.build(),
        }
    }
}

#[derive(Debug, Clone)]
pub struct SearchHit {
    pub file_id: FileId,
    pub virtual_path: VirtualPath,
    pub artist: Option<String>,
    pub album: Option<String>,
    pub title: Option<String>,
    pub score: f32,
}

impl SearchIndex {
    /// Opens the search index and spawns a single-writer background task.
    /// IndexWriter is NOT thread-safe - all writes go through the channel.
    pub fn open(index_path: &Path) -> Result<Self, SearchError> {
        let schema = SearchSchema::new();
        
        let index = if index_path.exists() {
            Index::open_in_dir(index_path)?
        } else {
            std::fs::create_dir_all(index_path)?;
            Index::create_in_dir(index_path, schema.schema.clone())?
        };

        let reader = index
            .reader_builder()
            .reload_policy(ReloadPolicy::OnCommit)
            .try_into()?;

        // Single-writer pattern: IndexWriter lives in dedicated task
        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
        let writer = index.writer(50_000_000)?; // 50MB heap
        
        // Spawn writer task - owns IndexWriter exclusively
        Self::spawn_writer_task(writer, cmd_rx, schema.file_id);

        info!("Search index opened at {:?}", index_path);

        Ok(Self {
            index,
            reader,
            cmd_tx,
            schema,
            schema_version: SCHEMA_VERSION,
        })
    }

    /// Spawns background task that owns IndexWriter exclusively.
    /// All index mutations go through the channel.
    fn spawn_writer_task(
        mut writer: IndexWriter,
        mut cmd_rx: mpsc::UnboundedReceiver<IndexCommand>,
        file_id_field: Field,
    ) {
        tokio::spawn(async move {
            while let Some(cmd) = cmd_rx.recv().await {
                match cmd {
                    IndexCommand::Add(file) => {
                        if let Err(e) = Self::add_document(&mut writer, &file, file_id_field) {
                            error!("Index add failed: {}", e);
                        }
                    }
                    IndexCommand::Remove(id) => {
                        let term = tantivy::Term::from_field_u64(file_id_field, id.0 as u64);
                        writer.delete_term(term);
                    }
                    IndexCommand::Commit => {
                        if let Err(e) = writer.commit() {
                            error!("Index commit failed: {}", e);
                        } else {
                            info!("Search index committed");
                        }
                    }
                    IndexCommand::Shutdown => {
                        let _ = writer.commit();
                        info!("Index writer shutdown");
                        break;
                    }
                }
            }
        });
    }

    fn add_document(writer: &mut IndexWriter, file: &FileMeta, _file_id_field: Field) -> Result<(), SearchError> {
        // Document creation happens in writer task
        // (schema fields would be passed or stored in writer context)
        let _ = writer.add_document(tantivy::doc!())?;
        debug!("Indexed file {:?}", file.id);
        Ok(())
    }

    /// Queue a file for indexing (non-blocking)
    pub fn index_file(&self, file: &FileMeta) -> Result<(), SearchError> {
        self.cmd_tx.send(IndexCommand::Add(file.clone()))
            .map_err(|_| SearchError::WriterShutdown)?;
        Ok(())
    }

    /// Queue file removal (non-blocking)
    pub fn remove_file(&self, file_id: FileId) -> Result<(), SearchError> {
        self.cmd_tx.send(IndexCommand::Remove(file_id))
            .map_err(|_| SearchError::WriterShutdown)?;
        Ok(())
    }

    /// Request commit (non-blocking)
    pub fn commit(&self) -> Result<(), SearchError> {
        self.cmd_tx.send(IndexCommand::Commit)
            .map_err(|_| SearchError::WriterShutdown)?;
        Ok(())
    }

    /// Shutdown the writer task gracefully
    pub fn shutdown(&self) -> Result<(), SearchError> {
        self.cmd_tx.send(IndexCommand::Shutdown)
            .map_err(|_| SearchError::WriterShutdown)?;
        Ok(())
    }

    pub fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchHit>, SearchError> {
        let searcher = self.reader.searcher();
        
        // Include genre in searchable fields (Oracle fix)
        let query_parser = QueryParser::for_index(
            &self.index,
            vec![
                self.schema.artist,
                self.schema.album,
                self.schema.album_artist,
                self.schema.title,
                self.schema.genre,      // Now searchable
                self.schema.composer,
            ],
        );

        let query = query_parser.parse_query(query)?;
        let top_docs = searcher.search(&query, &TopDocs::with_limit(limit))?;

        let mut results = Vec::with_capacity(top_docs.len());
        for (score, doc_address) in top_docs {
            let doc = searcher.doc(doc_address)?;
            
            let file_id = doc
                .get_first(self.schema.file_id)
                .and_then(|v| v.as_u64())
                .map(|id| FileId(id as i64))
                .ok_or(SearchError::CorruptedIndex)?;

            let virtual_path = doc
                .get_first(self.schema.virtual_path)
                .and_then(|v| v.as_text())
                .map(|s| VirtualPath::new(s))
                .ok_or(SearchError::CorruptedIndex)?;

            results.push(SearchHit {
                file_id,
                virtual_path,
                artist: doc.get_first(self.schema.artist).and_then(|v| v.as_text()).map(String::from),
                album: doc.get_first(self.schema.album).and_then(|v| v.as_text()).map(String::from),
                title: doc.get_first(self.schema.title).and_then(|v| v.as_text()).map(String::from),
                score,
            });
        }

        debug!("Search '{}' returned {} results", query, results.len());
        Ok(results)
    }

    pub fn count(&self) -> u64 {
        self.reader.searcher().num_docs()
    }
}

#[derive(Debug, thiserror::Error)]
pub enum SearchError {
    #[error("tantivy error: {0}")]
    Tantivy(#[from] tantivy::TantivyError),
    
    #[error("query parse error: {0}")]
    QueryParse(#[from] tantivy::query::QueryParserError),
    
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
    
    #[error("corrupted search index")]
    CorruptedIndex,
    
    #[error("index writer shutdown")]
    WriterShutdown,
}

#[cfg(test)]
mod tests {
    use super::*;
    use musicfs_core::{AudioMeta, RealPath, OriginId};
    use std::path::PathBuf;
    use tempfile::TempDir;

    fn make_file(id: i64, artist: &str, album: &str, title: &str) -> FileMeta {
        FileMeta {
            id: FileId(id),
            virtual_path: VirtualPath::new(&format!("/{}/{}/{}.flac", artist, album, title)),
            real_path: RealPath {
                origin_id: OriginId::from("test"),
                path: PathBuf::from("test.flac"),
            },
            size: 1000,
            mtime: std::time::SystemTime::UNIX_EPOCH,
            content_hash: None,
            audio: Some(AudioMeta {
                artist: Some(artist.to_string()),
                album: Some(album.to_string()),
                title: Some(title.to_string()),
                track_number: Some(1),
                duration_ms: Some(180000),
                format: musicfs_core::AudioFormat::Flac,
            }),
        }
    }

    #[test]
    fn test_search_basic() {
        let dir = TempDir::new().unwrap();
        let index = SearchIndex::open(dir.path()).unwrap();

        index.index_file(&make_file(1, "Metallica", "Black Album", "Enter Sandman")).unwrap();
        index.index_file(&make_file(2, "Metallica", "Master of Puppets", "Battery")).unwrap();
        index.index_file(&make_file(3, "Iron Maiden", "Powerslave", "Aces High")).unwrap();
        index.commit().unwrap();

        let results = index.search("metallica", 10).unwrap();
        assert_eq!(results.len(), 2);

        let results = index.search("sandman", 10).unwrap();
        assert_eq!(results.len(), 1);
        assert_eq!(results[0].title.as_deref(), Some("Enter Sandman"));
    }

    #[test]
    fn test_search_fuzzy() {
        let dir = TempDir::new().unwrap();
        let index = SearchIndex::open(dir.path()).unwrap();

        index.index_file(&make_file(1, "Metallica", "Black Album", "Enter Sandman")).unwrap();
        index.commit().unwrap();

        // Fuzzy match with typo
        let results = index.search("metalica~1", 10).unwrap();
        assert_eq!(results.len(), 1);
    }
}

Task 2: Query Parser

2.1 Create musicfs-search/src/query.rs

use tantivy::query::{BooleanQuery, FuzzyTermQuery, Occur, Query, TermQuery};
use tantivy::schema::{Field, IndexRecordOption};
use tantivy::Term;

pub struct SearchQueryBuilder {
    fields: Vec<Field>,
    default_fuzziness: u8,
}

impl SearchQueryBuilder {
    pub fn new(fields: Vec<Field>) -> Self {
        Self {
            fields,
            default_fuzziness: 1,
        }
    }

    pub fn with_fuzziness(mut self, fuzziness: u8) -> Self {
        self.default_fuzziness = fuzziness;
        self
    }

    pub fn build_fuzzy(&self, query_text: &str) -> Box<dyn Query> {
        let terms: Vec<_> = query_text
            .split_whitespace()
            .filter(|t| !t.is_empty())
            .collect();

        if terms.is_empty() {
            return Box::new(tantivy::query::AllQuery);
        }

        let mut clauses: Vec<(Occur, Box<dyn Query>)> = Vec::new();

        for term in terms {
            let mut field_queries: Vec<(Occur, Box<dyn Query>)> = Vec::new();

            for field in &self.fields {
                let fuzzy = FuzzyTermQuery::new(
                    Term::from_field_text(*field, &term.to_lowercase()),
                    self.default_fuzziness,
                    true,
                );
                field_queries.push((Occur::Should, Box::new(fuzzy)));
            }

            let field_union = BooleanQuery::new(field_queries);
            clauses.push((Occur::Must, Box::new(field_union)));
        }

        Box::new(BooleanQuery::new(clauses))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tantivy::schema::{Schema, TEXT};

    #[test]
    fn test_query_builder() {
        let mut schema_builder = Schema::builder();
        let artist = schema_builder.add_text_field("artist", TEXT);
        let title = schema_builder.add_text_field("title", TEXT);

        let builder = SearchQueryBuilder::new(vec![artist, title]);
        let _query = builder.build_fuzzy("metallica sandman");
    }
}

Task 3: Incremental Indexer

3.1 Create musicfs-search/src/indexer.rs

use crate::index::{SearchError, SearchIndex};
use musicfs_cache::MetadataCache;
use musicfs_core::{Event, EventBus, FileId, FileMeta, VirtualPath};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};

pub struct Indexer {
    index: Arc<SearchIndex>,
    event_bus: Arc<EventBus>,
    /// MetadataCache for fetching FileMeta on events (Oracle fix - not placeholder)
    metadata_cache: Arc<MetadataCache>,
}

impl Indexer {
    pub fn new(
        index: Arc<SearchIndex>,
        event_bus: Arc<EventBus>,
        metadata_cache: Arc<MetadataCache>,
    ) -> Self {
        Self { index, event_bus, metadata_cache }
    }

    pub fn start(self) -> IndexerHandle {
        let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
        let mut event_rx = self.event_bus.subscribe();

        tokio::spawn(async move {
            let mut pending_commit = false;
            let mut commit_timer = tokio::time::interval(std::time::Duration::from_secs(5));

            loop {
                tokio::select! {
                    Ok(event) = event_rx.recv() => {
                        if let Err(e) = self.handle_event(&event).await {
                            error!("Indexer error: {}", e);
                        }
                        pending_commit = true;
                    }
                    _ = commit_timer.tick() => {
                        if pending_commit {
                            if let Err(e) = self.index.commit() {
                                error!("Index commit error: {}", e);
                            }
                            pending_commit = false;
                        }
                    }
                    _ = stop_rx.recv() => {
                        info!("Indexer stopping");
                        if pending_commit {
                            let _ = self.index.commit();
                        }
                        break;
                    }
                }
            }
        });

        IndexerHandle { stop_tx }
    }

    async fn handle_event(&self, event: &Event) -> Result<(), SearchError> {
        match event {
            Event::FileAdded { path, file_id } => {
                debug!("Indexing added file: {:?}", path);
                // Fetch FileMeta from MetadataCache (Oracle fix - real integration)
                if let Some(meta) = self.metadata_cache.get_by_path(path).await {
                    self.index.index_file(&meta)?;
                } else {
                    warn!("No metadata found for added file: {:?}", path);
                }
            }
            Event::FileRemoved { path, file_id } => {
                debug!("Removing from index: {:?}", path);
                // Lookup FileId and remove from index
                if let Some(id) = file_id {
                    self.index.remove_file(*id)?;
                } else if let Some(meta) = self.metadata_cache.get_by_path(path).await {
                    self.index.remove_file(meta.id)?;
                }
            }
            Event::FileModified { path, file_id } => {
                debug!("Re-indexing modified file: {:?}", path);
                // Re-index with updated metadata
                if let Some(meta) = self.metadata_cache.get_by_path(path).await {
                    self.index.remove_file(meta.id)?;
                    self.index.index_file(&meta)?;
                }
            }
            _ => {}
        }
        Ok(())
    }

    pub fn index_batch(&self, files: &[FileMeta]) -> Result<usize, SearchError> {
        let mut count = 0;
        for file in files {
            self.index.index_file(file)?;
            count += 1;
        }
        self.index.commit()?;
        info!("Indexed {} files", count);
        Ok(count)
    }
}

pub struct IndexerHandle {
    stop_tx: mpsc::Sender<()>,
}

impl IndexerHandle {
    pub async fn stop(self) {
        let _ = self.stop_tx.send(()).await;
    }
}

Task 4: Search Virtual Directory

4.1 Create musicfs-fuse/src/ops/search.rs

use fuser::{FileType, ReplyDirectory, ReplyEntry, ReplyData};
use moka::sync::Cache;
use musicfs_search::{SearchHit, SearchIndex};
use std::collections::HashMap;
use std::ffi::OsStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tracing::debug;

const SEARCH_DIR_INODE: u64 = 0xFFFF_FFFF_0000_0001;
const SEARCH_RESULT_BASE: u64 = 0xFFFF_FFFF_1000_0000;

/// Result cache config - prevents unbounded memory growth (Oracle fix)
const RESULT_CACHE_MAX_ENTRIES: u64 = 1000;
const RESULT_CACHE_TTL_SECS: u64 = 300;  // 5 minutes

pub struct SearchOps {
    index: Arc<SearchIndex>,
    /// TTL-based LRU cache for search results (moka) - prevents OOM
    result_cache: Cache<String, Vec<SearchHit>>,
    inode_to_result: parking_lot::RwLock<HashMap<u64, (String, usize)>>,
    /// Mount point for absolute symlink targets
    mount_point: String,
}

impl SearchOps {
    pub fn new(index: Arc<SearchIndex>, mount_point: &str) -> Self {
        // moka cache with TTL and max entries (Oracle fix for unbounded growth)
        let result_cache = Cache::builder()
            .max_capacity(RESULT_CACHE_MAX_ENTRIES)
            .time_to_live(Duration::from_secs(RESULT_CACHE_TTL_SECS))
            .build();
        
        Self {
            index,
            result_cache,
            inode_to_result: parking_lot::RwLock::new(HashMap::new()),
            mount_point: mount_point.to_string(),
        }
    }

    pub fn is_search_path(path: &str) -> bool {
        path.starts_with("/.search/")
    }

    pub fn is_search_inode(inode: u64) -> bool {
        inode == SEARCH_DIR_INODE || inode >= SEARCH_RESULT_BASE
    }

    pub fn lookup_search_dir(&self, reply: ReplyEntry) {
        let attr = Self::dir_attr(SEARCH_DIR_INODE);
        reply.entry(&Duration::from_secs(60), &attr, 0);
    }

    pub fn lookup_query_dir(&self, query: &str, reply: ReplyEntry) {
        let results = self.execute_query(query);
        if results.is_empty() {
            reply.error(libc::ENOENT);
            return;
        }

        let attr = Self::dir_attr(SEARCH_DIR_INODE + 1);
        reply.entry(&Duration::from_secs(1), &attr, 0);
    }

    pub fn readdir_search_root(&self, reply: &mut ReplyDirectory) {
        reply.add(SEARCH_DIR_INODE, 1, FileType::Directory, ".");
        reply.add(1, 2, FileType::Directory, "..");
    }

    pub fn readdir_query(&self, query: &str, offset: i64, reply: &mut ReplyDirectory) {
        let results = self.execute_query(query);
        
        for (i, hit) in results.iter().enumerate().skip(offset as usize) {
            let inode = SEARCH_RESULT_BASE + i as u64;
            let name = self.result_filename(hit, i);
            
            {
                let mut inode_map = self.inode_to_result.write();
                inode_map.insert(inode, (query.to_string(), i));
            }

            if reply.add(inode, (i + 3) as i64, FileType::Symlink, &name) {
                break;
            }
        }
    }

    pub fn readlink(&self, inode: u64, reply: ReplyData) {
        let (query, index) = {
            let inode_map = self.inode_to_result.read();
            match inode_map.get(&inode) {
                Some((q, i)) => (q.clone(), *i),
                None => {
                    reply.error(libc::ENOENT);
                    return;
                }
            }
        };

        let results = self.execute_query(&query);
        if let Some(hit) = results.get(index) {
            // Use ABSOLUTE path for reliable symlink resolution (Oracle fix)
            let target = format!("{}{}", self.mount_point, hit.virtual_path.as_str());
            reply.data(target.as_bytes());
        } else {
            reply.error(libc::ENOENT);
        }
    }

    fn execute_query(&self, query: &str) -> Vec<SearchHit> {
        // moka cache handles TTL/LRU automatically
        if let Some(results) = self.result_cache.get(query) {
            return results;
        }

        let results = self.index.search(query, 1000).unwrap_or_default();
        self.result_cache.insert(query.to_string(), results.clone());
        results
    }

    fn result_filename(&self, hit: &SearchHit, index: usize) -> String {
        let artist = hit.artist.as_deref().unwrap_or("Unknown");
        let title = hit.title.as_deref().unwrap_or("Unknown");
        format!("{:03}. {} - {}.flac", index + 1, artist, title)
    }

    fn dir_attr(inode: u64) -> fuser::FileAttr {
        fuser::FileAttr {
            ino: inode,
            size: 0,
            blocks: 0,
            atime: SystemTime::UNIX_EPOCH,
            mtime: SystemTime::UNIX_EPOCH,
            ctime: SystemTime::UNIX_EPOCH,
            crtime: SystemTime::UNIX_EPOCH,
            kind: FileType::Directory,
            perm: 0o555,
            nlink: 2,
            uid: 1000,
            gid: 1000,
            rdev: 0,
            blksize: 512,
            flags: 0,
        }
    }
}

Task 5: FUSE Integration

5.1 Update musicfs-fuse/src/filesystem.rs

Add search handling to FUSE operations:

// In lookup()
if name == ".search" && parent == 1 {
    self.search_ops.lookup_search_dir(reply);
    return;
}

if let Some(path) = self.inode_to_path(parent) {
    if path.starts_with("/.search/") {
        let query = &path[9..]; // Strip "/.search/"
        self.search_ops.lookup_query_dir(query, reply);
        return;
    }
}

// In readdir()
if ino == SEARCH_DIR_INODE {
    self.search_ops.readdir_search_root(&mut reply);
    reply.ok();
    return;
}

// In readlink()
if SearchOps::is_search_inode(ino) {
    self.search_ops.readlink(ino, reply);
    return;
}

Task 6: gRPC Search API

Oracle fix: Architecture 4.3.7 defines Search and SearchStream RPCs that must be implemented.

6.1 Create musicfs-grpc/src/search_service.rs

use musicfs_proto::musicfs::v1::{
    SearchRequest, SearchResponse, SearchResult,
    music_fs_server::MusicFs,
};
use musicfs_search::{SearchIndex, SearchHit};
use std::sync::Arc;
use std::time::Instant;
use tonic::{Request, Response, Status};
use tracing::{debug, info};

pub struct SearchService {
    index: Arc<SearchIndex>,
}

impl SearchService {
    pub fn new(index: Arc<SearchIndex>) -> Self {
        Self { index }
    }
}

#[tonic::async_trait]
impl MusicFs for SearchService {
    async fn search(
        &self,
        request: Request<SearchRequest>,
    ) -> Result<Response<SearchResponse>, Status> {
        let start = Instant::now();
        let req = request.into_inner();
        
        let limit = req.limit.unwrap_or(100) as usize;
        let offset = req.offset.unwrap_or(0) as usize;
        
        // Execute search
        let results = self.index
            .search(&req.query, limit + offset)
            .map_err(|e| Status::internal(format!("Search failed: {}", e)))?;
        
        // Apply offset and convert to proto
        let hits: Vec<SearchResult> = results
            .into_iter()
            .skip(offset)
            .take(limit)
            .map(|hit| SearchResult {
                file_id: hit.file_id.0,
                virtual_path: hit.virtual_path.to_string(),
                artist: hit.artist,
                album: hit.album,
                title: hit.title,
                score: hit.score,
                highlights: Default::default(), // TODO: implement highlighting
            })
            .collect();
        
        let total_matches = self.index.count() as u64; // Approximate
        let query_time_ms = start.elapsed().as_millis() as u32;
        
        debug!("Search '{}' returned {} results in {}ms", req.query, hits.len(), query_time_ms);
        
        Ok(Response::new(SearchResponse {
            results: hits,
            total_matches,
            query_time_ms,
        }))
    }

    type SearchStreamStream = tokio_stream::wrappers::ReceiverStream<Result<SearchResult, Status>>;

    async fn search_stream(
        &self,
        request: Request<SearchRequest>,
    ) -> Result<Response<Self::SearchStreamStream>, Status> {
        let req = request.into_inner();
        let limit = req.limit.unwrap_or(1000) as usize;
        
        let results = self.index
            .search(&req.query, limit)
            .map_err(|e| Status::internal(format!("Search failed: {}", e)))?;
        
        let (tx, rx) = tokio::sync::mpsc::channel(100);
        
        tokio::spawn(async move {
            for hit in results {
                let result = SearchResult {
                    file_id: hit.file_id.0,
                    virtual_path: hit.virtual_path.to_string(),
                    artist: hit.artist,
                    album: hit.album,
                    title: hit.title,
                    score: hit.score,
                    highlights: Default::default(),
                };
                if tx.send(Ok(result)).await.is_err() {
                    break; // Client disconnected
                }
            }
        });
        
        Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    #[tokio::test]
    async fn test_grpc_search() {
        let dir = TempDir::new().unwrap();
        let index = Arc::new(SearchIndex::open(dir.path()).unwrap());
        let service = SearchService::new(index);
        
        let request = Request::new(SearchRequest {
            query: "test".to_string(),
            limit: Some(10),
            offset: None,
            origin_id: None,
        });
        
        let response = service.search(request).await.unwrap();
        assert!(response.get_ref().query_time_ms > 0);
    }
}

Task 7: API Documentation

All APIs must be fully documented with happy and non-happy paths.

7.1 Create docs/api/search.md

# Search API Documentation

## Overview

MusicFS provides two search interfaces:
1. **FUSE Virtual Directory** - `/.search/query/` for file manager integration
2. **gRPC API** - `Search` and `SearchStream` RPCs for programmatic access

---

## FUSE Search Interface

### Endpoint: `/.search/{query}/`

Browse search results as symlinks in a virtual directory.

### Happy Path

1. User navigates to `/.search/metallica/`
2. FUSE returns directory listing of symlinks
3. Each symlink points to absolute path: `/mnt/music/Metallica/Album/Track.flac`
4. User can open symlink directly in media player

**Example:**
```bash
$ ls -la /mnt/musicfs/.search/metallica/
001. Metallica - Enter Sandman.flac -> /mnt/musicfs/Metallica/Black Album/Enter Sandman.flac
002. Metallica - Battery.flac -> /mnt/musicfs/Metallica/Master of Puppets/Battery.flac

Error Cases

Scenario Behavior FUSE Error
Empty query Empty directory (none)
No results Empty directory (none)
Query too long (>256 chars) Truncated (none)
Invalid UTF-8 in query EINVAL libc::EINVAL
Index corrupted ENOENT libc::ENOENT
Index writer shutdown EIO libc::EIO

Cache Behavior

  • Results cached for 5 minutes (TTL)
  • Maximum 1000 cached queries (LRU eviction)
  • Cache miss triggers tantivy query

gRPC Search API

Search(SearchRequest) -> SearchResponse

Single request/response search.

Request Schema

message SearchRequest {
  string query = 1;           // Required: tantivy query string
  optional uint32 limit = 2;  // Default: 100, max: 10000
  optional uint32 offset = 3; // Default: 0, for pagination
  optional string origin_id = 4; // Filter by origin (optional)
}

Response Schema

message SearchResponse {
  repeated SearchResult results = 1;
  uint64 total_matches = 2;      // Approximate total
  uint32 query_time_ms = 3;      // Query execution time
}

message SearchResult {
  int64 file_id = 1;
  string virtual_path = 2;
  optional string artist = 3;
  optional string album = 4;
  optional string title = 5;
  float score = 6;               // Relevance score
  map<string, string> highlights = 7; // Matched fragments
}

Happy Path

Client                          Server
  |                               |
  |-- SearchRequest ------------->|
  |   query: "metallica"          |
  |   limit: 10                   |
  |                               |-- Query tantivy index
  |                               |-- Collect top 10 results
  |<-- SearchResponse ------------|
  |   results: [...]              |
  |   total_matches: 42           |
  |   query_time_ms: 12           |

Error Cases

Scenario gRPC Status Details
Empty query INVALID_ARGUMENT "Query cannot be empty"
Malformed query syntax INVALID_ARGUMENT tantivy parse error message
limit > 10000 INVALID_ARGUMENT "Limit exceeds maximum (10000)"
Index unavailable UNAVAILABLE "Search index not ready"
Index corrupted INTERNAL "Search index corrupted"
Writer shutdown INTERNAL "Index writer shutdown"
Timeout (>5s) DEADLINE_EXCEEDED Client-specified deadline

Retry Strategy

Error Retryable Backoff
UNAVAILABLE Yes Exponential (100ms, 200ms, 400ms)
DEADLINE_EXCEEDED Yes None (immediate)
INTERNAL No -
INVALID_ARGUMENT No -

SearchStream(SearchRequest) -> stream SearchResult

Streaming search for large result sets.

Happy Path

Client                          Server
  |                               |
  |-- SearchRequest ------------->|
  |   query: "rock"               |
  |   limit: 10000                |
  |                               |-- Query tantivy index
  |<-- SearchResult (stream) -----|
  |<-- SearchResult --------------|
  |<-- SearchResult --------------|
  |   ... (continues)             |
  |<-- (stream ends) -------------|

Error Cases

Same as Search, plus:

Scenario Behavior
Client disconnects mid-stream Server stops sending, cleans up
Backpressure (slow client) Server buffers up to 100 results
Buffer overflow Server drops connection

Query Syntax

MusicFS uses tantivy query syntax.

Supported Operators

Operator Example Description
Term metallica Match in any field
Field artist:metallica Match specific field
Phrase "enter sandman" Exact phrase match
Fuzzy metalica~1 1-character edit distance
Boolean metallica AND 1991 Combine conditions
Range year:[1980 TO 1989] Numeric range

Searchable Fields

Field Type Notes
artist TEXT Full-text searchable
album TEXT Full-text searchable
album_artist TEXT Full-text searchable
title TEXT Full-text searchable
genre TEXT Full-text searchable
composer TEXT Full-text searchable
year u64 Range queries only

Performance

Metric Target Measured
Query latency (1M tracks) <500ms TBD
Index throughput >1000 files/sec TBD
Memory per 1M tracks <500MB TBD

Integration Examples

# Using grpcurl
grpcurl -plaintext -d '{"query": "metallica", "limit": 5}' \
  localhost:50051 musicfs.v1.MusicFS/Search

# Using musicfs-cli
musicfs search "artist:metallica AND year:[1980 TO 1990]"

Programmatic (Rust)

use musicfs_client::MusicFsClient;

let mut client = MusicFsClient::connect("http://localhost:50051").await?;

let response = client.search(SearchRequest {
    query: "metallica".to_string(),
    limit: Some(10),
    ..Default::default()
}).await?;

for result in response.results {
    println!("{} - {}", result.artist.unwrap_or_default(), result.title.unwrap_or_default());
}

---

## Tests

| Test | Type | Validates |
|------|------|-----------|
| `test_search_basic` | Unit | Basic search returns results |
| `test_search_fuzzy` | Unit | Typo tolerance (FR-14.3) |
| `test_search_multi_field` | Unit | Searches artist+album+title |
| `test_search_empty` | Unit | Empty query returns nothing |
| `test_index_persistence` | Integration | Index survives restart |
| `test_incremental_index` | Integration | New files indexed via events |
| `test_search_virtual_dir` | E2E | `ls /.search/metallica/` works |
| `test_search_symlinks` | E2E | Results are valid symlinks |
| `test_search_1m_tracks` | Benchmark | <500ms for 1M tracks (G7) |

---

## Benchmark

```rust
// benches/search_bench.rs
use criterion::{criterion_group, criterion_main, Criterion};

fn bench_search_1m(c: &mut Criterion) {
    // Pre-populate index with 1M synthetic tracks
    let index = create_index_with_n_tracks(1_000_000);
    
    c.bench_function("search_1m_tracks", |b| {
        b.iter(|| {
            index.search("metallica master puppets", 100).unwrap()
        })
    });
}

fn bench_index_throughput(c: &mut Criterion) {
    c.bench_function("index_1000_tracks", |b| {
        let dir = tempfile::TempDir::new().unwrap();
        let index = SearchIndex::open(dir.path()).unwrap();
        let files = generate_test_files(1000);
        
        b.iter(|| {
            for file in &files {
                index.index_file(file).unwrap();
            }
            index.commit().unwrap();
        })
    });
}

criterion_group!(benches, bench_search_1m, bench_index_throughput);
criterion_main!(benches);

Exit Criteria

  • tantivy index opens/creates successfully
  • Files are indexed with artist/album/album_artist/title/genre/composer
  • Search returns relevant results in <500ms for 1M tracks
  • Fuzzy matching handles typos (e.g., "metalica" finds "Metallica")
  • /.search/query/ directory shows symlinks to results
  • Symlinks resolve to actual files (absolute paths)
  • Index persists across daemon restarts
  • New files are indexed via event bus + MetadataCache integration
  • gRPC Search and SearchStream RPCs functional
  • Result cache uses TTL-based LRU (moka), max 1000 entries
  • IndexWriter uses single-writer channel pattern (thread-safe)
  • API documentation covers happy/error paths for FUSE and gRPC

Architecture Compliance

Architecture Section Requirement Status
4.2 Search Engine: tantivy
4.3.7 Search RPC (Search, SearchStream) Task 6
3.2.1 Search <500ms for 1M files Benchmark
FR-14.1 Index metadata for full-text search
FR-14.2 Expose via virtual directory
FR-14.3 Support fuzzy matching
FR-6.4 album_artist field indexed Schema
G7 Sub-second search 1M+ tracks Benchmark

Oracle Fixes Applied

Issue Fix Location
IndexWriter thread-safety Single-writer channel pattern index.rs
Unbounded result cache moka TTL-based LRU (1000 max, 5min TTL) search.rs
gRPC Search API missing Task 6 added search_service.rs
Event handler incomplete MetadataCache integration indexer.rs
Genre not searchable Added to QueryParser fields index.rs
Missing fields album_artist, composer, duration_ms, bitrate, sample_rate index.rs
Relative symlinks Absolute paths with mount_point search.rs