Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12290: [Rust][DataFusion] Add input_file_name function [WIP] #9976

Closed
wants to merge 2 commits into from

Conversation

seddonm1
Copy link
Contributor

@alamb @jorgecarleitao @andygrove

This is a WIP start of work to re-implement the input_file_name function by not using the metadata of the RecordBatch and instead relying on plan traversal. Inspired by @jorgecarleitao's comment: #9944 (review) I have tried to initially extend the TableProvider to capture this information by means of the Statistics.

This code collects statistics at the partition level not the table level and provides methods for calculating the table level statistics. It can easily be extended to do things like TableProvider::file_name(partition: usize) to get a specific partition filename. Calculating things like distinct_count per column will be a fun future problem and may require something like HyperLogLog.

This code also assumes that a 1:1 mapping of Parquet file to logical partition - but it still has a lot of the initial implementation for future many:one mapping of files to partition. I think it would be fair to fix a 1:1 mapping of file:partition and then down the line a repartition operator used to merge them. If agreed I can make those changes.

I don't like to make these changes in a vacuum and the mailing list is not ideal to demonstrate ideas so I hoped to review here.

@seddonm1 seddonm1 marked this pull request as draft April 10, 2021 04:04
@github-actions
Copy link

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seddonm1 I like where this PR is headed.

It might be worth testing / prototyping the mechanism to get the PartitionStatistics to input_filename() before you polish up this approach. I am specifically wondering what would happen if data went through the repartition operator (e.g. when the repartition optimization, #9865 from @Dandandan, is enabled ) - it seems like the mapping of "output partition" to "input partition" may not be 1:1.

Calculating things like distinct_count per column will be a fun future problem and may require something like HyperLogLog.

I agree calculating distinct counts should be left for some future time

This code also assumes that a 1:1 mapping of Parquet file to logical partition - but it still has a lot of the initial implementation for future many:one mapping of files to partition. I think it would be fair to fix a 1:1 mapping of file:partition and then down the line a repartition operator used to merge them.

I am a little unclear why we would want to do a many:1 mapping of parquet files --> partition. If the goal is to limit the number of concurrently open / being scanned parquet files based on how fast downstream operators can consume the data and the resources available, we may want to consider other approaches that work for things other than parquet files (e.g. a more sophisticated execution scheduler)

@@ -271,62 +249,18 @@ impl ParquetExec {
.collect(),
);

// sum the statistics
let mut num_rows: Option<usize> = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how you have consolidated the statistics accumulation / cross partition summarization into a central place.

@alamb
Copy link
Contributor

alamb commented Apr 10, 2021

@seddonm1 can you remind me again why adding a non-persisted field to each Rust RecordBatch (for example RecordBatch::runtime_exension) was rejected. I can't help thinking that implementing an accurate input_filename with all the data reorganization that can happen during plan time is going to require runtime (not just plan time) support.

Sorry for muddying the waters,

@seddonm1
Copy link
Contributor Author

@alamb Thanks for the review and I wanted to raise this early as I appreciate the feedback (even if we end up closing this PR).

The RecordBatch metadata is definitely the easiest mechanism for attaching this data but I do agree with Jorge that the modifications to the Arrow crate are undesirable.

As this is ultimately a lineage type capability (by traversing the plan) I have checked how Spark implements it and it will throw an error: 'input_file_name' does not support more than one sources for queries not against the simple use case (i.e. direct select against a TableProvider) - for example if trying to execute it in a SQL query with multiple tables.

I will have a go trying to make basic functionality work to ensure I'm not off on a wild goose chase.

@seddonm1
Copy link
Contributor Author

@jorgecarleitao
So my challenge is this function signature:

/// Scalar function
pub type ScalarFunctionImplementation = Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;

which is required for the physical expression:

/// Physical expression of a scalar function
pub struct ScalarFunctionExpr {
    fun: ScalarFunctionImplementation,
    name: String,
    args: Vec<Arc<dyn PhysicalExpr>>,
    return_type: DataType,
}

I can either change this type alias to accept additional parameters (in this case I could also pass the LogicalPlan) which would require changes to all BuiltinScalarFunctions or I have to create a second implementation of ScalarFunctionExpr like ScalarFunctionPlanExpr which passes in the plan with the modified type signature. Both are do-able but have drawbacks.

Thoughts?

@jorgecarleitao
Copy link
Member

Isn't the filename that a column came from uniquely identified by the logical plan? If two physical plans arrive to different conclusions about a columns' provenance, then those physical plans are using two data origins, which implies different semantics.

This is rationale I was using to recommend addressing this at the DAG level. I do agree that the logical plan may not have all the information about how the source is partitioned and its exact names (as that may even change with time), but I would expect to resolve that as part of the query execution (just like we resolve the physical plan when we run SHOW PLAN).

@seddonm1 , I think that a physical expression does not need to be ScalarFunctionExpr: the ScalarFunctionExpr is useful for the cases where the physical operation can be described by a simple function and signature. Check e.g. how e.g. the binary operators are defined: they have their own custom struct that implements PhysicalExpr.

@alamb
Copy link
Contributor

alamb commented Apr 19, 2021

The Apache Arrow Rust community is moving the Rust implementation into its own dedicated github repositories arrow-rs and arrow-datafusion. It is likely we will not merge this PR into this repository

Please see the mailing-list thread for more details

We expect the process to take a few days and will follow up with a migration plan for the in-flight PRs.

@alamb
Copy link
Contributor

alamb commented May 3, 2021

#10096 has removed the arrow implementation from this repository (it now resides in https://github.com/apache/arrow-rs and https://github.com/apache/arrow-datafusion) in the hopes of streamlining the development process

Please re-target this PR (let us know if you need help doing so) to one/both of the new repositories.

Thank you for understanding and helping to make arrow-rs and datafusion better

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants