-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-36482: [C++][CI] Fix sporadic test failures in AsofJoinBasicTest #36499
Conversation
|
@github-actions crossbow submit -g cpp |
Revision: fd949f8 Submitted crossbow builds: ursacomputing/crossbow @ actions-ea59d8ad24 |
@icexelloss Could you take a look at this PR? |
@pitrou, if you take the PR-code, does it resolve the failure you observed on your platform? |
@rtpsw It does, though of course the failure wasn't deterministic in the first place. It also seems to fix things on CI, which is encouraging :-) |
OK, crossing fingers. |
Is KeyHasher thread-safe? AFAIU, |
I also don't understand why you call However, I see that Am I missing something here? |
This doc addresses thread-safety of
Presumably, by automatic invalidation you mean this invocation of I suspect that with the explicit invalidation in place, we could just have
The explicit invalidation was introduced to resolve exactly this case, which was observed on macOS. |
Hmm, this could probably have been fixed more idiomatically by storing a |
I don't have any problems with the fix but I don't understand the bug in the first place. How was it a race? The only thread that is looking at these variables is the processing thread correct? |
Or maybe the non-determinsitic part was the order in which the batches arrived? E.g. if all the right input batches arrived before any of the left input batches? |
Unfortunately, it's not that simple. The concurrency problem is not caused by an invalid/dangling pointer to a record batch that a weak pointer would have avoided. I'll try to answer both your and Westons's questions. Some background first. The key hasher is used by the processing thread and caches hashes corresponding to a particular record batch. Therefore, the key hasher (i.e., its cache of hashes) should be invalidated when the record batch being handled by the processing thread changes. The complexity here is that this change is affected by the input-receiving thread. The post-PR code invalidates the key hasher just before this point using a thread-synchronizing operation. This ensures that the cached hashes at this point would not be used by the processing thread calling Given this background, we can see that the problem is not about an invalid/pointer to a record batch but about invalid cached hashes. In the pre-PR code, the failure could happen with the following order of operations:
Only thereafter, the processing thread, still in |
@rtpsw The rationale behind the hash caching / invalidation is unclear to me - if the purpose of the KeyHasher here is to compute the hash for each row in the batch, why don't we always keep a immutable tuple of Looks like
And I don't see clearly why we cannot use the tuple in both places to avoid the cache invalidation issue. One thing that is not clear to me is why do we need |
The key hasher already encapsulates the batch and its hashes, so I don't see how wrapping the two in a tuple gains anything. As for immutability, you'd need to explain what you have in mind in more detail (or try to implement it) because it could easily degrade performance, which is a major consideration here that greatly impacts the design. The current design has all processing, including for hashes, done on the processing thread; it sounds like the proposed immutability would shift the computation of hashes to the input-receiving thread, as well as add allocation/deallocation cost for the hashes, and these could easily degrade performance.
We can and the code effectively does use the "tuple", encapsulated by the key hasher, in both these places. The main question is when and in which thread to compute the hashes. The current code's answer is: upon initial querying for the hashes (of a given batch) within the processing thread.
This is an orthogonal topic. Rehashing occurs only when the key is a single fixed-width (up to 64 bits, for now) column in which a null value is observed for the first time. So, no rehashing ever occurs if the key spans multiple columns or if the key is a single fixed-width column with no null values. This is a performance optimization - the fast path is when the key is a single fixed-width column without any nulls, which is a common case for which no hashes are computed, since its fixed-width values can be compared directly. The code is being optimistic - it hopes for no null values until it sees the first one. |
Yes the key hasher contains effectively a tuple of <RecordBatch, Hash>. However the tuple is mutable, i.e., the class needs to make sure the Hash is indeed the hash of the RecordBatch (via Invalidate) and IIUC that's where bugs happen. (Either we have a batch and doesn't have the hash, or that we associate the wrong hash with a batch.) What I am proposing as an alternative is to have key hasher class produces an immutable tuple of <RecordBatch, Hash> so we avoid this wrong hash problem all together. So the code basically becomes: Currently: What I am proposing: In terms of performance, in both cases, we compute hash once for each input batches, so I don't see why performance would degrade. The extra cost of what I proposing is basically allocating one tuple for each input batches, maybe + some shared pointer copies per batch, both seems fairly cheap. Do you not agree that this would simplify things?
I see. I got misguided by the function name |
@@ -524,7 +524,7 @@ class KeyHasher { | |||
size_t index_; | |||
std::vector<col_index_t> indices_; | |||
std::vector<KeyColumnMetadata> metadata_; | |||
const RecordBatch* batch_; | |||
std::atomic<const RecordBatch*> batch_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the KeyHasher class thread safe now? Since with this change there are two thread using this class, we should clearly document what the thread safety model of this class is. (What happens if the hash is invalidated in the middle of processing?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the KeyHasher class thread safe now?
I think so - see below for why.
Since with this change there are two thread using this class, we should clearly document what the thread safety model of this class is.
This doc describes the supported concurrency.
What happens if the hash is invalidated in the middle of processing?
Here's what I think happens. The hashes are always computed for the record batch found at the front of the queue. The critical (and probably rare) case is when the input-receiving thread gets a record batch during the computation of hashes, whence it invalidates the key hasher and pushes the record batch to the queue. In this case, the important point is that this pushed record batch cannot be at the front of the queue because the queue is not empty and the push is to the back of the queue. This invalidation may only lead to a recomputation of hashes for the same record batch at the front of the queue.
Yes, this was my understanding of your proposal when I wrote this post, where I noted performance concerns.
There's also additional deallocations, additional memory needed, additional costs due to computing in one thread and using the results in another thread, and probably more CPU cache misses. It's conceptually simpler but I don't think we can be sure it'd be more performant. It's hard to predict performance impact of changes to complex concurrent code. Ultimately, if we want to know, there's no escaping from implementing the alternative and trying it out. It's going to take some effort to change |
Ok I think we agree that an immutable tuple design is simpler but maybe have performance implications here. For the sake of pushing this PR forward, I will try to understand your current approach.
In the case here, since the input thread doesn't not set |
This case doesn't always occur though it is indeed the common one. It occurs when the input-receiving thread invalidates the key hasher while the processing thread is not executing The other case (also explained here) is when the processing thread is executing So,
I meant a key hasher cache miss, i.e., that the condition in this if-statement evaluates to |
I am trying understand the issue pre-PR, but you seems to explaining the behavior after the code change here. Going back to my original question. Regarding your original comments on pre-PR issue behavior:
Can you explain your comment above w.r.t to my question below?
|
Correct, sorry.
Sure. See below.
In the pre-PR code, this description is correct for the first batch. For the second batch, Now, taking this a step further, if the two batches have a different address, then things seem OK because the hashes would then be recomputed. I suspect the bug is triggered when the first and second batches happen to have the same address, which we know is a race condition, leading to the use of incorrect hashes. Granted, I haven't yet nailed down the exact triggering condition in the pre-PR code. It would take some effort - let me know if you'd like this investigated. For the time being, in the current version of the PR, I preferred to find the minimal change that works and I can explain. |
If that is case, then I would argue this is not a concurrency issue, but rather, that the KeyHasher class is broken and failed to cache hash correctly - you can have the same wrong hash issue too if everything is on one thread, and therefore, I don't think moving things to another thread is the right fix. I remembered we had this issue before and I thought one of your PR fixed this issue, but I cannot seem to find the change anymore. (There was one about Allocator reusing the buffer address for the second batch on MacOS or sth) Also, are you able to confirm that the failure happened pre-PR is because the second batch has the same address as the first batch? |
I understand that if you take just the key hasher alone then the caching by address can (be driven to) fail using one thread. However, this argument is ignoring invalidation. I could make a similar argument that the KeyHasher with proper invalidation is correct and can be so even when everything is on one thread. I agree that it would be cleaner to completely remove the address checking from the key hasher, since invalidation makes this not needed, and instead just check for
Presumably, you're referring to the invalidation from the input-receiving thread. Indeed, there is an alternative where the key hasher operations are all done from one thread, which we discussed starting in this post. It may be considered better by some standard, certainly doable, yet also more complex. This is because, as discussed, Note that up until recently, I did not have access to a platform where I could reproduce the kind of failure discussed here. Now that I do, I'm more confident I could locally test changes relevant to this kind of failure.
My understanding of the timeline of relevant changes is:
Right now I don't have debug-print confirmation of this or of the opposite. It would take some effort to get - let me know if you want this done. |
@rtpsw #36580 is not exactly what I was asking for. There are still two thread reading/writing the batch_index variable which can lead to race conditions / complex threading model. For the sake of moving forward, I am OK with what is implemented in this PR - however, I would requesting adding detailed explanation of why the invalidation needs to happen on the input received thread (basically because KeyHasher class cannot detect that the top of queue batch has changed if the address of the next batch happens to be the same as the top of queue one). I would still like to simply the threading model there so that only one thread needs to interact with the KeyHasher class. But that can be left as future work. |
I've been able to reproduce errors very reliably with this patch (2516354) The patch accurately simulates what I think our real world constraints are on ordering and multi-threading:
This commit actually generates a lot of errors in many different asof join node tests. Also, unfortunately, the patch proposed here does not fix all of these errors. |
Right, this PR is the alternative I described. We could also try the alternative you described, where the hashes are paired with the batch, and check its performance, as discussed.
There is no batch index variable that is shared between threads. There is a member variable |
I'm also seeing errors with this patch. This should be very helpful, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the purpose of moving things forward I am going to approve this PR.
At minimum, this PR fixes the sporadic test failures. Although I am not a big fan of the approach here I think it is acceptable.
We can improve the code structure / simply concurrent models (e.g. using immutable <RecordBatch, hash> tuples) as follow up.
As for the potential issues @westonpace's test util reveals, we can look into it separately as well.
@westonpace Are you OK with merging this one as is? |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm fine with this as an incremental addition.
…36499) ### What changes are included in this PR? The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time. ### Are these changes tested? Yes, by existing tests. ### Are there any user-facing changes? No. **This PR contains a "Critical Fix".** * Closes: #36482 Authored-by: Yaron Gvili <rtpsw@hotmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
…Test (apache#36499) ### What changes are included in this PR? The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time. ### Are these changes tested? Yes, by existing tests. ### Are there any user-facing changes? No. **This PR contains a "Critical Fix".** * Closes: apache#36482 Authored-by: Yaron Gvili <rtpsw@hotmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 0339078. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about possible false positives for unstable benchmarks that are known to sometimes produce them. |
…Test (apache#36499) ### What changes are included in this PR? The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time. ### Are these changes tested? Yes, by existing tests. ### Are there any user-facing changes? No. **This PR contains a "Critical Fix".** * Closes: apache#36482 Authored-by: Yaron Gvili <rtpsw@hotmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
What changes are included in this PR?
The key hasher is invalidated before the first invocation of
GetKey
(viaGetLatestKey
) after a new batch arrives. In the pre-PR code, this invalidation happens withinAdvance
, which is called fromAdvanceAndMemoize
only afterGetLatestKey
is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time.Are these changes tested?
Yes, by existing tests.
Are there any user-facing changes?
No.
This PR contains a "Critical Fix".