Skip to content

Commit

Permalink
perf: Vectorized nested loop join for in-memory engine (#20495)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Dec 31, 2024
1 parent dff1ad7 commit 4c14e70
Show file tree
Hide file tree
Showing 47 changed files with 557 additions and 193 deletions.
2 changes: 2 additions & 0 deletions crates/polars-core/src/frame/horizontal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl DataFrame {
}
}

self.clear_schema();
self.columns.extend_from_slice(columns);
self
}
Expand Down Expand Up @@ -99,6 +100,7 @@ pub fn concat_df_horizontal(dfs: &[DataFrame], check_duplicates: bool) -> Polars
unsafe { df.get_columns_mut() }.iter_mut().for_each(|c| {
*c = c.extend_constant(AnyValue::Null, diff).unwrap();
});
df.clear_schema();
unsafe {
df.set_height(output_height);
}
Expand Down
50 changes: 39 additions & 11 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ pub struct DataFrame {
}

impl DataFrame {
pub fn clear_schema(&mut self) {
self.cached_schema = OnceLock::new();
}

#[inline]
pub fn materialized_column_iter(&self) -> impl ExactSizeIterator<Item = &Series> {
self.columns.iter().map(Column::as_materialized_series)
Expand Down Expand Up @@ -416,6 +420,8 @@ impl DataFrame {
/// # Ok::<(), PolarsError>(())
/// ```
pub fn pop(&mut self) -> Option<Column> {
self.clear_schema();

self.columns.pop()
}

Expand Down Expand Up @@ -477,6 +483,7 @@ impl DataFrame {
);
ca.set_sorted_flag(IsSorted::Ascending);

self.clear_schema();
self.columns.insert(0, ca.into_series().into());
self
}
Expand Down Expand Up @@ -687,14 +694,22 @@ impl DataFrame {
/// let f2: Field = Field::new("Diameter (m)".into(), DataType::Float64);
/// let sc: Schema = Schema::from_iter(vec![f1, f2]);
///
/// assert_eq!(df.schema(), sc);
/// assert_eq!(&**df.schema(), &sc);
/// # Ok::<(), PolarsError>(())
/// ```
pub fn schema(&self) -> Schema {
self.columns
.iter()
.map(|x| (x.name().clone(), x.dtype().clone()))
.collect()
pub fn schema(&self) -> &SchemaRef {
let out = self.cached_schema.get_or_init(|| {
Arc::new(
self.columns
.iter()
.map(|x| (x.name().clone(), x.dtype().clone()))
.collect(),
)
});

debug_assert_eq!(out.len(), self.width());

out
}

/// Get a reference to the [`DataFrame`] columns.
Expand Down Expand Up @@ -723,14 +738,17 @@ impl DataFrame {
///
/// The caller must ensure the length of all [`Series`] remains equal to `height` or
/// [`DataFrame::set_height`] is called afterwards with the appropriate `height`.
/// The caller must ensure that the cached schema is cleared if it modifies the schema by
/// calling [`DataFrame::clear_schema`].
pub unsafe fn get_columns_mut(&mut self) -> &mut Vec<Column> {
&mut self.columns
}

#[inline]
/// Remove all the columns in the [`DataFrame`] but keep the `height`.
pub fn clear_columns(&mut self) {
unsafe { self.get_columns_mut() }.clear()
unsafe { self.get_columns_mut() }.clear();
self.clear_schema();
}

#[inline]
Expand All @@ -744,7 +762,8 @@ impl DataFrame {
/// `DataFrame`]s with no columns (ZCDFs), it is important that the height is set afterwards
/// with [`DataFrame::set_height`].
pub unsafe fn column_extend_unchecked(&mut self, iter: impl IntoIterator<Item = Column>) {
unsafe { self.get_columns_mut() }.extend(iter)
unsafe { self.get_columns_mut() }.extend(iter);
self.clear_schema();
}

/// Take ownership of the underlying columns vec.
Expand Down Expand Up @@ -834,6 +853,7 @@ impl DataFrame {
s
})
.collect();
self.clear_schema();
Ok(())
}

Expand Down Expand Up @@ -1194,6 +1214,7 @@ impl DataFrame {
Ok(())
})?;
self.height += other.height;
self.clear_schema();
Ok(())
}

Expand All @@ -1215,6 +1236,7 @@ impl DataFrame {
/// ```
pub fn drop_in_place(&mut self, name: &str) -> PolarsResult<Column> {
let idx = self.check_name_to_idx(name)?;
self.clear_schema();
Ok(self.columns.remove(idx))
}

Expand Down Expand Up @@ -1347,6 +1369,7 @@ impl DataFrame {
}

self.columns.insert(index, column);
self.clear_schema();
Ok(self)
}

Expand All @@ -1370,6 +1393,7 @@ impl DataFrame {
}

self.columns.push(column);
self.clear_schema();
}
Ok(())
}
Expand Down Expand Up @@ -1417,6 +1441,7 @@ impl DataFrame {
unsafe { self.set_height(column.len()) };
}
unsafe { self.get_columns_mut() }.push(column);
self.clear_schema();

self
}
Expand All @@ -1433,6 +1458,7 @@ impl DataFrame {
}

self.columns.push(c);
self.clear_schema();
}
// Schema is incorrect fallback to search
else {
Expand All @@ -1448,6 +1474,7 @@ impl DataFrame {
}

self.columns.push(c);
self.clear_schema();
}

Ok(())
Expand Down Expand Up @@ -1637,7 +1664,7 @@ impl DataFrame {
/// # Ok::<(), PolarsError>(())
/// ```
pub fn get_column_index(&self, name: &str) -> Option<usize> {
let schema = self.cached_schema.get_or_init(|| Arc::new(self.schema()));
let schema = self.schema();
if let Some(idx) = schema.index_of(name) {
if self
.get_columns()
Expand Down Expand Up @@ -1775,7 +1802,7 @@ impl DataFrame {
cols: &[PlSmallStr],
schema: &Schema,
) -> PolarsResult<Vec<Column>> {
debug_ensure_matching_schema_names(schema, &self.schema())?;
debug_ensure_matching_schema_names(schema, self.schema())?;

cols.iter()
.map(|name| {
Expand Down Expand Up @@ -1984,7 +2011,7 @@ impl DataFrame {
return Ok(self);
}
polars_ensure!(
self.columns.iter().all(|c| c.name() != &name),
!self.schema().contains(&name),
Duplicate: "column rename attempted with already existing name \"{name}\""
);

Expand Down Expand Up @@ -2326,6 +2353,7 @@ impl DataFrame {
);
let old_col = &mut self.columns[index];
mem::swap(old_col, &mut new_column);
self.clear_schema();
Ok(self)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-expr/src/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ impl PhysicalExpr for WindowExpr {
// 4. select the final column and return

if df.is_empty() {
let field = self.phys_function.to_field(&df.schema())?;
let field = self.phys_function.to_field(df.schema())?;
return Ok(Column::full_null(field.name().clone(), 0, field.dtype()));
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
}

fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
let schema = schema_to_arrow_checked(&df.schema(), CompatLevel::oldest(), "avro")?;
let schema = schema_to_arrow_checked(df.schema(), CompatLevel::oldest(), "avro")?;
let record = write::to_record(&schema, self.name.clone())?;

let mut data = vec![];
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ where
}

fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
let schema = schema_to_arrow_checked(&df.schema(), self.compat_level, "ipc")?;
let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
let mut ipc_writer = write::FileWriter::try_new(
&mut self.writer,
Arc::new(schema),
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
/// Write the given DataFrame in the writer `W`. Returns the total size of the file.
pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;
let mut batched = self.batched(&chunked_df.schema())?;
let mut batched = self.batched(chunked_df.schema())?;
batched.write_batch(&chunked_df)?;
batched.finish()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/exotic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) fn prepare_expression_for_context(
// type coercion and simplify expression optimizations run.
let column = Series::full_null(name, 0, dtype);
let df = column.into_frame();
let input_schema = Arc::new(df.schema());
let input_schema = df.schema().clone();
let lf = df
.lazy()
.without_optimizations()
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ fn scan_anonymous_fn_with_options() -> PolarsResult<()> {
let function = Arc::new(MyScan {});

let args = ScanArgsAnonymous {
schema: Some(Arc::new(fruits_cars().schema())),
schema: Some(fruits_cars().schema().clone()),
..ScanArgsAnonymous::default()
};

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/pdsh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn test_q2() -> PolarsResult<()> {
Field::new("s_phone".into(), DataType::String),
Field::new("s_comment".into(), DataType::String),
]);
assert_eq!(&out.schema(), &schema);
assert_eq!(&**out.schema(), &schema);

Ok(())
}
6 changes: 5 additions & 1 deletion crates/polars-mem-engine/src/executors/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct JoinExec {
right_on: Vec<Arc<dyn PhysicalExpr>>,
parallel: bool,
args: JoinArgs,
options: Option<JoinTypeOptions>,
}

impl JoinExec {
Expand All @@ -20,6 +21,7 @@ impl JoinExec {
right_on: Vec<Arc<dyn PhysicalExpr>>,
parallel: bool,
args: JoinArgs,
options: Option<JoinTypeOptions>,
) -> Self {
JoinExec {
input_left: Some(input_left),
Expand All @@ -28,6 +30,7 @@ impl JoinExec {
right_on,
parallel,
args,
options,
}
}
}
Expand Down Expand Up @@ -75,7 +78,7 @@ impl Executor for JoinExec {
let by = self
.left_on
.iter()
.map(|s| Ok(s.to_field(&df_left.schema())?.name))
.map(|s| Ok(s.to_field(df_left.schema())?.name))
.collect::<PolarsResult<Vec<_>>>()?;
let name = comma_delimited("join".to_string(), &by);
Cow::Owned(name)
Expand Down Expand Up @@ -142,6 +145,7 @@ impl Executor for JoinExec {
left_on_series.into_iter().map(|c| c.take_materialized_series()).collect(),
right_on_series.into_iter().map(|c| c.take_materialized_series()).collect(),
self.args.clone(),
self.options.clone(),
true,
state.verbose(),
);
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Executor for SortExec {
let by = self
.by_column
.iter()
.map(|s| Ok(s.to_field(&df.schema())?.name))
.map(|s| Ok(s.to_field(df.schema())?.name))
.collect::<PolarsResult<Vec<_>>>()?;
let name = comma_delimited("sort".to_string(), &by);
Cow::Owned(name)
Expand Down
30 changes: 30 additions & 0 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use polars_core::prelude::*;
use polars_core::POOL;
use polars_expr::state::ExecutionState;
use polars_plan::global::_set_n_rows_for_scan;
use polars_plan::plans::expr_ir::ExprIR;

Expand Down Expand Up @@ -484,6 +485,7 @@ fn create_physical_plan_impl(
left_on,
right_on,
options,
schema,
..
} => {
let parallel = if options.force_parallel {
Expand Down Expand Up @@ -521,13 +523,41 @@ fn create_physical_plan_impl(
&mut ExpressionConversionState::new(true, state.expr_depth),
)?;
let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());

// Convert the join options, to the physical join options. This requires the physical
// planner, so we do this last minute.
let join_type_options = options
.options
.map(|o| {
o.compile(|e| {
let phys_expr = create_physical_expr(
e,
Context::Default,
expr_arena,
&schema,
&mut ExpressionConversionState::new(false, state.expr_depth),
)?;

let execution_state = ExecutionState::default();

Ok(Arc::new(move |df: DataFrame| {
let mask = phys_expr.evaluate(&df, &execution_state)?;
let mask = mask.as_materialized_series();
let mask = mask.bool()?;
df._filter_seq(mask)
}))
})
})
.transpose()?;

Ok(Box::new(executors::JoinExec::new(
input_left,
input_right,
left_on,
right_on,
parallel,
options.args,
join_type_options,
)))
},
HStack {
Expand Down
Loading

0 comments on commit 4c14e70

Please sign in to comment.