Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataSink Dynamic Execution Time Demux #7791

Merged
merged 11 commits into from
Oct 18, 2023

Conversation

devinjdangelo
Copy link
Contributor

Which issue does this PR close?

Closes #5383
Closes #7767

Progresses towards #7744

Rationale for this change

Currently, we use the partitioning of the input plan to determine how files are written out. This has a number of drawbacks, all stemming from the fact that we have limited information at planning time.

If instead, DataSinks are responsible for partitioning a single input stream at execution time, we can have more fine grained control over how data is laid out into files.

What changes are included in this PR?

This PR introduces an execution time demux task, which takes a single SendableRecordBatchStream and divides it into a dynamic number of output streams, which are then serialized to independent files. The division of the input stream is currently determined exclusively by the number of rows we want in each file, but in the future could be made more sophisticated (such as for #7744).

                                                                             ┌───────────┐               ┌────────────┐    ┌─────────────┐
                                                                    ┌──────▶ │  batch 1  ├────▶...──────▶│   Batch a  │    │ Output File1│
                                                                    │        └───────────┘               └────────────┘    └─────────────┘
                                                                    │
                                                ┌──────────┐        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
┌───────────┐               ┌────────────┐      │          │        ├──────▶ │  batch a+1├────▶...──────▶│   Batch b  │    │ Output File2│
│  batch 1  ├────▶...──────▶│   Batch N  ├─────▶│  Demux   ├────────┤ ...    └───────────┘               └────────────┘    └─────────────┘
└───────────┘               └────────────┘      │          │        │
                                                └──────────┘        │        ┌───────────┐               ┌────────────┐    ┌─────────────┐
                                                                    └──────▶ │  batch d  ├────▶...──────▶│   Batch n  │    │ Output FileN│
                                                                             └───────────┘               └────────────┘    └─────────────┘

To accomplish the above, the demux task shares a channel with the caller, which itself communicates channels of RecordBatches. These are the key types:

type RecordBatchReceiver = Receiver<RecordBatch>;
type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;

The caller of the demux task is responsible for consuming a variable number of RecordBatchReceivers. DataSinks in general will want to spawn independent tasks to consume each RecordBatchReceiver and serialize/write them to ObjectStore writer corresponding to the Path returned by the demux.

Various config options are added to allow the user to control the tradeoff between buffering more data in memory and higher parallelism.

Are these changes tested?

Yes, by existing tests.

Are there any user-facing changes?

No more empty files written out

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Oct 11, 2023
/// This is a soft max, so it can be exceeded slightly. There also
/// will be one file smaller than the limit if the total
/// number of rows written is not roughly divisible by the soft max
pub soft_max_rows_per_output_file: usize, default = 50000000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ideal value here is very situational, so definitely need to make this configurable at the statement and table level.

let allow_single_file_parallelism =
exec_options.parquet.allow_single_file_parallelism;

// This is a temporary special case until https://github.com/apache/arrow-datafusion/pull/7655
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current single file parallelization strategy on main does not work well with this PR (doesn't error but very slow). The pending one #7655 should work great though. Once parquet crate has a new release, I can combine it with this PR.

@metesynnada
Copy link
Contributor

I will review this PR at my earliest convenience. Your explanations have been very helpful, making the review process much smoother. Thank you!

@devinjdangelo
Copy link
Contributor Author

@metesynnada @alamb I opened a draft #7801 that extends this pr to support hive style partitioning inserts. It isn't finished/polished up yet, but it may be worth a peek to see how I intend to continue building on this pr.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- this is a very nice PR.

I think we need to add some tests of this functionality -- specifically that setting soft_max_rows_per_output_file and then writing to tables (both empty and appending) that it actually makes the expected number of files

  1. for each of (CSV, Parquet and JSON formats).
  2. For both empty directories / external tables as well as pre-existing tables

One concern I have with the approach in this PR that now instead of writing the data in parallel, this PR would write the data serially (in order to preserve the order) resulting in fewer larger files. It seems like it could be better to write max_parallel_output_files in parallel (even though they might end with fewer than soft_max_rows_per_output_file rows. This is how the current code works. I think core issue of #5383 is that several empty files are also created even when they have no data. Maybe there could be a "minimum output file size" that ensures if a new file (other than the first) is created, there are at least that many rows for it 🤔

While I was playing around with it, I found a problem (not introduced by this PR) that would be good to file tickets for (I can do so if you like).

Tested manually

mkdir /tmp/output
datafusion-cli
❯ create table test(x int) as values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
0 rows in set. Query took 0.003 seconds.

❯ create external table output(x int) stored as parquet location '/tmp/output';
0 rows in set. Query took 0.000 seconds.

❯ insert into output select t1.x FROM test t1 CROSS JOIN test t2 CROSS JOIN test t3 CROSS JOIN test t4 CROSS JOIN test t5 CROSS JOIN test t6 CROSS JOIN test t7 CROSS JOIN test t8;
+-----------+
| count     |
+-----------+
| 100000000 |
+-----------+
1 row in set. Query took 18.132 seconds.

This results in a few files, as expected:

alamb@MacBook-Pro-8 ~ % du -s -h /tmp/output/*
 16K	/tmp/output/Lw7GheXPv4qsuuK2_0.parquet
 16K	/tmp/output/Lw7GheXPv4qsuuK2_1.parquet
4.0K	/tmp/output/Lw7GheXPv4qsuuK2_2.parquet

Inserting into an empty directory didn't work for csv or JSON

I tried to test this feature out by inserting into an empty directory:

mkdir /tmp/output
datafusion-cli

And then tried to insert some data but got an error

❯ create external table output(x int) stored as CSV location '/tmp/output';
0 rows in set. Query took 0.003 seconds.

❯ insert into output values (1),(2),(3);
Error during planning: Cannot append 1 partitions to 0 files!

/// for each output file being worked. Higher values can potentially
/// give faster write performance at the cost of higher peak
/// memory consumption
pub max_buffered_batches_per_output_file: usize, default = 5000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value seems far to high to me -- it seems like there is no reason to buffer more than 1-2 batches per output file -- the rationale being that it makes no sense to let a producer of data run run so far ahead of the consumer (writer).

Adding some buffer makes sense so that a new input batch can be computed concurrently while writing the next output batch, but I don't see why it would make sense to buffer so many.

Maybe a value of 2 here would be good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I set this so high is to allow for the possibility that 1 file writer cannot keep up with the batches being generated. Eventually enough data is buffered that the 2nd, 3rd, ... file writer will kick in and work in parallel. Eventually it will stabilize and keep up with the speed that batches are being generated. If the buffer is too small, then only 1 file can be worked in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a "minimum_parallel_writers" setting as discussed in our main comments is a better solution for this. So, I'll reduce the buffer size as you suggest.

@@ -481,6 +479,81 @@ impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
Self { config }
}

async fn append_all(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very nice cleanup / refactor

};

let (mut tx_file, mut rx_file) =
tokio::sync::mpsc::channel(max_buffered_recordbatches / 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the max buffer size divided by 2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are actually two buffers that hold RecordBatches and they are moved between them, so the effective maximum buffered batches is 2*max_buffered_recordbatches.

@devinjdangelo
Copy link
Contributor Author

I think we need to add some tests of this functionality -- specifically that setting soft_max_rows_per_output_file and then writing to tables (both empty and appending) that it actually makes the expected number of files

Good idea. I adjusted some existing tests to set desired_batch_size and soft_max_rows_per_output_file both to 1. Then we can check that number of written files == number of inserted rows. Tests are passing for all 3 file types inserting to empty and preexisting table.

One concern I have with the approach in this PR that now instead of writing the data in parallel, this PR would write the data serially (in order to preserve the order) resulting in fewer larger files.

This is a concern I shared while working on this, but I believe there are a few mitigating factors:

  1. I have invested significant effort into parallelizing individual file serialization, which reduces the performance boost to run multiple independent files in parallel. Right now only parquet writes are single threaded until the current draft PR can be merged.
  2. It is still possible for multiple files to be worked on in parallel in this PR if buffer sizes are set large enough. This is why I set the defaults so high (just keep buffering until the second, third ect file writer can kick in)

It seems like it could be better to write max_parallel_output_files in parallel (even though they might end with fewer than soft_max_rows_per_output_file rows.

I agree there is a middle ground here. I.e. write at least N files, each up to soft_maximum rows. That way we are guaranteed at least N files worked in parallel regardless of buffer size. This will add a good deal of complexity to the demuxer logic, so I'd like to work this as an enhancement in a follow up PR if that is OK with you.

Inserting into an empty directory didn't work for csv or JSON

This is because the "append to existing file" mode (default for these file types) does not know how to handle empty tables. We could update the logic so "append to existing file" will create 1 file if and only if the table is completely empty. I don't expect this to be too complex. Current workaround would be to create table with append new file mode, write 1 file, then drop the table and switch to append to existing file mode. Not great UX, so agreed that we should fix.

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your hard work on the PR. It includes a lot of cool features. However, as I mentioned in my review, the methods can become too long and complex when they contain multiple repeating parts. To make the code more readable, it would be beneficial to divide the logic into smaller, more manageable pieces.

vec![false]
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition; self.children().len()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a docstring here could be beneficial for maintainers.

.max_buffered_batches_per_output_file;

let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(rb_buffer_size / 2);
let (tx_row_cnt, mut rx_row_cnt) = tokio::sync::mpsc::channel(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a tokio::sync::oneshot::channel()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice that's perfect. I made this change.

)
.await?;

let (tx, rx) = tokio::sync::mpsc::channel(9000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be an unbounded channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually supposed to be a specific configured value. Thanks for flagging I fixed it.

let mut part_idx = 0;
let write_id = rand::distributions::Alphanumeric
.sample_string(&mut rand::thread_rng(), 16);
let file_path = if !single_file_output {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file_path can be formatted inside a method like generate_file_path

/// overrides all other settings to force only a single file to be written.
/// partition_by parameter will additionally split the input based on the unique
/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>``
pub(crate) fn start_demuxer_task(
Copy link
Contributor

@metesynnada metesynnada Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can improve code readability by dividing the logic into methods. For the start_demux_task function, it might be helpful to divide it into smaller parts:

pub(crate) fn start_demuxer_task(
    input: SendableRecordBatchStream,
    context: &Arc<TaskContext>,
    _partition_by: Option<&str>,
    base_output_path: ListingTableUrl,
    file_extension: String,
    single_file_output: bool,
) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
    let exec_options = &context.session_config().options().execution;

    let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
    let max_parallel_files = exec_options.max_parallel_ouput_files;
    let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;

    let (tx, rx) = mpsc::channel(max_parallel_files);

    let task = tokio::spawn(async move {
        demux_stream(
            input,
            base_output_path,
            file_extension,
            single_file_output,
            max_rows_per_file,
            max_buffered_batches,
            tx,
        )
            .await
    });
    (task, rx)
}

fn generate_file_path(
    base_output_path: &ListingTableUrl,
    write_id: &str,
    part_idx: usize,
    file_extension: &str,
    single_file_output: bool,
) -> Path {
    if !single_file_output {
        base_output_path.prefix().child(format!("{}_{}.{}", write_id, part_idx, file_extension))
    } else {
        base_output_path.prefix().to_owned()
    }
}

async fn create_new_file_stream(
    base_output_path: &ListingTableUrl,
    write_id: &str,
    part_idx: usize,
    file_extension: &str,
    single_file_output: bool,
    max_buffered_batches: usize,
    tx: &mut Sender<(Path, Receiver<RecordBatch>)>,
) -> Result<Sender<RecordBatch>> {
    let file_path = generate_file_path(
        base_output_path,
        write_id,
        part_idx,
        file_extension,
        single_file_output,
    );
    let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
    tx.send((file_path, rx_file)).await.map_err(|_| {
        DataFusionError::Execution(
            "Error sending RecordBatch to file stream!".into(),
        )
    })?;
    Ok(tx_file)
}

async fn demux_stream(
    mut input: SendableRecordBatchStream,
    base_output_path: ListingTableUrl,
    file_extension: String,
    single_file_output: bool,
    max_rows_per_file: usize,
    max_buffered_batches: usize,
    mut tx: Sender<(Path, Receiver<RecordBatch>)>,
) -> Result<()> {
    let mut total_rows_current_file = 0;
    let mut part_idx = 0;
    let write_id = rand::distributions::Alphanumeric
        .sample_string(&mut rand::thread_rng(), 16);

    let mut tx_file = create_new_file_stream(
        &base_output_path,
        &write_id,
        part_idx,
        &file_extension,
        single_file_output,
        max_buffered_batches,
        &mut tx,
    )
        .await?;
    part_idx += 1;

    while let Some(rb) = input.next().await.transpose()? {
        total_rows_current_file += rb.num_rows();
        tx_file.send(rb).await.map_err(|_| {
            DataFusionError::Execution(
                "Error sending RecordBatch to file stream!".into(),
            )
        })?;

        if total_rows_current_file >= max_rows_per_file && !single_file_output {
            total_rows_current_file = 0;
            tx_file = create_new_file_stream(
                &base_output_path,
                &write_id,
                part_idx,
                &file_extension,
                single_file_output,
                max_buffered_batches,
                &mut tx,
            )
                .await?;
            part_idx += 1;
        }
    }
    Ok(())
}

Btw, this code is tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the help breaking this down. I added this in.

@alamb
Copy link
Contributor

alamb commented Oct 14, 2023

Thanks @devinjdangelo and @metesynnada -- I plan to review this PR later today or tomorrow. I apologize I have been struggling to find the time to review things properly

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo and @metesynnada

I tested this out locally with a release build

rm -rf traces_output
mkdir traces_output
DataFusion CLI v32.0.0
❯ copy (SELECT * from traces_source) TO 'traces_output' (FORMAT parquet, SINGLE_FILE_OUTPUT false);

On main, the output looks like:

❯ copy (SELECT * from traces) TO 'traces_output' (FORMAT parquet, SINGLE_FILE_OUTPUT false);
+---------+
| count   |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 3.107 seconds.

❯
\q
alamb@MacBook-Pro-8:~/Downloads$ ls traces_output
3iCEVNa6H5bEw3m0_0.parquet   3iCEVNa6H5bEw3m0_12.parquet  3iCEVNa6H5bEw3m0_2.parquet   3iCEVNa6H5bEw3m0_6.parquet
3iCEVNa6H5bEw3m0_1.parquet   3iCEVNa6H5bEw3m0_13.parquet  3iCEVNa6H5bEw3m0_3.parquet   3iCEVNa6H5bEw3m0_7.parquet
3iCEVNa6H5bEw3m0_10.parquet  3iCEVNa6H5bEw3m0_14.parquet  3iCEVNa6H5bEw3m0_4.parquet   3iCEVNa6H5bEw3m0_8.parquet
3iCEVNa6H5bEw3m0_11.parquet  3iCEVNa6H5bEw3m0_15.parquet  3iCEVNa6H5bEw3m0_5.parquet   3iCEVNa6H5bEw3m0_9.parquet

On this PR, the output looks like this (much slower):

❯ copy (SELECT * from traces) TO 'traces_output' (FORMAT parquet, SINGLE_FILE_OUTPUT false);
+---------+
| count   |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 7.452 seconds.

❯
\q
alamb@MacBook-Pro-8:~/Downloads$ ls traces_output
iL8tYC9HfnaORhHH_0.parquet

However, I think given the new parallelization strategy, coming soon (as described in in https://github.com/apache/arrow-datafusion/pull/7791/files#r1353822968) will make this better.

Thus what I suggest we do is to merge this PR and file follow on tickets for:

  1. Parallelizing the file output again (to recover the writing performance regression, e.g. Parallelize Serialization of Columns within Parquet RowGroups #7655)
  2. Table / statement level control of max_rows_per_output_file (as described in https://github.com/apache/arrow-datafusion/pull/7791/files#r1353775705)

Unless anyone objects I'll plan to merge this (and file follow on tickets) tomorrow. Thanks again @devinjdangelo and @metesynnada

@devinjdangelo
Copy link
Contributor Author

Thank you @alamb that sounds good! I have two open draft PRs which will build on this one:

  1. Allow Setting Minimum Parallelism with RowCount Based Demuxer #7841 addresses the performance regression caused by this PR with a new setting to balance file size and parallelism (does not depend on Parallelize Serialization of Columns within Parquet RowGroups #7655 so can be merged sooner).
  2. Implement Hive-Style Partitioned Write Support #7801 extends this PR to add hive-style partitioned table writing support

I will rebase and mark these ready for review once this PR is merged. I hope that breaking these changes into smaller PRs will help with the review burden!

@alamb alamb merged commit 7c6fdcc into apache:main Oct 18, 2023
23 checks passed
@alamb
Copy link
Contributor

alamb commented Oct 18, 2023

Thanks again @devinjdangelo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
3 participants