Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flume channels for RepartitionExec #6929

Closed
wants to merge 4 commits into from
Closed

feat: flume channels for RepartitionExec #6929

wants to merge 4 commits into from

Conversation

YjyJeff
Copy link
Contributor

@YjyJeff YjyJeff commented Jul 12, 2023

Which issue does this PR close?

Closes #6928 #6928 (comment)

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Jul 12, 2023
@Dandandan
Copy link
Contributor

I can replicate the resuls:

Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃    flume ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 303.98ms │ 304.20ms │     no change │
│ QQuery 2     │  71.69ms │  68.18ms │     no change │
│ QQuery 3     │ 134.92ms │ 111.77ms │ +1.21x faster │
│ QQuery 4     │  81.51ms │  51.06ms │ +1.60x faster │
│ QQuery 5     │ 164.34ms │ 121.10ms │ +1.36x faster │
│ QQuery 6     │  80.46ms │  81.17ms │     no change │
│ QQuery 7     │ 229.50ms │ 216.51ms │ +1.06x faster │
│ QQuery 8     │ 187.78ms │ 174.60ms │ +1.08x faster │
│ QQuery 9     │ 274.48ms │ 210.65ms │ +1.30x faster │
│ QQuery 10    │ 217.25ms │ 151.70ms │ +1.43x faster │
│ QQuery 11    │  53.64ms │  52.83ms │     no change │
│ QQuery 12    │ 150.92ms │ 118.99ms │ +1.27x faster │
│ QQuery 13    │ 317.33ms │ 329.28ms │     no change │
│ QQuery 14    │ 112.84ms │ 114.44ms │     no change │
│ QQuery 15    │  88.75ms │  90.07ms │     no change │
│ QQuery 16    │  66.07ms │  61.94ms │ +1.07x faster │
│ QQuery 17    │ 763.97ms │ 756.59ms │     no change │
│ QQuery 18    │ 603.97ms │ 477.65ms │ +1.26x faster │
│ QQuery 19    │ 224.88ms │ 227.46ms │     no change │
│ QQuery 20    │ 247.86ms │ 239.13ms │     no change │
│ QQuery 21    │ 340.86ms │ 234.40ms │ +1.45x faster │
│ QQuery 22    │  53.66ms │  47.97ms │ +1.12x faster │
└──────────────┴──────────┴──────────┴───────────────┘

#[derive(Debug)]
pub(super) struct DistributionSender<T>(Sender<T>);

impl<T> Clone for DistributionSender<T> {
Copy link
Contributor

@Dandandan Dandandan Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can modify the existing DistributionReceiver/DistributionSender instead of creating a new one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this PR does not want to modify the existing code. It adds new code in the new module. When the flume code is approved by the community, we could remove the old distribution_channel.rs directly.

//! Channel based on flume

use flume::r#async::RecvStream;
use flume::{unbounded, Sender};
Copy link
Contributor

@Dandandan Dandandan Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the purpose of the existing code is that preferably we would not using unbounded channels (to avoid high memory usage).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think @crepererum did some work on this recently. It'd be good to get a review from him.

Copy link
Contributor

@Dandandan Dandandan Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we see the same performance improvement with "just" the default unbounded tokio mspc? i.e. performance doesn't improve because of flume, but just because we switch to unbounded buffering here
@YjyJeff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dandandan Good point. I have compared the flume channel with the tokio channel on tpch. Here is the result:

Comparing main and feature_tokio_unbounded
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ feature_tokio_unbounded ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 317.52ms │                309.05ms │     no change │
│ QQuery 2     │  73.18ms │                 67.94ms │ +1.08x faster │
│ QQuery 3     │ 136.38ms │                114.46ms │ +1.19x faster │
│ QQuery 4     │  84.27ms │                 51.39ms │ +1.64x faster │
│ QQuery 5     │ 170.56ms │                121.67ms │ +1.40x faster │
│ QQuery 6     │  83.52ms │                 83.22ms │     no change │
│ QQuery 7     │ 249.60ms │                220.18ms │ +1.13x faster │
│ QQuery 8     │ 191.66ms │                175.75ms │ +1.09x faster │
│ QQuery 9     │ 282.38ms │                215.22ms │ +1.31x faster │
│ QQuery 10    │ 230.92ms │                152.78ms │ +1.51x faster │
│ QQuery 11    │  52.68ms │                 56.47ms │  1.07x slower │
│ QQuery 12    │ 153.50ms │                120.02ms │ +1.28x faster │
│ QQuery 13    │ 314.86ms │                309.84ms │     no change │
│ QQuery 14    │ 115.02ms │                114.93ms │     no change │
│ QQuery 15    │  90.32ms │                 93.55ms │     no change │
│ QQuery 16    │  67.44ms │                 62.28ms │ +1.08x faster │
│ QQuery 17    │ 785.40ms │                763.14ms │     no change │
│ QQuery 18    │ 636.27ms │                559.21ms │ +1.14x faster │
│ QQuery 19    │ 232.26ms │                231.57ms │     no change │
│ QQuery 20    │ 261.95ms │                247.18ms │ +1.06x faster │
│ QQuery 21    │ 351.81ms │                247.46ms │ +1.42x faster │
│ QQuery 22    │  54.88ms │                 48.02ms │ +1.14x faster │
└──────────────┴──────────┴─────────────────────────┴───────────────┘
Comparing feature_flume and feature_tokio_unbounded
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃ feature_flume ┃ feature_tokio_unbounded ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │      317.86ms │                309.05ms │    no change │
│ QQuery 2     │       70.41ms │                 67.94ms │    no change │
│ QQuery 3     │      113.01ms │                114.46ms │    no change │
│ QQuery 4     │       51.30ms │                 51.39ms │    no change │
│ QQuery 5     │      123.28ms │                121.67ms │    no change │
│ QQuery 6     │       81.93ms │                 83.22ms │    no change │
│ QQuery 7     │      220.84ms │                220.18ms │    no change │
│ QQuery 8     │      175.73ms │                175.75ms │    no change │
│ QQuery 9     │      213.37ms │                215.22ms │    no change │
│ QQuery 10    │      153.20ms │                152.78ms │    no change │
│ QQuery 11    │       54.10ms │                 56.47ms │    no change │
│ QQuery 12    │      119.72ms │                120.02ms │    no change │
│ QQuery 13    │      313.01ms │                309.84ms │    no change │
│ QQuery 14    │      115.82ms │                114.93ms │    no change │
│ QQuery 15    │       89.26ms │                 93.55ms │    no change │
│ QQuery 16    │       61.57ms │                 62.28ms │    no change │
│ QQuery 17    │      786.18ms │                763.14ms │    no change │
│ QQuery 18    │      491.24ms │                559.21ms │ 1.14x slower │
│ QQuery 19    │      231.82ms │                231.57ms │    no change │
│ QQuery 20    │      240.57ms │                247.18ms │    no change │
│ QQuery 21    │      239.96ms │                247.46ms │    no change │
│ QQuery 22    │       49.39ms │                 48.02ms │    no change │
└──────────────┴───────────────┴─────────────────────────┴──────────────┘

From the above result, we can see that

  • changing the custom channel to tokio::mpsc can also improve the performance a lot
  • flume is more efficient in one query

To reproduce the result, you could find the code here.

In my view, avoiding high memory usage is good. But we should not sacrifice the performance 0.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for checking @YjyJeff !
Yeah I agree on your point. It would be good if we can recover most of the performance.
Maybe we can introduce some (bounded) buffering again and see if we can get an acceptable trade-off?

FYI @crepererum

@ozankabak
Copy link
Contributor

If it turns out that bounding memory usage inevitably reduces performance in a non-negligible way, I propose we introduce a configuration flag to control this. We can use the high-performance/unbounded behavior the default one, but one should still be able to choose the lower performance/bounded version for memory conscious use cases.

@Dandandan
Copy link
Contributor

If it turns out that bounding memory usage inevitably reduces performance in a non-negligible way, I propose we introduce a configuration flag to control this. We can use the high-performance/unbounded behavior the default one, but one should still be able to choose the lower performance/bounded version for memory conscious use cases.

Sounds reasonable to me like a simple solution.

Slightly more complex: track the memory usage and maybe reserve up to 10-100MB for buffering (configurable). I guess at some point much more buffering won't really help anymore for performance.

@alamb
Copy link
Contributor

alamb commented Jul 14, 2023

If it turns out that bounding memory usage inevitably reduces performance in a non-negligible way, I propose we introduce a configuration flag to control this. We can use the high-performance/unbounded behavior the default one, but one should still be able to choose the lower performance/bounded version for memory conscious use cases.

I don't think we should ever be using unbounded memory ever if we can avoid it -- in this case if the producer goes faster than the consumer it will just buffer a huge amount of data (and eg will eventually OOM with TPCH SF100, or SF1000)

I like @Dandandan 's suggestion to introduce more (but not unbounded) buffering

Perhaps we could extend the existing DistributionSender to have a queue (2 or 3 for example) rather than just a single Option<> so that it was possible to start fetching the next input immediately

https://github.com/apache/arrow-datafusion/blob/d316702722e6c301fdb23a9698f7ec415ef548e9/datafusion/core/src/physical_plan/repartition/distributor_channels.rs#L180-L182

@ozankabak
Copy link
Contributor

ozankabak commented Jul 15, 2023

I don't think we should ever be using unbounded memory ever if we can avoid it

I agree, if we can avoid it without paying a noticeable penalty, we definitely should. Let's explore if we can. We can discuss later on what to do if it turns out we can't.

@ozankabak
Copy link
Contributor

I did some reading on this and it seems like the extra buffering approach could indeed be fruitful. I think we can land near a "sweet spot" in terms of the cost/benefit trade-off w.r.t. peak memory usage vs. performance.

Losing a small amount of performance in extreme cases (which would have had impractical peak memory usages had we used an unbounded channel anyway) is a small price to pay for having the backpressure mechanism in place and always avoiding OOM.

@alamb
Copy link
Contributor

alamb commented Jul 16, 2023

I will file a ticket describing the ideas in this thread, probably tomorrow

@crepererum
Copy link
Contributor

@YjyJeff Please read the reasoning here before experimenting with unbounded constructs:

https://github.com/apache/arrow-datafusion/blob/49583bd5010282ca126e75100dce958aa346e5ee/datafusion/core/src/physical_plan/repartition/distributor_channels.rs#L18-L39

Mainly, the reason that we are NOT using unbounded channels is that this just buffers large (potentially unbounded) quantities of data in many scenarios. Mostly this happens when the repartition producer side (e.g. trivial data reading like highly compressed parquet files) is way faster than the consumer side (e.g. complicated transforms or costly aggregations).

Note that I'm NOT saying that the current impl. is good (or even perfect), there are likley better options. But just slapping an unbounded channel on this problem is not going to solve it. Sure that wins in some micro-benchmarking but it fails to provide a robust foundation for query execution.1

I agree w/ @ozankabak & Co though that SOME buffering is OK. So I think the following config option would be robust, reasonably fast and unsurprising to the user: A option "repartition buffer bytes OR messages" (or similar name) that limits bytes or messages per channel (not per repartition, otherwise the cross-comm overhead is too high) and only if this limit is met we fall back to the cross-channel gating behavior described in the code comment linked above (namely: we let data flow freely if there's at least 1 empty channel).

Ref #4867 and #4865.

Footnotes

  1. If someone finds a good metric, benchmark, or test for robustness, please open a ticket / PR. I am quite unhappy that this is currently mostly based on engineering intuition.

@alamb
Copy link
Contributor

alamb commented Jul 17, 2023

I will file a ticket describing the ideas in this thread, probably tomorrow

I wrote up a somewhat long ticket about my thoughts on this issue here: #7001

I think there are some buffering / adaptivity tricks we could use that might result in some non trivial wins (the "adaptive repartition one in particular I am excited about")

@alamb alamb marked this pull request as draft July 17, 2023 21:14
@alamb
Copy link
Contributor

alamb commented Jul 17, 2023

Marking as draft as we come up with a plan

@YjyJeff YjyJeff closed this by deleting the head repository Mar 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Lock free MPSC channel for RepartitionExec
5 participants