diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 85c4aea99ff5..0335e29127ab 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -399,7 +399,7 @@ impl LogicalPlanBuilder { /// Create an expression to represent the explanation of the plan pub fn explain(&self, verbose: bool) -> Result { let stringified_plans = vec![StringifiedPlan::new( - PlanType::LogicalPlan, + PlanType::InitialLogicalPlan, format!("{:#?}", self.plan.clone()), )]; @@ -740,14 +740,24 @@ mod tests { #[test] fn stringified_plan() { let stringified_plan = - StringifiedPlan::new(PlanType::LogicalPlan, "...the plan..."); + StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan..."); + assert!(stringified_plan.should_display(true)); + assert!(!stringified_plan.should_display(false)); // not in non verbose mode + + let stringified_plan = + StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan..."); assert!(stringified_plan.should_display(true)); assert!(stringified_plan.should_display(false)); // display in non verbose mode too let stringified_plan = - StringifiedPlan::new(PlanType::PhysicalPlan, "...the plan..."); + StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan..."); assert!(stringified_plan.should_display(true)); - assert!(!stringified_plan.should_display(false)); + assert!(!stringified_plan.should_display(false)); // not in non verbose mode + + let stringified_plan = + StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan..."); + assert!(stringified_plan.should_display(true)); + assert!(stringified_plan.should_display(false)); // display in non verbose mode let stringified_plan = StringifiedPlan::new( PlanType::OptimizedLogicalPlan { diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 42eaf8e559c9..9a4daae27ff5 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -805,28 +805,35 @@ impl fmt::Debug for LogicalPlan { } } -/// Represents which type of plan +/// Represents which type of plan, when storing multiple +/// for use in EXPLAIN plans #[derive(Debug, Clone, PartialEq)] pub enum PlanType { /// The initial LogicalPlan provided to DataFusion - LogicalPlan, + InitialLogicalPlan, /// The LogicalPlan which results from applying an optimizer pass OptimizedLogicalPlan { /// The name of the optimizer which produced this plan optimizer_name: String, }, - /// The physical plan, prepared for execution - PhysicalPlan, + /// The final, fully optimized LogicalPlan that was converted to a physical plan + FinalLogicalPlan, + /// The initial physical plan, prepared for execution + InitialPhysicalPlan, + /// The final, fully optimized physical which would be executed + FinalPhysicalPlan, } impl fmt::Display for PlanType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - PlanType::LogicalPlan => write!(f, "logical_plan"), + PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"), PlanType::OptimizedLogicalPlan { optimizer_name } => { write!(f, "logical_plan after {}", optimizer_name) } - PlanType::PhysicalPlan => write!(f, "physical_plan"), + PlanType::FinalLogicalPlan => write!(f, "logical_plan"), + PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"), + PlanType::FinalPhysicalPlan => write!(f, "physical_plan"), } } } @@ -854,7 +861,10 @@ impl StringifiedPlan { /// returns true if this plan should be displayed. Generally /// `verbose_mode = true` will display all available plans pub fn should_display(&self, verbose_mode: bool) -> bool { - self.plan_type == PlanType::LogicalPlan || verbose_mode + match self.plan_type { + PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true, + _ => verbose_mode, + } } } diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 1d19f0681b35..88380ea17c87 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -542,7 +542,7 @@ mod tests { &optimizer, true, &empty_plan, - &[StringifiedPlan::new(PlanType::LogicalPlan, "...")], + &[StringifiedPlan::new(PlanType::InitialLogicalPlan, "...")], schema.as_ref(), &ExecutionProps::new(), )?; @@ -556,7 +556,7 @@ mod tests { assert!(*verbose); let expected_stringified_plans = vec![ - StringifiedPlan::new(PlanType::LogicalPlan, "..."), + StringifiedPlan::new(PlanType::InitialLogicalPlan, "..."), StringifiedPlan::new( PlanType::OptimizedLogicalPlan { optimizer_name: "test_optimizer".into(), diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index c838ce4a94d4..195a7a518370 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -40,14 +40,21 @@ pub struct ExplainExec { schema: SchemaRef, /// The strings to be printed stringified_plans: Vec, + /// control which plans to print + verbose: bool, } impl ExplainExec { /// Create a new ExplainExec - pub fn new(schema: SchemaRef, stringified_plans: Vec) -> Self { + pub fn new( + schema: SchemaRef, + stringified_plans: Vec, + verbose: bool, + ) -> Self { ExplainExec { schema, stringified_plans, + verbose, } } @@ -103,8 +110,13 @@ impl ExecutionPlan for ExplainExec { let mut type_builder = StringBuilder::new(self.stringified_plans.len()); let mut plan_builder = StringBuilder::new(self.stringified_plans.len()); - for p in &self.stringified_plans { - type_builder.append_value(&p.plan_type.to_string())?; + let plans_to_print = self + .stringified_plans + .iter() + .filter(|s| s.should_display(self.verbose)); + + for p in plans_to_print { + type_builder.append_value(p.plan_type.to_string())?; plan_builder.append_value(&*p.plan)?; } diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index df4168370003..5163e4b425b4 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -240,8 +240,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, ) -> Result> { - let plan = self.create_initial_plan(logical_plan, ctx_state)?; - self.optimize_plan(plan, ctx_state) + match self.handle_explain(logical_plan, ctx_state)? { + Some(plan) => Ok(plan), + None => { + let plan = self.create_initial_plan(logical_plan, ctx_state)?; + self.optimize_plan(plan, ctx_state) + } + } } /// Create a physical expression from a logical expression @@ -280,7 +285,7 @@ impl DefaultPhysicalPlanner { Self { extension_planners } } - /// Optimize a physical plan + /// Optimize a physical plan by applying each physical optimizer fn optimize_plan( &self, plan: Arc, @@ -749,32 +754,9 @@ impl DefaultPhysicalPlanner { "Unsupported logical plan: CreateExternalTable".to_string(), )) } - LogicalPlan::Explain { - verbose, - plan, - stringified_plans, - schema, - } => { - let input = self.create_initial_plan(plan, ctx_state)?; - - let mut stringified_plans = stringified_plans - .iter() - .filter(|s| s.should_display(*verbose)) - .cloned() - .collect::>(); - - // add in the physical plan if requested - if *verbose { - stringified_plans.push(StringifiedPlan::new( - PlanType::PhysicalPlan, - displayable(input.as_ref()).indent().to_string(), - )); - } - Ok(Arc::new(ExplainExec::new( - SchemaRef::new(schema.as_ref().to_owned().into()), - stringified_plans, - ))) - } + LogicalPlan::Explain { .. } => Err(DataFusionError::Internal( + "Unsupported logical plan: Explain must be root of the plan".to_string(), + )), LogicalPlan::Extension { node } => { let physical_inputs = node .inputs() @@ -1315,6 +1297,60 @@ impl DefaultPhysicalPlanner { options, }) } + + /// Handles capturing the various plans for EXPLAIN queries + /// + /// Returns + /// Some(plan) if optimized, and None if logical_plan was not an + /// explain (and thus needs to be optimized as normal) + fn handle_explain( + &self, + logical_plan: &LogicalPlan, + ctx_state: &ExecutionContextState, + ) -> Result>> { + if let LogicalPlan::Explain { + verbose, + plan, + stringified_plans, + schema, + } = logical_plan + { + let final_logical_plan = StringifiedPlan::new( + PlanType::FinalLogicalPlan, + plan.display_indent().to_string(), + ); + + let input = self.create_initial_plan(plan, ctx_state)?; + + let initial_physical_plan = StringifiedPlan::new( + PlanType::InitialPhysicalPlan, + displayable(input.as_ref()).indent().to_string(), + ); + + let input = self.optimize_plan(input, ctx_state)?; + + let final_physical_plan = StringifiedPlan::new( + PlanType::FinalPhysicalPlan, + displayable(input.as_ref()).indent().to_string(), + ); + + let stringified_plans = stringified_plans + .iter() + .cloned() + .chain(std::iter::once(final_logical_plan)) + .chain(std::iter::once(initial_physical_plan)) + .chain(std::iter::once(final_physical_plan)) + .collect::>(); + + Ok(Some(Arc::new(ExplainExec::new( + SchemaRef::new(schema.as_ref().to_owned().into()), + stringified_plans, + *verbose, + )))) + } else { + Ok(None) + } + } } fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 1437346fccd3..a4bb02cf0f9a 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -234,8 +234,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let plan = self.sql_statement_to_plan(statement)?; let stringified_plans = vec![StringifiedPlan::new( - PlanType::LogicalPlan, - format!("{:#?}", plan), + PlanType::InitialLogicalPlan, + plan.display_indent().to_string(), )]; let schema = LogicalPlan::explain_schema(); diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 9c7d0795edb9..875e98255118 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1972,17 +1972,28 @@ async fn csv_explain() { register_aggregate_csv_by_sql(&mut ctx).await; let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10"; let actual = execute(&mut ctx, sql).await; - let expected = vec![vec![ - "logical_plan", - "Projection: #aggregate_test_100.c1\ - \n Filter: #aggregate_test_100.c2 Gt Int64(10)\ - \n TableScan: aggregate_test_100 projection=None", + let actual = normalize_vec_for_explain(actual); + let expected = vec![ + vec![ + "logical_plan", + "Projection: #aggregate_test_100.c1\ + \n Filter: #aggregate_test_100.c2 Gt Int64(10)\ + \n TableScan: aggregate_test_100 projection=Some([0, 1])" + ], + vec!["physical_plan", + "ProjectionExec: expr=[c1@0 as c1]\ + \n CoalesceBatchesExec: target_batch_size=4096\ + \n FilterExec: CAST(c2@1 AS Int64) > 10\ + \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ + \n CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true\ + \n" ]]; assert_eq!(expected, actual); // Also, expect same result with lowercase explain let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10"; let actual = execute(&mut ctx, sql).await; + let actual = normalize_vec_for_explain(actual); assert_eq!(expected, actual); } @@ -3921,3 +3932,27 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> { assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'"); Ok(()) } + +// Normalizes parts of an explain plan that vary from run to run (such as path) +fn normalize_for_explain(s: &str) -> String { + // Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv + // to ARROW_TEST_DATA/csv/aggregate_test_100.csv + let data_path = datafusion::test_util::arrow_test_data(); + let s = s.replace(&data_path, "ARROW_TEST_DATA"); + + // convert things like partitioning=RoundRobinBatch(16) + // to partitioning=RoundRobinBatch(NUM_CORES) + let needle = format!("RoundRobinBatch({})", num_cpus::get()); + s.replace(&needle, "RoundRobinBatch(NUM_CORES)") +} + +/// Applies normalize_for_explain to every line +fn normalize_vec_for_explain(v: Vec>) -> Vec> { + v.into_iter() + .map(|l| { + l.into_iter() + .map(|s| normalize_for_explain(&s)) + .collect::>() + }) + .collect::>() +} diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 21b49638d23a..e1f8c767bd8d 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> { let mut ctx = setup_table(make_topk_context()).await?; let expected = vec![ - "| logical_plan after topk | TopK: k=3 |", - "| | Projection: #sales.customer_id, #sales.revenue |", - "| | TableScan: sales projection=Some([0, 1]) |", + "| logical_plan after topk | TopK: k=3 |", + "| | Projection: #sales.customer_id, #sales.revenue |", + "| | TableScan: sales projection=Some([0, 1]) |", ].join("\n"); let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);