Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File partitioning for ListingTable #1141

Merged
merged 18 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ message PartitionedFile {
string path = 1;
uint64 size = 2;
uint64 last_modified_ns = 3;
repeated ScalarValue partition_values = 4;
}

message CsvFormat {
Expand All @@ -294,7 +295,7 @@ message ListingTableScanNode {
ProjectionColumns projection = 4;
Schema schema = 5;
repeated LogicalExprNode filters = 6;
repeated string partitions = 7;
repeated string table_partition_cols = 7;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the partitions field to table_partition_cols to make it more explicit (cf #1141 (comment))

bool collect_stat = 8;
uint32 target_partitions = 9;
oneof FileFormatType {
Expand Down Expand Up @@ -613,33 +614,28 @@ message ScanLimit {
uint32 limit = 1;
}

message ParquetScanExecNode {
message FileScanExecConf {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Promoted the PhysicalPlanConfig entity to the physical_plan/file_format, which enables us to factorize a lot of configuration that is the same for all file formats. Here for instance we serialize the configuration in one common entity to avoid having the same parameters over and over in all XxxScanExecConf.

The rational for doing this here is that clippy was complaining more and more often that methods had too many arguments 😉

Suggestions on the naming of the config entity (PhysicalPlanConfig) or the serialized version (FileScanExecConf) are welcome 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me

repeated FileGroup file_groups = 1;
Schema schema = 2;
uint32 batch_size = 4;
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
uint32 batch_size = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this be back-compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you don't have DataFusion nodes with different versions, it should be ok!

repeated uint32 projection = 4;
ScanLimit limit = 5;
Statistics statistics = 6;
repeated string table_partition_cols = 7;
}

message ParquetScanExecNode {
FileScanExecConf base_conf = 1;
rdettai marked this conversation as resolved.
Show resolved Hide resolved
}

message CsvScanExecNode {
repeated FileGroup file_groups = 1;
Schema schema = 2;
bool has_header = 3;
uint32 batch_size = 4;
string delimiter = 5;
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
FileScanExecConf base_conf = 1;
bool has_header = 2;
string delimiter = 3;
}

message AvroScanExecNode {
repeated FileGroup file_groups = 1;
Schema schema = 2;
uint32 batch_size = 4;
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
FileScanExecConf base_conf = 1;
}

enum PartitionMode {
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
let options = ListingOptions {
file_extension: scan.file_extension.clone(),
format: file_format,
partitions: scan.partitions.clone(),
table_partition_cols: scan.table_partition_cols.clone(),
collect_stat: scan.collect_stat,
target_partitions: scan.target_partitions as usize,
};
Expand Down
7 changes: 5 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,11 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
.options()
.file_extension
.clone(),
partitions: listing_table.options().partitions.clone(),
path: listing_table.path().to_owned(),
table_partition_cols: listing_table
.options()
.table_partition_cols
.clone(),
path: listing_table.table_path().to_owned(),
schema: Some(schema),
projection,
filters,
Expand Down
127 changes: 66 additions & 61 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::serde::protobuf::ShuffleReaderPartition;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{from_proto_binary_op, proto_error, protobuf, str_to_byte};
use crate::{convert_box_required, convert_required, into_required};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use chrono::{TimeZone, Utc};
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
Expand All @@ -46,7 +46,9 @@ use datafusion::logical_plan::{
};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::file_format::{AvroExec, CsvExec, ParquetExec};
use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, ParquetExec, PhysicalPlanConfig,
};
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -118,64 +120,21 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.try_into()?;
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
}
PhysicalPlanType::CsvScan(scan) => {
let schema = Arc::new(convert_required!(scan.schema)?);
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let statistics = convert_required!(scan.statistics)?;

Ok(Arc::new(CsvExec::new(
Arc::new(LocalFileSystem {}),
scan.file_groups
.iter()
.map(|p| p.into())
.collect::<Vec<Vec<PartitionedFile>>>(),
statistics,
schema,
scan.has_header,
str_to_byte(&scan.delimiter)?,
Some(projection),
scan.batch_size as usize,
scan.limit.as_ref().map(|sl| sl.limit as usize),
)))
}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
scan.base_conf.as_ref().unwrap().try_into()?,
scan.has_header,
str_to_byte(&scan.delimiter)?,
))),
PhysicalPlanType::ParquetScan(scan) => {
let schema = Arc::new(convert_required!(scan.schema)?);
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let statistics = convert_required!(scan.statistics)?;

Ok(Arc::new(ParquetExec::new(
Arc::new(LocalFileSystem {}),
scan.file_groups
.iter()
.map(|p| p.into())
.collect::<Vec<Vec<PartitionedFile>>>(),
statistics,
schema,
Some(projection),
scan.base_conf.as_ref().unwrap().try_into()?,
// TODO predicate should be de-serialized
None,
scan.batch_size as usize,
scan.limit.as_ref().map(|sl| sl.limit as usize),
)))
}
PhysicalPlanType::AvroScan(scan) => {
let schema = Arc::new(convert_required!(scan.schema)?);
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let statistics = convert_required!(scan.statistics)?;

Ok(Arc::new(AvroExec::new(
Arc::new(LocalFileSystem {}),
scan.file_groups
.iter()
.map(|p| p.into())
.collect::<Vec<Vec<PartitionedFile>>>(),
statistics,
schema,
Some(projection),
scan.batch_size as usize,
scan.limit.as_ref().map(|sl| sl.limit as usize),
)))
}
PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
scan.base_conf.as_ref().unwrap().try_into()?,
))),
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(coalesce_batches.input)?;
Expand Down Expand Up @@ -738,9 +697,11 @@ pub fn parse_protobuf_hash_partitioning(
}
}

impl From<&protobuf::PartitionedFile> for PartitionedFile {
fn from(val: &protobuf::PartitionedFile) -> Self {
PartitionedFile {
impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
type Error = BallistaError;

fn try_from(val: &protobuf::PartitionedFile) -> Result<Self, Self::Error> {
Ok(PartitionedFile {
file_meta: FileMeta {
sized_file: SizedFile {
path: val.path.clone(),
Expand All @@ -752,13 +713,23 @@ impl From<&protobuf::PartitionedFile> for PartitionedFile {
Some(Utc.timestamp_nanos(val.last_modified_ns as i64))
},
},
}
partition_values: val
.partition_values
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
})
}
}

impl From<&protobuf::FileGroup> for Vec<PartitionedFile> {
fn from(val: &protobuf::FileGroup) -> Self {
val.files.iter().map(|f| f.into()).collect()
impl TryFrom<&protobuf::FileGroup> for Vec<PartitionedFile> {
type Error = BallistaError;

fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
val.files
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()
}
}

Expand Down Expand Up @@ -795,3 +766,37 @@ impl TryInto<Statistics> for &protobuf::Statistics {
})
}
}

impl TryInto<PhysicalPlanConfig> for &protobuf::FileScanExecConf {
type Error = BallistaError;

fn try_into(self) -> Result<PhysicalPlanConfig, Self::Error> {
let schema = Arc::new(convert_required!(self.schema)?);
let projection = self
.projection
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>();
let projection = if projection.is_empty() {
None
} else {
Some(projection)
};
let statistics = convert_required!(self.statistics)?;

Ok(PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: self
.file_groups
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?,
statistics,
projection,
batch_size: self.batch_size as usize,
limit: self.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_cols: vec![],
})
}
}
Loading