Skip to content

Commit

Permalink
Initial MemoryManager and DiskManager APIs for query execution + Exte…
Browse files Browse the repository at this point in the history
…rnal Sort implementation (#1526)

* Simplified memory management

* External sorter as an example

* document more

* use std::sync::Mutex instead of futures::lock::Mutex

* resolve comments

* Adding tests for ExternalSorter as well as DiskManager

* Adding test for memory manager

* Prevent allocate more memory than we actually have

* unnecessary unsafe

* fix lint

* Fix requesters_total update non-atomic

* Fix lint

* resolve comments
  • Loading branch information
yjshen authored Jan 13, 2022
1 parent b4c77e5 commit d7e465a
Show file tree
Hide file tree
Showing 84 changed files with 2,817 additions and 468 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl From<tokio::task::JoinError> for BallistaError {
}

impl Display for BallistaError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
BallistaError::NotImplemented(ref desc) => {
write!(f, "Not implemented: {}", desc)
Expand Down
4 changes: 3 additions & 1 deletion ballista/rust/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion::physical_plan::{
};

use async_trait::async_trait;
use datafusion::execution::runtime_env::RuntimeEnv;
use futures::future;
use futures::StreamExt;
use log::{error, info};
Expand Down Expand Up @@ -99,7 +100,8 @@ impl ExecutionPlan for DistributedQueryExec {
async fn execute(
&self,
partition: usize,
) -> datafusion::error::Result<SendableRecordBatchStream> {
_runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
assert_eq!(0, partition);

info!("Connecting to Ballista scheduler at {}", self.scheduler_url);
Expand Down
7 changes: 5 additions & 2 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Metric, Partitioning, Statistics,
DisplayFormatType, ExecutionPlan, Metric, Partitioning, SendableRecordBatchStream,
Statistics,
};
use datafusion::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -100,7 +102,8 @@ impl ExecutionPlan for ShuffleReaderExec {
async fn execute(
&self,
partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
_runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
info!("ShuffleReaderExec::execute({})", partition);

let fetch_time =
Expand Down
76 changes: 18 additions & 58 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@ use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::common::IPCWriter;
use datafusion::physical_plan::hash_utils::create_hashes;
use datafusion::physical_plan::metrics::{
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::Partitioning::RoundRobinBatch;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream, Statistics,
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use futures::StreamExt;
use hashbrown::HashMap;
Expand Down Expand Up @@ -139,10 +142,11 @@ impl ShuffleWriterExec {
pub async fn execute_shuffle_write(
&self,
input_partition: usize,
runtime: Arc<RuntimeEnv>,
) -> Result<Vec<ShuffleWritePartition>> {
let now = Instant::now();

let mut stream = self.plan.execute(input_partition).await?;
let mut stream = self.plan.execute(input_partition, runtime).await?;

let mut path = PathBuf::from(&self.work_dir);
path.push(&self.job_id);
Expand Down Expand Up @@ -197,7 +201,7 @@ impl ShuffleWriterExec {

// we won't necessary produce output for every possible partition, so we
// create writers on demand
let mut writers: Vec<Option<ShuffleWriter>> = vec![];
let mut writers: Vec<Option<IPCWriter>> = vec![];
for _ in 0..num_output_partitions {
writers.push(None);
}
Expand Down Expand Up @@ -265,7 +269,7 @@ impl ShuffleWriterExec {
info!("Writing results to {}", path);

let mut writer =
ShuffleWriter::new(path, stream.schema().as_ref())?;
IPCWriter::new(path, stream.schema().as_ref())?;

writer.write(&output_batch)?;
writers[output_partition] = Some(writer);
Expand Down Expand Up @@ -350,9 +354,10 @@ impl ExecutionPlan for ShuffleWriterExec {

async fn execute(
&self,
input_partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
let part_loc = self.execute_shuffle_write(input_partition).await?;
partition: usize,
runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
let part_loc = self.execute_shuffle_write(partition, runtime).await?;

// build metadata result batch
let num_writers = part_loc.len();
Expand Down Expand Up @@ -432,55 +437,6 @@ fn result_schema() -> SchemaRef {
]))
}

struct ShuffleWriter {
path: String,
writer: FileWriter<File>,
num_batches: u64,
num_rows: u64,
num_bytes: u64,
}

impl ShuffleWriter {
fn new(path: &str, schema: &Schema) -> Result<Self> {
let file = File::create(path)
.map_err(|e| {
BallistaError::General(format!(
"Failed to create partition file at {}: {:?}",
path, e
))
})
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Ok(Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
path: path.to_owned(),
writer: FileWriter::try_new(file, schema)?,
})
}

fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows() as u64;
let num_bytes: usize = batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.sum();
self.num_bytes += num_bytes as u64;
Ok(())
}

fn finish(&mut self) -> Result<()> {
self.writer.finish().map_err(DataFusionError::ArrowError)
}

fn path(&self) -> &str {
&self.path
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -493,6 +449,8 @@ mod tests {

#[tokio::test]
async fn test() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());

let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?));
let work_dir = TempDir::new()?;
let query_stage = ShuffleWriterExec::try_new(
Expand All @@ -502,7 +460,7 @@ mod tests {
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
let mut stream = query_stage.execute(0).await?;
let mut stream = query_stage.execute(0, runtime).await?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Expand Down Expand Up @@ -545,6 +503,8 @@ mod tests {

#[tokio::test]
async fn test_partitioned() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());

let input_plan = create_input_plan()?;
let work_dir = TempDir::new()?;
let query_stage = ShuffleWriterExec::try_new(
Expand All @@ -554,7 +514,7 @@ mod tests {
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
let mut stream = query_stage.execute(0).await?;
let mut stream = query_stage.execute(0, runtime).await?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Expand Down
6 changes: 4 additions & 2 deletions ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use crate::serde::scheduler::PartitionLocation;

use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, Statistics,
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -102,7 +103,8 @@ impl ExecutionPlan for UnresolvedShuffleExec {
async fn execute(
&self,
_partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
_runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
Err(DataFusionError::Plan(
"Ballista UnresolvedShuffleExec does not support execution".to_owned(),
))
Expand Down
4 changes: 3 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion::datasource::PartitionedFile;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::{
window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
};
Expand All @@ -53,6 +54,7 @@ use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::sorts::sort::{SortExec, SortOptions};
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
Expand All @@ -72,7 +74,6 @@ use datafusion::physical_plan::{
limit::{GlobalLimitExec, LocalLimitExec},
projection::ProjectionExec,
repartition::RepartitionExec,
sort::{SortExec, SortOptions},
Partitioning,
};
use datafusion::physical_plan::{
Expand Down Expand Up @@ -626,6 +627,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
runtime_env: Arc::new(RuntimeEnv::default()),
};

let fun_expr = functions::create_physical_fun(
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod to_proto;
mod roundtrip_tests {
use std::{convert::TryInto, sync::Arc};

use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::{
arrow::{
compute::kernels::sort::SortOptions,
Expand All @@ -36,7 +37,6 @@ mod roundtrip_tests {
hash_aggregate::{AggregateMode, HashAggregateExec},
hash_join::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
sort::SortExec,
AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
PhysicalExpr,
},
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{cross_join::CrossJoinExec, ColumnStatistics};
use datafusion::physical_plan::{
expressions::{
Expand Down
9 changes: 3 additions & 6 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{
metrics, AggregateExpr, ExecutionPlan, Metric, PhysicalExpr, RecordBatchStream,
};
Expand Down Expand Up @@ -88,11 +89,7 @@ pub async fn write_stream_to_disk(
while let Some(result) = stream.next().await {
let batch = result?;

let batch_size_bytes: usize = batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.sum();
let batch_size_bytes: usize = batch_byte_size(&batch);
num_batches += 1;
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;
Expand Down
6 changes: 4 additions & 2 deletions ballista/rust/executor/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::arrow::{
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
};
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -75,11 +76,12 @@ impl ExecutionPlan for CollectExec {
async fn execute(
&self,
partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
assert_eq!(0, partition);
let num_partitions = self.plan.output_partitioning().partition_count();

let futures = (0..num_partitions).map(|i| self.plan.execute(i));
let futures = (0..num_partitions).map(|i| self.plan.execute(i, runtime.clone()));
let streams = futures::future::join_all(futures)
.await
.into_iter()
Expand Down
7 changes: 6 additions & 1 deletion ballista/rust/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};

Expand Down Expand Up @@ -71,7 +72,11 @@ impl Executor {
))
}?;

let partitions = exec.execute_shuffle_write(part).await?;
let runtime_config =
RuntimeConfig::new().with_local_dirs(vec![self.work_dir.clone()]);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);

let partitions = exec.execute_shuffle_write(part, runtime).await?;

println!(
"=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ mod test {
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
};
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,14 @@ async fn datafusion_sql_benchmarks(
}

async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
if debug {
println!("Optimized logical plan:\n{:?}", plan);
}
let physical_plan = ctx.create_physical_plan(&plan).await?;
let result = collect(physical_plan).await?;
let result = collect(physical_plan, runtime).await?;
if debug {
pretty::print_batches(&result)?;
}
Expand Down
Loading

0 comments on commit d7e465a

Please sign in to comment.