Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial MemoryManager and DiskManager APIs for query execution + External Sort implementation #1526

Merged
merged 14 commits into from
Jan 13, 2022
4 changes: 3 additions & 1 deletion ballista/rust/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion::physical_plan::{
};

use async_trait::async_trait;
use datafusion::execution::runtime_env::RuntimeEnv;
use futures::future;
use futures::StreamExt;
use log::{error, info};
Expand Down Expand Up @@ -99,7 +100,8 @@ impl ExecutionPlan for DistributedQueryExec {
async fn execute(
&self,
partition: usize,
) -> datafusion::error::Result<SendableRecordBatchStream> {
_runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
assert_eq!(0, partition);

info!("Connecting to Ballista scheduler at {}", self.scheduler_url);
Expand Down
7 changes: 5 additions & 2 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Metric, Partitioning, Statistics,
DisplayFormatType, ExecutionPlan, Metric, Partitioning, SendableRecordBatchStream,
Statistics,
};
use datafusion::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -100,7 +102,8 @@ impl ExecutionPlan for ShuffleReaderExec {
async fn execute(
&self,
partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
_runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
info!("ShuffleReaderExec::execute({})", partition);

let fetch_time =
Expand Down
76 changes: 18 additions & 58 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@ use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::common::IPCWriter;
use datafusion::physical_plan::hash_utils::create_hashes;
use datafusion::physical_plan::metrics::{
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::Partitioning::RoundRobinBatch;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream, Statistics,
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use futures::StreamExt;
use hashbrown::HashMap;
Expand Down Expand Up @@ -139,10 +142,11 @@ impl ShuffleWriterExec {
pub async fn execute_shuffle_write(
&self,
input_partition: usize,
runtime: Arc<RuntimeEnv>,
) -> Result<Vec<ShuffleWritePartition>> {
let now = Instant::now();

let mut stream = self.plan.execute(input_partition).await?;
let mut stream = self.plan.execute(input_partition, runtime).await?;

let mut path = PathBuf::from(&self.work_dir);
path.push(&self.job_id);
Expand Down Expand Up @@ -197,7 +201,7 @@ impl ShuffleWriterExec {

// we won't necessary produce output for every possible partition, so we
// create writers on demand
let mut writers: Vec<Option<ShuffleWriter>> = vec![];
let mut writers: Vec<Option<IPCWriter>> = vec![];
for _ in 0..num_output_partitions {
writers.push(None);
}
Expand Down Expand Up @@ -265,7 +269,7 @@ impl ShuffleWriterExec {
info!("Writing results to {}", path);

let mut writer =
ShuffleWriter::new(path, stream.schema().as_ref())?;
IPCWriter::new(path, stream.schema().as_ref())?;

writer.write(&output_batch)?;
writers[output_partition] = Some(writer);
Expand Down Expand Up @@ -350,9 +354,10 @@ impl ExecutionPlan for ShuffleWriterExec {

async fn execute(
&self,
input_partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
let part_loc = self.execute_shuffle_write(input_partition).await?;
partition: usize,
runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
let part_loc = self.execute_shuffle_write(partition, runtime).await?;

// build metadata result batch
let num_writers = part_loc.len();
Expand Down Expand Up @@ -432,55 +437,6 @@ fn result_schema() -> SchemaRef {
]))
}

struct ShuffleWriter {
path: String,
writer: FileWriter<File>,
num_batches: u64,
num_rows: u64,
num_bytes: u64,
}

impl ShuffleWriter {
fn new(path: &str, schema: &Schema) -> Result<Self> {
let file = File::create(path)
.map_err(|e| {
BallistaError::General(format!(
"Failed to create partition file at {}: {:?}",
path, e
))
})
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
path: path.to_owned(),
writer: FileWriter::try_new(file, schema)?,
})
}

fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows() as u64;
let num_bytes: usize = batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.sum();
self.num_bytes += num_bytes as u64;
Ok(())
}

fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(DataFusionError::ArrowError)
}

fn path(&self) -> &str {
&self.path
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -493,6 +449,8 @@ mod tests {

#[tokio::test]
async fn test() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());

let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?));
let work_dir = TempDir::new()?;
let query_stage = ShuffleWriterExec::try_new(
Expand All @@ -502,7 +460,7 @@ mod tests {
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
let mut stream = query_stage.execute(0).await?;
let mut stream = query_stage.execute(0, runtime).await?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Expand Down Expand Up @@ -545,6 +503,8 @@ mod tests {

#[tokio::test]
async fn test_partitioned() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());

let input_plan = create_input_plan()?;
let work_dir = TempDir::new()?;
let query_stage = ShuffleWriterExec::try_new(
Expand All @@ -554,7 +514,7 @@ mod tests {
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
let mut stream = query_stage.execute(0).await?;
let mut stream = query_stage.execute(0, runtime).await?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Expand Down
6 changes: 4 additions & 2 deletions ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use crate::serde::scheduler::PartitionLocation;

use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, Statistics,
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -102,7 +103,8 @@ impl ExecutionPlan for UnresolvedShuffleExec {
async fn execute(
&self,
_partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
_runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
Err(DataFusionError::Plan(
"Ballista UnresolvedShuffleExec does not support execution".to_owned(),
))
Expand Down
4 changes: 3 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion::datasource::PartitionedFile;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::{
window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
};
Expand All @@ -53,6 +54,7 @@ use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::sorts::sort::{SortExec, SortOptions};
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
Expand All @@ -72,7 +74,6 @@ use datafusion::physical_plan::{
limit::{GlobalLimitExec, LocalLimitExec},
projection::ProjectionExec,
repartition::RepartitionExec,
sort::{SortExec, SortOptions},
Partitioning,
};
use datafusion::physical_plan::{
Expand Down Expand Up @@ -626,6 +627,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
runtime_env: Arc::new(RuntimeEnv::default()),
};

let fun_expr = functions::create_physical_fun(
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod to_proto;
mod roundtrip_tests {
use std::{convert::TryInto, sync::Arc};

use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::{
arrow::{
compute::kernels::sort::SortOptions,
Expand All @@ -36,7 +37,6 @@ mod roundtrip_tests {
hash_aggregate::{AggregateMode, HashAggregateExec},
hash_join::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
sort::SortExec,
AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
PhysicalExpr,
},
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{cross_join::CrossJoinExec, ColumnStatistics};
use datafusion::physical_plan::{
expressions::{
Expand Down
9 changes: 3 additions & 6 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{
metrics, AggregateExpr, ExecutionPlan, Metric, PhysicalExpr, RecordBatchStream,
};
Expand Down Expand Up @@ -88,11 +89,7 @@ pub async fn write_stream_to_disk(
while let Some(result) = stream.next().await {
let batch = result?;

let batch_size_bytes: usize = batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.sum();
let batch_size_bytes: usize = batch_byte_size(&batch);
num_batches += 1;
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;
Expand Down
6 changes: 4 additions & 2 deletions ballista/rust/executor/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::arrow::{
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
};
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -75,11 +76,12 @@ impl ExecutionPlan for CollectExec {
async fn execute(
&self,
partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
runtime: Arc<RuntimeEnv>,
yjshen marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<SendableRecordBatchStream> {
assert_eq!(0, partition);
let num_partitions = self.plan.output_partitioning().partition_count();

let futures = (0..num_partitions).map(|i| self.plan.execute(i));
let futures = (0..num_partitions).map(|i| self.plan.execute(i, runtime.clone()));
let streams = futures::future::join_all(futures)
.await
.into_iter()
Expand Down
7 changes: 6 additions & 1 deletion ballista/rust/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};

Expand Down Expand Up @@ -71,7 +72,11 @@ impl Executor {
))
}?;

let partitions = exec.execute_shuffle_write(part).await?;
let runtime_config =
RuntimeConfig::new().with_local_dirs(vec![self.work_dir.clone()]);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);

let partitions = exec.execute_shuffle_write(part, runtime).await?;

println!(
"=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ mod test {
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
};
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,14 @@ async fn datafusion_sql_benchmarks(
}

async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
if debug {
println!("Optimized logical plan:\n{:?}", plan);
}
let physical_plan = ctx.create_physical_plan(&plan).await?;
let result = collect(physical_plan).await?;
let result = collect(physical_plan, runtime).await?;
if debug {
pretty::print_batches(&result)?;
}
Expand Down
Loading