feat: add artist sync flow and stub torrent client

- Add DownloadService to orchestrate metadata → indexer → torrent flow
- Add POST /api/sync/artist endpoint for syncing artist albums
- Add StubTorrentClient for testing (logs requests to file)
- Refactor TorrentConfig to tagged enum (client_type: qbittorrent|stub|none)
- Add POST /api/reload endpoint for hot config reload
- Add chrono dependency for timestamps
This commit is contained in:
Alexander
2026-04-28 21:40:11 +02:00
parent 925c7c3703
commit 3aaeade4d3
13 changed files with 697 additions and 37 deletions
Generated
+116
View File
@@ -11,6 +11,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "1.0.0"
@@ -257,6 +266,20 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chrono"
version = "0.4.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0"
dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-link",
]
[[package]]
name = "clap"
version = "4.6.1"
@@ -332,6 +355,12 @@ dependencies = [
"url",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "deranged"
version = "0.5.8"
@@ -666,6 +695,30 @@ dependencies = [
"tracing",
]
[[package]]
name = "iana-time-zone"
version = "0.1.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "2.2.0"
@@ -961,6 +1014,7 @@ dependencies = [
"async-trait",
"axum 0.8.9",
"base64",
"chrono",
"clap",
"prost",
"reqwest",
@@ -994,6 +1048,15 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "once_cell"
version = "1.21.4"
@@ -2198,12 +2261,65 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "windows-core"
version = "0.62.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-implement"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-interface"
version = "0.59.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-result"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
+1
View File
@@ -23,6 +23,7 @@ thiserror = "2"
url = "2"
roxmltree = "0.20"
base64 = "0.22"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4", features = ["derive"] }
tonic = "0.12"
prost = "0.13"
+12 -1
View File
@@ -13,8 +13,19 @@ indexers:
url: "http://localhost:9117"
api_key: "your-jackett-api-key"
# Torrent client - choose one of: qbittorrent, stub, none
torrent:
qbittorrent:
client_type: qbittorrent
url: "http://localhost:8080"
username: "admin"
password: "changeme"
# Alternative: stub client for testing
# torrent:
# client_type: stub
# log_path: "/tmp/torrent-stub.log"
# save_path: "/tmp/downloads"
# Alternative: no torrent client
# torrent:
# client_type: none
+24
View File
@@ -1,5 +1,6 @@
mod indexer_controller;
mod metadata_controller;
mod sync_controller;
mod torrent_controller;
use axum::{
@@ -17,6 +18,7 @@ use crate::AppState;
pub fn routes(state: AppState) -> Router {
Router::new()
.route("/health", get(health))
.route("/reload", post(reload))
.route("/tracks", get(list_tracks))
.route("/tracks", post(create_track))
.route("/tracks/{id}", get(get_track))
@@ -26,6 +28,7 @@ pub fn routes(state: AppState) -> Router {
.nest("/indexers", indexer_controller::routes())
.nest("/torrents", torrent_controller::routes())
.nest("/metadata", metadata_controller::routes())
.nest("/sync", sync_controller::routes())
.with_state(state)
}
@@ -60,6 +63,27 @@ async fn health(State(state): State<AppState>) -> Json<Health> {
})
}
#[derive(serde::Serialize)]
struct ReloadResponse {
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
async fn reload(State(state): State<AppState>) -> Json<ReloadResponse> {
let mut state = state.write().await;
match state.reload().await {
Ok(()) => Json(ReloadResponse {
success: true,
error: None,
}),
Err(e) => Json(ReloadResponse {
success: false,
error: Some(e),
}),
}
}
async fn list_tracks(State(state): State<AppState>) -> Json<Vec<Track>> {
let state = state.read().await;
Json(state.aggregator.get_all().to_vec())
+32
View File
@@ -0,0 +1,32 @@
use axum::{extract::State, http::StatusCode, routing::post, Json, Router};
use serde::Deserialize;
use crate::services::{ArtistSyncResult, DownloadService};
use crate::AppState;
pub fn routes() -> Router<AppState> {
Router::new().route("/artist", post(sync_artist))
}
#[derive(Debug, Deserialize)]
pub struct SyncArtistRequest {
pub name: String,
}
async fn sync_artist(
State(state): State<AppState>,
Json(req): Json<SyncArtistRequest>,
) -> Result<Json<ArtistSyncResult>, (StatusCode, String)> {
let state = state.read().await;
let result = DownloadService::sync_artist(
&req.name,
&state.metadata_service,
&state.indexer_service,
&state.torrent_service,
)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
Ok(Json(result))
}
+21 -7
View File
@@ -69,15 +69,29 @@ pub struct IndexerConfig {
}
#[derive(Debug, Clone, Deserialize)]
pub struct TorrentConfig {
pub qbittorrent: Option<QBittorrentConfig>,
#[serde(tag = "client_type", rename_all = "lowercase")]
pub enum TorrentConfig {
QBittorrent {
url: String,
username: String,
password: String,
},
Stub {
log_path: String,
#[serde(default = "default_stub_save_path")]
save_path: String,
},
None,
}
#[derive(Debug, Clone, Deserialize)]
pub struct QBittorrentConfig {
pub url: String,
pub username: String,
pub password: String,
impl Default for TorrentConfig {
fn default() -> Self {
Self::None
}
}
fn default_stub_save_path() -> String {
"/tmp/downloads".to_string()
}
impl Config {
+27
View File
@@ -14,6 +14,7 @@ pub struct AppServices {
pub indexer_service: services::IndexerService,
pub torrent_service: services::TorrentService,
pub metadata_service: services::MetadataService,
config_path: String,
}
impl AppServices {
@@ -21,14 +22,40 @@ impl AppServices {
indexer_service: services::IndexerService,
torrent_service: services::TorrentService,
metadata_service: services::MetadataService,
config_path: String,
) -> Self {
Self {
aggregator: services::Aggregator::new(),
indexer_service,
torrent_service,
metadata_service,
config_path,
}
}
pub async fn reload(&mut self) -> Result<(), String> {
let cfg = config::Config::load(&self.config_path).map_err(|e| e.to_string())?;
self.indexer_service =
services::IndexerService::from_config(&cfg.indexers).map_err(|e| e.to_string())?;
match services::TorrentService::from_config(&cfg.torrent).await {
Ok(svc) => self.torrent_service = svc,
Err(e) => {
tracing::warn!("failed to init torrent client on reload: {}", e);
}
}
let mut metadata = services::MetadataService::new(&cfg.metadata.endpoint);
if metadata.connect().await.is_ok() {
self.metadata_service = metadata;
} else {
tracing::warn!("failed to connect to metadata service on reload");
}
tracing::info!("config reloaded from {}", self.config_path);
Ok(())
}
}
pub type AppState = Arc<RwLock<AppServices>>;
+14 -11
View File
@@ -54,23 +54,25 @@ async fn main() {
}
};
let torrent_service = if let Some(qbit_config) = &config.torrent.qbittorrent {
match TorrentService::from_qbittorrent_config(qbit_config).await {
let torrent_service = match TorrentService::from_config(&config.torrent).await {
Ok(svc) => {
tracing::info!("connected to qBittorrent at {}", qbit_config.url);
match &config.torrent {
config::TorrentConfig::QBittorrent { url, .. } => {
tracing::info!("connected to qBittorrent at {}", url);
}
config::TorrentConfig::Stub { log_path, .. } => {
tracing::info!("using stub torrent client, logging to {}", log_path);
}
config::TorrentConfig::None => {
tracing::info!("no torrent client configured");
}
}
svc
}
Err(e) => {
tracing::warn!(
"failed to connect to qBittorrent: {} (continuing without torrent client)",
e
);
tracing::warn!("failed to init torrent client: {} (continuing without)", e);
TorrentService::new()
}
}
} else {
tracing::info!("no torrent client configured");
TorrentService::new()
};
let mut metadata_service = MetadataService::new(&config.metadata.endpoint);
@@ -93,6 +95,7 @@ async fn main() {
indexer_service,
torrent_service,
metadata_service,
args.config.clone(),
)));
let cors = CorsLayer::new()
+180
View File
@@ -0,0 +1,180 @@
use serde::Serialize;
use crate::indexer::SearchResult;
use super::{IndexerService, MetadataService, TorrentService};
#[derive(Debug, Serialize)]
pub struct AlbumDownloadResult {
pub album_id: String,
pub album_title: String,
pub artist_name: String,
pub status: DownloadStatus,
pub torrent_hash: Option<String>,
pub indexer: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum DownloadStatus {
Added,
NoResults,
Failed,
Skipped,
}
#[derive(Debug, Serialize)]
pub struct ArtistSyncResult {
pub artist_id: String,
pub artist_name: String,
pub total_albums: usize,
pub albums_added: usize,
pub albums_failed: usize,
pub albums_no_results: usize,
pub results: Vec<AlbumDownloadResult>,
}
pub struct DownloadService;
impl DownloadService {
pub async fn sync_artist(
artist_name: &str,
metadata: &MetadataService,
indexers: &IndexerService,
torrent: &TorrentService,
) -> Result<ArtistSyncResult, String> {
let search_result = metadata
.search_artists(artist_name, Some(1), None)
.await
.map_err(|e| format!("metadata search failed: {}", e))?;
let artist = search_result
.artists
.first()
.ok_or_else(|| format!("artist '{}' not found", artist_name))?;
let albums_response = metadata
.get_artist_albums(&artist.id, Some(100), None)
.await
.map_err(|e| format!("failed to get albums: {}", e))?;
let mut results = Vec::new();
let mut albums_added = 0;
let mut albums_failed = 0;
let mut albums_no_results = 0;
for album in &albums_response.albums {
let result = Self::download_album(
&artist.name,
&album.id,
&album.title,
album
.release_date
.split('-')
.next()
.and_then(|y| y.parse().ok()),
indexers,
torrent,
)
.await;
match result.status {
DownloadStatus::Added => albums_added += 1,
DownloadStatus::NoResults => albums_no_results += 1,
DownloadStatus::Failed | DownloadStatus::Skipped => albums_failed += 1,
}
results.push(result);
}
Ok(ArtistSyncResult {
artist_id: artist.id.clone(),
artist_name: artist.name.clone(),
total_albums: albums_response.albums.len(),
albums_added,
albums_failed,
albums_no_results,
results,
})
}
async fn download_album(
artist_name: &str,
album_id: &str,
album_title: &str,
year: Option<u32>,
indexers: &IndexerService,
torrent: &TorrentService,
) -> AlbumDownloadResult {
let criteria = crate::indexer::MusicSearchCriteria {
artist: artist_name.to_string(),
album: Some(album_title.to_string()),
year,
limit: 20,
offset: 0,
};
let search_results = match indexers.search(&criteria, None).await {
Ok(r) => r,
Err(e) => {
return AlbumDownloadResult {
album_id: album_id.to_string(),
album_title: album_title.to_string(),
artist_name: artist_name.to_string(),
status: DownloadStatus::Failed,
torrent_hash: None,
indexer: None,
error: Some(format!("indexer search failed: {}", e)),
};
}
};
if search_results.is_empty() {
return AlbumDownloadResult {
album_id: album_id.to_string(),
album_title: album_title.to_string(),
artist_name: artist_name.to_string(),
status: DownloadStatus::NoResults,
torrent_hash: None,
indexer: None,
error: None,
};
}
let best = Self::select_best_result(&search_results);
match torrent.add_torrent_url(&best.download_url, None).await {
Ok(()) => AlbumDownloadResult {
album_id: album_id.to_string(),
album_title: album_title.to_string(),
artist_name: artist_name.to_string(),
status: DownloadStatus::Added,
torrent_hash: best.infohash.clone(),
indexer: Some(best.indexer.clone()),
error: None,
},
Err(e) => AlbumDownloadResult {
album_id: album_id.to_string(),
album_title: album_title.to_string(),
artist_name: artist_name.to_string(),
status: DownloadStatus::Failed,
torrent_hash: None,
indexer: Some(best.indexer.clone()),
error: Some(format!("failed to add torrent: {}", e)),
},
}
}
fn select_best_result(results: &[SearchResult]) -> &SearchResult {
results
.iter()
.max_by_key(|r| {
let seeders = r.seeders.unwrap_or(0);
let is_flac = r.title.to_lowercase().contains("flac");
let score = seeders as i64 + if is_flac { 1000 } else { 0 };
score
})
.unwrap()
}
}
+4
View File
@@ -1,7 +1,11 @@
mod download_service;
mod indexer_service;
mod metadata_service;
mod torrent_service;
pub use download_service::{
AlbumDownloadResult, ArtistSyncResult, DownloadService, DownloadStatus,
};
pub use indexer_service::{IndexerInfo, IndexerService};
pub use metadata_service::MetadataService;
pub use torrent_service::TorrentService;
+25 -7
View File
@@ -1,7 +1,9 @@
use std::sync::Arc;
use crate::config::QBittorrentConfig;
use crate::torrent::{QBittorrentClient, TorrentClient, TorrentClientError, TorrentInfo};
use crate::config::TorrentConfig;
use crate::torrent::{
QBittorrentClient, StubTorrentClient, TorrentClient, TorrentClientError, TorrentInfo,
};
pub struct TorrentService {
client: Option<Arc<dyn TorrentClient>>,
@@ -12,16 +14,32 @@ impl TorrentService {
Self { client: None }
}
pub async fn from_qbittorrent_config(
config: &QBittorrentConfig,
) -> Result<Self, TorrentClientError> {
let mut client = QBittorrentClient::new(&config.url, &config.username, &config.password)?;
pub async fn from_config(config: &TorrentConfig) -> Result<Self, TorrentClientError> {
match config {
TorrentConfig::QBittorrent {
url,
username,
password,
} => {
let mut client = QBittorrentClient::new(url, username, password)?;
client.connect().await?;
Ok(Self {
client: Some(Arc::new(client)),
})
}
TorrentConfig::Stub {
log_path,
save_path,
} => {
let mut client = StubTorrentClient::new(log_path, save_path);
client.connect().await?;
Ok(Self {
client: Some(Arc::new(client)),
})
}
TorrentConfig::None => Ok(Self::new()),
}
}
fn client(&self) -> Result<&Arc<dyn TorrentClient>, TorrentClientError> {
self.client
+2
View File
@@ -1,5 +1,7 @@
mod client;
mod qbittorrent;
mod stub;
pub use client::{TorrentClient, TorrentClientError, TorrentInfo, TorrentState};
pub use qbittorrent::QBittorrentClient;
pub use stub::StubTorrentClient;
+228
View File
@@ -0,0 +1,228 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use super::client::{TorrentClient, TorrentClientError, TorrentInfo, TorrentState};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "action")]
pub enum StubRequest {
AddUrl {
url: String,
save_path: Option<String>,
timestamp: String,
},
AddFile {
size: usize,
save_path: Option<String>,
timestamp: String,
},
Remove {
hash: String,
delete_files: bool,
timestamp: String,
},
Pause {
hash: String,
timestamp: String,
},
Resume {
hash: String,
timestamp: String,
},
}
struct StubTorrent {
info: TorrentInfo,
}
pub struct StubTorrentClient {
torrents: Arc<RwLock<HashMap<String, StubTorrent>>>,
log_path: PathBuf,
save_path: String,
connected: bool,
}
impl StubTorrentClient {
pub fn new(log_path: impl Into<PathBuf>, save_path: impl Into<String>) -> Self {
Self {
torrents: Arc::new(RwLock::new(HashMap::new())),
log_path: log_path.into(),
save_path: save_path.into(),
connected: false,
}
}
fn log_request(&self, request: &StubRequest) {
use std::io::Write;
let json = serde_json::to_string_pretty(request).unwrap_or_default();
let entry = format!("{}\n---\n", json);
let result = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_path)
.and_then(|mut f| f.write_all(entry.as_bytes()));
if let Err(e) = result {
tracing::warn!("failed to write stub log: {}", e);
}
}
fn generate_hash(input: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
input.hash(&mut hasher);
format!("{:016x}{:016x}{:08x}", hasher.finish(), hasher.finish(), 0)
}
fn timestamp() -> String {
chrono::Utc::now().to_rfc3339()
}
}
#[async_trait]
impl TorrentClient for StubTorrentClient {
async fn connect(&mut self) -> Result<(), TorrentClientError> {
self.connected = true;
tracing::info!("stub torrent client connected");
Ok(())
}
async fn disconnect(&mut self) -> Result<(), TorrentClientError> {
self.connected = false;
Ok(())
}
async fn list_torrents(&self) -> Result<Vec<TorrentInfo>, TorrentClientError> {
let torrents = self.torrents.read().await;
Ok(torrents.values().map(|t| t.info.clone()).collect())
}
async fn get_torrent(&self, hash: &str) -> Result<TorrentInfo, TorrentClientError> {
let torrents = self.torrents.read().await;
torrents
.get(hash)
.map(|t| t.info.clone())
.ok_or_else(|| TorrentClientError::TorrentNotFound(hash.to_string()))
}
async fn add_torrent_url(
&self,
url: &str,
save_path: Option<&str>,
) -> Result<(), TorrentClientError> {
let request = StubRequest::AddUrl {
url: url.to_string(),
save_path: save_path.map(String::from),
timestamp: Self::timestamp(),
};
self.log_request(&request);
let hash = Self::generate_hash(url);
let name = url.rsplit('/').next().unwrap_or("unknown").to_string();
let info = TorrentInfo {
hash: hash.clone(),
name,
size: 0,
progress: 0.0,
download_speed: 0,
upload_speed: 0,
state: TorrentState::Downloading,
save_path: save_path.unwrap_or(&self.save_path).to_string(),
};
let mut torrents = self.torrents.write().await;
torrents.insert(hash, StubTorrent { info });
Ok(())
}
async fn add_torrent_file(
&self,
torrent_data: &[u8],
save_path: Option<&str>,
) -> Result<(), TorrentClientError> {
let request = StubRequest::AddFile {
size: torrent_data.len(),
save_path: save_path.map(String::from),
timestamp: Self::timestamp(),
};
self.log_request(&request);
let hash = Self::generate_hash(&format!("file:{}", torrent_data.len()));
let info = TorrentInfo {
hash: hash.clone(),
name: format!("torrent-{}.torrent", &hash[..8]),
size: torrent_data.len() as u64,
progress: 0.0,
download_speed: 0,
upload_speed: 0,
state: TorrentState::Downloading,
save_path: save_path.unwrap_or(&self.save_path).to_string(),
};
let mut torrents = self.torrents.write().await;
torrents.insert(hash, StubTorrent { info });
Ok(())
}
async fn remove_torrent(
&self,
hash: &str,
delete_files: bool,
) -> Result<(), TorrentClientError> {
let request = StubRequest::Remove {
hash: hash.to_string(),
delete_files,
timestamp: Self::timestamp(),
};
self.log_request(&request);
let mut torrents = self.torrents.write().await;
torrents
.remove(hash)
.map(|_| ())
.ok_or_else(|| TorrentClientError::TorrentNotFound(hash.to_string()))
}
async fn pause_torrent(&self, hash: &str) -> Result<(), TorrentClientError> {
let request = StubRequest::Pause {
hash: hash.to_string(),
timestamp: Self::timestamp(),
};
self.log_request(&request);
let mut torrents = self.torrents.write().await;
if let Some(t) = torrents.get_mut(hash) {
t.info.state = TorrentState::Paused;
Ok(())
} else {
Err(TorrentClientError::TorrentNotFound(hash.to_string()))
}
}
async fn resume_torrent(&self, hash: &str) -> Result<(), TorrentClientError> {
let request = StubRequest::Resume {
hash: hash.to_string(),
timestamp: Self::timestamp(),
};
self.log_request(&request);
let mut torrents = self.torrents.write().await;
if let Some(t) = torrents.get_mut(hash) {
t.info.state = TorrentState::Downloading;
Ok(())
} else {
Err(TorrentClientError::TorrentNotFound(hash.to_string()))
}
}
}