Skip to content

Commit

Permalink
rework Ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-laplante committed Mar 23, 2022
1 parent 5fb1b25 commit a5b571e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 46 deletions.
32 changes: 28 additions & 4 deletions src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{ProgressBarIter, ProgressIterator};
pub struct ProgressBar {
state: Arc<Mutex<BarState>>,
pos: Arc<AtomicPosition>,
ticker: Arc<Mutex<Option<Ticker>>>,
}

impl fmt::Debug for ProgressBar {
Expand Down Expand Up @@ -51,6 +52,7 @@ impl ProgressBar {
ProgressBar {
state: Arc::new(Mutex::new(BarState::new(len, draw_target, pos.clone()))),
pos,
ticker: Arc::new(Mutex::new(None)),
}
}

Expand Down Expand Up @@ -132,19 +134,38 @@ impl ProgressBar {
/// When steady ticks are enabled, calling [`ProgressBar::tick()`] on a progress bar does not
/// have any effect.
pub fn enable_steady_tick(&self, interval: Duration) {
Ticker::spawn(&self.state, interval)
if interval.is_zero() {
return;
}

self.stop_and_replace_ticker(Some(Ticker::new(interval, &self.state)));
}

/// Undoes [`ProgressBar::enable_steady_tick()`]
pub fn disable_steady_tick(&self) {
self.state().ticker = None;
self.stop_and_replace_ticker(None);
}

fn stop_and_replace_ticker(&self, new_ticker: Option<Ticker>) {
let mut ticker_state = self.ticker.lock().unwrap();

// There may be clones of this `ProgressBar` elsewhere, so we can't rely on taking the
// `Ticker` from the `Arc<Mutex<Option<Ticker>>>` to actually drop it.
if let Some(ticker) = ticker_state.take() {
ticker.stop();
}

*ticker_state = new_ticker;
}

/// Manually ticks the spinner or progress bar
///
/// This automatically happens on any other change to a progress bar.
pub fn tick(&self) {
self.state().tick(Instant::now())
// Only tick if a `Ticker` isn't installed
if self.ticker.lock().unwrap().is_none() {
self.state().tick(Instant::now())
}
}

/// Advances the position of the progress bar by `delta`
Expand Down Expand Up @@ -226,6 +247,7 @@ impl ProgressBar {
WeakProgressBar {
state: Arc::downgrade(&self.state),
pos: Arc::downgrade(&self.pos),
ticker: Arc::downgrade(&self.ticker),
}
}

Expand Down Expand Up @@ -489,6 +511,7 @@ impl ProgressBar {
pub struct WeakProgressBar {
state: Weak<Mutex<BarState>>,
pos: Weak<AtomicPosition>,
ticker: Weak<Mutex<Option<Ticker>>>,
}

impl WeakProgressBar {
Expand All @@ -506,7 +529,8 @@ impl WeakProgressBar {
pub fn upgrade(&self) -> Option<ProgressBar> {
let state = self.state.upgrade()?;
let pos = self.pos.upgrade()?;
Some(ProgressBar { state, pos })
let ticker = self.ticker.upgrade()?;
Some(ProgressBar { state, pos, ticker })
}
}

Expand Down
110 changes: 68 additions & 42 deletions src/state.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::borrow::Cow;
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::thread;
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::time::{Duration, Instant};
use std::{fmt, thread};

use crate::draw_target::ProgressDrawTarget;
use crate::style::ProgressStyle;
Expand All @@ -14,7 +13,6 @@ pub(crate) struct BarState {
pub(crate) on_finish: ProgressFinish,
pub(crate) style: ProgressStyle,
pub(crate) state: ProgressState,
pub(crate) ticker: Option<(Duration, thread::JoinHandle<()>)>,
}

impl BarState {
Expand All @@ -28,7 +26,6 @@ impl BarState {
on_finish: ProgressFinish::default(),
style: ProgressStyle::default_bar(),
state: ProgressState::new(len, pos),
ticker: None,
}
}

Expand Down Expand Up @@ -107,7 +104,7 @@ impl BarState {
}

pub(crate) fn tick(&mut self, now: Instant) {
if self.ticker.is_none() || self.state.tick == 0 {
if self.state.tick == 0 {
self.state.tick = self.state.tick.saturating_add(1);
}

Expand Down Expand Up @@ -145,7 +142,7 @@ impl BarState {
ret
}

fn draw(&mut self, mut force_draw: bool, now: Instant) -> io::Result<()> {
pub(crate) fn draw(&mut self, mut force_draw: bool, now: Instant) -> io::Result<()> {
let width = self.draw_target.width();
force_draw |= self.state.is_finished();
let mut drawable = match self.draw_target.drawable(force_draw, now) {
Expand Down Expand Up @@ -289,53 +286,82 @@ impl ProgressState {
}

pub(crate) struct Ticker {
weak: Weak<Mutex<BarState>>,
interval: Duration,
stop_received: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
join_handle: Option<thread::JoinHandle<()>>,
}

impl Drop for Ticker {
fn drop(&mut self) {
self.stop();
self.join_handle.take().map(|handle| handle.join());
}
}

impl Ticker {
pub(crate) fn spawn(arc: &Arc<Mutex<BarState>>, interval: Duration) {
let mut state = arc.lock().unwrap();
if interval.is_zero() {
return;
} else if let Some((old, _)) = &mut state.ticker {
*old = interval;
return;
}
pub(crate) fn new(interval: Duration, bar_state: &Arc<Mutex<BarState>>) -> Ticker {
debug_assert!(!interval.is_zero());

#[allow(clippy::mutex_atomic)]
let cancel_pending = Arc::new(Mutex::new(false));
let cond_var = Arc::new(Condvar::new());

let ticker = Self {
// Using a weak pointer is required to prevent a potential deadlock. See issue #133
weak: Arc::downgrade(arc),
interval,
let control = TickerControl {
stop_received: cancel_pending.clone(),
cond_var: cond_var.clone(),
weak_state: Arc::downgrade(bar_state),
};

let handle = thread::spawn(move || ticker.run());
state.ticker = Some((interval, handle));
drop(state);
// use the side effect of tick to force the bar to tick.
arc.lock().unwrap().tick(Instant::now());
}
let join_handle = thread::spawn(move || {
while let Some(arc) = control.weak_state.upgrade() {
let mut state = arc.lock().unwrap();
if state.state.is_finished() {
break;
}

fn run(mut self) {
thread::sleep(self.interval);
while let Some(arc) = self.weak.upgrade() {
let mut state = arc.lock().unwrap();
let interval = match state.ticker {
Some((interval, _)) if !state.state.is_finished() => interval,
_ => return,
};
if state.state.tick != 0 {
state.state.tick = state.state.tick.saturating_add(1);
}

state.draw(false, Instant::now()).ok();

drop(state); // Don't forget to drop the lock before sleeping
drop(arc); // Also need to drop Arc otherwise BarState won't be dropped

if state.state.tick != 0 {
state.state.tick = state.state.tick.saturating_add(1);
// Wait for `interval` but return early if we are notified to stop
let result = control
.cond_var
.wait_timeout_while(
control.stop_received.lock().unwrap(),
interval,
|stopped| !*stopped,
)
.unwrap();

// If the wait didn't time out, it means we were notified to stop
if !result.1.timed_out() {
break;
}
}
});

self.interval = interval;
state.draw(false, Instant::now()).ok();
drop(state); // Don't forget to drop the lock before sleeping
drop(arc); // Also need to drop Arc otherwise BarState won't be dropped
thread::sleep(self.interval);
Ticker {
stop_received: cancel_pending,
cond_var,
join_handle: Some(join_handle),
}
}

pub(crate) fn stop(&self) {
*self.stop_received.lock().unwrap() = true;
self.cond_var.notify_one();
}
}

struct TickerControl {
stop_received: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
weak_state: Weak<Mutex<BarState>>,
}

/// Estimate the number of seconds per step
Expand Down

0 comments on commit a5b571e

Please sign in to comment.