Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move all describe, describe_tree and dot-viz code to IR instead of DslPlan #16237

Merged
merged 11 commits into from
May 15, 2024
56 changes: 6 additions & 50 deletions crates/polars-lazy/src/dot.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,16 @@
use std::fmt::Write;

use polars_core::prelude::*;
use polars_plan::dot::*;
use polars_plan::prelude::*;

use crate::prelude::*;

impl LazyFrame {
/// Get a dot language representation of the LogicalPlan.
pub fn to_dot(&self, optimized: bool) -> PolarsResult<String> {
let mut s = String::with_capacity(512);

let mut logical_plan = self.clone().get_plan_builder().build();
if optimized {
// initialize arena's
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(32);

let lp_top = self.clone().optimize_with_scratch(
&mut lp_arena,
&mut expr_arena,
&mut vec![],
true,
)?;
logical_plan = node_to_lp(lp_top, &expr_arena, &mut lp_arena);
}

let prev_node = DotNode {
branch: 0,
id: 0,
fmt: "",
};

// maps graphviz id to label
// we use this to create this graph
// first we create nodes including ids to make sure they are unique
// A [id] -- B [id]
// B [id] -- C [id]
//
// then later we hide the [id] by adding this to the graph
// A [id] [label="A"]
// B [id] [label="B"]
// C [id] [label="C"]

let mut id_map = PlHashMap::with_capacity(8);
logical_plan
.dot(&mut s, (0, 0), prev_node, &mut id_map)
.expect("io error");
s.push('\n');
let lp = if optimized {
self.clone().to_alp_optimized()
} else {
self.clone().to_alp()
}?;

for (id, label) in id_map {
// the label is wrapped in double quotes
// the id already is wrapped in double quotes
writeln!(s, "{id}[label=\"{label}\"]").unwrap();
}
s.push_str("\n}");
Ok(s)
Ok(lp.display_dot().to_string())
}
}
43 changes: 11 additions & 32 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,49 +208,27 @@ impl LazyFrame {
}

/// Return a String describing the naive (un-optimized) logical plan.
pub fn describe_plan(&self) -> String {
self.logical_plan.describe()
pub fn describe_plan(&self) -> PolarsResult<String> {
Ok(self.clone().to_alp()?.describe())
}

/// Return a String describing the naive (un-optimized) logical plan in tree format.
pub fn describe_plan_tree(&self) -> String {
self.logical_plan.describe_tree_format()
}

fn optimized_plan_ir(&self) -> PolarsResult<IRPlan> {
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(64);
let lp_top = self.clone().optimize_with_scratch(
&mut lp_arena,
&mut expr_arena,
&mut vec![],
true,
)?;

Ok(IRPlan::new(lp_top, lp_arena, expr_arena))
}

fn optimized_plan(&self) -> PolarsResult<DslPlan> {
let IRPlan {
lp_top,
mut lp_arena,
expr_arena,
} = self.optimized_plan_ir()?;
Ok(node_to_lp(lp_top, &expr_arena, &mut lp_arena))
pub fn describe_plan_tree(&self) -> PolarsResult<String> {
Ok(self.clone().to_alp()?.describe_tree_format())
}

/// Return a String describing the optimized logical plan.
///
/// Returns `Err` if optimizing the logical plan fails.
pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
Ok(self.optimized_plan_ir()?.describe())
Ok(self.clone().to_alp_optimized()?.describe())
}

/// Return a String describing the optimized logical plan in tree format.
///
/// Returns `Err` if optimizing the logical plan fails.
pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
Ok(self.optimized_plan()?.describe_tree_format())
Ok(self.clone().to_alp_optimized()?.describe_tree_format())
}

/// Return a String describing the logical plan.
Expand All @@ -261,7 +239,7 @@ impl LazyFrame {
if optimized {
self.describe_optimized_plan()
} else {
Ok(self.describe_plan())
self.describe_plan()
}
}

Expand Down Expand Up @@ -532,15 +510,16 @@ impl LazyFrame {
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
}

pub fn to_alp_optimized(self) -> PolarsResult<(Node, Arena<IR>, Arena<AExpr>)> {
pub fn to_alp_optimized(self) -> PolarsResult<IRPlan> {
let mut lp_arena = Arena::with_capacity(16);
let mut expr_arena = Arena::with_capacity(16);
let node =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;
Ok((node, lp_arena, expr_arena))

Ok(IRPlan::new(node, lp_arena, expr_arena))
}

pub fn to_alp(self) -> PolarsResult<(Node, Arena<IR>, Arena<AExpr>)> {
pub fn to_alp(self) -> PolarsResult<IRPlan> {
self.logical_plan.to_alp()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ApplyExpr {
) -> Self {
#[cfg(debug_assertions)]
if matches!(options.collect_groups, ApplyOptions::ElementWise) && options.returns_scalar {
panic!("expr {} is not implemented correctly. 'returns_scalar' and 'elementwise' are mutually exclusive", expr)
panic!("expr {:?} is not implemented correctly. 'returns_scalar' and 'elementwise' are mutually exclusive", expr)
}

Self {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl<'a> AggregationContext<'a> {
(true, &DataType::List(_)) => {
if series.len() != self.groups.len() {
let fmt_expr = if let Some(e) = expr {
format!("'{e}' ")
format!("'{e:?}' ")
} else {
String::new()
};
Expand Down Expand Up @@ -589,7 +589,7 @@ impl Display for &dyn PhysicalExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.as_expression() {
None => Ok(()),
Some(e) => write!(f, "{e}"),
Some(e) => write!(f, "{e:?}"),
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions crates/polars-lazy/src/physical_plan/streaming/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,15 @@ pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena<IR>, expr_arena: &Arena<AExp
.unwrap();

println!("SUBPLAN ELIGIBLE FOR STREAMING:");
let lp = node_to_lp(root, expr_arena, &mut (lp_arena.clone()));
println!("{lp:?}\n");
println!(
"{}\n",
IRPlanRef {
lp_top: root,
lp_arena,
expr_arena
}
.display()
);

println!("PIPELINE TREE:");
for (i, branch) in tree.iter().enumerate() {
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ fn cached_before_root(q: LazyFrame) {
}

fn count_caches(q: LazyFrame) -> usize {
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
let IRPlan {
lp_top, lp_arena, ..
} = q.to_alp_optimized().unwrap();
(&lp_arena)
.iter(node)
.iter(lp_top)
.filter(|(_node, lp)| matches!(lp, IR::Cache { .. }))
.count()
}
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,10 @@ fn test_scan_parquet_limit_9001() {
..Default::default()
};
let q = LazyFrame::scan_parquet(path, args).unwrap().limit(3);
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
(&lp_arena).iter(node).all(|(_, lp)| match lp {
let IRPlan {
lp_top, lp_arena, ..
} = q.to_alp_optimized().unwrap();
(&lp_arena).iter(lp_top).all(|(_, lp)| match lp {
IR::Union { options, .. } => {
let sliced = options.slice.unwrap();
sliced.1 == 3
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn test_duration() -> PolarsResult<()> {
}

fn print_plans(lf: &LazyFrame) {
println!("LOGICAL PLAN\n\n{}\n", lf.describe_plan());
println!("LOGICAL PLAN\n\n{}\n", lf.describe_plan().unwrap());
println!(
"OPTIMIZED LOGICAL PLAN\n\n{}\n",
lf.describe_optimized_plan().unwrap()
Expand Down
8 changes: 6 additions & 2 deletions crates/polars-lazy/src/tests/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ fn test_schema_update_after_projection_pd() -> PolarsResult<()> {

// run optimizations
// Get the explode node
let (input, lp_arena, _expr_arena) = q.to_alp_optimized()?;
let IRPlan {
lp_top,
lp_arena,
expr_arena: _,
} = q.to_alp_optimized()?;

// assert the schema has been corrected with the projection pushdown run
let lp = lp_arena.get(input);
let lp = lp_arena.get(lp_top);
assert!(matches!(
lp,
IR::MapFunction {
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-lazy/src/tests/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ fn test_q2() -> PolarsResult<()> {
.limit(100)
.with_comm_subplan_elim(true);

let (node, lp_arena, _) = q.clone().to_alp_optimized().unwrap();
let IRPlan {
lp_top, lp_arena, ..
} = q.clone().to_alp_optimized().unwrap();
assert_eq!(
(&lp_arena)
.iter(node)
.iter(lp_top)
.filter(|(_, alp)| matches!(alp, IR::Cache { .. }))
.count(),
2
Expand Down
Loading