Skip to content

Commit

Permalink
Ticker: apply review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-laplante committed May 10, 2022
1 parent 330980b commit 41e0f3f
Showing 1 changed file with 60 additions and 63 deletions.
123 changes: 60 additions & 63 deletions src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{fmt, mem};
use std::{io, thread};

#[cfg(test)]
use std::sync::atomic::AtomicBool;
use once_cell::sync::Lazy;
#[cfg(test)]
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, Ordering};

use crate::draw_target::ProgressDrawTarget;
use crate::state::{AtomicPosition, BarState, ProgressFinish, Reset};
Expand Down Expand Up @@ -143,21 +143,21 @@ impl ProgressBar {
return;
}

self.stop_and_replace_ticker(Some(Ticker::new(interval, &self.state)));
self.stop_and_replace_ticker(Some(interval));
}

/// Undoes [`ProgressBar::enable_steady_tick()`]
pub fn disable_steady_tick(&self) {
self.stop_and_replace_ticker(None);
}

fn stop_and_replace_ticker(&self, new_ticker: Option<Ticker>) {
fn stop_and_replace_ticker(&self, interval: Option<Duration>) {
let mut ticker_state = self.ticker.lock().unwrap();
if let Some(ticker) = ticker_state.take() {
ticker.stop();
}

*ticker_state = new_ticker;
*ticker_state = interval.map(|interval| Ticker::new(interval, &self.state));
}

/// Manually ticks the spinner or progress bar
Expand Down Expand Up @@ -537,8 +537,8 @@ impl WeakProgressBar {
}

pub(crate) struct Ticker {
stop_received: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
stopping_flag: Arc<Mutex<bool>>,
stopping_flag_set: Arc<Condvar>,
join_handle: Option<thread::JoinHandle<()>>,
}

Expand All @@ -553,79 +553,76 @@ impl Drop for Ticker {
static TICKER_RUNNING: AtomicBool = AtomicBool::new(false);

impl Ticker {
pub(crate) fn new(interval: Duration, bar_state: &Arc<Mutex<BarState>>) -> Ticker {
pub(crate) fn new(interval: Duration, bar_state: &Arc<Mutex<BarState>>) -> Self {
debug_assert!(!interval.is_zero());

#[allow(clippy::mutex_atomic)]
let cancel_pending = Arc::new(Mutex::new(false));
let cond_var = Arc::new(Condvar::new());

let stopping_flag = Arc::new(Mutex::new(false));
let stopping_flag_set = Arc::new(Condvar::new());
let control = TickerControl {
stop_received: cancel_pending.clone(),
cond_var: cond_var.clone(),
weak_state: Arc::downgrade(bar_state),
stopping_flag: stopping_flag.clone(),
stopping_flag_set: stopping_flag_set.clone(),
state: Arc::downgrade(bar_state),
};

let join_handle = thread::spawn(move || {
#[cfg(test)]
TICKER_RUNNING.store(true, Ordering::SeqCst);

while let Some(arc) = control.weak_state.upgrade() {
let mut state = arc.lock().unwrap();
if state.state.is_finished() {
break;
}

if state.state.tick != 0 {
state.state.tick = state.state.tick.saturating_add(1);
}

state.draw(false, Instant::now()).ok();

drop(state); // Don't forget to drop the lock before sleeping
drop(arc); // Also need to drop Arc otherwise BarState won't be dropped

// Wait for `interval` but return early if we are notified to stop
let result = control
.cond_var
.wait_timeout_while(
control.stop_received.lock().unwrap(),
interval,
|stopped| !*stopped,
)
.unwrap();

// If the wait didn't time out, it means we were notified to stop
if !result.1.timed_out() {
break;
}
}

#[cfg(test)]
TICKER_RUNNING.store(false, Ordering::SeqCst);
});

Ticker {
stop_received: cancel_pending,
cond_var,
let join_handle = thread::spawn(move || control.run(interval));
Self {
stopping_flag,
stopping_flag_set,
join_handle: Some(join_handle),
}
}

pub(crate) fn stop(&self) {
*self.stop_received.lock().unwrap() = true;
self.cond_var.notify_one();
*self.stopping_flag.lock().unwrap() = true;
self.stopping_flag_set.notify_one();
}
}

struct TickerControl {
stop_received: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
weak_state: Weak<Mutex<BarState>>,
stopping_flag: Arc<Mutex<bool>>,
stopping_flag_set: Arc<Condvar>,
state: Weak<Mutex<BarState>>,
}

#[cfg(test)]
use once_cell::sync::Lazy;
impl TickerControl {
fn run(&self, interval: Duration) {
#[cfg(test)]
TICKER_RUNNING.store(true, Ordering::SeqCst);

while let Some(arc) = self.state.upgrade() {
let mut state = arc.lock().unwrap();
if state.state.is_finished() {
break;
}

if state.state.tick != 0 {
state.state.tick = state.state.tick.saturating_add(1);
}

state.draw(false, Instant::now()).ok();

drop(state); // Don't forget to drop the lock before sleeping
drop(arc); // Also need to drop Arc otherwise BarState won't be dropped

// Wait for `interval` but return early if we are notified to stop
let result = self
.stopping_flag_set
.wait_timeout_while(self.stopping_flag.lock().unwrap(), interval, |stopped| {
!*stopped
})
.unwrap();

// If the wait didn't time out, it means we were notified to stop
if !result.1.timed_out() {
break;
}
}

#[cfg(test)]
TICKER_RUNNING.store(false, Ordering::SeqCst);
}
}

// Tests using the global TICKER_RUNNING flag need to be serialized
#[cfg(test)]
Expand Down

0 comments on commit 41e0f3f

Please sign in to comment.