Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize table providers by table format #1010

Merged
merged 39 commits into from
Oct 13, 2021

Conversation

rdettai
Copy link
Contributor

@rdettai rdettai commented Sep 16, 2021

Which issue does this PR close?

Closes #1009.

Rationale for this change

Currently the TableProvider implementations are split by file format (Parquet, CSV...). One other solution would be to organize TableProviders would be by table format (file system listing, Iceberg, Delta). This is discussed in this design document.

What changes are included in this PR?

  • this change will delete the file format specific implementation of TableProvider, they are replaced with:
    • a ListingTable implementation of TableProvider that finds files by using the "list" functionality of the file system
    • a FileFormat abstraction and implementations for each file format
  • this change will remove the TableDescriptor abstraction
  • this change will replace the GetFileMetadata Ballista rpc endpoint with GetSchema

Errata: I propose not to remove the old implementations but only add the new ones. We will remove the old ones in a separate PR!

Are there any user-facing changes?

The current implementations of TableProvider will be replaced, but this will partly be abstracted by methods such as ExecutionContext.read_parquet() or ExecutionContext.read_sql()

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Sep 16, 2021
@rdettai
Copy link
Contributor Author

rdettai commented Sep 16, 2021

  • @yjshen this change is impacting your recent changes as it removes the TableDescriptor that you recently added
  • @houqp @alamb this change is still mostly an empty skeleton of what the new structure would be, but I would like to first discuss this before moving further with the implementation

@alamb
Copy link
Contributor

alamb commented Sep 16, 2021

TLDR: I wonder "if DataFusion planning was async would you be able to implement the table format as you would like"?

I really like the idea of splitting the details of reading formats from the "metadata management" (called "table format" in the linked doc) so that users of DataFusion can extend DataFusion to manage metadata in ways suited to their use.

I thought, however, we were headed towards a slightly different abstraction where would still have a ParquetReader that didn't use Path / File directly, but instead would use the ObjectStore abstraction recently added by @yjshen.

In terms of the document, https://docs.google.com/document/d/1Bd4-PLLH-pHj0BquMDsJ6cVr_awnxTuvwNJuWsTHxAQ/edit?usp=sharing, my biggest takeaway was that:

  1. Any TableProvider needs to provide the schema (columns, types) without almost any information from the query to create a LogicalPlan

  2. The details of how statistics() and scan() work will be different based on:

  • The actual file format (parquet, json, etc)
  • The cost of accessing the statistics and creating requested ExecutionPlans (e.g. a bunch of remote files on S3 vs cached in memory copies)

At the moment, the user has to synchronously create a TableProvider for each named table in the query and (synchronously) provide the Schema, as well as synchronously provide statistics. The creating of ExecutionPlans via execute is already async.

This, I am wondering if the fetching of the TableProvider and statistics creation was async, would that be sufficient?

@rdettai
Copy link
Contributor Author

rdettai commented Sep 16, 2021

I thought, however, we were headed towards a slightly different abstraction where would still have a ParquetReader that didn't use Path / File directly, but instead would use the ObjectStore abstraction recently added by @yjshen.

Correct, that will be the next step

TLDR: I wonder "if DataFusion planning was async would you be able to implement the table format as you would like"?

Yes, that would really bring a huge amount of flexibility. A funny example: I have just added a sketch implementation of the partition pruning algorithm. One interesting approach is to load the partitions into a RecordBatch to be able to run the pushed down filter on it. DataFusion inside Datafusion! But we are stuck because that requires async. Too many APIs are async in the rust ecosystem, we want to be able to use them in the planning 😄

I am going to try to make the TableProvider.scan() method async, and if it works I'll submit that in a separate PR.

@alamb
Copy link
Contributor

alamb commented Sep 16, 2021

Too many APIs are async in the rust ecosystem, we want to be able to use them in the planning 😄

Once async has creeped into the stack, I think it ends up getting pretty far down..

@rdettai
Copy link
Contributor Author

rdettai commented Sep 21, 2021

@houqp @alamb @yjshen this is now getting closer to what I am targeting. Some notes for anyone who would want to go through the code:

  • I have duplicated the TableProvider and the ExecutionPlan implementations for the file formats (parquet, csv, json). The objective is to remove the old ones once this is functional.
  • In general, I am trying to minimize the number of constructor overloading because I find them very hard to read and test
  • The ListingTable does not support partitioning yet, but the stubs are there to show how and where it will be implemented. I don't plan to include implementations in this PR.
  • The ListingTable and FileFormat implems still use the file system calls. The next step (another PR) will be to use the ObjectStore.

Note that I am really trying not to add any feature, just re-organize the code. But I think that ones this is done, it will be fairly easy to add the Hive partitioning implem and use the ObjectStore.

I know it is a lot to ask (as this is again a fairly large change) but it would be great if you could take a "quick" look and at least validate the overall approach. Thanks 😄

@alamb
Copy link
Contributor

alamb commented Sep 22, 2021

I hope to have a look at this PR later today

Comment on lines +2110 to +2246

#[test]
fn combine_zero_filters() {
let result = combine_filters(&[]);
assert_eq!(result, None);
}

#[test]
fn combine_one_filter() {
let filter = binary_expr(col("c1"), Operator::Lt, lit(1));
let result = combine_filters(&[filter.clone()]);
assert_eq!(result, Some(filter));
}

#[test]
fn combine_multiple_filters() {
let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1));
let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2));
let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3));
let result =
combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]);
assert_eq!(result, Some(and(and(filter1, filter2), filter3)));
}
Copy link
Contributor Author

@rdettai rdettai Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests were in the wrong place (datasource::parquet.rs), moving them here to avoid losing them

@rdettai rdettai marked this pull request as ready for review September 22, 2021 15:50
@rdettai
Copy link
Contributor Author

rdettai commented Sep 22, 2021

If you agree with this new way of organizing the file format datasources, my feeling would be that we should merge this as is. We can then remove the existing parquet/csv/json/avro TableProvider implems and ExecutionPlan implems and rewire all the tests/examples/benchmarks to the new ones in a separate PR:

  • if we realize while trying to migrate all the tests/examples/benchmarks that won't make it, we can easily revert by simply deleting the datasource::file_format modules and the execution_plan::file_format
  • the PRs will be easier to review

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not review all the new code carefully (yet) but instead initially focused on the structure.

TLDR is I (really) like it -- nice work @rdettai

For any other reviewers, I suggest looking at the FileFormat trait and then the organization of ListingTable.

My biggest question is "why not use the ObjectStore interface in ListingTable (and maybe call it `ObjectStoreTable)?

It is my opinion that if ListingTable used the ObjectStore interface, it would be reasonable to merge this PR: Even though it has substantial duplication with existing code, using the ObjectStore interface would mean it also had new features.

If we are going to leave ListingTable file backed, I think it would make sense to remove the old code paths as part of the same PR (so we can use existing end-to-end tests, etc)

// specific language governing permissions and limitations
// under the License.

//! A table that uses the `ObjectStore` listing capability
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it is still a "TODO" to use ObjectStore in this module

What do you think about doing that in this PR? The use of ObjectStore seems like one of the key outcomes of this restructuring work to me, so I do wonder about postponing it (especially as this proposed PR is additive and doesn't change the existing providers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your are right. I didn't want to add ObjectStore useage yet to keep the PR simpler, but it definitively makes sense to do it all at once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting turn of event, I just realized that the LocalFileReader reader from ObjectReader isn't fully implemented yet. 🙃

@houqp
Copy link
Member

houqp commented Sep 23, 2021

I haven't had the time to take a close look at all the changes yet, I am running out of time so I plan to do it tomorrow. From a quick glance, I think the design is solid and on the right track, great work @rdettai 👍

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rdettai, Thanks a lot for your work! I have several higher-level questions:

  1. How do you deal with a table with mixed file format? I didn't get the possibilities in the current implementation. I perceive it as the biggest benefit from this reorganization.

  2. I agree with @alamb on putting ObjectStore listing API in use in this PR, I think we have all prerequisites needed to use it for real. Since you already made scan in TableProvider async and doing the refactor of data sources in this one.

  3. (From my own perspective while reading code) Will this refactor wipe out all the git history for each source file? both in datasource module and in physical_plan module?

    I think at least we should remove the deprecated ones in the same PR to preserve git history for each file? I regard the git history as precious documentation for understanding code as a newcomer.

    Again, does it make much sense to reorder the ones in physcial_plan? I didn't quite get the reason here.

datafusion/src/datasource/file_format/mod.rs Outdated Show resolved Hide resolved
pub trait FileFormat: Send + Sync {
/// Open the files at the paths provided by iterator and infer the
/// common schema
async fn infer_schema(&self, paths: StringStream) -> Result<SchemaRef>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe FileMetaStream as arguments instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think it will get to that while I add the ObjectStore abstraction

/// For example `Vec["a", "b"]` means that the two first levels of
/// partitioning expected should be named "a" and "b":
/// - If there is a third level of partitioning it will be ignored.
/// - Files that don't follow this partitioning will be ignored.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the comments here, we haven't think about how to deal with default partition semantics yet.

I.e while inserting a table with a partition column values NULL, how do we deal with a=20210922/b=NULL/1.parquet? how to differentiate if the NULL is a valid string value or it denotes none exists?

I recall Hive have __HIVE_DEFAULT_PARTITION__ for this purpose.

Copy link
Contributor Author

@rdettai rdettai Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I know, there isn't an official standard regarding this partitioning. The feature set that I am proposing here is pretty basic:

  • you should know the partitions in advance an specify them
  • files that don't comply are ignored
  • all partition values will be read with type string

I believe this feature set is simple to implement and understand, it is predictable, and it covers most usecases:

  • if your partitioning type is not string, you can cast it later in your query as you want
  • if you have encoded NULL somehow, you can also parse that with a CASE expression in your query
  • performance wise, we can have an equally efficient materialization into an Arrow array than most types if we use dictionary encoding

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall Hive have HIVE_DEFAULT_PARTITION for this purpose.

This is correct, hive, and spark inherited from this, uses __HIVE_DEFAULT_PARTITION__ to denote null partition value by default. I feel like we can make this configurable and use __HIVE_DEFAULT_PARTITION__ as the default for compatibility with other systems.

From what I know, there isn't an official standard regarding this partitioning.

I think so too. AFAIK, there is no consensus in the big data ecosystem on a formal partition file path ser/de convention. Most systems are just trying to replicate what hive has.

if your partitioning type is not string, you can cast it later in your query as you want

Could you give us an example on what the UX would look like? How does a user perform partition value type casting in a query? Did you mean user need to manually cast the filter value to string if the filter is applied to a partition column?

Same for the CASE expression for handling NULLs. It would be helpful to have an example.

I think we should probably put more thoughts into this since this design decision will have a big impact to downstream query UX.

/// - Files that don't follow this partitioning will be ignored.
/// Note that only `DataType::Utf8` is supported for the column type.
/// TODO implement case where partitions.len() > 0
pub partitions: Vec<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have partitions a schema instead of Vec<String>, and introduce a cast if needed from path string to its partition_values or Vec<ScalarValue>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdettai
Copy link
Contributor Author

rdettai commented Sep 23, 2021

Thanks for all your great feedback!

How do you deal with a table with mixed file format? I didn't get the possibilities in the current implementation. I perceive it as the biggest benefit from this reorganization.

Frankly speaking, I have never met a case of mixed file formats, so I wouldn't really know what is important to take into consideration. Can you describe your usecase precisely? Do you have an example of API that supports it?

I agree with @alamb on putting ObjectStore listing API in use in this PR, I think we have all prerequisites needed to use it for real. Since you already made scan in TableProvider async and doing the refactor of data sources in this one.

Sold! I'm working in it 😄

Will this refactor wipe out all the git history for each source file? both in datasource module and in physical_plan module? I think at least we should remove the deprecated ones in the same PR to preserve git history for each file?

I think that the change is so massive that git will not recognize it as the previous files anyway, especially in the datasource module. This is a bit annoying but not sure we can do much about it 😬

Again, does it make much sense to reorder the ones in physcial_plan? I didn't quite get the reason here.

By "reorder", you mean moving them to a separate folder? The physical_plan folder is pretty huge, so I thought restructuring it a bit wouldn't harm. Also, it creates a nice symmetry with the folder structure in datasource.

I think at least we should remove the deprecated ones in the same PR to preserve git history for each file?

We can, but it would make this PR huuuuuuuuge. From the development side, that's ok, but for the reviewers it might get tricky. @alamb @houqp what do you think about it?

@yjshen
Copy link
Member

yjshen commented Sep 23, 2021

Frankly speaking, I have never met a case of mixed file formats, so I wouldn't really know what is important to take into consideration. Can you describe your usecase precisely? Do you have an example of API that supports it?

I may get this wrong if you are not implying it's possible that a table can have its data in different file formats in the original doc.

I thought data lake implementations may store regular data in a columnar format such as parquet, and deltas (add or removes) in a row-based format like Avro or JSON, which makes me feel similar to SAP HANA http://www.vldb.org/pvldb/vol5/p061_jenskrueger_vldb2012.pdf and HyPer http://db.in.tum.de/downloads/publications/datablocks.pdf
If it is not the case for existing implementations and not the intention of the PR, please just ignore this one.

By "reorder", you mean moving them to a separate folder? The physical_plan folder is pretty huge, so I thought restructuring it a bit wouldn't harm.

Yes, I find it hard to tell what has been removed and what are the additions for the physical_plan files.

I diff the files manually and find out try_new_from_readers have been removed from JSON/Avro/CSV, I think it is worth some reasons for the removal to avoid something just slipped away silently. (because I'm not familiar with the use case of these existing readers).

For the removal of TableDescriptor I think is fine since we now have ListingTable's scan method that takes care of listing and partition files.

But I what really want to argue is: why should we use diff manually and leave git diff away? Where can I find the original PR and related issue while reading code but it points to a reorganize PR with little information about it?
Do I have to browse through all the discussions in this PR to find out another separate PR that contains the removal of the stale code? And dig again into the history of that removed file and finally find the right background I need?

If we are following the rule that each PR should target at one problem, why should we tell apart reorganize PR into one addition and one stale-removal? And why do we involve restructure the physical_plan module for "nice symmetry" but leave "git will not recognize it as the previous files "?

@rdettai
Copy link
Contributor Author

rdettai commented Sep 23, 2021

I may get this wrong if you are not implying it's possible that a table can have its data in different file formats in the original doc

I was not implying anything! 😄 I just didn't have an example in mind. Thanks for the pointers! I think that handling these cases will require creating dedicated ExecutionPlans and do some further refactoring of the execution plan to allow re-utilization of the file format specific code.

But I what really want to argue is: why should we use diff manually and leave git diff away?

I do know that git history is very important, I am not arguing that 😃. Do we all agree that we should directly replace the old implementations in this PR, even if it means that we will need to modify a large part of the code base at once? Does someone have a better split for this PR in mind?

PS: just opened a new branch with the addition of the ObjectStore. I will pull it into this one once (and if) I am satisfied with the result.

@alamb
Copy link
Contributor

alamb commented Sep 23, 2021

I have never met a case of mixed file formats, so I wouldn't really know what is important to take into consideration

For what it is worth, I think the design of this PR makes it easier to support a mixed file format TableProvider in the future, even though ListingProvider may not do so at the moment. Introducing the FileFormat abstraction will allow someone to create something like MixedProvider that then calls the appropriate FileFormats if they so desire.

I do know that git history is very important, I am not arguing that 😃. Do we all agree that we should directly replace the old implementations in this PR, even if it means that we will need to modify a large part of the code base at once?

I can see both sides here (one massive PR vs multiple smaller PRs) and they both involve tradeoffs. Some thoughts on making reviewss easier:

  1. Make separate PRs on to this branch (as you seem to be planning for the ObjectStore use)
  2. Keep the commits separated into logical chunks

Regardless of the route, I wonder if this PR is one we may want to draw some extra attention to

@houqp
Copy link
Member

houqp commented Sep 24, 2021

To @yjshen 's point of supporting mixed file formats, it's totally a valid use-case and a widely adopted practice. But I agree with @alamb that we don't need to make the list table do everything now. We can keep it simple to only support partitioned table with single file format. For these more complex mixed file format table formats, they typically have their own specific file organization and schema management logics, so it would be hard to come up with one table provider to capture them all. For example, Hudi uses avro and parquet, DeltaLake uses parquet and json, etc. The better approach in my mind is to create table format specific providers for each of these specialized table formats as plugins.

I also think there is value in grouping logically coupled changes into a single commit so it's easier to do git diff. I care less about whether git is smart enough to keep track of the file rename, but at least being able to do a git blame and find all logical related changes to this single commit helps a lot. I totally understand how painful it is to do a large PR because of the needs of having to constantly manage merge conflicts, so I am happy to help if there is anything I can do on my end to alleviate your pain. Just let me know ;) I think it's a good tradeoff do a bit of extra work now for the peace of mind in the future.

@rdettai
Copy link
Contributor Author

rdettai commented Sep 24, 2021

Thank you again for your feedback. I don't mind having to manage a large PR, my only concern is the reviewer experience as it is crucial to allow us to lead this to the merge 😉. I have created a PR to the PR rdettai#1 with the addition of the ObjectStore abstraction.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I too think we should try to merge this into master as soon as we could because it's now at risk of conflicts.

@houqp houqp added the api change Changes the API exposed to users of the crate label Oct 13, 2021
Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@alamb alamb merged commit 2454e46 into apache:master Oct 13, 2021
@alamb
Copy link
Contributor

alamb commented Oct 13, 2021

🚀 -- thank you so much @rdettai / @houqp / @yjshen and everyone else who worked to make this happen

@Dandandan
Copy link
Contributor

Wow thank you so much @rdettai truly astonishing work!

@houqp
Copy link
Member

houqp commented Oct 14, 2021

Indeed, legendary PR @rdettai ! Also really cool to see @yjshen 's object store in action.

Now the arrow2 branch merge is going to be really fun :D


/// The configurations to be passed when creating a physical plan for
/// a given file format.
pub struct PhysicalPlanConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileFormatConfig maybe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reorganize table providers by table format
5 participants