Parse frames from a decoder and push to pipewire

This commit is contained in:
Muaz Ahmad 2024-12-09 14:29:15 +05:00
parent 58d33b8868
commit 0ca063e927
8 changed files with 132 additions and 25 deletions

View file

@ -1,35 +1,80 @@
use std::{ use std::{
collections::VecDeque,
io::{Read, Write}, io::{Read, Write},
process::{Child, ChildStdin, ChildStdout, Command, Stdio}, process::{Child, ChildStdin, ChildStdout, Command, Stdio},
sync::mpsc::{channel, Receiver, Sender, TryRecvError},
thread,
}; };
use crate::utils::Error; use crate::utils::Error;
const MAX_SAMPLE_BUFF: usize = 1_000_000;
pub struct DecoderContext { pub struct DecoderContext {
process: Child, process: Child,
encoded_in: ChildStdin, encoded_in: Sender<Vec<u8>>,
frames_out: ChildStdout, frames_out: Receiver<Vec<u8>>,
sample_buf: VecDeque<Vec<u8>>,
} }
pub fn init() -> Result<DecoderContext, Error> { pub fn init(stride: usize, sample_rate: u32) -> Result<DecoderContext, Error> {
let mut decoder = Command::new("ffmpeg") let mut decoder = Command::new("ffmpeg")
.args(["-hide_banner", "-i", "-", "-f", "s16le", "-"]) .args(["-hide_banner", "-i", "-", "-f", "s16le", "-"])
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::null()) .stderr(Stdio::null())
.spawn()?; .spawn()?;
let encoded_in = decoder.stdin.take().unwrap(); let (encoded_in, encoded_out) = channel::<Vec<u8>>();
let frames_out = decoder.stdout.take().unwrap(); let (frames_in, frames_out) = channel();
let mut ffmpeg_in = decoder.stdin.take().unwrap();
let mut ffmpeg_out = decoder.stdout.take().unwrap();
thread::spawn(move || loop {
if let Ok(chunk) = encoded_out.recv() {
ffmpeg_in.write_all(chunk.as_slice()).unwrap();
} else {
break;
}
});
thread::spawn(move || loop {
let mut frames = vec![0; stride * sample_rate as usize / 1000 * 10];
match ffmpeg_out.read_exact(frames.as_mut_slice()) {
Err(_) => break,
Ok(_) => (),
};
match frames_in.send(frames) {
Err(_) => break,
Ok(_) => (),
};
});
Ok(DecoderContext { Ok(DecoderContext {
process: decoder, process: decoder,
encoded_in, encoded_in,
frames_out, frames_out,
sample_buf: VecDeque::new(),
}) })
} }
impl DecoderContext { impl DecoderContext {
pub fn append_chunk(&mut self, chunk: Vec<u8>) -> Result<(), Error> { pub fn append_chunk(&mut self, chunk: Vec<u8>) -> Result<(), Error> {
self.encoded_in.write_all(chunk.as_slice())?; self.encoded_in.send(chunk)?;
Ok(()) Ok(())
} }
pub fn fetch_samples(&mut self) {
loop {
match self.frames_out.try_recv() {
Err(TryRecvError::Empty) => break,
Ok(frames) => self.sample_buf.push_back(frames),
Err(_) => unimplemented!(),
}
}
}
pub fn next_sample(&mut self) -> Vec<u8> {
if self.sample_buf.len() < 100 {
self.fetch_samples();
}
self.sample_buf.pop_front().unwrap_or(vec![0; 1920])
}
} }

16
src/audio/errors.rs Normal file
View file

@ -0,0 +1,16 @@
#[derive(Debug)]
pub enum AudioError {
PodUpdateError,
}
impl std::error::Error for AudioError {}
impl std::fmt::Display for AudioError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::PodUpdateError => {
write!(f, "Error while reconfiguring the audio stream paramteres")
}
}
}
}

View file

@ -14,6 +14,7 @@ use crate::{config::Settings, player::PlayerEvent, utils::Error};
pub enum AudioEvent { pub enum AudioEvent {
DecodeChunk(Vec<u8>), DecodeChunk(Vec<u8>),
DecoderInit(u32, u32),
} }
pub struct SoundManager { pub struct SoundManager {
@ -60,5 +61,6 @@ pub fn init(
} }
mod codec; mod codec;
mod errors;
mod pw; mod pw;
mod sound_mgr; mod sound_mgr;

View file

@ -11,7 +11,7 @@ use pipewire::{
stream::{Stream, StreamFlags}, stream::{Stream, StreamFlags},
}; };
use crate::utils::{default_pw_pod, Error}; use crate::utils::{new_pw_pod, Error};
pub fn init( pub fn init(
samples: std::sync::mpsc::Receiver<Vec<u8>>, samples: std::sync::mpsc::Receiver<Vec<u8>>,
@ -53,24 +53,24 @@ pub fn init(
}; };
let chunk = data.chunk_mut(); let chunk = data.chunk_mut();
*chunk.offset_mut() = 0; *chunk.offset_mut() = 0;
*chunk.stride_mut() = 4;
*chunk.size_mut() = n_iter as _; *chunk.size_mut() = n_iter as _;
} }
}) })
.register()?; .register()?;
let default_audio = default_pw_pod();
let mut params = [Pod::from_bytes(&default_audio).unwrap()];
audio_source.connect(
Direction::Output,
None,
StreamFlags::AUTOCONNECT | StreamFlags::ALLOC_BUFFERS | StreamFlags::MAP_BUFFERS,
&mut params,
);
let audio_source_ref = audio_source.clone(); let audio_source_ref = audio_source.clone();
let _receiver = pw_signal.attach(mainloop.loop_(), move |pod_bytes| { let _receiver = pw_signal.attach(mainloop.loop_(), move |pod_bytes| {
let mut params = [Pod::from_bytes(&pod_bytes).unwrap()]; let mut params = [Pod::from_bytes(&pod_bytes).unwrap()];
audio_source_ref.update_params(&mut params); audio_source_ref.disconnect().unwrap();
audio_source_ref
.connect(
Direction::Output,
None,
StreamFlags::AUTOCONNECT | StreamFlags::ALLOC_BUFFERS | StreamFlags::MAP_BUFFERS,
&mut params,
)
.unwrap();
}); });
mainloop.run(); mainloop.run();

View file

@ -4,11 +4,16 @@ use std::{
Arc, Arc,
}, },
thread, thread,
time::{Duration, SystemTime},
}; };
use crate::{config::Settings, player::PlayerEvent, utils::Error}; use crate::{
config::Settings,
player::PlayerEvent,
utils::{new_pw_pod, Error},
};
use super::{codec, pw, AudioEvent, SoundManager}; use super::{codec, errors::AudioError, pw, AudioEvent, SoundManager};
impl SoundManager { impl SoundManager {
pub fn init( pub fn init(
@ -30,7 +35,7 @@ impl SoundManager {
Ok(_) => (), Ok(_) => (),
}; };
}); });
let decoder_context = codec::init()?; let decoder_context = codec::init(4, 48000)?;
Ok(SoundManager { Ok(SoundManager {
settings, settings,
@ -45,10 +50,23 @@ impl SoundManager {
} }
pub fn begin(&mut self) -> Result<(), Error> { pub fn begin(&mut self) -> Result<(), Error> {
loop { loop {
let start = SystemTime::now();
self.handle_events()?; self.handle_events()?;
if self.playing {
self.push_samples()?;
}
let time_elapsed = SystemTime::now().duration_since(start)?;
let time_rem = Duration::from_millis(9).saturating_sub(time_elapsed);
thread::sleep(time_rem);
} }
} }
fn push_samples(&mut self) -> Result<(), Error> {
let frame = self.decoder_context.next_sample();
self.sample_in.send(frame)?;
Ok(())
}
fn handle_events(&mut self) -> Result<(), Error> { fn handle_events(&mut self) -> Result<(), Error> {
let event = match self.audio_controls.try_recv() { let event = match self.audio_controls.try_recv() {
Err(err) if err == TryRecvError::Empty => return Ok(()), Err(err) if err == TryRecvError::Empty => return Ok(()),
@ -57,8 +75,21 @@ impl SoundManager {
}; };
match event { match event {
AudioEvent::DecodeChunk(chunk) => self.decoder_context.append_chunk(chunk)?, AudioEvent::DecodeChunk(chunk) => self.decoder_context.append_chunk(chunk)?,
AudioEvent::DecoderInit(sample_rate, channel_count) => {
self.decoder_context = codec::init(channel_count as usize * 2, sample_rate)?;
self.update_pw_stream(channel_count, sample_rate)?
}
_ => unimplemented!(), _ => unimplemented!(),
} }
Ok(()) Ok(())
} }
fn update_pw_stream(&mut self, channel_count: u32, sample_rate: u32) -> Result<(), Error> {
let pod_bytes = new_pw_pod(sample_rate, channel_count);
match self.pw_signal_in.send(pod_bytes) {
Err(_) => Err(Box::new(AudioError::PodUpdateError)),
Ok(_) => Ok(()),
}
}
} }

View file

@ -24,7 +24,6 @@ pub enum PlayerEvent {
UserQuit, UserQuit,
PlayNext, PlayNext,
AddAudioChunk(Vec<u8>), AddAudioChunk(Vec<u8>),
AudioChunkQueued(u32),
} }
pub struct Player { pub struct Player {

View file

@ -82,6 +82,7 @@ impl Player {
self.player_chan_in self.player_chan_in
.send(PlayerEvent::UpdateCover(default_cover()))?; .send(PlayerEvent::UpdateCover(default_cover()))?;
} }
self.audio_chan.send(AudioEvent::DecoderInit(48000, 2))?;
self.tui_root.metadata.update_metadata(song); self.tui_root.metadata.update_metadata(song);
self.fetch_audio_chunk()?; self.fetch_audio_chunk()?;

View file

@ -1,8 +1,11 @@
use image::DynamicImage; use image::DynamicImage;
use pipewire::spa::{ use pipewire::spa::{
param::audio::AudioInfoRaw, param::audio::{AudioFormat, AudioInfoRaw, MAX_CHANNELS},
pod::{serialize::PodSerializer, Object, Value}, pod::{serialize::PodSerializer, Object, Value},
sys::{SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format}, sys::{
SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format, SPA_AUDIO_CHANNEL_FL, SPA_AUDIO_CHANNEL_FR,
SPA_AUDIO_CHANNEL_MONO,
},
}; };
use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::io::Cursor; use std::io::Cursor;
@ -40,8 +43,18 @@ pub fn format_duration(secs: u32) -> String {
} }
} }
pub fn default_pw_pod() -> Vec<u8> { pub fn new_pw_pod(sample_rate: u32, channels: u32) -> Vec<u8> {
let audio_info = AudioInfoRaw::new(); let mut audio_info = AudioInfoRaw::new();
audio_info.set_format(AudioFormat::S16LE);
audio_info.set_rate(sample_rate);
audio_info.set_channels(channels);
let mut pos = [0; MAX_CHANNELS];
if channels == 1 {
pos[0] = SPA_AUDIO_CHANNEL_MONO
} else {
(pos[0], pos[1]) = (SPA_AUDIO_CHANNEL_FL, SPA_AUDIO_CHANNEL_FR);
}
audio_info.set_position(pos);
PodSerializer::serialize( PodSerializer::serialize(
Cursor::new(Vec::new()), Cursor::new(Vec::new()),
&Value::Object(Object { &Value::Object(Object {