-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use arrow row format in SortPreservingMerge ~50-70% faster #3386
Conversation
@@ -321,10 +318,13 @@ pub(crate) struct SortPreservingMergeStream { | |||
next_batch_id: usize, | |||
|
|||
/// min heap for record comparison | |||
min_heap: BinaryHeap<SortKeyCursor>, | |||
max_heap: BinaryHeap<Reverse<SortKeyCursor>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a somewhat amusing surprise, BinaryHeap
is a max heap, not a min heap, the comparator for SortKeyCursor
was just backwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But w/ Reverse
, it's a "min heap" again, so I think the variable name should read min_heap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a max heap of reversed elements no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we get into philosophical discussions here, but IMHO the variable should describe the entire construct (BinaryHeap<Reverse<SortKeyCursor>>
), not just the outer shell (BinaryHeap<...>
).
Should you decide the keep the name, then at least adjust the docstring which still read min heap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😍 where this is headed
// their batch_idx. | ||
batch_comparators: RwLock<HashMap<usize, Vec<DynComparator>>>, | ||
sort_options: Arc<Vec<SortOptions>>, | ||
rows: Rows, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that certainly looks nicer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The speed-up is fantastic, love it!
|
||
let rows = self.row_converter.convert(&cols); | ||
|
||
let cursor = SortKeyCursor::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to track the total memory used by all cursors since the cursor now holds Rows
. We could do this as follow-ups but note here as it came to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that memory usage is a potential concern (as we are effectively copying data into the Rows
format.
A follow on PR would be good I think. I filed #3609
8143bb9
to
68ff05b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
let rows = self.row_converter.convert(&cols); | ||
|
||
let cursor = SortKeyCursor::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that memory usage is a potential concern (as we are effectively copying data into the Rows
format.
A follow on PR would be good I think. I filed #3609
let _timer = elapsed_compute.timer(); | ||
// NB timer records time taken on drop, so there are no | ||
// calls to `timer.done()` below. | ||
let elapsed_compute = self.tracking_metrics.elapsed_compute().clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this simply reduces the overhead of timing , right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, which turned out to be a major bottleneck, as Instant::now is a syscall
Benchmark runs are scheduled for baseline = 15c19c3 and contender = 451e441. 451e441 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Real nice 🎉 |
This reverts commit 451e441.
Which issue does this PR close?
Part of #416
Rationale for this change
It is also worth highlighting that these benchmarks are in many ways the worst case, as the rows are distributed randomly across streams, instead of large contiguous slices, which increases the cost of reassembly, i.e. the non-comparison portion of the operator.
What changes are included in this PR?
Are there any user-facing changes?