Skip to content

Commit

Permalink
refactor: move all describe, describe_tree and dot-viz code to IR ins…
Browse files Browse the repository at this point in the history
…tead of DslPlan (#16237)
  • Loading branch information
coastalwhite authored May 15, 2024
1 parent 0270ffa commit d7d720a
Show file tree
Hide file tree
Showing 27 changed files with 918 additions and 1,163 deletions.
56 changes: 6 additions & 50 deletions crates/polars-lazy/src/dot.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,16 @@
use std::fmt::Write;

use polars_core::prelude::*;
use polars_plan::dot::*;
use polars_plan::prelude::*;

use crate::prelude::*;

impl LazyFrame {
/// Get a dot language representation of the LogicalPlan.
pub fn to_dot(&self, optimized: bool) -> PolarsResult<String> {
let mut s = String::with_capacity(512);

let mut logical_plan = self.clone().get_plan_builder().build();
if optimized {
// initialize arena's
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(32);

let lp_top = self.clone().optimize_with_scratch(
&mut lp_arena,
&mut expr_arena,
&mut vec![],
true,
)?;
logical_plan = node_to_lp(lp_top, &expr_arena, &mut lp_arena);
}

let prev_node = DotNode {
branch: 0,
id: 0,
fmt: "",
};

// maps graphviz id to label
// we use this to create this graph
// first we create nodes including ids to make sure they are unique
// A [id] -- B [id]
// B [id] -- C [id]
//
// then later we hide the [id] by adding this to the graph
// A [id] [label="A"]
// B [id] [label="B"]
// C [id] [label="C"]

let mut id_map = PlHashMap::with_capacity(8);
logical_plan
.dot(&mut s, (0, 0), prev_node, &mut id_map)
.expect("io error");
s.push('\n');
let lp = if optimized {
self.clone().to_alp_optimized()
} else {
self.clone().to_alp()
}?;

for (id, label) in id_map {
// the label is wrapped in double quotes
// the id already is wrapped in double quotes
writeln!(s, "{id}[label=\"{label}\"]").unwrap();
}
s.push_str("\n}");
Ok(s)
Ok(lp.display_dot().to_string())
}
}
43 changes: 11 additions & 32 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,49 +208,27 @@ impl LazyFrame {
}

/// Return a String describing the naive (un-optimized) logical plan.
pub fn describe_plan(&self) -> String {
self.logical_plan.describe()
pub fn describe_plan(&self) -> PolarsResult<String> {
Ok(self.clone().to_alp()?.describe())
}

/// Return a String describing the naive (un-optimized) logical plan in tree format.
pub fn describe_plan_tree(&self) -> String {
self.logical_plan.describe_tree_format()
}

fn optimized_plan_ir(&self) -> PolarsResult<IRPlan> {
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(64);
let lp_top = self.clone().optimize_with_scratch(
&mut lp_arena,
&mut expr_arena,
&mut vec![],
true,
)?;

Ok(IRPlan::new(lp_top, lp_arena, expr_arena))
}

fn optimized_plan(&self) -> PolarsResult<DslPlan> {
let IRPlan {
lp_top,
mut lp_arena,
expr_arena,
} = self.optimized_plan_ir()?;
Ok(node_to_lp(lp_top, &expr_arena, &mut lp_arena))
pub fn describe_plan_tree(&self) -> PolarsResult<String> {
Ok(self.clone().to_alp()?.describe_tree_format())
}

/// Return a String describing the optimized logical plan.
///
/// Returns `Err` if optimizing the logical plan fails.
pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
Ok(self.optimized_plan_ir()?.describe())
Ok(self.clone().to_alp_optimized()?.describe())
}

/// Return a String describing the optimized logical plan in tree format.
///
/// Returns `Err` if optimizing the logical plan fails.
pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
Ok(self.optimized_plan()?.describe_tree_format())
Ok(self.clone().to_alp_optimized()?.describe_tree_format())
}

/// Return a String describing the logical plan.
Expand All @@ -261,7 +239,7 @@ impl LazyFrame {
if optimized {
self.describe_optimized_plan()
} else {
Ok(self.describe_plan())
self.describe_plan()
}
}

Expand Down Expand Up @@ -532,15 +510,16 @@ impl LazyFrame {
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
}

pub fn to_alp_optimized(self) -> PolarsResult<(Node, Arena<IR>, Arena<AExpr>)> {
pub fn to_alp_optimized(self) -> PolarsResult<IRPlan> {
let mut lp_arena = Arena::with_capacity(16);
let mut expr_arena = Arena::with_capacity(16);
let node =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;
Ok((node, lp_arena, expr_arena))

Ok(IRPlan::new(node, lp_arena, expr_arena))
}

pub fn to_alp(self) -> PolarsResult<(Node, Arena<IR>, Arena<AExpr>)> {
pub fn to_alp(self) -> PolarsResult<IRPlan> {
self.logical_plan.to_alp()
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl ApplyExpr {
) -> Self {
#[cfg(debug_assertions)]
if matches!(options.collect_groups, ApplyOptions::ElementWise) && options.returns_scalar {
panic!("expr {} is not implemented correctly. 'returns_scalar' and 'elementwise' are mutually exclusive", expr)
panic!("expr {:?} is not implemented correctly. 'returns_scalar' and 'elementwise' are mutually exclusive", expr)
}

Self {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl<'a> AggregationContext<'a> {
(true, &DataType::List(_)) => {
if series.len() != self.groups.len() {
let fmt_expr = if let Some(e) = expr {
format!("'{e}' ")
format!("'{e:?}' ")
} else {
String::new()
};
Expand Down Expand Up @@ -589,7 +589,7 @@ impl Display for &dyn PhysicalExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.as_expression() {
None => Ok(()),
Some(e) => write!(f, "{e}"),
Some(e) => write!(f, "{e:?}"),
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions crates/polars-lazy/src/physical_plan/streaming/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,15 @@ pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena<IR>, expr_arena: &Arena<AExp
.unwrap();

println!("SUBPLAN ELIGIBLE FOR STREAMING:");
let lp = node_to_lp(root, expr_arena, &mut (lp_arena.clone()));
println!("{lp:?}\n");
println!(
"{}\n",
IRPlanRef {
lp_top: root,
lp_arena,
expr_arena
}
.display()
);

println!("PIPELINE TREE:");
for (i, branch) in tree.iter().enumerate() {
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ fn cached_before_root(q: LazyFrame) {
}

fn count_caches(q: LazyFrame) -> usize {
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
let IRPlan {
lp_top, lp_arena, ..
} = q.to_alp_optimized().unwrap();
(&lp_arena)
.iter(node)
.iter(lp_top)
.filter(|(_node, lp)| matches!(lp, IR::Cache { .. }))
.count()
}
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,10 @@ fn test_scan_parquet_limit_9001() {
..Default::default()
};
let q = LazyFrame::scan_parquet(path, args).unwrap().limit(3);
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
(&lp_arena).iter(node).all(|(_, lp)| match lp {
let IRPlan {
lp_top, lp_arena, ..
} = q.to_alp_optimized().unwrap();
(&lp_arena).iter(lp_top).all(|(_, lp)| match lp {
IR::Union { options, .. } => {
let sliced = options.slice.unwrap();
sliced.1 == 3
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn test_duration() -> PolarsResult<()> {
}

fn print_plans(lf: &LazyFrame) {
println!("LOGICAL PLAN\n\n{}\n", lf.describe_plan());
println!("LOGICAL PLAN\n\n{}\n", lf.describe_plan().unwrap());
println!(
"OPTIMIZED LOGICAL PLAN\n\n{}\n",
lf.describe_optimized_plan().unwrap()
Expand Down
8 changes: 6 additions & 2 deletions crates/polars-lazy/src/tests/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ fn test_schema_update_after_projection_pd() -> PolarsResult<()> {

// run optimizations
// Get the explode node
let (input, lp_arena, _expr_arena) = q.to_alp_optimized()?;
let IRPlan {
lp_top,
lp_arena,
expr_arena: _,
} = q.to_alp_optimized()?;

// assert the schema has been corrected with the projection pushdown run
let lp = lp_arena.get(input);
let lp = lp_arena.get(lp_top);
assert!(matches!(
lp,
IR::MapFunction {
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-lazy/src/tests/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ fn test_q2() -> PolarsResult<()> {
.limit(100)
.with_comm_subplan_elim(true);

let (node, lp_arena, _) = q.clone().to_alp_optimized().unwrap();
let IRPlan {
lp_top, lp_arena, ..
} = q.clone().to_alp_optimized().unwrap();
assert_eq!(
(&lp_arena)
.iter(node)
.iter(lp_top)
.filter(|(_, alp)| matches!(alp, IR::Cache { .. }))
.count(),
2
Expand Down
Loading

0 comments on commit d7d720a

Please sign in to comment.