Skip to content

Commit

Permalink
Merge pull request #567 from RustyDAW/jack-alloc
Browse files Browse the repository at this point in the history
Update to rust-jack 0.7 and simplify implementation
  • Loading branch information
est31 authored Jun 7, 2021
2 parents b628b9d + 833acba commit 47ef62d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 93 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ alsa = "0.5"
nix = "0.20"
libc = "0.2.65"
parking_lot = "0.11"
jack = { version = "0.6.5", optional = true }
jack = { version = "0.7.0", optional = true }

[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
core-foundation-sys = "0.6.2" # For linking to CoreFoundation.framework and handling device name `CFString`s.
Expand Down
166 changes: 74 additions & 92 deletions src/host/jack/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use crate::{
};

use super::JACK_SAMPLE_FORMAT;

type ErrorCallbackPtr = Arc<Mutex<dyn FnMut(StreamError) + Send + 'static>>;

pub struct Stream {
// TODO: It might be faster to send a message when playing/pausing than to check this every iteration
playing: Arc<AtomicBool>,
Expand Down Expand Up @@ -58,17 +61,20 @@ impl Stream {

let playing = Arc::new(AtomicBool::new(true));

let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr;

let input_process_handler = LocalProcessHandler::new(
vec![],
ports,
SampleRate(client.sample_rate() as u32),
client.buffer_size() as usize,
Some(Box::new(data_callback)),
None,
playing.clone(),
client.buffer_size() as usize,
Arc::clone(&error_callback_ptr),
);

let notification_handler = JackNotificationHandler::new(error_callback);
let notification_handler = JackNotificationHandler::new(error_callback_ptr);

let async_client = client
.activate_async(notification_handler, input_process_handler)
Expand Down Expand Up @@ -120,17 +126,20 @@ impl Stream {

let playing = Arc::new(AtomicBool::new(true));

let error_callback_ptr = Arc::new(Mutex::new(error_callback)) as ErrorCallbackPtr;

let output_process_handler = LocalProcessHandler::new(
ports,
vec![],
SampleRate(client.sample_rate() as u32),
client.buffer_size() as usize,
None,
Some(Box::new(data_callback)),
playing.clone(),
client.buffer_size() as usize,
Arc::clone(&error_callback_ptr),
);

let notification_handler = JackNotificationHandler::new(error_callback);
let notification_handler = JackNotificationHandler::new(error_callback_ptr);

let async_client = client
.activate_async(notification_handler, output_process_handler)
Expand Down Expand Up @@ -215,67 +224,53 @@ struct LocalProcessHandler {
in_ports: Vec<jack::Port<jack::AudioIn>>,

sample_rate: SampleRate,
buffer_size: usize,
input_data_callback: Option<Box<dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static>>,
output_data_callback: Option<Box<dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static>>,

temp_input_buffer: Vec<f32>,

// JACK audio samples are 32 bit float (unless you do some custom dark magic)
temp_input_buffer: Vec<f32>,
temp_output_buffer: Vec<f32>,
/// The number of frames in the temp_output_buffer
temp_output_buffer_size_in_frames: usize,
temp_output_buffer_frames_index: usize,
playing: Arc<AtomicBool>,
creation_timestamp: std::time::Instant,
/// This should not be called on `process`, only on `buffer_size` because it can block.
error_callback_ptr: ErrorCallbackPtr,
}

impl LocalProcessHandler {
fn new(
out_ports: Vec<jack::Port<jack::AudioOut>>,
in_ports: Vec<jack::Port<jack::AudioIn>>,
sample_rate: SampleRate,
buffer_size: usize,
input_data_callback: Option<Box<dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static>>,
output_data_callback: Option<
Box<dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static>,
>,
playing: Arc<AtomicBool>,
buffer_size: usize,
error_callback_ptr: ErrorCallbackPtr,
) -> Self {
// buffer_size is the maximum number of samples per port JACK can request/provide in a single call
// If it can be fewer than that per call the temp_input_buffer needs to be the smallest multiple of that.
// These may be reallocated in the `buffer_size` callback.
let temp_input_buffer = vec![0.0; in_ports.len() * buffer_size];

let temp_output_buffer = vec![0.0; out_ports.len() * buffer_size];

// let out_port_buffers = Vec::with_capacity(out_ports.len());
// let in_port_buffers = Vec::with_capacity(in_ports.len());

LocalProcessHandler {
out_ports,
in_ports,
// out_port_buffers,
// in_port_buffers,
sample_rate,
buffer_size,
input_data_callback,
output_data_callback,
temp_input_buffer,
temp_output_buffer,
temp_output_buffer_size_in_frames: buffer_size,
temp_output_buffer_frames_index: 0,
playing,
creation_timestamp: std::time::Instant::now(),
error_callback_ptr,
}
}
}

fn temp_output_buffer_to_data(temp_output_buffer: &mut Vec<f32>) -> Data {
let data = temp_output_buffer.as_mut_ptr() as *mut ();
let len = temp_output_buffer.len();
let data = unsafe { Data::from_parts(data, len, JACK_SAMPLE_FORMAT) };
data
}

fn temp_input_buffer_to_data(temp_input_buffer: &mut Vec<f32>, total_buffer_size: usize) -> Data {
fn temp_buffer_to_data(temp_input_buffer: &mut Vec<f32>, total_buffer_size: usize) -> Data {
let slice = &temp_input_buffer[0..total_buffer_size];
let data = slice.as_ptr() as *mut ();
let len = total_buffer_size;
Expand All @@ -289,6 +284,8 @@ impl jack::ProcessHandler for LocalProcessHandler {
return jack::Control::Continue;
}

// This should be equal to self.buffer_size, but the implementation will
// work even if it is less. Will panic in `temp_buffer_to_data` if greater.
let current_frame_count = process_scope.n_frames() as usize;

// Get timestamp data
Expand Down Expand Up @@ -325,7 +322,7 @@ impl jack::ProcessHandler for LocalProcessHandler {
}
}
// Create a slice of exactly current_frame_count frames
let data = temp_input_buffer_to_data(
let data = temp_buffer_to_data(
&mut self.temp_input_buffer,
current_frame_count * num_in_channels,
);
Expand All @@ -345,51 +342,58 @@ impl jack::ProcessHandler for LocalProcessHandler {
if let Some(output_callback) = &mut self.output_data_callback {
let num_out_channels = self.out_ports.len();

// Run the output callback on the temporary output buffer until we have filled the output ports
// JACK ports each provide a mutable slice to be filled with samples whereas CPAL uses interleaved
// channels. The formats therefore have to be bridged.
for i in 0..current_frame_count {
// Check if we have gotten all of the frames from the temp_output_buffer
if self.temp_output_buffer_frames_index == self.temp_output_buffer_size_in_frames {
// Get new samples if the temporary buffer is depleted. This can theoretically happen
// several times per cycle or once every few cycles if the buffer size changes, but in practice
// it should generally happen once per cycle if the buffer size is not changed.
let mut data = temp_output_buffer_to_data(&mut self.temp_output_buffer);
// Create timestamp
let frames_since_cycle_start =
process_scope.frames_since_cycle_start() as usize;
let duration_since_cycle_start =
frames_to_duration(frames_since_cycle_start, self.sample_rate);
let callback = start_callback_instant
.add(duration_since_cycle_start)
.expect(
"`playback` occurs beyond representation supported by `StreamInstant`",
);
let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate);
let playback = start_cycle_instant.add(buffer_duration).expect(
"`playback` occurs beyond representation supported by `StreamInstant`",
);
let timestamp = crate::OutputStreamTimestamp { callback, playback };
let info = crate::OutputCallbackInfo { timestamp };
output_callback(&mut data, &info);
self.temp_output_buffer_frames_index = 0;
}
// Write the interleaved samples e.g. [l0, r0, l1, r1, ..] to each output buffer
for ch_ix in 0..num_out_channels {
// TODO: It should be marginally faster to store pointers to these slices, but I don't know how
// to avoid lifetime issues and allocation
let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope);
output_channel[i] = self.temp_output_buffer
[ch_ix + self.temp_output_buffer_frames_index * num_out_channels];
// Create a slice of exactly current_frame_count frames
let mut data = temp_buffer_to_data(
&mut self.temp_output_buffer,
current_frame_count * num_out_channels,
);
// Create timestamp
let frames_since_cycle_start = process_scope.frames_since_cycle_start() as usize;
let duration_since_cycle_start =
frames_to_duration(frames_since_cycle_start, self.sample_rate);
let callback = start_callback_instant
.add(duration_since_cycle_start)
.expect("`playback` occurs beyond representation supported by `StreamInstant`");
let buffer_duration = frames_to_duration(current_frame_count, self.sample_rate);
let playback = start_cycle_instant
.add(buffer_duration)
.expect("`playback` occurs beyond representation supported by `StreamInstant`");
let timestamp = crate::OutputStreamTimestamp { callback, playback };
let info = crate::OutputCallbackInfo { timestamp };
output_callback(&mut data, &info);

// Deinterlace
for ch_ix in 0..num_out_channels {
let output_channel = &mut self.out_ports[ch_ix].as_mut_slice(process_scope);
for i in 0..current_frame_count {
output_channel[i] = self.temp_output_buffer[ch_ix + i * num_out_channels];
}
// Count the number of frames that have been read from the temp buffer
self.temp_output_buffer_frames_index += 1;
}
}

// Continue as normal
jack::Control::Continue
}

fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control {
// The `buffer_size` callback is actually called on the process thread, but
// it does not need to be suitable for real-time use. Thus we can simply allocate
// new buffers here. It is also fine to call the error callback.
// Details: https://github.com/RustAudio/rust-jack/issues/137
let new_size = size as usize;
if new_size != self.buffer_size {
self.buffer_size = new_size;
self.temp_input_buffer = vec![0.0; self.in_ports.len() * new_size];
self.temp_output_buffer = vec![0.0; self.out_ports.len() * new_size];
let description = format!("buffer size changed to: {}", new_size);
if let Ok(mut mutex_guard) = self.error_callback_ptr.lock() {
let err = &mut *mutex_guard;
err(BackendSpecificError { description }.into());
}
}

jack::Control::Continue
}
}

fn micros_to_stream_instant(micros: u64) -> crate::StreamInstant {
Expand All @@ -410,19 +414,14 @@ fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Dura
/// Receives notifications from the JACK server. It is unclear if this may be run concurrent with itself under JACK2 specs
/// so it needs to be Sync.
struct JackNotificationHandler {
error_callback_ptr: Arc<Mutex<Box<dyn FnMut(StreamError) + Send + 'static>>>,
init_block_size_flag: Arc<AtomicBool>,
error_callback_ptr: ErrorCallbackPtr,
init_sample_rate_flag: Arc<AtomicBool>,
}

impl JackNotificationHandler {
pub fn new<E>(error_callback: E) -> Self
where
E: FnMut(StreamError) + Send + 'static,
{
pub fn new(error_callback_ptr: ErrorCallbackPtr) -> Self {
JackNotificationHandler {
error_callback_ptr: Arc::new(Mutex::new(Box::new(error_callback))),
init_block_size_flag: Arc::new(AtomicBool::new(false)),
error_callback_ptr,
init_sample_rate_flag: Arc::new(AtomicBool::new(false)),
}
}
Expand Down Expand Up @@ -457,23 +456,6 @@ impl jack::NotificationHandler for JackNotificationHandler {
}
}

fn buffer_size(&mut self, _: &jack::Client, size: jack::Frames) -> jack::Control {
match self.init_block_size_flag.load(Ordering::SeqCst) {
false => {
// One of these notifications is sent every time a client is started.
self.init_block_size_flag.store(true, Ordering::SeqCst)
}
true => {
self.send_error(format!("buffer size changed to: {}", size));
}
}

// The current implementation should work even if the buffer size changes, although
// potentially with poorer performance. However, reallocating the temporary processing
// buffers would be expensive so we choose to just continue in this case.
jack::Control::Continue
}

fn xrun(&mut self, _: &jack::Client) -> jack::Control {
self.send_error(String::from("xrun (buffer over or under run)"));
jack::Control::Continue
Expand Down

0 comments on commit 47ef62d

Please sign in to comment.