-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(swordfish): Parallel expression evaluation #3593
base: main
Are you sure you want to change the base?
Conversation
CodSpeed Performance ReportMerging #3593 will degrade performances by 15.97%Comparing Summary
Benchmarks breakdown
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3593 +/- ##
==========================================
+ Coverage 77.80% 77.83% +0.02%
==========================================
Files 718 718
Lines 88176 88295 +119
==========================================
+ Hits 68607 68726 +119
Misses 19569 19569
|
@@ -46,7 +46,7 @@ impl Session { | |||
let cfg = Arc::new(DaftExecutionConfig::default()); | |||
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?; | |||
|
|||
let mut result_stream = native_executor.run(&pset, cfg, None)?.into_stream(); | |||
let mut result_stream = pin!(native_executor.run(&pset, cfg, None)?.into_stream()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The kanal receiver doesn't provide a way to get an owned stream, so I manually made one using unfold, with a recv.await
inside. Because of this await point I need to pin the stream.
@@ -36,7 +32,7 @@ pub(crate) fn create_ordering_aware_receiver_channel<T: Clone>( | |||
) -> (Vec<Sender<T>>, OrderingAwareReceiver<T>) { | |||
match ordered { | |||
true => { | |||
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(1)).unzip(); | |||
let (senders, receiver) = (0..buffer_size).map(|_| create_channel::<T>(0)).unzip(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting channel sizes to 0 can elide heap allocation.
Addresses: #3389. More generally, this PR optimizes for projections with many expressions, particularly memory intensive expressions like UDFs.
Problem:
Currently, swordfish parallelizes projections across morsels, with 1 CPU per morsel. However, if each projection has many memory intensive expressions, we could experience a massive inflation in memory because we will have many materialized morsels living in memory at once.
Proposed solution:
Instead, we can parallelize the expressions within the projection (but only for expressions that require compute). This way, we still have good CPU utilization, but we keep a lower number of materialized morsels in memory.
In the linked issue above, we see that a 128cpu machine will parallelize morsels across the cores, each doing multiple udfs, resulting in "317GB allocations and duration 351 secs".
This PR reduces that to 7.8GB peak memory and runtime of 66 seconds.
Notes:
async
send to async
receive was not respecting capacity constraints, and was allowing sends even though the receive did not happen. Moved over to https://github.com/fereidani/kanal, which worked much better.Todos for next time: