Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Inserts to Partitioned Listing Table #7744

Closed
devinjdangelo opened this issue Oct 5, 2023 · 5 comments · Fixed by #7801
Closed

Allow Inserts to Partitioned Listing Table #7744

devinjdangelo opened this issue Oct 5, 2023 · 5 comments · Fixed by #7801
Labels
enhancement New feature or request

Comments

@devinjdangelo
Copy link
Contributor

Is your feature request related to a problem or challenge?

It is currently unsupported to run an insert into query for a listing table which is partitioned by a column.

Describe the solution you'd like

  1. ListingTable must be able to inject into FileSinkExec a distribution requirement that the incoming RecordBatchStreams are partitioned by the correct columns
  2. Each FileSink must be able to identify which stream belongs to which partition and create the appropriate ObjectStore writer

For 2, unless there is a slick solution FileSink could simply peak at each stream before initializing a writer.

Describe alternatives you've considered

No response

Additional context

Progress on inserts to sorted tables may be relevant https://github.com/apache/arrow-datafusion/pull/7743/files

@devinjdangelo devinjdangelo added the enhancement New feature or request label Oct 5, 2023
@devinjdangelo
Copy link
Contributor Author

For 2, rather than peaking, I am thinking about extending the RecordBatchStream trait like so:

/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
    /// Returns the schema of this `RecordBatchStream`.
    ///
    /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
    /// stream should have the same schema as returned from this method.
    fn schema(&self) -> SchemaRef;

    fn partition_info(&self) -> &PartitionInfo
}

PartitionInfo would contain information needed to identify which hive style partition the stream belongs to, and also should be general enough to work for non hive style partitioning. Whichever execution plan partitions its output should provide each stream with its partitioning information.

@alamb
Copy link
Contributor

alamb commented Oct 5, 2023

I am not sure about extending the RecordBatchStream -- given we don't know what partition column values are in the input it will not be possible to know how many output files (one for each partition that has data) are needed before the plan is run.

I think actually writing to a partitioned datasource will require a more dynamic approach, with something similar to a RepartitionExec that dynamically creates a FileSink whenever new values are seen. I tried to sketch out what this might look like here:

                                                            rows from the input batch                       
                                                           that belong to partition 1  ─ ─ ─ ─ ─ ┐          
                                                                                                            
                                                                                                 │          
                                                                                                 ▼          
                                                                            ┌──────┐        Partition 1     
                                                                    ┌──────▶│Batch │     (date=2023-10-01)  
                                                                    │       └──────┘                        
                                                                    │       ┌──────┐                        
                                                ┌────────────┐      ├──────▶│Batch │        Partition 2     
┌───────────┐            ┌────────────┐         │            │▒     │       └──────┘     (date=2023-10-02)  
│  batch N  │────▶...────▶  Batch 1   │────────▶│  Demux ?   │──────┤ ...                                   
└───────────┘            └────────────┘         │            │▒     │                                       
                                                └────────────┘▒     └──────▶    ▲                           
                                                 ▒▒▒▒▒▒▒▒▒▒▒▒▒▒                                             
                                                                                └ ─ ─ ─ ─ ─                 
                                                                                           │                
                                                                                                            
                                   Input batches are repartitioned                         │                
                                   according to partition column's            As new partition values are   
                                 values with a "Demux" operator (like        seen, new output streams are   
                                           RepartitionExec)                                                 
                                                                                                            
                                                                                                            

@devinjdangelo
Copy link
Contributor Author

This description makes sense, and I agree that we can't know the number of partitions during planning. I'll spend some more time thinking on this.

Perhaps filesink could consume a Stream<Item = RecordBatchStream>. I.e. unlike Vec<RecordBatchStream> we don't know a fixed number of partitions up front, but rather consume some unknown number of streams as they are created.

Will have to think on this more... 🤔

@devinjdangelo
Copy link
Contributor Author

Perhaps filesink could consume a Stream<Item = RecordBatchStream>. I.e. unlike Vec<RecordBatchStream> we don't know a fixed number of partitions up front, but rather consume some unknown number of streams as they are created.

@alamb I went with receivers of receivers rather than streams of streams, but this approach is implemented here: #7791. So far, I am not trying to do hive style partitioning, but this PR I think sets it up to be much easier.

@alamb
Copy link
Contributor

alamb commented Oct 11, 2023

Thank you @devinjdangelo -- #7791 looks great. I plan to check it out carefully tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants