Skip to content

Commit

Permalink
Fix build errors
Browse files Browse the repository at this point in the history
Co-authored-by: Yijie Shen <henry.yijieshen@gmail.com>
  • Loading branch information
houqp and yjshen committed Sep 17, 2021
1 parent 3218759 commit a035200
Show file tree
Hide file tree
Showing 40 changed files with 1,162 additions and 862 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ members = [
]

exclude = ["python"]

[patch.crates-io]
arrow2 = { path = "/home/houqp/Documents/code/arrow/arrow2" }
43 changes: 20 additions & 23 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,11 @@ use crate::utils;
use crate::serde::protobuf::ShuffleWritePartition;
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
use async_trait::async_trait;
use datafusion::arrow::array::{
Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
UInt64Builder,
};
use datafusion::arrow::array::*;
use datafusion::arrow::compute::take;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::io::ipc::read::FileReader;
use datafusion::arrow::io::ipc::write::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_utils::create_hashes;
Expand Down Expand Up @@ -244,7 +241,7 @@ impl ShuffleWriterExec {
.collect::<Result<Vec<Arc<dyn Array>>>>()?;

let output_batch =
RecordBatch::try_new(input_batch.schema(), columns)?;
RecordBatch::try_new(input_batch.schema().clone(), columns)?;

// write non-empty batch out

Expand Down Expand Up @@ -356,18 +353,18 @@ impl ExecutionPlan for ShuffleWriterExec {

// build metadata result batch
let num_writers = part_loc.len();
let mut partition_builder = UInt32Builder::new(num_writers);
let mut path_builder = StringBuilder::new(num_writers);
let mut num_rows_builder = UInt64Builder::new(num_writers);
let mut num_batches_builder = UInt64Builder::new(num_writers);
let mut num_bytes_builder = UInt64Builder::new(num_writers);
let mut partition_builder = UInt32Vec::with_capacity(num_writers);
let mut path_builder = MutableUtf8Array::with_capacity(num_writers);
let mut num_rows_builder = UInt64Vec::with_capacity(num_writers);
let mut num_batches_builder = UInt64Vec::with_capacity(num_writers);
let mut num_bytes_builder = UInt64Vec::with_capacity(num_writers);

for loc in &part_loc {
path_builder.append_value(loc.path.clone())?;
partition_builder.append_value(loc.partition_id as u32)?;
num_rows_builder.append_value(loc.num_rows)?;
num_batches_builder.append_value(loc.num_batches)?;
num_bytes_builder.append_value(loc.num_bytes)?;
path_builder.push(Some(loc.path.clone()));
partition_builder.push(Some(loc.partition_id as u32));
num_rows_builder.push(Some(loc.num_rows));
num_batches_builder.push(Some(loc.num_batches));
num_bytes_builder.push(Some(loc.num_bytes));
}

// build arrays
Expand Down Expand Up @@ -428,17 +425,17 @@ fn result_schema() -> SchemaRef {
]))
}

struct ShuffleWriter {
struct ShuffleWriter<'a> {
path: String,
writer: FileWriter<File>,
writer: FileWriter<'a, File>,
num_batches: u64,
num_rows: u64,
num_bytes: u64,
}

impl ShuffleWriter {
impl<'a> ShuffleWriter<'a> {
fn new(path: &str, schema: &Schema) -> Result<Self> {
let file = File::create(path)
let mut file = File::create(path)
.map_err(|e| {
BallistaError::General(format!(
"Failed to create partition file at {}: {:?}",
Expand All @@ -451,7 +448,7 @@ impl ShuffleWriter {
num_rows: 0,
num_bytes: 0,
path: path.to_owned(),
writer: FileWriter::try_new(file, schema)?,
writer: FileWriter::try_new(&mut file, schema)?,
})
}

Expand Down Expand Up @@ -480,7 +477,7 @@ impl ShuffleWriter {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
use datafusion::arrow::array::{Utf8Array, StructArray, UInt32Array, UInt64Array};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::limit::GlobalLimitExec;
Expand Down
2 changes: 0 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use datafusion::physical_plan::{
expressions::{
col, Avg, BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr,
IsNullExpr, Literal, NegativeExpr, NotExpr, PhysicalSortExpr, TryCastExpr,
DEFAULT_DATAFUSION_CAST_OPTIONS,
},
filter::FilterExec,
functions::{self, BuiltinScalarFunction, ScalarFunctionExpr},
Expand Down Expand Up @@ -620,7 +619,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
ExprType::Cast(e) => Arc::new(CastExpr::new(
convert_box_required!(e.expr)?,
convert_required!(e.arrow_type)?,
DEFAULT_DATAFUSION_CAST_OPTIONS,
)),
ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
convert_box_required!(e.expr)?,
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::serde::scheduler::PartitionStats;

use crate::config::BallistaConfig;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
array::*,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ force_hash_collisions = []
[dependencies]
ahash = "0.7"
hashbrown = { version = "0.11", features = ["raw"] }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "43d8cf5c54805aa437a1c7ee48f80e90f07bc553", features = ["io_csv", "io_json", "io_parquet", "io_ipc", "io_print", "ahash", "merge_sort", "compute", "regex"] }
arrow = { package = "arrow2", version="0.5", features = ["io_csv", "io_json", "io_parquet", "io_ipc", "io_print", "ahash", "merge_sort", "compute", "regex"] }
parquet = { package = "parquet2", version = "0.4", default_features = false, features = ["stream"] }
sqlparser = "0.10"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {

let exec = MemoryExec::try_new(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
schema,
schema.clone(),
None,
)
.unwrap();
Expand Down
Loading

0 comments on commit a035200

Please sign in to comment.