Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show optimized physical and logical plans in EXPLAIN #744

Merged
merged 3 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ impl LogicalPlanBuilder {
/// Create an expression to represent the explanation of the plan
pub fn explain(&self, verbose: bool) -> Result<Self> {
let stringified_plans = vec![StringifiedPlan::new(
PlanType::LogicalPlan,
PlanType::InitialLogicalPlan,
format!("{:#?}", self.plan.clone()),
)];

Expand Down Expand Up @@ -740,14 +740,24 @@ mod tests {
#[test]
fn stringified_plan() {
let stringified_plan =
StringifiedPlan::new(PlanType::LogicalPlan, "...the plan...");
StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
assert!(stringified_plan.should_display(true));
assert!(!stringified_plan.should_display(false)); // not in non verbose mode

let stringified_plan =
StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
assert!(stringified_plan.should_display(true));
assert!(stringified_plan.should_display(false)); // display in non verbose mode too

let stringified_plan =
StringifiedPlan::new(PlanType::PhysicalPlan, "...the plan...");
StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
assert!(stringified_plan.should_display(true));
assert!(!stringified_plan.should_display(false));
assert!(!stringified_plan.should_display(false)); // not in non verbose mode

let stringified_plan =
StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
assert!(stringified_plan.should_display(true));
assert!(stringified_plan.should_display(false)); // display in non verbose mode

let stringified_plan = StringifiedPlan::new(
PlanType::OptimizedLogicalPlan {
Expand Down
24 changes: 17 additions & 7 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,28 +805,35 @@ impl fmt::Debug for LogicalPlan {
}
}

/// Represents which type of plan
/// Represents which type of plan, when storing multiple
/// for use in EXPLAIN plans
#[derive(Debug, Clone, PartialEq)]
pub enum PlanType {
/// The initial LogicalPlan provided to DataFusion
LogicalPlan,
InitialLogicalPlan,
/// The LogicalPlan which results from applying an optimizer pass
OptimizedLogicalPlan {
/// The name of the optimizer which produced this plan
optimizer_name: String,
},
/// The physical plan, prepared for execution
PhysicalPlan,
/// The final, fully optimized LogicalPlan that was converted to a physical plan
FinalLogicalPlan,
/// The initial physical plan, prepared for execution
InitialPhysicalPlan,
/// The final, fully optimized physical which would be executed
FinalPhysicalPlan,
}

impl fmt::Display for PlanType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PlanType::LogicalPlan => write!(f, "logical_plan"),
PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
PlanType::OptimizedLogicalPlan { optimizer_name } => {
write!(f, "logical_plan after {}", optimizer_name)
}
PlanType::PhysicalPlan => write!(f, "physical_plan"),
PlanType::FinalLogicalPlan => write!(f, "logical_plan"),
PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"),
PlanType::FinalPhysicalPlan => write!(f, "physical_plan"),
}
}
}
Expand Down Expand Up @@ -854,7 +861,10 @@ impl StringifiedPlan {
/// returns true if this plan should be displayed. Generally
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
self.plan_type == PlanType::LogicalPlan || verbose_mode
match self.plan_type {
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
_ => verbose_mode,
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ mod tests {
&optimizer,
true,
&empty_plan,
&[StringifiedPlan::new(PlanType::LogicalPlan, "...")],
&[StringifiedPlan::new(PlanType::InitialLogicalPlan, "...")],
schema.as_ref(),
&ExecutionProps::new(),
)?;
Expand All @@ -556,7 +556,7 @@ mod tests {
assert!(*verbose);

let expected_stringified_plans = vec![
StringifiedPlan::new(PlanType::LogicalPlan, "..."),
StringifiedPlan::new(PlanType::InitialLogicalPlan, "..."),
StringifiedPlan::new(
PlanType::OptimizedLogicalPlan {
optimizer_name: "test_optimizer".into(),
Expand Down
18 changes: 15 additions & 3 deletions datafusion/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,21 @@ pub struct ExplainExec {
schema: SchemaRef,
/// The strings to be printed
stringified_plans: Vec<StringifiedPlan>,
/// control which plans to print
verbose: bool,
}

impl ExplainExec {
/// Create a new ExplainExec
pub fn new(schema: SchemaRef, stringified_plans: Vec<StringifiedPlan>) -> Self {
pub fn new(
schema: SchemaRef,
stringified_plans: Vec<StringifiedPlan>,
verbose: bool,
) -> Self {
ExplainExec {
schema,
stringified_plans,
verbose,
}
}

Expand Down Expand Up @@ -103,8 +110,13 @@ impl ExecutionPlan for ExplainExec {
let mut type_builder = StringBuilder::new(self.stringified_plans.len());
let mut plan_builder = StringBuilder::new(self.stringified_plans.len());

for p in &self.stringified_plans {
type_builder.append_value(&p.plan_type.to_string())?;
let plans_to_print = self
.stringified_plans
.iter()
.filter(|s| s.should_display(self.verbose));

for p in plans_to_print {
type_builder.append_value(p.plan_type.to_string())?;
plan_builder.append_value(&*p.plan)?;
}

Expand Down
94 changes: 65 additions & 29 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
self.optimize_plan(plan, ctx_state)
match self.handle_explain(logical_plan, ctx_state)? {
Some(plan) => Ok(plan),
None => {
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
self.optimize_plan(plan, ctx_state)
}
}
}

/// Create a physical expression from a logical expression
Expand Down Expand Up @@ -280,7 +285,7 @@ impl DefaultPhysicalPlanner {
Self { extension_planners }
}

/// Optimize a physical plan
/// Optimize a physical plan by applying each physical optimizer
fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -749,32 +754,9 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
LogicalPlan::Explain {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously the physical planner could handle an LogicalPlan::Explain anywhere in the tree. However, I can't think if cases where this can actually occur and it made adding the final physical plan challenging.

So I have changed the code to special case handling Explain as the root.

verbose,
plan,
stringified_plans,
schema,
} => {
let input = self.create_initial_plan(plan, ctx_state)?;

let mut stringified_plans = stringified_plans
.iter()
.filter(|s| s.should_display(*verbose))
.cloned()
.collect::<Vec<_>>();

// add in the physical plan if requested
if *verbose {
stringified_plans.push(StringifiedPlan::new(
PlanType::PhysicalPlan,
displayable(input.as_ref()).indent().to_string(),
));
}
Ok(Arc::new(ExplainExec::new(
SchemaRef::new(schema.as_ref().to_owned().into()),
stringified_plans,
)))
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Extension { node } => {
let physical_inputs = node
.inputs()
Expand Down Expand Up @@ -1315,6 +1297,60 @@ impl DefaultPhysicalPlanner {
options,
})
}

/// Handles capturing the various plans for EXPLAIN queries
///
/// Returns
/// Some(plan) if optimized, and None if logical_plan was not an
/// explain (and thus needs to be optimized as normal)
fn handle_explain(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if let LogicalPlan::Explain {
verbose,
plan,
stringified_plans,
schema,
} = logical_plan
{
let final_logical_plan = StringifiedPlan::new(
PlanType::FinalLogicalPlan,
plan.display_indent().to_string(),
);

let input = self.create_initial_plan(plan, ctx_state)?;

let initial_physical_plan = StringifiedPlan::new(
PlanType::InitialPhysicalPlan,
displayable(input.as_ref()).indent().to_string(),
);

let input = self.optimize_plan(input, ctx_state)?;

let final_physical_plan = StringifiedPlan::new(
PlanType::FinalPhysicalPlan,
displayable(input.as_ref()).indent().to_string(),
);

let stringified_plans = stringified_plans
.iter()
.cloned()
.chain(std::iter::once(final_logical_plan))
.chain(std::iter::once(initial_physical_plan))
.chain(std::iter::once(final_physical_plan))
.collect::<Vec<_>>();

Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(schema.as_ref().to_owned().into()),
stringified_plans,
*verbose,
))))
} else {
Ok(None)
}
}
}

fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.sql_statement_to_plan(statement)?;

let stringified_plans = vec![StringifiedPlan::new(
PlanType::LogicalPlan,
format!("{:#?}", plan),
PlanType::InitialLogicalPlan,
plan.display_indent().to_string(),
)];

let schema = LogicalPlan::explain_schema();
Expand Down
45 changes: 40 additions & 5 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1972,17 +1972,28 @@ async fn csv_explain() {
register_aggregate_csv_by_sql(&mut ctx).await;
let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10";
let actual = execute(&mut ctx, sql).await;
let expected = vec![vec![
"logical_plan",
"Projection: #aggregate_test_100.c1\
\n Filter: #aggregate_test_100.c2 Gt Int64(10)\
\n TableScan: aggregate_test_100 projection=None",
let actual = normalize_vec_for_explain(actual);
let expected = vec![
vec![
"logical_plan",
"Projection: #aggregate_test_100.c1\
\n Filter: #aggregate_test_100.c2 Gt Int64(10)\
\n TableScan: aggregate_test_100 projection=Some([0, 1])"
],
vec!["physical_plan",
"ProjectionExec: expr=[c1@0 as c1]\
\n CoalesceBatchesExec: target_batch_size=4096\
\n FilterExec: CAST(c2@1 AS Int64) > 10\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
\n CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true\
\n"
]];
assert_eq!(expected, actual);

// Also, expect same result with lowercase explain
let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10";
let actual = execute(&mut ctx, sql).await;
let actual = normalize_vec_for_explain(actual);
assert_eq!(expected, actual);
}

Expand Down Expand Up @@ -3921,3 +3932,27 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> {
assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
Ok(())
}

// Normalizes parts of an explain plan that vary from run to run (such as path)
fn normalize_for_explain(s: &str) -> String {
// Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
// to ARROW_TEST_DATA/csv/aggregate_test_100.csv
let data_path = datafusion::test_util::arrow_test_data();
let s = s.replace(&data_path, "ARROW_TEST_DATA");

// convert things like partitioning=RoundRobinBatch(16)
// to partitioning=RoundRobinBatch(NUM_CORES)
let needle = format!("RoundRobinBatch({})", num_cpus::get());
s.replace(&needle, "RoundRobinBatch(NUM_CORES)")
}

/// Applies normalize_for_explain to every line
fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
v.into_iter()
.map(|l| {
l.into_iter()
.map(|s| normalize_for_explain(&s))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
}
6 changes: 3 additions & 3 deletions datafusion/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> {
let mut ctx = setup_table(make_topk_context()).await?;

let expected = vec![
"| logical_plan after topk | TopK: k=3 |",
"| | Projection: #sales.customer_id, #sales.revenue |",
"| | TableScan: sales projection=Some([0, 1]) |",
"| logical_plan after topk | TopK: k=3 |",
"| | Projection: #sales.customer_id, #sales.revenue |",
"| | TableScan: sales projection=Some([0, 1]) |",
].join("\n");

let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);
Expand Down