From 34126cac12d5a69de32f7b6ad017303b92043ba9 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Fri, 5 Jul 2024 20:11:18 +0200 Subject: [PATCH] perf: Respect allow_threading in some more operators (#17450) --- crates/polars-expr/src/expressions/binary.rs | 13 ++++--------- crates/polars-expr/src/planner.rs | 5 +++-- crates/polars-expr/src/state/execution_state.rs | 14 -------------- .../physical_plan/streaming/construct_pipeline.rs | 3 +-- 4 files changed, 8 insertions(+), 27 deletions(-) diff --git a/crates/polars-expr/src/expressions/binary.rs b/crates/polars-expr/src/expressions/binary.rs index e5d455655e89..0d4634d6eeaf 100644 --- a/crates/polars-expr/src/expressions/binary.rs +++ b/crates/polars-expr/src/expressions/binary.rs @@ -14,6 +14,7 @@ pub struct BinaryExpr { right: Arc, expr: Expr, has_literal: bool, + allow_threading: bool, } impl BinaryExpr { @@ -23,6 +24,7 @@ impl BinaryExpr { right: Arc, expr: Expr, has_literal: bool, + allow_threading: bool, ) -> Self { Self { left, @@ -30,6 +32,7 @@ impl BinaryExpr { right, expr, has_literal, + allow_threading, } } } @@ -175,21 +178,13 @@ impl PhysicalExpr for BinaryExpr { // they also saturate the thread pool by themselves, so that's fine. let has_window = state.has_window(); - // Streaming takes care of parallelism, don't parallelize here, as it - // increases contention. - #[cfg(feature = "streaming")] - let in_streaming = state.in_streaming_engine(); - - #[cfg(not(feature = "streaming"))] - let in_streaming = false; - let (lhs, rhs); if has_window { let mut state = state.split(); state.remove_cache_window_flag(); lhs = self.left.evaluate(df, &state)?; rhs = self.right.evaluate(df, &state)?; - } else if in_streaming || self.has_literal { + } else if !self.allow_threading || self.has_literal { // Literals are free, don't pay par cost. lhs = self.left.evaluate(df, state)?; rhs = self.right.evaluate(df, state)?; diff --git a/crates/polars-expr/src/planner.rs b/crates/polars-expr/src/planner.rs index cf716477d20b..85968c74e77c 100644 --- a/crates/polars-expr/src/planner.rs +++ b/crates/polars-expr/src/planner.rs @@ -321,6 +321,7 @@ fn create_physical_expr_inner( rhs, node_to_expr(expression, expr_arena), state.local.has_lit, + state.allow_threading, ))) }, Column(column) => Ok(Arc::new(ColumnExpr::new( @@ -516,7 +517,7 @@ fn create_physical_expr_inner( function.clone(), node_to_expr(expression, expr_arena), *options, - true, + state.allow_threading, schema.cloned(), output_dtype, ))) @@ -554,7 +555,7 @@ fn create_physical_expr_inner( function.clone().into(), node_to_expr(expression, expr_arena), *options, - true, + state.allow_threading, schema.cloned(), output_dtype, ))) diff --git a/crates/polars-expr/src/state/execution_state.rs b/crates/polars-expr/src/state/execution_state.rs index 90798ef6b8ee..07c571e26653 100644 --- a/crates/polars-expr/src/state/execution_state.rs +++ b/crates/polars-expr/src/state/execution_state.rs @@ -229,20 +229,6 @@ impl ExecutionState { flags }); } - - #[cfg(feature = "streaming")] - pub fn set_in_streaming_engine(&mut self) { - self.set_flags(&|mut flags| { - flags.insert(StateFlags::IN_STREAMING); - flags - }); - } - - #[cfg(feature = "streaming")] - pub fn in_streaming_engine(&self) -> bool { - let flags: StateFlags = self.flags.load(Ordering::Relaxed).into(); - flags.contains(StateFlags::IN_STREAMING) - } } impl Default for ExecutionState { diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 2f9183f18cd6..19ce681d0523 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -248,12 +248,11 @@ fn get_pipeline_node( IR::MapFunction { function: FunctionNode::Pipeline { function: Arc::new(Mutex::new(move |_df: DataFrame| { - let mut state = ExecutionState::new(); + let state = ExecutionState::new(); if state.verbose() { eprintln!("RUN STREAMING PIPELINE"); eprintln!("{:?}", &pipelines) } - state.set_in_streaming_engine(); execute_pipeline(state, std::mem::take(&mut pipelines)) })), schema,