async encoding

This commit is contained in:
Guillem Castro 2024-05-17 17:39:46 +02:00
parent c2b636236d
commit fbe55b73fd
5 changed files with 113 additions and 64 deletions

View file

@ -1,6 +1,6 @@
use std::fmt::Write; use std::fmt::Write;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use futures::StreamExt; use futures::StreamExt;
@ -100,6 +100,7 @@ impl Downloader {
); );
let pb = self.progress_bar.add(ProgressBar::new(file_size as u64)); let pb = self.progress_bar.add(ProgressBar::new(file_size as u64));
pb.enable_steady_tick(Duration::from_millis(100));
pb.set_style(ProgressStyle::with_template("{spinner:.green} {msg} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")? pb.set_style(ProgressStyle::with_template("{spinner:.green} {msg} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")?
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-")); .progress_chars("#>-"));
@ -122,24 +123,23 @@ impl Downloader {
samples.append(&mut content); samples.append(&mut content);
} }
SinkEvent::Finished => { SinkEvent::Finished => {
pb.finish_with_message(format!("Downloaded {}", &file_name)); tracing::info!("Finished downloading track: {:?}", file_name);
break;
} }
} }
} }
tracing::info!("Downloaded track: {:?}", file_name);
let samples = Samples::new(samples, 44100, 2, 16);
let format = options.format.clone();
tokio_rayon::spawn(move || {
tracing::info!("Encoding track: {:?}", file_name); tracing::info!("Encoding track: {:?}", file_name);
let encoder = crate::encoder::get_encoder(format).unwrap(); pb.set_message(format!("Encoding {}", &file_name));
let stream = encoder.encode(samples).unwrap(); let samples = Samples::new(samples, 44100, 2, 16);
tracing::info!("Writing track: {:?} to file: {}", file_name, &path); let encoder = crate::encoder::get_encoder(options.format)?;
stream.write_to_file(&path).unwrap(); let stream = encoder.encode(samples).await?;
}).await;
pb.set_message(format!("Writing {}", &file_name));
tracing::info!("Writing track: {:?} to file: {}", file_name, &path);
stream.write_to_file(&path).await?;
pb.finish_with_message(format!("Downloaded {}", &file_name));
Ok(()) Ok(())
} }

View file

@ -1,28 +1,24 @@
use flacenc::bitsink::ByteSink; use flacenc::bitsink::ByteSink;
use flacenc::component::BitRepr; use flacenc::component::BitRepr;
use flacenc::error::Verified;
use flacenc::error::Verify; use flacenc::error::Verify;
use super::execute_with_result;
use super::EncodedStream; use super::EncodedStream;
use super::Encoder; use super::Encoder;
use super::Samples; use super::Samples;
#[derive(Debug)] #[derive(Debug)]
pub struct FlacEncoder { pub struct FlacEncoder;
config: Verified<flacenc::config::Encoder>,
}
impl FlacEncoder { impl FlacEncoder {
pub fn new() -> anyhow::Result<Self> { pub fn new() -> anyhow::Result<Self> {
let config = flacenc::config::Encoder::default() Ok(Self)
.into_verified()
.map_err(|e| anyhow::anyhow!("Failed to verify encoder config: {:?}", e))?;
Ok(FlacEncoder { config })
} }
} }
#[async_trait::async_trait]
impl Encoder for FlacEncoder { impl Encoder for FlacEncoder {
fn encode(&self, samples: Samples) -> anyhow::Result<EncodedStream> { async fn encode(&self, samples: Samples) -> anyhow::Result<EncodedStream> {
let source = flacenc::source::MemSource::from_samples( let source = flacenc::source::MemSource::from_samples(
&samples.samples, &samples.samples,
samples.channels as usize, samples.channels as usize,
@ -30,8 +26,19 @@ impl Encoder for FlacEncoder {
samples.sample_rate as usize, samples.sample_rate as usize,
); );
let flac_stream = let config = flacenc::config::Encoder::default()
flacenc::encode_with_fixed_block_size(&self.config, source, self.config.block_size) .into_verified()
.map_err(|e| anyhow::anyhow!("Failed to verify encoder config: {:?}", e))?;
let (tx, rx) = tokio::sync::oneshot::channel();
rayon::spawn(execute_with_result(
move || {
let flac_stream = flacenc::encode_with_fixed_block_size(
&config,
source,
config.block_size,
)
.map_err(|e| anyhow::anyhow!("Failed to encode flac: {:?}", e))?; .map_err(|e| anyhow::anyhow!("Failed to encode flac: {:?}", e))?;
let mut byte_sink = ByteSink::new(); let mut byte_sink = ByteSink::new();
@ -39,6 +46,13 @@ impl Encoder for FlacEncoder {
.write(&mut byte_sink) .write(&mut byte_sink)
.map_err(|e| anyhow::anyhow!("Failed to write flac stream: {:?}", e))?; .map_err(|e| anyhow::anyhow!("Failed to write flac stream: {:?}", e))?;
Ok(EncodedStream::new(byte_sink.into_inner())) Ok(byte_sink.into_inner())
},
tx,
));
let byte_sink: Vec<u8> = rx.await??;
Ok(EncodedStream::new(byte_sink))
} }
} }

View file

@ -2,10 +2,11 @@ mod flac;
#[cfg(feature = "mp3")] #[cfg(feature = "mp3")]
mod mp3; mod mp3;
use std::str::FromStr; use std::{path::Path, str::FromStr};
use anyhow::Result; use anyhow::Result;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tokio::sync::oneshot::Sender;
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub enum Format { pub enum Format {
@ -54,8 +55,9 @@ pub fn get_encoder(format: Format) -> anyhow::Result<&'static Box<dyn Encoder +
} }
} }
#[async_trait::async_trait]
pub trait Encoder { pub trait Encoder {
fn encode(&self, samples: Samples) -> Result<EncodedStream>; async fn encode(&self, samples: Samples) -> Result<EncodedStream>;
} }
pub struct Samples { pub struct Samples {
@ -85,15 +87,27 @@ impl EncodedStream {
EncodedStream { stream } EncodedStream { stream }
} }
pub fn write_to_file<P: std::convert::AsRef<std::path::Path>>(&self, path: P) -> Result<()> { pub async fn write_to_file<P: AsRef<Path>>(&self, path: P) -> Result<()> {
if !path.as_ref().exists() { if !path.as_ref().exists() {
std::fs::create_dir_all( tokio::fs::create_dir_all(
path.as_ref() path.as_ref()
.parent() .parent()
.ok_or(anyhow::anyhow!("Could not create path"))?, .ok_or(anyhow::anyhow!("Could not create path"))?,
)?; ).await?;
} }
std::fs::write(path, &self.stream)?; tokio::fs::write(path, &self.stream).await?;
Ok(()) Ok(())
} }
} }
pub fn execute_with_result<F, T>(func: F, tx: Sender<anyhow::Result<T>>) -> impl FnOnce()
where
F: FnOnce() -> anyhow::Result<T> + Send + 'static,
T: Send + 'static,
{
move || {
let result = func();
// Ignore the error if the receiver has been dropped
let _ = tx.send(result);
}
}

View file

@ -4,48 +4,69 @@ use mp3lame_encoder::Builder;
use mp3lame_encoder::FlushNoGap; use mp3lame_encoder::FlushNoGap;
use mp3lame_encoder::InterleavedPcm; use mp3lame_encoder::InterleavedPcm;
use super::Samples; use super::execute_with_result;
use super::Encoder;
use super::EncodedStream; use super::EncodedStream;
use super::Encoder;
use super::Samples;
pub struct Mp3Encoder; pub struct Mp3Encoder;
impl Mp3Encoder { impl Mp3Encoder {
fn build_encoder(&self, sample_rate: u32, channels: u32) -> anyhow::Result<mp3lame_encoder::Encoder> { fn build_encoder(
let mut builder = Builder::new(). &self,
ok_or(anyhow::anyhow!("Failed to create mp3 encoder"))?; sample_rate: u32,
channels: u32,
) -> anyhow::Result<mp3lame_encoder::Encoder> {
let mut builder = Builder::new().ok_or(anyhow::anyhow!("Failed to create mp3 encoder"))?;
builder.set_sample_rate(sample_rate) builder
.set_sample_rate(sample_rate)
.map_err(|e| anyhow::anyhow!("Failed to set sample rate for mp3 encoder: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to set sample rate for mp3 encoder: {}", e))?;
builder.set_num_channels(channels as u8) builder.set_num_channels(channels as u8).map_err(|e| {
.map_err(|e| anyhow::anyhow!("Failed to set number of channels for mp3 encoder: {}", e))?; anyhow::anyhow!("Failed to set number of channels for mp3 encoder: {}", e)
builder.set_brate(mp3lame_encoder::Birtate::Kbps160) })?;
builder
.set_brate(mp3lame_encoder::Birtate::Kbps160)
.map_err(|e| anyhow::anyhow!("Failed to set bitrate for mp3 encoder: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to set bitrate for mp3 encoder: {}", e))?;
builder.build() builder
.build()
.map_err(|e| anyhow::anyhow!("Failed to build mp3 encoder: {}", e)) .map_err(|e| anyhow::anyhow!("Failed to build mp3 encoder: {}", e))
} }
} }
#[async_trait::async_trait]
impl Encoder for Mp3Encoder { impl Encoder for Mp3Encoder {
fn encode(&self, samples: Samples) -> anyhow::Result<EncodedStream> { async fn encode(&self, samples: Samples) -> anyhow::Result<EncodedStream> {
let mut mp3_encoder = self.build_encoder(samples.sample_rate, samples.channels)?; let mut mp3_encoder = self.build_encoder(samples.sample_rate, samples.channels)?;
let (tx, rx) = tokio::sync::oneshot::channel();
rayon::spawn(execute_with_result(
move || {
let samples: Vec<i16> = samples.samples.iter().map(|&x| x as i16).collect(); let samples: Vec<i16> = samples.samples.iter().map(|&x| x as i16).collect();
let input = InterleavedPcm(samples.as_slice()); let input = InterleavedPcm(samples.as_slice());
let mut mp3_out_buffer = Vec::new(); let mut mp3_out_buffer = Vec::new();
mp3_out_buffer.reserve(mp3lame_encoder::max_required_buffer_size(samples.len())); mp3_out_buffer.reserve(mp3lame_encoder::max_required_buffer_size(samples.len()));
let encoded_size = mp3_encoder.encode(input, mp3_out_buffer.spare_capacity_mut()) let encoded_size = mp3_encoder
.encode(input, mp3_out_buffer.spare_capacity_mut())
.map_err(|e| anyhow!("Failed to encode mp3: {}", e))?; .map_err(|e| anyhow!("Failed to encode mp3: {}", e))?;
unsafe { unsafe {
mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size)); mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size));
} }
let encoded_size = mp3_encoder.flush::<FlushNoGap>(mp3_out_buffer.spare_capacity_mut()) let encoded_size = mp3_encoder
.flush::<FlushNoGap>(mp3_out_buffer.spare_capacity_mut())
.map_err(|e| anyhow!("Failed to flush mp3 encoder: {}", e))?; .map_err(|e| anyhow!("Failed to flush mp3 encoder: {}", e))?;
unsafe { unsafe {
mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size)); mp3_out_buffer.set_len(mp3_out_buffer.len().wrapping_add(encoded_size));
} }
Ok(mp3_out_buffer)
},
tx,
));
let mp3_out_buffer = rx.await??;
Ok(EncodedStream::new(mp3_out_buffer)) Ok(EncodedStream::new(mp3_out_buffer))
} }

View file

@ -1,5 +1,5 @@
pub mod download; pub mod download;
pub mod file_sink; // pub mod file_sink;
pub mod channel_sink; pub mod channel_sink;
pub mod session; pub mod session;
pub mod track; pub mod track;