Skip to content

Commit

Permalink
refactor: Refactor parts of IR. (#16899)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jun 12, 2024
1 parent 2dba716 commit 6ed624f
Show file tree
Hide file tree
Showing 36 changed files with 73 additions and 92 deletions.
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct CsvReadOptions {
pub n_rows: Option<usize>,
pub row_index: Option<RowIndex>,
// Column-wise options
pub columns: Option<Arc<Vec<String>>>,
pub columns: Option<Arc<[String]>>,
pub projection: Option<Arc<Vec<usize>>>,
pub schema: Option<SchemaRef>,
pub schema_overwrite: Option<SchemaRef>,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl CsvReadOptions {
}

/// Which columns to select.
pub fn with_columns(mut self, columns: Option<Arc<Vec<String>>>) -> Self {
pub fn with_columns(mut self, columns: Option<Arc<[String]>>) -> Self {
self.columns = columns;
self
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl<'a> CoreReader<'a> {
has_header: bool,
ignore_errors: bool,
schema: Option<SchemaRef>,
columns: Option<Arc<Vec<String>>>,
columns: Option<Arc<[String]>>,
encoding: CsvEncoding,
mut n_threads: Option<usize>,
schema_overwrite: Option<SchemaRef>,
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-io/src/ipc/ipc_reader_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct IpcReaderAsync {
#[derive(Default, Clone)]
pub struct IpcReadOptions {
// Names of the columns to include in the output.
projection: Option<Vec<String>>,
projection: Option<Arc<[String]>>,

// The maximum number of rows to include in the output.
row_limit: Option<usize>,
Expand All @@ -38,8 +38,8 @@ pub struct IpcReadOptions {
}

impl IpcReadOptions {
pub fn with_projection(mut self, indices: impl Into<Option<Vec<String>>>) -> Self {
self.projection = indices.into();
pub fn with_projection(mut self, projection: Option<Arc<[String]>>) -> Self {
self.projection = projection;
self
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ impl Executor for PythonScanExec {

let python_scan_function = self.options.scan_fn.take().unwrap().0;

let with_columns =
with_columns.map(|mut cols| std::mem::take(Arc::make_mut(&mut cols)));
let with_columns = with_columns.map(|cols| cols.iter().cloned().collect::<Vec<_>>());

let out = callable
.call1((
Expand Down
9 changes: 2 additions & 7 deletions crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ impl IpcExec {
}

let projection = materialize_projection(
self.file_options
.with_columns
.as_deref()
.map(|cols| cols.deref()),
self.file_options.with_columns.as_deref(),
&self.schema,
None,
self.file_options.row_index.is_some(),
Expand Down Expand Up @@ -193,9 +190,7 @@ impl IpcExec {
}),
)
.with_row_index(this.file_options.row_index.clone())
.with_projection(
this.file_options.with_columns.as_deref().cloned(),
),
.with_projection(this.file_options.with_columns.as_ref().cloned()),
verbose,
)
.await?;
Expand Down
8 changes: 3 additions & 5 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ mod parquet;
#[cfg(feature = "ipc")]
mod support;
use std::mem;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "cse"))]
use std::ops::Deref;

#[cfg(feature = "csv")]
pub(crate) use csv::CsvExec;
Expand All @@ -38,7 +36,7 @@ type Predicate = Option<Arc<dyn PhysicalIoExpr>>;
#[cfg(any(feature = "ipc", feature = "parquet"))]
fn prepare_scan_args(
predicate: Option<Arc<dyn PhysicalExpr>>,
with_columns: &mut Option<Arc<Vec<String>>>,
with_columns: &mut Option<Arc<[String]>>,
schema: &mut SchemaRef,
has_row_index: bool,
hive_partitions: Option<&[Series]>,
Expand All @@ -47,7 +45,7 @@ fn prepare_scan_args(
let schema = mem::take(schema);

let projection = materialize_projection(
with_columns.as_deref().map(|cols| cols.deref()),
with_columns.as_deref(),
&schema,
hive_partitions,
has_row_index,
Expand All @@ -62,7 +60,7 @@ fn prepare_scan_args(
pub struct DataFrameExec {
pub(crate) df: Arc<DataFrame>,
pub(crate) selection: Option<Arc<dyn PhysicalExpr>>,
pub(crate) projection: Option<Arc<Vec<String>>>,
pub(crate) projection: Option<Arc<[String]>>,
pub(crate) predicate_has_windows: bool,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,7 @@ impl ParquetExec {
.unwrap_left();
let first_metadata = &self.metadata;
let cloud_options = self.cloud_options.as_ref();
let with_columns = self
.file_options
.with_columns
.as_ref()
.map(|v| v.as_slice());
let with_columns = self.file_options.with_columns.as_ref().map(|v| v.as_ref());

let mut result = vec![];
let batch_size = get_file_prefetch_size();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ fn create_physical_plan_impl(
DataFrameScan {
df,
projection,
selection: predicate,
filter: predicate,
schema,
..
} => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ fn get_pipeline_node(
schema: Arc::new(Schema::new()),
output_schema: None,
projection: None,
selection: None,
filter: None,
});

IR::MapFunction {
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub(crate) fn predicate_at_scan(q: LazyFrame) -> bool {
matches!(
lp,
DataFrameScan {
selection: Some(_),
filter: Some(_),
..
} | Scan {
predicate: Some(_),
Expand All @@ -50,7 +50,7 @@ pub(crate) fn predicate_at_all_scans(q: LazyFrame) -> bool {
matches!(
lp,
DataFrameScan {
selection: Some(_),
filter: Some(_),
..
} | Scan {
predicate: Some(_),
Expand Down Expand Up @@ -482,7 +482,7 @@ fn test_with_column_prune() -> PolarsResult<()> {
match lp {
DataFrameScan { projection, .. } => {
let projection = projection.as_ref().unwrap();
let projection = projection.as_slice();
let projection = projection.as_ref();
assert_eq!(projection.len(), 1);
let name = &projection[0];
assert_eq!(name, "c1");
Expand Down
13 changes: 3 additions & 10 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::VecDeque;
use std::ops::{Deref, Range};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;

Expand Down Expand Up @@ -86,10 +86,7 @@ impl ParquetSource {
.map(|hive| hive.materialize_partition_columns());

let projection = materialize_projection(
file_options
.with_columns
.as_deref()
.map(|cols| cols.deref()),
file_options.with_columns.as_deref(),
&schema,
hive_partitions.as_deref(),
false,
Expand Down Expand Up @@ -144,11 +141,7 @@ impl ParquetSource {

fn finish_init_reader(&mut self, batched_reader: BatchedParquetReader) -> PolarsResult<()> {
if self.processed_paths >= 1 {
let with_columns = self
.file_options
.with_columns
.as_ref()
.map(|v| v.as_slice());
let with_columns = self.file_options.with_columns.as_ref().map(|v| v.as_ref());
check_projected_arrow_schema(
batched_reader.schema().as_ref(),
self.file_info
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
DataFrameScan {
df,
projection,
selection,
filter: selection,
output_schema,
..
} => {
Expand All @@ -68,7 +68,7 @@ where
}
// projection is free
if let Some(projection) = projection {
df = df.select(projection.as_slice())?;
df = df.select(projection.as_ref())?;
}
}
Ok(Box::new(sources::DataFrameSource::from_df(df)) as Box<dyn Source>)
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/dsl/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::fmt::Display;
use std::ops::BitAnd;

use super::*;
use crate::logical_plan::alp::tree_format::TreeFmtVisitor;
use crate::logical_plan::conversion::is_regex_projection;
use crate::logical_plan::ir::tree_format::TreeFmtVisitor;
use crate::logical_plan::visitor::{AexprNode, TreeWalker};

/// Specialized expressions for Categorical dtypes.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/logical_plan/anonymous_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::dsl::Expr;

pub struct AnonymousScanArgs {
pub n_rows: Option<usize>,
pub with_columns: Option<Arc<Vec<String>>>,
pub with_columns: Option<Arc<[String]>>,
pub schema: SchemaRef,
pub output_schema: Option<SchemaRef>,
pub predicate: Option<Expr>,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/logical_plan/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl DslBuilder {
schema,
output_schema: None,
projection: None,
selection: None,
filter: None,
}
.into()
}
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn empty_df() -> IR {
schema: Arc::new(Default::default()),
output_schema: None,
projection: None,
selection: None,
filter: None,
}
}

Expand Down Expand Up @@ -204,13 +204,13 @@ pub fn to_alp_impl(
schema,
output_schema,
projection,
selection,
filter: selection,
} => IR::DataFrameScan {
df,
schema,
output_schema,
projection,
selection: selection.map(|expr| to_expr_ir(expr, expr_arena)),
filter: selection.map(|expr| to_expr_ir(expr, expr_arena)),
},
DslPlan::Select {
expr,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/logical_plan/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ impl IR {
schema,
output_schema,
projection,
selection,
filter: selection,
} => DslPlan::DataFrameScan {
df,
schema,
output_schema,
projection,
selection: selection.map(|e| e.to_expr(expr_arena)),
filter: selection.map(|e| e.to_expr(expr_arena)),
},
IR::Select {
expr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::PathBuf;

use super::format::ExprIRSliceDisplay;
use crate::constants::UNLIMITED_CACHE;
use crate::prelude::alp::format::ColumnsDisplay;
use crate::prelude::ir::format::ColumnsDisplay;
use crate::prelude::*;

pub struct IRDotDisplay<'a> {
Expand Down Expand Up @@ -153,7 +153,7 @@ impl<'a> IRDotDisplay<'a> {
#[cfg(feature = "python")]
PythonScan { predicate, options } => {
let predicate = predicate.as_ref().map(|e| self.display_expr(e));
let with_columns = NumColumns(options.with_columns.as_ref().map(|s| s.as_slice()));
let with_columns = NumColumns(options.with_columns.as_ref().map(|s| s.as_ref()));
let total_columns = options.schema.len();
let predicate = OptionExprIRDisplay(predicate);

Expand Down Expand Up @@ -228,10 +228,10 @@ impl<'a> IRDotDisplay<'a> {
DataFrameScan {
schema,
projection,
selection,
filter: selection,
..
} => {
let num_columns = NumColumns(projection.as_ref().map(|p| p.as_ref().as_ref()));
let num_columns = NumColumns(projection.as_ref().map(|p| p.as_ref()));
let selection = selection.as_ref().map(|e| self.display_expr(e));
let selection = OptionExprIRDisplay(selection);
let total_columns = schema.len();
Expand All @@ -250,7 +250,7 @@ impl<'a> IRDotDisplay<'a> {
} => {
let name: &str = scan_type.into();
let path = PathsDisplay(paths.as_ref());
let with_columns = options.with_columns.as_ref().map(|cols| cols.as_slice());
let with_columns = options.with_columns.as_ref().map(|cols| cols.as_ref());
let with_columns = NumColumns(with_columns);
let total_columns = file_info.schema.len();
let predicate = predicate.as_ref().map(|e| self.display_expr(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl<'a> IRDisplay<'a> {
DataFrameScan {
schema,
projection,
selection,
filter: selection,
..
} => {
let total_columns = schema.len();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl IR {
schema,
output_schema,
projection,
selection,
filter: selection,
} => {
let mut new_selection = None;
if selection.is_some() {
Expand All @@ -139,7 +139,7 @@ impl IR {
schema: schema.clone(),
output_schema: output_schema.clone(),
projection: projection.clone(),
selection: new_selection,
filter: new_selection,
}
},
MapFunction { function, .. } => MapFunction {
Expand Down Expand Up @@ -188,7 +188,9 @@ impl IR {
container.push(pred.clone())
}
},
DataFrameScan { selection, .. } => {
DataFrameScan {
filter: selection, ..
} => {
if let Some(expr) = selection {
container.push(expr.clone())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub enum IR {
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
projection: Option<Arc<Vec<String>>>,
selection: Option<ExprIR>,
projection: Option<Arc<[String]>>,
filter: Option<ExprIR>,
},
// Only selects columns (semantically only has row access).
// This is a more restricted operation than `Select`.
Expand Down
Loading

0 comments on commit 6ed624f

Please sign in to comment.