From dc20d0d10e73229850eb00442e95f2bf1ecd022f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 1 Nov 2023 13:45:25 -0400 Subject: [PATCH 1/2] Minor: Improve `ExecutionPlan` documentation --- datafusion/physical-plan/src/lib.rs | 112 +++++++++++++++++++--------- 1 file changed, 78 insertions(+), 34 deletions(-) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 8ae2a8686674..afac5befc64e 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -91,16 +91,26 @@ pub use datafusion_physical_expr::{ pub use crate::stream::EmptyRecordBatchStream; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; -/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. +/// Represent nodes in the DataFusion Physical Plan. /// -/// Each `ExecutionPlan` is partition-aware and is responsible for -/// creating the actual `async` [`SendableRecordBatchStream`]s -/// of [`RecordBatch`] that incrementally compute the operator's -/// output from its input partition. +/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of +/// [`RecordBatch`] that incrementally computes a partition of the +/// `ExecutionPlan`'s output from its input. +/// +/// Methods such as [`schema`] and [`output_partitioning`] communicate +/// properties of this output to the DataFusion optimizer, and methods such as +/// [`required_input_distribution`] and [`required_input_ordering`] express +/// requirements of the `ExecutionPlan` from its input. /// /// [`ExecutionPlan`] can be displayed in a simplified form using the /// return value from [`displayable`] in addition to the (normally /// quite verbose) `Debug` output. +/// +/// [`execute`]: ExecutionPlan::execute +/// [`schema`]: ExecutionPlan::schema +/// [`output_partitioning`]: ExecutionPlan::output_partitioning +/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution +/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns the execution plan as [`Any`] so that it can be /// downcast to a specific implementation. @@ -109,7 +119,8 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Get the schema for this execution plan fn schema(&self) -> SchemaRef; - /// Specifies the output partitioning scheme of this plan + /// Specifies how the output of this `ExecutionPlan` is split into + /// partitions. fn output_partitioning(&self) -> Partitioning; /// Specifies whether this plan generates an infinite stream of records. @@ -123,7 +134,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } } - /// If the output of this operator within each partition is sorted, + /// If the output of this `ExecutionPlan` within each partition is sorted, /// returns `Some(keys)` with the description of how it was sorted. /// /// For example, Sort, (obviously) produces sorted output as does @@ -131,17 +142,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// produces sorted output if its input was sorted as it does not /// reorder the input rows, /// - /// It is safe to return `None` here if your operator does not + /// It is safe to return `None` here if your `ExecutionPlan` does not /// have any particular output order here fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; /// Specifies the data distribution requirements for all the - /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child, + /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child, fn required_input_distribution(&self) -> Vec { vec![Distribution::UnspecifiedDistribution; self.children().len()] } - /// Specifies the ordering requirements for all of the children + /// Specifies the ordering required for all of the children of this + /// `ExecutionPlan`. + /// /// For each child, it's the local ordering requirement within /// each partition rather than the global ordering /// @@ -152,7 +165,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { vec![None; self.children().len()] } - /// Returns `false` if this operator's implementation may reorder + /// Returns `false` if this `ExecutionPlan`'s implementation may reorder /// rows within or between partitions. /// /// For example, Projection, Filter, and Limit maintain the order @@ -166,19 +179,21 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// The default implementation returns `false` /// /// WARNING: if you override this default, you *MUST* ensure that - /// the operator's maintains the ordering invariant or else + /// the `ExecutionPlan`'s maintains the ordering invariant or else /// DataFusion may produce incorrect results. fn maintains_input_order(&self) -> Vec { vec![false; self.children().len()] } - /// Specifies whether the operator benefits from increased parallelization - /// at its input for each child. If set to `true`, this indicates that the - /// operator would benefit from partitioning its corresponding child - /// (and thus from more parallelism). For operators that do very little work - /// the overhead of extra parallelism may outweigh any benefits + /// Specifies whether the `ExecutionPlan` benefits from increased + /// parallelization at its input for each child. /// - /// The default implementation returns `true` unless this operator + /// If returns `true`, the `ExecutionPlan` would benefit from partitioning + /// its corresponding child (and thus from more parallelism). For + /// `ExecutionPlan` that do very little work the overhead of extra + /// parallelism may outweigh any benefits + /// + /// The default implementation returns `true` unless this `ExecutionPlan` /// has signalled it requires a single child input partition. fn benefits_from_input_partitioning(&self) -> Vec { // By default try to maximize parallelism with more CPUs if @@ -199,12 +214,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { OrderingEquivalenceProperties::new(self.schema()) } - /// Get a list of child execution plans that provide the input for this plan. The returned list - /// will be empty for leaf nodes, will contain a single value for unary nodes, or two - /// values for binary nodes (such as joins). + /// Get a list of `ExecutionPlan` that provide input for this plan. The + /// returned list will be empty for leaf nodes such as scans, will contain a + /// single value for unary nodes, or two values for binary nodes (such as + /// joins). fn children(&self) -> Vec>; - /// Returns a new plan where all children were replaced by new plans. + /// Returns a new `ExecutionPlan` where all existing children were replaced + /// by the `children`, oi order fn with_new_children( self: Arc, children: Vec>, @@ -235,13 +252,40 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(None) } - /// Begin execution of `partition`, returning a stream of [`RecordBatch`]es. + /// Begin execution of `partition`, returning a [`Stream`] of + /// [`RecordBatch`]es. + /// + /// # Notes + /// + /// The `execute` method itself is not `async` but it returns an `async` + /// [`futures::stream::Stream`]. This `Stream` should incrementally compute + /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion). + /// Most `ExecutionPlan`s should not do any work before the first + /// `RecordBatch` is requested from the stream. + /// + /// [`RecordBatchStreamAdapter`] can be used to convert an `async` + /// [`Stream`] into a [`SendableRecordBatchStream`]. + /// + /// Using `async` `Streams` allows for network I/O during execution and + /// takes advantage of Rust's built in support for `async` continuations and + /// crate ecosystem. + /// + /// [`Stream`]: futures::stream::Stream + /// [`StreamExt`]: futures::stream::StreamExt + /// [`TryStreamExt`]: futures::stream::TryStreamExt + /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter /// /// # Implementation Examples /// - /// ## Return Precomputed Batch + /// While `async` `Stream`s have a non trivial learning curve, the + /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`] + /// which help simplify many common operations. /// - /// We can return a precomputed batch as a stream + /// Here are some common patterns: + /// + /// ## Return Precomputed `RecordBatch` + /// + /// We can return a precomputed `RecordBatch` as a `Stream`: /// /// ``` /// # use std::sync::Arc; @@ -261,6 +305,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// partition: usize, /// context: Arc /// ) -> Result { + /// // use functions from futures crate convert the batch into a stream /// let fut = futures::future::ready(Ok(self.batch.clone())); /// let stream = futures::stream::once(fut); /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream))) @@ -268,9 +313,9 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// } /// ``` /// - /// ## Async Compute Batch + /// ## Lazily (async) Compute `RecordBatch` /// - /// We can also lazily compute a RecordBatch when the returned stream is polled + /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled /// /// ``` /// # use std::sync::Arc; @@ -284,6 +329,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// schema: SchemaRef, /// } /// + /// /// Returns a single batch when the returned stream is polled /// async fn get_batch() -> Result { /// todo!() /// } @@ -301,10 +347,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// } /// ``` /// - /// ## Async Compute Batch Stream + /// ## Lazily (async) create a Stream /// - /// We can lazily compute a RecordBatch stream when the returned stream is polled - /// flattening the result into a single stream + /// If you need to to create the return `Stream` using an `async` function, + /// you can do so by flattening the result: /// /// ``` /// # use std::sync::Arc; @@ -319,6 +365,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// schema: SchemaRef, /// } /// + /// /// async function that returns a stream /// async fn get_batch_stream() -> Result { /// todo!() /// } @@ -337,9 +384,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// } /// } /// ``` - /// - /// See [`futures::stream::StreamExt`] and [`futures::stream::TryStreamExt`] for further - /// combinators that can be used with streams fn execute( &self, partition: usize, @@ -372,7 +416,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful /// especially for the distributed engine to judge whether need to deal with shuffling. /// Currently there are 3 kinds of execution plan which needs data exchange -/// 1. RepartitionExec for changing the partition number between two operators +/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s /// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee /// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee pub fn need_data_exchange(plan: Arc) -> bool { From 6f9d925d703b262a9f9e5bfdd9f4a00fee910443 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 1 Nov 2023 16:43:56 -0400 Subject: [PATCH 2/2] Add link to Partitioning --- datafusion/physical-plan/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index afac5befc64e..ed795a1cb333 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -95,7 +95,8 @@ pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; /// /// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of /// [`RecordBatch`] that incrementally computes a partition of the -/// `ExecutionPlan`'s output from its input. +/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more +/// details on partitioning. /// /// Methods such as [`schema`] and [`output_partitioning`] communicate /// properties of this output to the DataFusion optimizer, and methods such as