Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace file format providers #2

Merged
merged 4 commits into from
Oct 12, 2021
Merged

Conversation

rdettai
Copy link
Owner

@rdettai rdettai commented Oct 8, 2021

Rationale for this change

As discussed in apache#1010, this PR replaces the old file format datasources with the new one (both table providers and execution plan)

What changes are included in this PR?

These are mostly mechanical changes that come from the replacement of the file format datasources. A few changes are noteworthy:

  • the way the default values are provided to the table provider is somewhat chaotic. Some parameters come from the context, some from the struct XxxReadOptions<'a> objects. This PR mimics the current behavior as much as possible, but a separate work should be conducted to make the configuration system more consistent and intuitive
  • the ExecutionContext.read_xxx and ExecutionContext.register_xxx are now async because they might require to infer the schema
  • the get_file_metadata service in Ballista does not return the list of files anymore

Are there any user-facing changes?

  • ExecutionContext.read_xxx and ExecutionContext.register_xxx now async

What changes are left for later?

When possible, the behavior was left to be the same. For this reason, some todos were left:

  • in Ballista, the schema should be resolved on the scheduler (calling a service like get_file_metadata) instead of locally
  • the way the configuration is passed around between the context and the logical plan builder lacks consitency (in particular in terms of default management). This should be improved.
  • the useage of PartitionedFile, ParquetPartition, FilePartition is not very clear. The structures should need to be simplified a bit.
  • the LocalFileSystem ObjectStore is hardcoded in Ballista

@github-actions github-actions bot added datafusion documentation Improvements or additions to documentation labels Oct 8, 2021
@rdettai rdettai changed the title [fix] replace file format providers in datafusion Replace file format providers in datafusion Oct 8, 2021
@rdettai rdettai changed the title Replace file format providers in datafusion Replace file format providers Oct 8, 2021
Copy link

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an epic PR @rdettai -- I like it a lot 👍 Really nice work; Thank you so much

I made it about half way through reviewing, but ran out of time for today -- I will try and finish this first thing tomorrow. So far it is looking great.

Given what I have seen so far, I think we should get merged into apache#1010 and then get that merged on to master.

@@ -76,10 +76,10 @@ use datafusion::arrow::record_batch::RecordBatch;
async fn main() -> datafusion::error::Result<()> {
// register the table
let mut ctx = ExecutionContext::new();
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?;
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think this kind of change is inevitable to support remote object stores.

Some(projection),
scan.batch_size as usize,
None,
)?))
scan.limit.as_ref().map(|sl| sl.limit as usize),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a drive by bug fix?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal here was to serialize the whole object, I wasn't aiming any specific bugfix. I don't think this was a bug earlier in the sense that the final result would be correct because the limit is also ensured by the LimitExec. But it was surprising indeed that the CsvExec was different before and after serialization 😄

Ok(Arc::new(ParquetExec::new(
partitions,
Arc::new(LocalFileSystem {}),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the use of the hard coded LocalFileSystem

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here, are we planning to incorporate object_store both in DataFusion and Ballista? or just in Datafusion core for scope limit? I hit on your design doc on how to incorporate object_store into Ballista several days ago, it that the scope of another PR due to a separate registry passing mechanism of object stores?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also guessing generalizing to support all object stores would be handled as a follow up PR after 1010 gets merged. I think hard coding localfilesystem here is perfectly fine for this refactor because it doesn't break any existing feature.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, the simplest solution I have to handle ObjectStore in Ballista is to do exactly the same way as for TableProvider: define the implementations we support in a predefined enum of the protobuf and serialize/deserialize to/from that. I haven't created that enum for now as we have only 1 variant.

If we want to move away from that, we will need:

  • to reference the ExecutionContext in the serde (requires some piping on the Ballista side)
  • reorganize the ExecutionContext into tiers as discussed in Expose a static object store registry apache/datafusion#1072 (comment) to avoid confusion
  • find a way to dynamically load the same static configs accross all components of Ballista (with the challenge of adding new object stores / table providers at build time)

@@ -272,6 +278,10 @@ impl SchedulerGrpc for SchedulerServer {
&self,
request: Request<GetFileMetadataParams>,
) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> {
// TODO support multiple object stores
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still a TODO?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it because it's hard coding to localfilesystem below :)

};
let schema = Arc::new(get_schema(table));

let options = ListingOptions {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a cool interface 👍

@@ -31,7 +31,7 @@ async fn query(ctx: &mut ExecutionContext, sql: &str) {
let rt = Runtime::new().unwrap();

// execute the query
let df = ctx.sql(sql).unwrap();
let df = rt.block_on(ctx.sql(sql)).unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change mean the benchmark time will now include the planning time (whereas before it did not)?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that criterion::black_box still only wraps the collect call, I think the sql planning shouldn't be inlcluded?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't change anything, to my knowledge rt.block_on is independent of Criterion. Also, sql() is only blocking for CreateExternalTable queries, so for normal queries it will actually never yield back to the runtime.

@yjshen
Copy link

yjshen commented Oct 12, 2021

the useage of PartitionedFile, ParquetPartition, FilePartition is not very clear. The structures should need to be simplified a bit.

I introduced PartitionedFile and FilePartition here: apache#932

PartitionedFile -> Single file (for the moment) or part of a file (later, part of the row groups or rows), and we may even extend this to include partition value and partition schema (see below) to support partitioned tables:
/path/to/table/root/p_date=20210813/p_hour=1200/xxxxx.parquet

FilePartition -> The basic unit for parallel processing, each task is responsible for processing one FilePartition which is composed of several PartitionFiles.

I think ParquetPartition is a legacy abstraction and we could use FilePartition instead directly in various scan physical plans. Since FilePartition and PartitionedFile are meant to be format agnostic.

@yjshen
Copy link

yjshen commented Oct 12, 2021

Let's merge it into apache#1010 and get this epic happen. Great work @rdettai ! 👍👍

Copy link

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

truly amazing work @rdettai , ready to go from me 👍 I can't wait to see this land in master. The new list table provider abstraction looks solid in practice :)

Schema schema = 3;
message CsvFormat {
bool has_header = 1;
string delimiter = 2;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker and something we can improve after everything gets merged into datafusion master, but just wanted to point out that I think it would be better to use bytes type here since string in rust is more expensive than bytes due to utf8 validation overhead. The overhead is not going to matter for this particular use-case, but just a good practice to keep. It will also simplify the byte_to_string and string_to_byte logic you have in the plan ser/de code.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! as it was a string already I didn't even look into it 😄. Personally, what I find rather surprising is that the delimiter is bound to be a single char and can't be an arbitrary string 😅

Ok(Arc::new(ParquetExec::new(
partitions,
Arc::new(LocalFileSystem {}),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also guessing generalizing to support all object stores would be handled as a follow up PR after 1010 gets merged. I think hard coding localfilesystem here is perfectly fine for this refactor because it doesn't break any existing feature.

@@ -272,6 +278,10 @@ impl SchedulerGrpc for SchedulerServer {
&self,
request: Request<GetFileMetadataParams>,
) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> {
// TODO support multiple object stores
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it because it's hard coding to localfilesystem below :)

@@ -31,7 +31,7 @@ async fn query(ctx: &mut ExecutionContext, sql: &str) {
let rt = Runtime::new().unwrap();

// execute the query
let df = ctx.sql(sql).unwrap();
let df = rt.block_on(ctx.sql(sql)).unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that criterion::black_box still only wraps the collect call, I think the sql planning shouldn't be inlcluded?

path: impl Into<String>,
projection: Option<Vec<usize>>,
target_partitions: usize,
table_name: impl Into<String>,
) -> Result<Self> {
let provider = Arc::new(ParquetTable::try_new(path, target_partitions)?);
Self::scan(table_name, provider, projection)
// TODO remove hard coded enable_pruning
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is not a breaking change and preserves the current behavior.

@rdettai
Copy link
Owner Author

rdettai commented Oct 12, 2021

Thank you all for reviewing this so quickly! 🚀

@rdettai rdettai merged commit 593c2f2 into refacto-providers Oct 12, 2021
rdettai added a commit that referenced this pull request Oct 12, 2021
* [fix] replace file format providers in datafusion

* [lint] clippy

* [fix] replace file format providers in ballista

* [fix] await in python wrapper
Copy link

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks epic -- really nice work @rdettai 👍

pub fn get_statistics_with_limit(
table_desc: &TableDescriptor,
/// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0))
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this appears to have been done

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanded up in apache#1010, sorry that!

#[derive(Debug, Clone)]
/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, this is done

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanded up in apache#1010, sorry that!


#[derive(Debug, Clone)]
/// A collection of files that should be read in a single task
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanded up in apache#1010, sorry that!

))),
}?;

let options = ListingOptions {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so cool


/// CSV file read option
#[derive(Copy, Clone)]
pub struct CsvReadOptions<'a> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to put these files into the same modules as the file format that uses them: file_format::avro::Avro etc

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I was kind of hesitant about this! These option struct are really meant to be specific to context.read_xxx and context.register_xxx APIs, to serve as documentation and default holders. I will probably do a separate PR to propose something a bit more coherent.

rdettai added a commit that referenced this pull request Oct 12, 2021
* [fix] replace file format providers in datafusion

* [lint] clippy

* [fix] replace file format providers in ballista

* [fix] await in python wrapper
rdettai pushed a commit that referenced this pull request Oct 15, 2021
* # This is a combination of 3 commits.
# This is the 1st commit message:

Add Display for Expr::BinaryExpr

# This is the commit message #2:

Update logical_plan/operators tests

# This is the commit message #3:

rebase and debug display for non binary expr

* Add Display for Expr::BinaryExpr

Update logical_plan/operators tests

rebase and debug display for non binary expr

Add Display for Expr::BinaryExpr

Update logical_plan/operators tests

Updating tests

Update aggregate display

Updating tests without aggregate

More tests

Working on agg/scalar functions

Fix binary_expr in create_name function and attendant tests

More tests

More tests

Doc tests

Rebase and update new tests

* Submodule update

* Restore submodule references from master

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
rdettai added a commit that referenced this pull request Oct 15, 2021
* [feat] stubs for provider re-organization

* [feat] implement infer_schema to make test pass

* [wip] trying to implement pruned_partition_list

* [typo]

* [fix] replace enum with trait for extensibility

* [fix] add partition cols to infered schema

* [feat] forked file format executors

avro still missing

* [doc] comments about why we are flattening

* [test] migrated tests to file formats

* [test] improve listing test

* [feat] add avro to refactored format providers

* [fix] remove try from new when unnecessary

* [fix] remove try_ from ListingTable new

* [refacto] renamed format module to file_format

also removed statistics from the PartitionedFile abstraction

* [fix] removed Ballista stubs

* [fix] rename create_executor

* [feat] added store

* [fix] Clippy

* [test] improve file_format tests with limit

* [fix] limit file system read size

* [fix] avoid fetching unnecessary stats after limit

* [fix] improve readability

* [doc] improve comments

* [refacto] keep async reader stub

* [doc] cleanup comments

* [test] test file listing

* [fix] add last_modified back

* [refacto] simplify csv reader exec

* [refacto] change SizedFile back to FileMeta

* [doc] comment clarification

* [fix] avoid keeping object store as field

* [refacto] grouped params to avoid too_many_arguments

* [fix] get_by_uri also returns path

* [fix] ListingTable at store level instead of registry

* [fix] builder take self and not ref to self

* Replace file format providers (#2)

* [fix] replace file format providers in datafusion

* [lint] clippy

* [fix] replace file format providers in ballista

* [fix] await in python wrapper

* [doc] clearer doc about why sql() is async

* [doc] typos and clarity

* [fix] missing await after rebase
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ballista datafusion documentation Improvements or additions to documentation python
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants