Skip to content

Commit

Permalink
[wip] phys plan ser in progr
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 8, 2021
1 parent c595913 commit dd7f599
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 349 deletions.
95 changes: 46 additions & 49 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ message SortExprNode {
// LogicalPlan is a nested type
message LogicalPlanNode {
oneof LogicalPlanType {
CsvTableScanNode csv_scan = 1;
ParquetTableScanNode parquet_scan = 2;
ListingTableScanNode listing_scan = 1;
ProjectionNode projection = 3;
SelectionNode selection = 4;
LimitNode limit = 5;
Expand All @@ -254,25 +253,13 @@ message LogicalPlanNode {
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
AvroTableScanNode avro_scan = 16;
}
}

message ProjectionColumns {
repeated string columns = 1;
}

message CsvTableScanNode {
string table_name = 1;
string path = 2;
bool has_header = 3;
string delimiter = 4;
string file_extension = 5;
ProjectionColumns projection = 6;
Schema schema = 7;
repeated LogicalExprNode filters = 8;
}

message Statistics {
int64 num_rows = 1;
int64 total_byte_size = 2;
Expand All @@ -282,30 +269,35 @@ message Statistics {

message PartitionedFile {
string path = 1;
Statistics statistics = 2;
uint64 size = 2;
}

message TableDescriptor {
string path = 1;
repeated PartitionedFile partition_files = 2;
Schema schema = 3;
message CsvFormat {
bool has_header = 1;
string delimiter = 2;
}

message ParquetTableScanNode {
string table_name = 1;
TableDescriptor table_desc = 2;
ProjectionColumns projection = 3;
repeated LogicalExprNode filters = 4;
uint32 target_partitions = 5;
message ParquetFormat {
bool enable_pruning = 1;
}

message AvroTableScanNode {
message AvroFormat {}

message ListingTableScanNode {
string table_name = 1;
string path = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
Schema schema = 5;
repeated LogicalExprNode filters = 6;
repeated string partitions = 7;
bool collect_stat = 8;
uint32 target_partitions = 9;
oneof FileFormatType {
CsvFormat csv = 10;
ParquetFormat parquet = 11;
AvroFormat avro = 12;
}
}

message ProjectionNode {
Expand Down Expand Up @@ -598,40 +590,45 @@ message FilterExecNode {
PhysicalExprNode expr = 2;
}

message ParquetPartition {
uint32 index = 1;
repeated PartitionedFile files = 2;
message FilePartition {
repeated PartitionedFile files = 1;
}

message ScanLimit {
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/tokio-rs/prost/issues/430 and https://github.com/tokio-rs/prost/pull/455)
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
oneof optional_limit {
uint32 limit = 1;
}
}

message ParquetScanExecNode {
repeated ParquetPartition partitions = 1;
repeated FilePartition partitions = 1;
Schema schema = 2;
repeated uint32 projection = 3;
uint32 batch_size = 4;
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
}

message CsvScanExecNode {
string path = 1;
repeated uint32 projection = 2;
Schema schema = 3;
string file_extension = 4;
bool has_header = 5;
uint32 batch_size = 6;
string delimiter = 7;

// partition filenames
repeated string filename = 8;
repeated PartitionedFile files = 1;
Schema schema = 2;
bool has_header = 3;
uint32 batch_size = 4;
string delimiter = 5;
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
}

message AvroScanExecNode {
string path = 1;
repeated uint32 projection = 2;
Schema schema = 3;
string file_extension = 4;
uint32 batch_size = 5;

// partition filenames
repeated string filename = 8;
repeated PartitionedFile files = 1;
Schema schema = 2;
uint32 batch_size = 4;
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
}

enum PartitionMode {
Expand Down
148 changes: 47 additions & 101 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ use crate::error::BallistaError;
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use crate::{convert_box_required, convert_required};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::datasource::parquet::{ParquetTable, ParquetTableDescriptor};
use datafusion::datasource::{PartitionedFile, TableDescriptor};
use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::object_store::{FileMeta, SizedFile};
use datafusion::datasource::PartitionedFile;
use datafusion::logical_plan::window_frames::{
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
Expand All @@ -32,10 +38,10 @@ use datafusion::logical_plan::{
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use protobuf::listing_table_scan_node::FileFormatType;
use protobuf::logical_plan_node::LogicalPlanType;
use protobuf::{logical_expr_node::ExprType, scalar_type};
use std::{
Expand Down Expand Up @@ -109,13 +115,8 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.build()
.map_err(|e| e.into())
}
LogicalPlanType::CsvScan(scan) => {
LogicalPlanType::ListingScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
let options = CsvReadOptions::new()
.schema(&schema)
.delimiter(scan.delimiter.as_bytes()[0])
.file_extension(&scan.file_extension)
.has_header(scan.has_header);

let mut projection = None;
if let Some(columns) = &scan.projection {
Expand All @@ -127,73 +128,48 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
projection = Some(column_indices);
}

LogicalPlanBuilder::scan_csv_with_name(
&scan.path,
let file_format: Arc<dyn FileFormat> =
match scan.file_format_type.ok_or_else(|| {
proto_error(format!(
"logical_plan::from_proto() Unsupported file format '{:?}'",
self
))
})? {
FileFormatType::Parquet(protobuf::ParquetFormat {
enable_pruning,
}) => Arc::new(
ParquetFormat::default().with_enable_pruning(enable_pruning),
),
FileFormatType::Csv(protobuf::CsvFormat {
has_header,
delimiter,
}) => Arc::new(
CsvFormat::default()
.with_has_header(has_header)
.with_delimiter(delimiter.as_bytes()[0]),
),
FileFormatType::Avro(..) => Arc::new(AvroFormat::default()),
};

let options = ListingOptions {
file_extension: scan.file_extension,
format: file_format,
partitions: scan.partitions,
collect_stat: scan.collect_stat,
target_partitions: scan.target_partitions as usize,
};

let provider = ListingTable {
object_store: Arc::new(LocalFileSystem {}),
options,
projection,
&scan.table_name,
)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::ParquetScan(scan) => {
let descriptor: TableDescriptor = convert_required!(scan.table_desc)?;
let projection = match scan.projection.as_ref() {
None => None,
Some(columns) => {
let schema = descriptor.schema.clone();
let r: Result<Vec<usize>, _> = columns
.columns
.iter()
.map(|col_name| {
schema.fields().iter().position(|field| field.name() == col_name).ok_or_else(|| {
let column_names: Vec<&String> = schema.fields().iter().map(|f| f.name()).collect();
proto_error(format!(
"Parquet projection contains column name that is not present in schema. Column name: {}. Schema columns: {:?}",
col_name, column_names
))
})
})
.collect();
Some(r?)
}
path: scan.path,
schema: Arc::new(schema),
};

let parquet_table = ParquetTable::try_new_with_desc(
Arc::new(ParquetTableDescriptor { descriptor }),
scan.target_partitions as usize,
true,
)?;
LogicalPlanBuilder::scan(
&scan.table_name,
Arc::new(parquet_table),
projection,
)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::AvroScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
let options = AvroReadOptions {
schema: Some(Arc::new(schema.clone())),
file_extension: &scan.file_extension,
};

let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| schema.index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}

LogicalPlanBuilder::scan_avro_with_name(
&scan.path,
options,
Arc::new(provider),
projection,
&scan.table_name,
)?
.build()
.map_err(|e| e.into())
Expand Down Expand Up @@ -336,36 +312,6 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
}
}

impl TryInto<TableDescriptor> for &protobuf::TableDescriptor {
type Error = BallistaError;

fn try_into(self) -> Result<TableDescriptor, Self::Error> {
let partition_files = self
.partition_files
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<PartitionedFile>, _>>()?;
let schema = convert_required!(self.schema)?;
Ok(TableDescriptor {
path: self.path.to_owned(),
partition_files,
schema: Arc::new(schema),
})
}
}

impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
type Error = BallistaError;

fn try_into(self) -> Result<PartitionedFile, Self::Error> {
let statistics = convert_required!(self.statistics)?;
Ok(PartitionedFile {
path: self.path.clone(),
statistics,
})
}
}

impl From<&protobuf::ColumnStats> for ColumnStatistics {
fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
ColumnStatistics {
Expand Down
Loading

0 comments on commit dd7f599

Please sign in to comment.