From 0ca063e9272bb3a8c29e7516a13549ae6afe2af6 Mon Sep 17 00:00:00 2001 From: Muaz Ahmad Date: Mon, 9 Dec 2024 14:29:15 +0500 Subject: [PATCH] Parse frames from a decoder and push to pipewire --- src/audio/codec.rs | 57 +++++++++++++++++++++++++++++++++++++----- src/audio/errors.rs | 16 ++++++++++++ src/audio/mod.rs | 2 ++ src/audio/pw.rs | 22 ++++++++-------- src/audio/sound_mgr.rs | 37 ++++++++++++++++++++++++--- src/player/mod.rs | 1 - src/player/player.rs | 1 + src/utils.rs | 21 +++++++++++++--- 8 files changed, 132 insertions(+), 25 deletions(-) create mode 100644 src/audio/errors.rs diff --git a/src/audio/codec.rs b/src/audio/codec.rs index f31e5a0..7bca5b8 100644 --- a/src/audio/codec.rs +++ b/src/audio/codec.rs @@ -1,35 +1,80 @@ use std::{ + collections::VecDeque, io::{Read, Write}, process::{Child, ChildStdin, ChildStdout, Command, Stdio}, + sync::mpsc::{channel, Receiver, Sender, TryRecvError}, + thread, }; use crate::utils::Error; +const MAX_SAMPLE_BUFF: usize = 1_000_000; + pub struct DecoderContext { process: Child, - encoded_in: ChildStdin, - frames_out: ChildStdout, + encoded_in: Sender>, + frames_out: Receiver>, + sample_buf: VecDeque>, } -pub fn init() -> Result { +pub fn init(stride: usize, sample_rate: u32) -> Result { let mut decoder = Command::new("ffmpeg") .args(["-hide_banner", "-i", "-", "-f", "s16le", "-"]) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn()?; - let encoded_in = decoder.stdin.take().unwrap(); - let frames_out = decoder.stdout.take().unwrap(); + let (encoded_in, encoded_out) = channel::>(); + let (frames_in, frames_out) = channel(); + let mut ffmpeg_in = decoder.stdin.take().unwrap(); + let mut ffmpeg_out = decoder.stdout.take().unwrap(); + thread::spawn(move || loop { + if let Ok(chunk) = encoded_out.recv() { + ffmpeg_in.write_all(chunk.as_slice()).unwrap(); + } else { + break; + } + }); + thread::spawn(move || loop { + let mut frames = vec![0; stride * sample_rate as usize / 1000 * 10]; + match ffmpeg_out.read_exact(frames.as_mut_slice()) { + Err(_) => break, + Ok(_) => (), + }; + match frames_in.send(frames) { + Err(_) => break, + Ok(_) => (), + }; + }); Ok(DecoderContext { process: decoder, encoded_in, frames_out, + sample_buf: VecDeque::new(), }) } impl DecoderContext { pub fn append_chunk(&mut self, chunk: Vec) -> Result<(), Error> { - self.encoded_in.write_all(chunk.as_slice())?; + self.encoded_in.send(chunk)?; Ok(()) } + + pub fn fetch_samples(&mut self) { + loop { + match self.frames_out.try_recv() { + Err(TryRecvError::Empty) => break, + Ok(frames) => self.sample_buf.push_back(frames), + Err(_) => unimplemented!(), + } + } + } + + pub fn next_sample(&mut self) -> Vec { + if self.sample_buf.len() < 100 { + self.fetch_samples(); + } + + self.sample_buf.pop_front().unwrap_or(vec![0; 1920]) + } } diff --git a/src/audio/errors.rs b/src/audio/errors.rs new file mode 100644 index 0000000..1a1d74e --- /dev/null +++ b/src/audio/errors.rs @@ -0,0 +1,16 @@ +#[derive(Debug)] +pub enum AudioError { + PodUpdateError, +} + +impl std::error::Error for AudioError {} + +impl std::fmt::Display for AudioError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::PodUpdateError => { + write!(f, "Error while reconfiguring the audio stream paramteres") + } + } + } +} diff --git a/src/audio/mod.rs b/src/audio/mod.rs index e933d69..c243131 100644 --- a/src/audio/mod.rs +++ b/src/audio/mod.rs @@ -14,6 +14,7 @@ use crate::{config::Settings, player::PlayerEvent, utils::Error}; pub enum AudioEvent { DecodeChunk(Vec), + DecoderInit(u32, u32), } pub struct SoundManager { @@ -60,5 +61,6 @@ pub fn init( } mod codec; +mod errors; mod pw; mod sound_mgr; diff --git a/src/audio/pw.rs b/src/audio/pw.rs index 54f7b30..5fc15fb 100644 --- a/src/audio/pw.rs +++ b/src/audio/pw.rs @@ -11,7 +11,7 @@ use pipewire::{ stream::{Stream, StreamFlags}, }; -use crate::utils::{default_pw_pod, Error}; +use crate::utils::{new_pw_pod, Error}; pub fn init( samples: std::sync::mpsc::Receiver>, @@ -53,24 +53,24 @@ pub fn init( }; let chunk = data.chunk_mut(); *chunk.offset_mut() = 0; + *chunk.stride_mut() = 4; *chunk.size_mut() = n_iter as _; } }) .register()?; - let default_audio = default_pw_pod(); - let mut params = [Pod::from_bytes(&default_audio).unwrap()]; - audio_source.connect( - Direction::Output, - None, - StreamFlags::AUTOCONNECT | StreamFlags::ALLOC_BUFFERS | StreamFlags::MAP_BUFFERS, - &mut params, - ); - let audio_source_ref = audio_source.clone(); let _receiver = pw_signal.attach(mainloop.loop_(), move |pod_bytes| { let mut params = [Pod::from_bytes(&pod_bytes).unwrap()]; - audio_source_ref.update_params(&mut params); + audio_source_ref.disconnect().unwrap(); + audio_source_ref + .connect( + Direction::Output, + None, + StreamFlags::AUTOCONNECT | StreamFlags::ALLOC_BUFFERS | StreamFlags::MAP_BUFFERS, + &mut params, + ) + .unwrap(); }); mainloop.run(); diff --git a/src/audio/sound_mgr.rs b/src/audio/sound_mgr.rs index 9f831c0..dcb97be 100644 --- a/src/audio/sound_mgr.rs +++ b/src/audio/sound_mgr.rs @@ -4,11 +4,16 @@ use std::{ Arc, }, thread, + time::{Duration, SystemTime}, }; -use crate::{config::Settings, player::PlayerEvent, utils::Error}; +use crate::{ + config::Settings, + player::PlayerEvent, + utils::{new_pw_pod, Error}, +}; -use super::{codec, pw, AudioEvent, SoundManager}; +use super::{codec, errors::AudioError, pw, AudioEvent, SoundManager}; impl SoundManager { pub fn init( @@ -30,7 +35,7 @@ impl SoundManager { Ok(_) => (), }; }); - let decoder_context = codec::init()?; + let decoder_context = codec::init(4, 48000)?; Ok(SoundManager { settings, @@ -45,10 +50,23 @@ impl SoundManager { } pub fn begin(&mut self) -> Result<(), Error> { loop { + let start = SystemTime::now(); self.handle_events()?; + if self.playing { + self.push_samples()?; + } + let time_elapsed = SystemTime::now().duration_since(start)?; + let time_rem = Duration::from_millis(9).saturating_sub(time_elapsed); + thread::sleep(time_rem); } } + fn push_samples(&mut self) -> Result<(), Error> { + let frame = self.decoder_context.next_sample(); + self.sample_in.send(frame)?; + Ok(()) + } + fn handle_events(&mut self) -> Result<(), Error> { let event = match self.audio_controls.try_recv() { Err(err) if err == TryRecvError::Empty => return Ok(()), @@ -57,8 +75,21 @@ impl SoundManager { }; match event { AudioEvent::DecodeChunk(chunk) => self.decoder_context.append_chunk(chunk)?, + AudioEvent::DecoderInit(sample_rate, channel_count) => { + self.decoder_context = codec::init(channel_count as usize * 2, sample_rate)?; + self.update_pw_stream(channel_count, sample_rate)? + } _ => unimplemented!(), } Ok(()) } + + fn update_pw_stream(&mut self, channel_count: u32, sample_rate: u32) -> Result<(), Error> { + let pod_bytes = new_pw_pod(sample_rate, channel_count); + + match self.pw_signal_in.send(pod_bytes) { + Err(_) => Err(Box::new(AudioError::PodUpdateError)), + Ok(_) => Ok(()), + } + } } diff --git a/src/player/mod.rs b/src/player/mod.rs index 7a7d3a1..b9ae20f 100644 --- a/src/player/mod.rs +++ b/src/player/mod.rs @@ -24,7 +24,6 @@ pub enum PlayerEvent { UserQuit, PlayNext, AddAudioChunk(Vec), - AudioChunkQueued(u32), } pub struct Player { diff --git a/src/player/player.rs b/src/player/player.rs index 6fb2888..9bd60cb 100644 --- a/src/player/player.rs +++ b/src/player/player.rs @@ -82,6 +82,7 @@ impl Player { self.player_chan_in .send(PlayerEvent::UpdateCover(default_cover()))?; } + self.audio_chan.send(AudioEvent::DecoderInit(48000, 2))?; self.tui_root.metadata.update_metadata(song); self.fetch_audio_chunk()?; diff --git a/src/utils.rs b/src/utils.rs index 90cc172..20f99f3 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,8 +1,11 @@ use image::DynamicImage; use pipewire::spa::{ - param::audio::AudioInfoRaw, + param::audio::{AudioFormat, AudioInfoRaw, MAX_CHANNELS}, pod::{serialize::PodSerializer, Object, Value}, - sys::{SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format}, + sys::{ + SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format, SPA_AUDIO_CHANNEL_FL, SPA_AUDIO_CHANNEL_FR, + SPA_AUDIO_CHANNEL_MONO, + }, }; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use std::io::Cursor; @@ -40,8 +43,18 @@ pub fn format_duration(secs: u32) -> String { } } -pub fn default_pw_pod() -> Vec { - let audio_info = AudioInfoRaw::new(); +pub fn new_pw_pod(sample_rate: u32, channels: u32) -> Vec { + let mut audio_info = AudioInfoRaw::new(); + audio_info.set_format(AudioFormat::S16LE); + audio_info.set_rate(sample_rate); + audio_info.set_channels(channels); + let mut pos = [0; MAX_CHANNELS]; + if channels == 1 { + pos[0] = SPA_AUDIO_CHANNEL_MONO + } else { + (pos[0], pos[1]) = (SPA_AUDIO_CHANNEL_FL, SPA_AUDIO_CHANNEL_FR); + } + audio_info.set_position(pos); PodSerializer::serialize( Cursor::new(Vec::new()), &Value::Object(Object {