From 4aee356486941ec3c0fba99035cc70316dc1f6c4 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 12 May 2026 23:23:41 +0200 Subject: [PATCH] Add gRPC Search service with Search and SearchStream RPCs - Proto definitions for SearchRequest/SearchResponse/SearchResult - SearchService implementing MusicFs trait - Query validation (empty check, 256 char max, limit cap at 10k) - Streaming support via tokio mpsc channel Ultraworked with [Sisyphus](https://github.com/code-yeongyu/claude-agent) Co-authored-by: Sisyphus --- musicfs/crates/musicfs-grpc/Cargo.toml | 13 ++ musicfs/crates/musicfs-grpc/build.rs | 4 + .../crates/musicfs-grpc/proto/musicfs.proto | 31 ++++ musicfs/crates/musicfs-grpc/src/lib.rs | 13 +- .../crates/musicfs-grpc/src/search_service.rs | 160 ++++++++++++++++++ 5 files changed, 220 insertions(+), 1 deletion(-) create mode 100644 musicfs/crates/musicfs-grpc/build.rs create mode 100644 musicfs/crates/musicfs-grpc/proto/musicfs.proto create mode 100644 musicfs/crates/musicfs-grpc/src/search_service.rs diff --git a/musicfs/crates/musicfs-grpc/Cargo.toml b/musicfs/crates/musicfs-grpc/Cargo.toml index c249865..842b22b 100644 --- a/musicfs/crates/musicfs-grpc/Cargo.toml +++ b/musicfs/crates/musicfs-grpc/Cargo.toml @@ -4,3 +4,16 @@ version.workspace = true edition.workspace = true [dependencies] +musicfs-search = { path = "../musicfs-search" } +musicfs-core = { path = "../musicfs-core" } +tonic.workspace = true +prost.workspace = true +tokio.workspace = true +tokio-stream.workspace = true +tracing.workspace = true + +[build-dependencies] +tonic-build.workspace = true + +[dev-dependencies] +tempfile.workspace = true diff --git a/musicfs/crates/musicfs-grpc/build.rs b/musicfs/crates/musicfs-grpc/build.rs new file mode 100644 index 0000000..d84192f --- /dev/null +++ b/musicfs/crates/musicfs-grpc/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/musicfs.proto")?; + Ok(()) +} diff --git a/musicfs/crates/musicfs-grpc/proto/musicfs.proto b/musicfs/crates/musicfs-grpc/proto/musicfs.proto new file mode 100644 index 0000000..816d957 --- /dev/null +++ b/musicfs/crates/musicfs-grpc/proto/musicfs.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package musicfs.v1; + +service MusicFS { + rpc Search(SearchRequest) returns (SearchResponse); + rpc SearchStream(SearchRequest) returns (stream SearchResult); +} + +message SearchRequest { + string query = 1; + optional uint32 limit = 2; + optional uint32 offset = 3; + optional string origin_id = 4; +} + +message SearchResponse { + repeated SearchResult results = 1; + uint64 total_matches = 2; + uint32 query_time_ms = 3; +} + +message SearchResult { + int64 file_id = 1; + string virtual_path = 2; + optional string artist = 3; + optional string album = 4; + optional string title = 5; + float score = 6; + map highlights = 7; +} diff --git a/musicfs/crates/musicfs-grpc/src/lib.rs b/musicfs/crates/musicfs-grpc/src/lib.rs index f9da2c4..51d90ca 100644 --- a/musicfs/crates/musicfs-grpc/src/lib.rs +++ b/musicfs/crates/musicfs-grpc/src/lib.rs @@ -1 +1,12 @@ -#![allow(dead_code)] +pub mod proto { + pub mod musicfs { + pub mod v1 { + tonic::include_proto!("musicfs.v1"); + } + } +} + +mod search_service; + +pub use proto::musicfs::v1::music_fs_server::{MusicFs, MusicFsServer}; +pub use search_service::SearchService; diff --git a/musicfs/crates/musicfs-grpc/src/search_service.rs b/musicfs/crates/musicfs-grpc/src/search_service.rs new file mode 100644 index 0000000..b7fb88e --- /dev/null +++ b/musicfs/crates/musicfs-grpc/src/search_service.rs @@ -0,0 +1,160 @@ +use crate::proto::musicfs::v1::{ + music_fs_server::MusicFs, SearchRequest, SearchResponse, SearchResult, +}; +use musicfs_search::SearchIndex; +use std::sync::Arc; +use std::time::Instant; +use tonic::{Request, Response, Status}; +use tracing::debug; + +pub struct SearchService { + index: Arc, +} + +impl SearchService { + pub fn new(index: Arc) -> Self { + Self { index } + } +} + +#[tonic::async_trait] +impl MusicFs for SearchService { + async fn search( + &self, + request: Request, + ) -> Result, Status> { + let start = Instant::now(); + let req = request.into_inner(); + + if req.query.is_empty() { + return Err(Status::invalid_argument("Query cannot be empty")); + } + + if req.query.len() > 256 { + return Err(Status::invalid_argument("Query exceeds maximum length (256)")); + } + + let limit = req.limit.unwrap_or(100).min(10000) as usize; + let offset = req.offset.unwrap_or(0) as usize; + + let results = self + .index + .search(&req.query, limit + offset) + .map_err(|e| Status::internal(format!("Search failed: {}", e)))?; + + let hits: Vec = results + .into_iter() + .skip(offset) + .take(limit) + .map(|hit| SearchResult { + file_id: hit.file_id.0, + virtual_path: hit.virtual_path.as_str().to_string(), + artist: hit.artist, + album: hit.album, + title: hit.title, + score: hit.score, + highlights: Default::default(), + }) + .collect(); + + let total_matches = self.index.count(); + let query_time_ms = start.elapsed().as_millis() as u32; + + debug!( + "Search '{}' returned {} results in {}ms", + req.query, + hits.len(), + query_time_ms + ); + + Ok(Response::new(SearchResponse { + results: hits, + total_matches, + query_time_ms, + })) + } + + type SearchStreamStream = tokio_stream::wrappers::ReceiverStream>; + + async fn search_stream( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + if req.query.is_empty() { + return Err(Status::invalid_argument("Query cannot be empty")); + } + + let limit = req.limit.unwrap_or(1000).min(10000) as usize; + + let results = self + .index + .search(&req.query, limit) + .map_err(|e| Status::internal(format!("Search failed: {}", e)))?; + + let (tx, rx) = tokio::sync::mpsc::channel(100); + + tokio::spawn(async move { + for hit in results { + let result = SearchResult { + file_id: hit.file_id.0, + virtual_path: hit.virtual_path.as_str().to_string(), + artist: hit.artist, + album: hit.album, + title: hit.title, + score: hit.score, + highlights: Default::default(), + }; + if tx.send(Ok(result)).await.is_err() { + break; + } + } + }); + + Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new( + rx, + ))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_grpc_search_empty_query() { + let dir = TempDir::new().unwrap(); + let index = Arc::new(SearchIndex::open(dir.path()).unwrap()); + let service = SearchService::new(index); + + let request = Request::new(SearchRequest { + query: String::new(), + limit: Some(10), + offset: None, + origin_id: None, + }); + + let result = service.search(request).await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err().code(), tonic::Code::InvalidArgument); + } + + #[tokio::test] + async fn test_grpc_search_returns_response() { + let dir = TempDir::new().unwrap(); + let index = Arc::new(SearchIndex::open(dir.path()).unwrap()); + let service = SearchService::new(index); + + let request = Request::new(SearchRequest { + query: "test".to_string(), + limit: Some(10), + offset: None, + origin_id: None, + }); + + let response = service.search(request).await.unwrap(); + assert!(response.get_ref().results.is_empty()); + } +}