Skip to content

Commit

Permalink
perf: Respect allow_threading in some more operators (#17450)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Jul 5, 2024
1 parent a539d88 commit 34126ca
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 27 deletions.
13 changes: 4 additions & 9 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct BinaryExpr {
right: Arc<dyn PhysicalExpr>,
expr: Expr,
has_literal: bool,
allow_threading: bool,
}

impl BinaryExpr {
Expand All @@ -23,13 +24,15 @@ impl BinaryExpr {
right: Arc<dyn PhysicalExpr>,
expr: Expr,
has_literal: bool,
allow_threading: bool,
) -> Self {
Self {
left,
op,
right,
expr,
has_literal,
allow_threading,
}
}
}
Expand Down Expand Up @@ -175,21 +178,13 @@ impl PhysicalExpr for BinaryExpr {
// they also saturate the thread pool by themselves, so that's fine.
let has_window = state.has_window();

// Streaming takes care of parallelism, don't parallelize here, as it
// increases contention.
#[cfg(feature = "streaming")]
let in_streaming = state.in_streaming_engine();

#[cfg(not(feature = "streaming"))]
let in_streaming = false;

let (lhs, rhs);
if has_window {
let mut state = state.split();
state.remove_cache_window_flag();
lhs = self.left.evaluate(df, &state)?;
rhs = self.right.evaluate(df, &state)?;
} else if in_streaming || self.has_literal {
} else if !self.allow_threading || self.has_literal {
// Literals are free, don't pay par cost.
lhs = self.left.evaluate(df, state)?;
rhs = self.right.evaluate(df, state)?;
Expand Down
5 changes: 3 additions & 2 deletions crates/polars-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ fn create_physical_expr_inner(
rhs,
node_to_expr(expression, expr_arena),
state.local.has_lit,
state.allow_threading,
)))
},
Column(column) => Ok(Arc::new(ColumnExpr::new(
Expand Down Expand Up @@ -516,7 +517,7 @@ fn create_physical_expr_inner(
function.clone(),
node_to_expr(expression, expr_arena),
*options,
true,
state.allow_threading,
schema.cloned(),
output_dtype,
)))
Expand Down Expand Up @@ -554,7 +555,7 @@ fn create_physical_expr_inner(
function.clone().into(),
node_to_expr(expression, expr_arena),
*options,
true,
state.allow_threading,
schema.cloned(),
output_dtype,
)))
Expand Down
14 changes: 0 additions & 14 deletions crates/polars-expr/src/state/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,6 @@ impl ExecutionState {
flags
});
}

#[cfg(feature = "streaming")]
pub fn set_in_streaming_engine(&mut self) {
self.set_flags(&|mut flags| {
flags.insert(StateFlags::IN_STREAMING);
flags
});
}

#[cfg(feature = "streaming")]
pub fn in_streaming_engine(&self) -> bool {
let flags: StateFlags = self.flags.load(Ordering::Relaxed).into();
flags.contains(StateFlags::IN_STREAMING)
}
}

impl Default for ExecutionState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,11 @@ fn get_pipeline_node(
IR::MapFunction {
function: FunctionNode::Pipeline {
function: Arc::new(Mutex::new(move |_df: DataFrame| {
let mut state = ExecutionState::new();
let state = ExecutionState::new();
if state.verbose() {
eprintln!("RUN STREAMING PIPELINE");
eprintln!("{:?}", &pipelines)
}
state.set_in_streaming_engine();
execute_pipeline(state, std::mem::take(&mut pipelines))
})),
schema,
Expand Down

0 comments on commit 34126ca

Please sign in to comment.