From fbe55b73fd1306071466b6f1e950631eb58bade1 Mon Sep 17 00:00:00 2001 From: Guillem Castro Date: Fri, 17 May 2024 17:39:46 +0200 Subject: [PATCH] async encoding --- src/download.rs | 24 +++++++------- src/encoder/flac.rs | 46 +++++++++++++++++--------- src/encoder/mod.rs | 26 +++++++++++---- src/encoder/mp3.rs | 79 ++++++++++++++++++++++++++++----------------- src/lib.rs | 2 +- 5 files changed, 113 insertions(+), 64 deletions(-) diff --git a/src/download.rs b/src/download.rs index 7b3954d..de909a0 100644 --- a/src/download.rs +++ b/src/download.rs @@ -1,6 +1,6 @@ use std::fmt::Write; use std::path::PathBuf; -use std::sync::Arc; +use std::time::Duration; use anyhow::Result; use futures::StreamExt; @@ -100,6 +100,7 @@ impl Downloader { ); let pb = self.progress_bar.add(ProgressBar::new(file_size as u64)); + pb.enable_steady_tick(Duration::from_millis(100)); pb.set_style(ProgressStyle::with_template("{spinner:.green} {msg} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")? .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .progress_chars("#>-")); @@ -122,24 +123,23 @@ impl Downloader { samples.append(&mut content); } SinkEvent::Finished => { - pb.finish_with_message(format!("Downloaded {}", &file_name)); + tracing::info!("Finished downloading track: {:?}", file_name); + break; } } } - tracing::info!("Downloaded track: {:?}", file_name); - + tracing::info!("Encoding track: {:?}", file_name); + pb.set_message(format!("Encoding {}", &file_name)); let samples = Samples::new(samples, 44100, 2, 16); + let encoder = crate::encoder::get_encoder(options.format)?; + let stream = encoder.encode(samples).await?; - let format = options.format.clone(); - tokio_rayon::spawn(move || { - tracing::info!("Encoding track: {:?}", file_name); - let encoder = crate::encoder::get_encoder(format).unwrap(); - let stream = encoder.encode(samples).unwrap(); - tracing::info!("Writing track: {:?} to file: {}", file_name, &path); - stream.write_to_file(&path).unwrap(); - }).await; + pb.set_message(format!("Writing {}", &file_name)); + tracing::info!("Writing track: {:?} to file: {}", file_name, &path); + stream.write_to_file(&path).await?; + pb.finish_with_message(format!("Downloaded {}", &file_name)); Ok(()) } diff --git a/src/encoder/flac.rs b/src/encoder/flac.rs index 2337c56..3b463ca 100644 --- a/src/encoder/flac.rs +++ b/src/encoder/flac.rs @@ -1,28 +1,24 @@ use flacenc::bitsink::ByteSink; use flacenc::component::BitRepr; -use flacenc::error::Verified; use flacenc::error::Verify; +use super::execute_with_result; use super::EncodedStream; use super::Encoder; use super::Samples; #[derive(Debug)] -pub struct FlacEncoder { - config: Verified, -} +pub struct FlacEncoder; impl FlacEncoder { pub fn new() -> anyhow::Result { - let config = flacenc::config::Encoder::default() - .into_verified() - .map_err(|e| anyhow::anyhow!("Failed to verify encoder config: {:?}", e))?; - Ok(FlacEncoder { config }) + Ok(Self) } } +#[async_trait::async_trait] impl Encoder for FlacEncoder { - fn encode(&self, samples: Samples) -> anyhow::Result { + async fn encode(&self, samples: Samples) -> anyhow::Result { let source = flacenc::source::MemSource::from_samples( &samples.samples, samples.channels as usize, @@ -30,15 +26,33 @@ impl Encoder for FlacEncoder { samples.sample_rate as usize, ); - let flac_stream = - flacenc::encode_with_fixed_block_size(&self.config, source, self.config.block_size) + let config = flacenc::config::Encoder::default() + .into_verified() + .map_err(|e| anyhow::anyhow!("Failed to verify encoder config: {:?}", e))?; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + rayon::spawn(execute_with_result( + move || { + let flac_stream = flacenc::encode_with_fixed_block_size( + &config, + source, + config.block_size, + ) .map_err(|e| anyhow::anyhow!("Failed to encode flac: {:?}", e))?; - let mut byte_sink = ByteSink::new(); - flac_stream - .write(&mut byte_sink) - .map_err(|e| anyhow::anyhow!("Failed to write flac stream: {:?}", e))?; + let mut byte_sink = ByteSink::new(); + flac_stream + .write(&mut byte_sink) + .map_err(|e| anyhow::anyhow!("Failed to write flac stream: {:?}", e))?; - Ok(EncodedStream::new(byte_sink.into_inner())) + Ok(byte_sink.into_inner()) + }, + tx, + )); + + let byte_sink: Vec = rx.await??; + + Ok(EncodedStream::new(byte_sink)) } } diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs index 19fcdc0..ffb82cd 100644 --- a/src/encoder/mod.rs +++ b/src/encoder/mod.rs @@ -2,10 +2,11 @@ mod flac; #[cfg(feature = "mp3")] mod mp3; -use std::str::FromStr; +use std::{path::Path, str::FromStr}; use anyhow::Result; use lazy_static::lazy_static; +use tokio::sync::oneshot::Sender; #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum Format { @@ -54,8 +55,9 @@ pub fn get_encoder(format: Format) -> anyhow::Result<&'static Box Result; + async fn encode(&self, samples: Samples) -> Result; } pub struct Samples { @@ -85,15 +87,27 @@ impl EncodedStream { EncodedStream { stream } } - pub fn write_to_file>(&self, path: P) -> Result<()> { + pub async fn write_to_file>(&self, path: P) -> Result<()> { if !path.as_ref().exists() { - std::fs::create_dir_all( + tokio::fs::create_dir_all( path.as_ref() .parent() .ok_or(anyhow::anyhow!("Could not create path"))?, - )?; + ).await?; } - std::fs::write(path, &self.stream)?; + tokio::fs::write(path, &self.stream).await?; Ok(()) } } + +pub fn execute_with_result(func: F, tx: Sender>) -> impl FnOnce() +where + F: FnOnce() -> anyhow::Result + Send + 'static, + T: Send + 'static, +{ + move || { + let result = func(); + // Ignore the error if the receiver has been dropped + let _ = tx.send(result); + } +} \ No newline at end of file diff --git a/src/encoder/mp3.rs b/src/encoder/mp3.rs index d920ad9..779d4d3 100644 --- a/src/encoder/mp3.rs +++ b/src/encoder/mp3.rs @@ -4,49 +4,70 @@ use mp3lame_encoder::Builder; use mp3lame_encoder::FlushNoGap; use mp3lame_encoder::InterleavedPcm; -use super::Samples; -use super::Encoder; +use super::execute_with_result; use super::EncodedStream; +use super::Encoder; +use super::Samples; pub struct Mp3Encoder; impl Mp3Encoder { - fn build_encoder(&self, sample_rate: u32, channels: u32) -> anyhow::Result { - let mut builder = Builder::new(). - ok_or(anyhow::anyhow!("Failed to create mp3 encoder"))?; - - builder.set_sample_rate(sample_rate) + fn build_encoder( + &self, + sample_rate: u32, + channels: u32, + ) -> anyhow::Result { + let mut builder = Builder::new().ok_or(anyhow::anyhow!("Failed to create mp3 encoder"))?; + + builder + .set_sample_rate(sample_rate) .map_err(|e| anyhow::anyhow!("Failed to set sample rate for mp3 encoder: {}", e))?; - builder.set_num_channels(channels as u8) - .map_err(|e| anyhow::anyhow!("Failed to set number of channels for mp3 encoder: {}", e))?; - builder.set_brate(mp3lame_encoder::Birtate::Kbps160) + builder.set_num_channels(channels as u8).map_err(|e| { + anyhow::anyhow!("Failed to set number of channels for mp3 encoder: {}", e) + })?; + builder + .set_brate(mp3lame_encoder::Birtate::Kbps160) .map_err(|e| anyhow::anyhow!("Failed to set bitrate for mp3 encoder: {}", e))?; - - builder.build() + + builder + .build() .map_err(|e| anyhow::anyhow!("Failed to build mp3 encoder: {}", e)) } } +#[async_trait::async_trait] impl Encoder for Mp3Encoder { - fn encode(&self, samples: Samples) -> anyhow::Result { + async fn encode(&self, samples: Samples) -> anyhow::Result { let mut mp3_encoder = self.build_encoder(samples.sample_rate, samples.channels)?; - let samples: Vec = samples.samples.iter().map(|&x| x as i16).collect(); - let input = InterleavedPcm(samples.as_slice()); - let mut mp3_out_buffer = Vec::new(); - mp3_out_buffer.reserve(mp3lame_encoder::max_required_buffer_size(samples.len())); - let encoded_size = mp3_encoder.encode(input, mp3_out_buffer.spare_capacity_mut()) - .map_err(|e| anyhow!("Failed to encode mp3: {}", e))?; - unsafe { - mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size)); - } + let (tx, rx) = tokio::sync::oneshot::channel(); + + rayon::spawn(execute_with_result( + move || { + let samples: Vec = samples.samples.iter().map(|&x| x as i16).collect(); + let input = InterleavedPcm(samples.as_slice()); + let mut mp3_out_buffer = Vec::new(); + mp3_out_buffer.reserve(mp3lame_encoder::max_required_buffer_size(samples.len())); + let encoded_size = mp3_encoder + .encode(input, mp3_out_buffer.spare_capacity_mut()) + .map_err(|e| anyhow!("Failed to encode mp3: {}", e))?; + unsafe { + mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size)); + } + + let encoded_size = mp3_encoder + .flush::(mp3_out_buffer.spare_capacity_mut()) + .map_err(|e| anyhow!("Failed to flush mp3 encoder: {}", e))?; + unsafe { + mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size)); + } + Ok(mp3_out_buffer) + }, + tx, + )); + + let mp3_out_buffer = rx.await??; - let encoded_size = mp3_encoder.flush::(mp3_out_buffer.spare_capacity_mut()) - .map_err(|e| anyhow!("Failed to flush mp3 encoder: {}", e))?; - unsafe { - mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size)); - } - Ok(EncodedStream::new(mp3_out_buffer)) } -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index f150af9..dac977a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ pub mod download; -pub mod file_sink; +// pub mod file_sink; pub mod channel_sink; pub mod session; pub mod track;