-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-35838: [C++] Add backpressure test for asof join node #35874
Conversation
@westonpace Are you OK with me merging this to unblock internal issue? @rtpsw Is working on adding tests now but it might take a while. |
@@ -668,13 +668,13 @@ class InputState { | |||
|
|||
static Result<std::unique_ptr<InputState>> Make( | |||
size_t index, TolType tolerance, bool must_hash, bool may_rehash, | |||
KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output, | |||
KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still looking confusing:
BackpressureController takes ExecNode* node, ExecNode* output
and this one nows takes AsofJoinNode* node, ExecNode* input
which is inconsistent
Can we make this consistent between the two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is also confusing that
return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
key_hasher, node, std::move(handler), schema,
time_col_index, key_col_index);
On line 681 passes the asof join node to the input state instead of the input node, why is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rtpsw I think it is easier to revert this line as to unblock the internal first:
https://github.com/apache/arrow/pull/34392/files#diff-5493b6ae7ea2a4d5cfb581034c076e9c4be7608382168de6d1301ef67b6c01eeR1410
Then work on cleaning up changes introduced in GH-36391. The code is quite confusing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still looking confusing:
BackpressureController takes
ExecNode* node, ExecNode* output
and this one nows takesAsofJoinNode* node, ExecNode* input
which is inconsistentCan we make this consistent between the two?
This is actually intended to make things clearer. The two places have a different perspective of what is an input and what is an output (which likely caused confusion in the first place). The ExecNode
passed to Make
is an input of the as-of-join node while PauseProducing
(and similarly ResumeProducing
) sees the as-of-join node as an output of the ExecNode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is easier to revert this line
This reversion alone doesn't compile because inputs[i]
is not of type AsofJoinNode*
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably easier for me to do it instead of back and forth, opened:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with figure out the cleaner way to do this in follow up to GH-36391. But for now I think it's easier to just revert the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two places have a different perspective of what is an input and what is an output (which likely caused confusion in the first place). The ExecNode passed to Make is an input of the as-of-join node while PauseProducing (and similarly ResumeProducing) sees the as-of-join node as an output of the ExecNode.
We should probably fix the variable naming in the follow PR to GH-36391 how to call these things then. But for now let's just revert to what was before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use more specific names like asof_input
and asof_node
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
std::atomic<int32_t>& backpressure_counter, | ||
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index, | ||
const std::vector<col_index_t>& key_col_index) { | ||
constexpr size_t low_threshold = 4, high_threshold = 8; | ||
std::unique_ptr<BackpressureControl> backpressure_control = | ||
std::make_unique<BackpressureController>(node, output, backpressure_counter); | ||
std::make_unique<BackpressureController>(input, node, backpressure_counter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's been some confusion, can you make parameter names explicit?
std::make_unique<BackpressureController>(input, node, backpressure_counter); | |
std::make_unique<BackpressureController>(/*xxx=*/ input, /*yyy=*/ node, backpressure_counter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@icexelloss, I added test cases. This PR can now be reviewed as a (non-quick) resolution. |
@@ -1406,6 +1458,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, | |||
total_length += batch->num_rows(); | |||
} | |||
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length); | |||
|
|||
ASSERT_GT(pause_count, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we validate pause resume counter for all sources? i.e the slow table should not have been paused, but the fast tables should?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we validate pause resume counter for all sources?
I added validation ...
i.e the slow table should not have been paused, but the fast tables should?
... but this is not the actual behavior - instead only one fast source gets paused and resumed - so this is what the recent commit validates. @westonpace, is this behavior expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may have crossed - the previous commit already has one pause and one resume counter for each source. The issue is that sometimes (due to non-deterministic timing of operations) no pause/resume is requested on one or both of the fast sources. Because of this, in the recent commit I am attempting this test logic: there is a pause request on a fast source if and only if there is a resume one too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do have a little bit of concern that this test may become flaky on CI machines which have notoriously weird timing. However, we can give it a few runs and see how it goes. If it does become a problem I'd suggest we invent some kind of "gated source" so we can do something like (very rough pseudocode)...
GatedSourceNode gated_left = GatedSourceNode(l_batches);
GatedSourceNode gated_right = GatedSourceNode(r_batches);
gated_right.Ungate();
// Start plan
WaitFor([&] { right_bp_options->pause_count > 0; });
ASSERT_EQ(right_bp_options->resume_count, 0);
ASSERT_EQ(left_bp_options->pause_count, 0);
ASSERT_EQ(left_bp_options->resume_count, 0);
gated_left->Ungate();
AssertPlanFinishes();
I'm off today and Monday but I can sketch up a possible GatedSourceNode
implementation when I get back if needed.
Indeed it is flaky - this and this and this CI jobs have failed on the new check. I'll make a quick attempt to fix this. I agree relying on timing is not good, though that can be said about the pre-PR backpressure test code too, and this PR won't be worse in this respect. |
Thinking about loud here: What we want to test is that if the through put of asof join node is slower than the source, then we would pause the source. Two potential ways that I think we can reliably do this: I think I prefer (2) a bit more because this affects represents a real life case of slow sink. @westonpace I am not sure if the idea of GatedSourceNode is similar or different, but happy to hear |
ASSERT_GT(counters_by_is_fast[true].resume_count, 0); | ||
// runs on some slow machines may not see any pause/resume, but if at least one pause is | ||
// seen then at least one resume must also be seen | ||
ASSERT_EQ(counters_by_is_fast[false].pause_count > 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we really need to check the slow sources, because it is entirely possible that it is not paused at all in the expected behavior (since it is slower than asof join?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do. My understanding is that the intention of the pre-PR test code was to drive the as-of-join slower using a slower input to it, which should lead to backpressure from the as-of-join node toward a faster source. The additional intention of the post-PR test code is to check that indeed this backpressure happens, and this is observed at the test nodes inserted after each source.
While I'm not sure exactly what Weston has in mind, my understanding is that the GatedSourceNode's goal is to avoid flakiness due to non-deterministic timing. IMO, both (1) and (2) above could be flaky due to non-deterministic timing. Between (1) and (2) I also wouldn't prefer (1) because the debug-options would change the behavior of the as-of-join node being tested, and I prefer to change the code driving it instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rtpsw A few comments:
(1) Please separate out threading related changes to asof join - those should be in separate PR (outside of original scope)
(2) Please update the original iGH ssue to better reflect the change in this PR
(3) other inline comments
Gate* gate; | ||
}; | ||
|
||
struct GatedNode : public ExecNode, public TracedNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add documentation for this class to describe the purpose of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
It's authored by Weston - I don't have permission to edit. I edited the description of this PR instead, |
This CI job failure suggests that the tightening condition (in this commit) isn't always true. We should decide whether to revert the tightened condition or to look for the cause for its violation. |
Ok we can leave it as is then. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Conbench analyzed the 6 benchmark runs on commit There were no benchmark performance regressions. 🎉 The full Conbench report has more details. |
What changes are included in this PR?
Passing the correct nodes to the backpressure controller, along with better parameter naming/doc. Also added reusable gate-classes (
Gate
,GatedNodeOptions
andGatedNode
) that enable holding all input batches until a gate is released, in order to support more robust backpressure testing in this PR.Are these changes tested?
Yes.
Are there any user-facing changes?
No.
This PR contains a "Critical Fix".