Skip to content

Commit

Permalink
Fix job hangs when partition count of plan is zero (apache#1024)
Browse files Browse the repository at this point in the history
* Fix job hangs when partition count of plan is zero

* fix compilation issues

---------

Co-authored-by: Andy Grove <agrove@apache.org>
  • Loading branch information
lewiszlw and andygrove authored Jul 15, 2024
1 parent 1648bbe commit 4fa5b5f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
1 change: 0 additions & 1 deletion ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// This file is @generated by prost-build.
/// /////////////////////////////////////////////////////////////////////////////////////////////////
/// Ballista Physical Plan
/// /////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
24 changes: 22 additions & 2 deletions ballista/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
use datafusion::datasource::source_as_provider;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ExecutionPlanProperties;
use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -39,6 +40,7 @@ use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::BallistaCodec;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
Expand Down Expand Up @@ -408,8 +410,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);

let plan = plan.transform_down(&|node| {
if node.output_partitioning().partition_count() == 0 {
Ok(Transformed::yes(Arc::new(EmptyExec::new(node.schema()))))
} else {
Ok(Transformed::no(node))
}
})?;
debug!(
"Transformed physical plan: {}",
DisplayableExecutionPlan::new(plan.data.as_ref()).indent(false)
);

self.task_manager
.submit_job(job_id, job_name, &session_ctx.session_id(), plan, queued_at)
.submit_job(
job_id,
job_name,
&session_ctx.session_id(),
plan.data,
queued_at,
)
.await?;

let elapsed = start.elapsed();
Expand Down

0 comments on commit 4fa5b5f

Please sign in to comment.