-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-12290: [Rust][DataFusion] Add input_file_name function [WIP] #9976
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@seddonm1 I like where this PR is headed.
It might be worth testing / prototyping the mechanism to get the PartitionStatistics
to input_filename()
before you polish up this approach. I am specifically wondering what would happen if data went through the repartition operator (e.g. when the repartition optimization, #9865 from @Dandandan, is enabled ) - it seems like the mapping of "output partition" to "input partition" may not be 1:1.
Calculating things like distinct_count per column will be a fun future problem and may require something like HyperLogLog.
I agree calculating distinct counts should be left for some future time
This code also assumes that a 1:1 mapping of Parquet file to logical partition - but it still has a lot of the initial implementation for future many:one mapping of files to partition. I think it would be fair to fix a 1:1 mapping of file:partition and then down the line a repartition operator used to merge them.
I am a little unclear why we would want to do a many:1 mapping of parquet files --> partition. If the goal is to limit the number of concurrently open / being scanned parquet files based on how fast downstream operators can consume the data and the resources available, we may want to consider other approaches that work for things other than parquet files (e.g. a more sophisticated execution scheduler)
@@ -271,62 +249,18 @@ impl ParquetExec { | |||
.collect(), | |||
); | |||
|
|||
// sum the statistics | |||
let mut num_rows: Option<usize> = None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like how you have consolidated the statistics accumulation / cross partition summarization into a central place.
@seddonm1 can you remind me again why adding a non-persisted field to each Rust Sorry for muddying the waters, |
@alamb Thanks for the review and I wanted to raise this early as I appreciate the feedback (even if we end up closing this PR). The As this is ultimately a lineage type capability (by traversing the plan) I have checked how Spark implements it and it will throw an error: I will have a go trying to make basic functionality work to ensure I'm not off on a wild goose chase. |
@jorgecarleitao /// Scalar function
pub type ScalarFunctionImplementation = Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>; which is required for the physical expression: /// Physical expression of a scalar function
pub struct ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
name: String,
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
} I can either change this type alias to accept additional parameters (in this case I could also pass the Thoughts? |
Isn't the filename that a column came from uniquely identified by the logical plan? If two physical plans arrive to different conclusions about a columns' provenance, then those physical plans are using two data origins, which implies different semantics. This is rationale I was using to recommend addressing this at the DAG level. I do agree that the logical plan may not have all the information about how the source is partitioned and its exact names (as that may even change with time), but I would expect to resolve that as part of the query execution (just like we resolve the physical plan when we run @seddonm1 , I think that a physical expression does not need to be |
The Apache Arrow Rust community is moving the Rust implementation into its own dedicated github repositories arrow-rs and arrow-datafusion. It is likely we will not merge this PR into this repository Please see the mailing-list thread for more details We expect the process to take a few days and will follow up with a migration plan for the in-flight PRs. |
#10096 has removed the arrow implementation from this repository (it now resides in https://github.com/apache/arrow-rs and https://github.com/apache/arrow-datafusion) in the hopes of streamlining the development process Please re-target this PR (let us know if you need help doing so) to one/both of the new repositories. Thank you for understanding and helping to make arrow-rs and datafusion better |
@alamb @jorgecarleitao @andygrove
This is a WIP start of work to re-implement the
input_file_name
function by not using themetadata
of theRecordBatch
and instead relying on plan traversal. Inspired by @jorgecarleitao's comment: #9944 (review) I have tried to initially extend theTableProvider
to capture this information by means of theStatistics
.This code collects statistics at the partition level not the table level and provides methods for calculating the table level statistics. It can easily be extended to do things like
TableProvider::file_name(partition: usize)
to get a specificpartition
filename. Calculating things likedistinct_count
per column will be a fun future problem and may require something likeHyperLogLog
.This code also assumes that a 1:1 mapping of Parquet file to logical partition - but it still has a lot of the initial implementation for future many:one mapping of files to partition. I think it would be fair to fix a 1:1 mapping of file:partition and then down the line a
repartition
operator used to merge them. If agreed I can make those changes.I don't like to make these changes in a vacuum and the mailing list is not ideal to demonstrate ideas so I hoped to review here.