From 6988827407f867c433b1a292ff5fd47b3d4f1335 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 19 Dec 2024 14:10:20 -0800 Subject: [PATCH 1/4] build bar only on first update --- daft/runners/progress_bar.py | 37 +++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index ed4c91c9a3..b53df4d3bf 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -114,27 +114,38 @@ def __init__(self) -> None: self._maxinterval = 5.0 self.tqdm_mod = get_tqdm(False) self.pbars: dict[int, Any] = dict() + self.bar_configs: dict[int, tuple[str, str]] = dict() + self.next_id = 0 def make_new_bar(self, bar_format: str, initial_message: str) -> int: - pbar_id = len(self.pbars) - self.pbars[pbar_id] = self.tqdm_mod( - bar_format=bar_format, - desc=initial_message, - position=pbar_id, - leave=False, - mininterval=1.0, - maxinterval=self._maxinterval, - ) + pbar_id = self.next_id + self.next_id += 1 + self.bar_configs[pbar_id] = (bar_format, initial_message) return pbar_id def update_bar(self, pbar_id: int, message: str) -> None: + if pbar_id not in self.pbars: + if pbar_id not in self.bar_configs: + raise ValueError(f"No bar configuration found for id {pbar_id}") + bar_format, initial_message = self.bar_configs[pbar_id] + self.pbars[pbar_id] = self.tqdm_mod( + bar_format=bar_format, + desc=initial_message, + position=pbar_id, + leave=False, + mininterval=1.0, + maxinterval=self._maxinterval, + ) + del self.bar_configs[pbar_id] self.pbars[pbar_id].set_description_str(message) def close_bar(self, pbar_id: int) -> None: - self.pbars[pbar_id].close() - del self.pbars[pbar_id] + if pbar_id in self.pbars: + self.pbars[pbar_id].close() + del self.pbars[pbar_id] def close(self) -> None: - for p in self.pbars.values(): + for p in list(self.pbars.values()): p.close() - del p + self.pbars.clear() + self.bar_configs.clear() From 653bc114da9e8f1ad5d0c33d34a5acd6993b9459 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 19 Dec 2024 14:12:32 -0800 Subject: [PATCH 2/4] cleanup --- daft/runners/progress_bar.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index b53df4d3bf..e1498c242a 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -145,7 +145,6 @@ def close_bar(self, pbar_id: int) -> None: del self.pbars[pbar_id] def close(self) -> None: - for p in list(self.pbars.values()): + for p in self.pbars.values(): p.close() - self.pbars.clear() - self.bar_configs.clear() + del p From 4ba1200d5177233cbca6fae5a9f51a56a2bb9709 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 19 Dec 2024 14:39:26 -0800 Subject: [PATCH 3/4] still clear it, but up the update interval --- src/daft-local-execution/src/progress_bar.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/daft-local-execution/src/progress_bar.rs b/src/daft-local-execution/src/progress_bar.rs index cf7fa17e3a..36824e833a 100644 --- a/src/daft-local-execution/src/progress_bar.rs +++ b/src/daft-local-execution/src/progress_bar.rs @@ -52,8 +52,8 @@ pub struct OperatorProgressBar { } impl OperatorProgressBar { - // 100ms = 100_000_000ns - const UPDATE_INTERVAL: u64 = 100_000_000; + // 500ms = 500_000_000ns + const UPDATE_INTERVAL: u64 = 500_000_000; pub fn new( progress_bar: Box, From 4404309ed0ef9114055865eab2cfa2c12a63be4f Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 20 Dec 2024 19:57:54 -0800 Subject: [PATCH 4/4] no initial message --- daft/runners/progress_bar.py | 9 ++++----- src/daft-local-execution/src/lib.rs | 4 +--- src/daft-local-execution/src/progress_bar.rs | 21 ++------------------ 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/daft/runners/progress_bar.py b/daft/runners/progress_bar.py index e1498c242a..220fc7a9bf 100644 --- a/daft/runners/progress_bar.py +++ b/daft/runners/progress_bar.py @@ -114,23 +114,22 @@ def __init__(self) -> None: self._maxinterval = 5.0 self.tqdm_mod = get_tqdm(False) self.pbars: dict[int, Any] = dict() - self.bar_configs: dict[int, tuple[str, str]] = dict() + self.bar_configs: dict[int, str] = dict() self.next_id = 0 - def make_new_bar(self, bar_format: str, initial_message: str) -> int: + def make_new_bar(self, bar_format: str) -> int: pbar_id = self.next_id self.next_id += 1 - self.bar_configs[pbar_id] = (bar_format, initial_message) + self.bar_configs[pbar_id] = bar_format return pbar_id def update_bar(self, pbar_id: int, message: str) -> None: if pbar_id not in self.pbars: if pbar_id not in self.bar_configs: raise ValueError(f"No bar configuration found for id {pbar_id}") - bar_format, initial_message = self.bar_configs[pbar_id] + bar_format = self.bar_configs[pbar_id] self.pbars[pbar_id] = self.tqdm_mod( bar_format=bar_format, - desc=initial_message, position=pbar_id, leave=False, mininterval=1.0, diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index 654deea901..cd0c146879 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -166,9 +166,7 @@ impl ExecutionRuntimeContext { runtime_stats: Arc, ) -> Option> { if let Some(ref pb_manager) = self.progress_bar_manager { - let pb = pb_manager - .make_new_bar(color, prefix, show_received) - .unwrap(); + let pb = pb_manager.make_new_bar(color, prefix).unwrap(); Some(Arc::new(OperatorProgressBar::new( pb, runtime_stats, diff --git a/src/daft-local-execution/src/progress_bar.rs b/src/daft-local-execution/src/progress_bar.rs index 36824e833a..c5c59b3e06 100644 --- a/src/daft-local-execution/src/progress_bar.rs +++ b/src/daft-local-execution/src/progress_bar.rs @@ -21,7 +21,6 @@ pub trait ProgressBarManager { &self, color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult>; fn close_all(&self) -> DaftResult<()>; @@ -146,27 +145,19 @@ impl ProgressBarManager for IndicatifProgressBarManager { &self, color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult> { let template_str = format!( "🗡️ 🐟 {{spinner:.green}} {{prefix:.{color}/bold}} | [{{elapsed_precise}}] {{msg}}", color = color.to_str(), ); - let initial_message = if show_received { - "0 rows received, 0 rows emitted".to_string() - } else { - "0 rows emitted".to_string() - }; - let pb = indicatif::ProgressBar::new_spinner() .with_style( ProgressStyle::default_spinner() .template(template_str.as_str()) .unwrap(), ) - .with_prefix(prefix.to_string()) - .with_message(initial_message); + .with_prefix(prefix.to_string()); self.multi_progress.add(pb.clone()); DaftResult::Ok(Box::new(IndicatifProgressBar(pb))) @@ -263,18 +254,10 @@ mod python { &self, _color: ProgressBarColor, prefix: &str, - show_received: bool, ) -> DaftResult> { let bar_format = format!("🗡️ 🐟 {prefix}: {{elapsed}} {{desc}}", prefix = prefix); - let initial_message = if show_received { - "0 rows received, 0 rows emitted".to_string() - } else { - "0 rows emitted".to_string() - }; let pb_id = Python::with_gil(|py| { - let pb_id = - self.inner - .call_method1(py, "make_new_bar", (bar_format, initial_message))?; + let pb_id = self.inner.call_method1(py, "make_new_bar", (bar_format,))?; let pb_id = pb_id.extract::(py)?; DaftResult::Ok(pb_id) })?;