Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36482: [C++][CI] Fix sporadic test failures in AsofJoinBasicTest #36499

Merged
merged 1 commit into from
Jul 12, 2023

Conversation

rtpsw
Copy link
Contributor

@rtpsw rtpsw commented Jul 6, 2023

What changes are included in this PR?

The key hasher is invalidated before the first invocation of GetKey (via GetLatestKey) after a new batch arrives. In the pre-PR code, this invalidation happens within Advance, which is called from AdvanceAndMemoize only after GetLatestKey is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time.

Are these changes tested?

Yes, by existing tests.

Are there any user-facing changes?

No.

This PR contains a "Critical Fix".

@rtpsw rtpsw requested a review from westonpace as a code owner July 6, 2023 09:12
@github-actions
Copy link

github-actions bot commented Jul 6, 2023

⚠️ GitHub issue #36482 has been automatically assigned in GitHub to PR creator.

@pitrou
Copy link
Member

pitrou commented Jul 6, 2023

@github-actions crossbow submit -g cpp

@github-actions
Copy link

github-actions bot commented Jul 6, 2023

Revision: fd949f8

Submitted crossbow builds: ursacomputing/crossbow @ actions-ea59d8ad24

Task Status
test-alpine-linux-cpp Github Actions
test-build-cpp-fuzz Github Actions
test-conda-cpp Github Actions
test-conda-cpp-valgrind Azure
test-cuda-cpp Github Actions
test-debian-11-cpp-amd64 Github Actions
test-debian-11-cpp-i386 Github Actions
test-fedora-35-cpp Github Actions
test-ubuntu-20.04-cpp Github Actions
test-ubuntu-20.04-cpp-bundled Github Actions
test-ubuntu-20.04-cpp-minimal-with-formats Github Actions
test-ubuntu-20.04-cpp-thread-sanitizer Github Actions
test-ubuntu-22.04-cpp Github Actions
test-ubuntu-22.04-cpp-20 Github Actions

@pitrou
Copy link
Member

pitrou commented Jul 6, 2023

@icexelloss Could you take a look at this PR?

@pitrou pitrou added this to the 13.0.0 milestone Jul 6, 2023
@pitrou pitrou added the Priority: Blocker Marks a blocker for the release label Jul 6, 2023
@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 6, 2023

@pitrou, if you take the PR-code, does it resolve the failure you observed on your platform?

@pitrou
Copy link
Member

pitrou commented Jul 6, 2023

@rtpsw It does, though of course the failure wasn't deterministic in the first place.

It also seems to fix things on CI, which is encouraging :-)

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 6, 2023

OK, crossing fingers.

@pitrou
Copy link
Member

pitrou commented Jul 6, 2023

Is KeyHasher thread-safe? AFAIU, key_hasher_ can be used from two threads at a time... GetKey calling key_hasher_->HashesFor and Push calling key_hasher_->Invalidate.

@pitrou
Copy link
Member

pitrou commented Jul 6, 2023

I also don't understand why you call Invalidate explicitly, while the KeyHasher is supposed to invalidate automatically.

However, I see that KeyHasher is storing a raw RecordBatch pointer, which is problematic. If the previous std::shared_ptr<RecordBatch> was destroyed, another std::shared_ptr<RecordBatch> could re-allocate the same underlying RecordBatch pointer, which is unlikely but not impossible.

Am I missing something here?

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 6, 2023

Is KeyHasher thread-safe? AFAIU, key_hasher_ can be used from two threads at a time... GetKey calling key_hasher_->HashesFor and Push calling key_hasher_->Invalidate.

This doc addresses thread-safety of Invalidate and HashesFor.

I also don't understand why you call Invalidate explicitly, while the KeyHasher is supposed to invalidate automatically.

Presumably, by automatic invalidation you mean this invocation of Invalidate. Its for safety - it keeps the KeyHasher invalid if any code (now or in the future) fails between that point and the clean return. As for the explicit invalidation, it's for the case you described:

I suspect that with the explicit invalidation in place, we could just have if (batch_) here and then assert that batch_ equals batch. I could look into this separately. Given past experience, I'd prefer to make small steps when dealing with as-of-join-node's concurrency.

However, I see that KeyHasher is storing a raw RecordBatch pointer, which is problematic. If the previous std::shared_ptr was destroyed, another std::shared_ptr could re-allocate the same underlying RecordBatch pointer, which is unlikely but not impossible.

The explicit invalidation was introduced to resolve exactly this case, which was observed on macOS.

@pitrou
Copy link
Member

pitrou commented Jul 6, 2023

The explicit invalidation was introduced to resolve exactly this case, which was observed on macOS.

Hmm, this could probably have been fixed more idiomatically by storing a std::weak_ptr<RecordBatch> in KeyHasher. And you wouldn't have had to worry about concurrency.

@pitrou pitrou requested a review from icexelloss July 6, 2023 15:24
@westonpace
Copy link
Member

I don't have any problems with the fix but I don't understand the bug in the first place. How was it a race? The only thread that is looking at these variables is the processing thread correct?

@westonpace
Copy link
Member

Or maybe the non-determinsitic part was the order in which the batches arrived? E.g. if all the right input batches arrived before any of the left input batches?

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 6, 2023

Hmm, this could probably have been fixed more idiomatically by storing a std::weak_ptr in KeyHasher. And you wouldn't have had to worry about concurrency.

Unfortunately, it's not that simple. The concurrency problem is not caused by an invalid/dangling pointer to a record batch that a weak pointer would have avoided. I'll try to answer both your and Westons's questions.

Some background first. The key hasher is used by the processing thread and caches hashes corresponding to a particular record batch. Therefore, the key hasher (i.e., its cache of hashes) should be invalidated when the record batch being handled by the processing thread changes. The complexity here is that this change is affected by the input-receiving thread. The post-PR code invalidates the key hasher just before this point using a thread-synchronizing operation. This ensures that the cached hashes at this point would not be used by the processing thread calling HashesFor; instead, HashesFor will compute the hashes for the new record batch the next time HashesFor would be invoked.

Given this background, we can see that the problem is not about an invalid/pointer to a record batch but about invalid cached hashes. In the pre-PR code, the failure could happen with the following order of operations:

  1. The input-receiving thread pushes a record batch to an empty queue, placing it at the front, thus making it the current record batch being processed.
  2. The processing thread invokes AdvanceAndMemoize, which invokes GetLatestKey, which invokes GetKey, which invokes HashesFor, which finds hashes that are incorrect for the above batch. This causes the problem, which is a concurrency one because it is driven by a non-deterministic order of operations.

Only thereafter, the processing thread, still in AdvanceAndMemoize, invokes Advance, which deals with the queue and could detect a new record batch in order to then invalidate the key hasher, but it is too late due to item 2 above.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 7, 2023

@rtpsw The rationale behind the hash caching / invalidation is unclear to me - if the purpose of the KeyHasher here is to compute the hash for each row in the batch, why don't we always keep a immutable tuple of <RecordBatch, std::vector<HashType>> in the input queues / input state to avoid dealing cache invalidation / mutation here? There has been multiple bugs around cache invalidation here so I wonder if we can come up with a simpler design with the immutable.
<RecordBatch, vector<HashType>> tuple.

Looks like GetKey is called:

And I don't see clearly why we cannot use the tuple in both places to avoid the cache invalidation issue.

One thing that is not clear to me is why do we need rehash (row hash for each batch is deterministic so not clear to me why we need rehash at all) , but it seems to be probably can be avoided by using the recordbatch/hash tuple as well?

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 7, 2023

@rtpsw The rationale behind the hash caching / invalidation is unclear to me - if the purpose of the KeyHasher here is to compute the hash for each row in the batch, why don't we always keep a immutable tuple of <RecordBatch, std::vector> in the input queues / input state to avoid dealing cache invalidation / mutation here?

The key hasher already encapsulates the batch and its hashes, so I don't see how wrapping the two in a tuple gains anything. As for immutability, you'd need to explain what you have in mind in more detail (or try to implement it) because it could easily degrade performance, which is a major consideration here that greatly impacts the design. The current design has all processing, including for hashes, done on the processing thread; it sounds like the proposed immutability would shift the computation of hashes to the input-receiving thread, as well as add allocation/deallocation cost for the hashes, and these could easily degrade performance.

And I don't see clearly why we cannot use the tuple in both places to avoid the cache invalidation issue.

We can and the code effectively does use the "tuple", encapsulated by the key hasher, in both these places. The main question is when and in which thread to compute the hashes. The current code's answer is: upon initial querying for the hashes (of a given batch) within the processing thread.

One thing that is not clear to me is why do we need rehash (row hash for each batch is deterministic so not clear to me why we need rehash at all) , but it seems to be probably can be avoided by using the recordbatch/hash tuple as well?

This is an orthogonal topic. Rehashing occurs only when the key is a single fixed-width (up to 64 bits, for now) column in which a null value is observed for the first time. So, no rehashing ever occurs if the key spans multiple columns or if the key is a single fixed-width column with no null values. This is a performance optimization - the fast path is when the key is a single fixed-width column without any nulls, which is a common case for which no hashes are computed, since its fixed-width values can be compared directly. The code is being optimistic - it hopes for no null values until it sees the first one.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 7, 2023

The key hasher already encapsulates the batch and its hashes, so I don't see how wrapping the two in a tuple gains anything. As for immutability, you'd need to explain what you have in mind in more detail (or try to implement it) because it could easily degrade performance, which is a major consideration here that greatly impacts the design. The current design has all processing, including for hashes, done on the processing thread; it sounds like the proposed immutability would shift the computation of hashes to the input-receiving thread, as well as add allocation/deallocation cost for the hashes, and these could easily degrade performance.

Yes the key hasher contains effectively a tuple of <RecordBatch, Hash>. However the tuple is mutable, i.e., the class needs to make sure the Hash is indeed the hash of the RecordBatch (via Invalidate) and IIUC that's where bugs happen. (Either we have a batch and doesn't have the hash, or that we associate the wrong hash with a batch.)

What I am proposing as an alternative is to have key hasher class produces an immutable tuple of <RecordBatch, Hash> so we avoid this wrong hash problem all together. So the code basically becomes:

Currently:
We have queues of RecordBatches, when hash for a RecordBatch is needed, we call key hasher to get the hash (and that's where the bug comes when we cache the wrong hash)

What I am proposing:
We have queues of immutable <RecordBatch, Hash>, the tuple of created when a batch is received (either in the input received thread or processing thread, probably simpler in the input received thread so we don't need to add extra queues). Since we only popping batches from queue in the processing thread, we should always have the hash for the batch that we want to process if we store the tuple in the queue.

In terms of performance, in both cases, we compute hash once for each input batches, so I don't see why performance would degrade. The extra cost of what I proposing is basically allocating one tuple for each input batches, maybe + some shared pointer copies per batch, both seems fairly cheap.

Do you not agree that this would simplify things?

This is an orthogonal topic. Rehashing occurs only when the key is a single fixed-width (up to 64 bits, for now) column in which a null value is observed for the first time. So, no rehashing ever occurs if the key spans multiple columns or if the key is a single fixed-width column with no null values. This is a performance optimization - the fast path is when the key is a single fixed-width column without any nulls, which is a common case for which no hashes are computed, since its fixed-width values can be compared directly. The code is being optimistic - it hopes for no null values until it sees the first one.

I see. I got misguided by the function name rehash here and it reads to me as "recomputing the hash for a batch"

@@ -524,7 +524,7 @@ class KeyHasher {
size_t index_;
std::vector<col_index_t> indices_;
std::vector<KeyColumnMetadata> metadata_;
const RecordBatch* batch_;
std::atomic<const RecordBatch*> batch_;
Copy link
Contributor

@icexelloss icexelloss Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the KeyHasher class thread safe now? Since with this change there are two thread using this class, we should clearly document what the thread safety model of this class is. (What happens if the hash is invalidated in the middle of processing?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the KeyHasher class thread safe now?

I think so - see below for why.

Since with this change there are two thread using this class, we should clearly document what the thread safety model of this class is.

This doc describes the supported concurrency.

What happens if the hash is invalidated in the middle of processing?

Here's what I think happens. The hashes are always computed for the record batch found at the front of the queue. The critical (and probably rare) case is when the input-receiving thread gets a record batch during the computation of hashes, whence it invalidates the key hasher and pushes the record batch to the queue. In this case, the important point is that this pushed record batch cannot be at the front of the queue because the queue is not empty and the push is to the back of the queue. This invalidation may only lead to a recomputation of hashes for the same record batch at the front of the queue.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Jul 7, 2023
@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 7, 2023

What I am proposing as an alternative is to have key hasher class produces an immutable tuple ...

Yes, this was my understanding of your proposal when I wrote this post, where I noted performance concerns.

The extra cost of what I proposing is basically allocating one tuple for each input batches, maybe + some shared pointer copies per batch, both seems fairly cheap.

There's also additional deallocations, additional memory needed, additional costs due to computing in one thread and using the results in another thread, and probably more CPU cache misses. It's conceptually simpler but I don't think we can be sure it'd be more performant.

It's hard to predict performance impact of changes to complex concurrent code. Ultimately, if we want to know, there's no escaping from implementing the alternative and trying it out. It's going to take some effort to change RecordBatch to a tuple in a bunch of places, so whether to go down this path is a decision, I guess for you, to make - let me know your call. Note that in this PR, I was aiming for a minimal change; in fact, it's not even new code but a reversion of part of a previous PR.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 7, 2023

What I am proposing as an alternative is to have key hasher class produces an immutable tuple ...

Yes, this was my understanding of your proposal when I wrote this post, where I noted performance concerns.

The extra cost of what I proposing is basically allocating one tuple for each input batches, maybe + some shared pointer copies per batch, both seems fairly cheap.

There's also additional deallocations, additional memory needed, additional costs due to computing in one thread and using the results in another thread, and probably more CPU cache misses. It's conceptually simpler but I don't think we can be sure it'd be more performant.

It's hard to predict performance impact of changes to complex concurrent code. Ultimately, if we want to know, there's no escaping from implementing the alternative and trying it out. It's going to take some effort to change RecordBatch to a tuple in a bunch of places, so whether to go down this path is a decision, I guess for you, to make - let me know your call. Note that in this PR, I was aiming for a minimal change; in fact, it's not even new code but a reversion of part of a previous PR.

Ok I think we agree that an immutable tuple design is simpler but maybe have performance implications here. For the sake of pushing this PR forward, I will try to understand your current approach.

Given this background, we can see that the problem is not about an invalid/pointer to a record batch but about invalid cached hashes. In the pre-PR code, the failure could happen with the following order of operations:

The input-receiving thread pushes a record batch to an empty queue, placing it at the front, thus making it the current record batch being processed.
The processing thread invokes AdvanceAndMemoize, which invokes GetLatestKey, which invokes GetKey, which invokes HashesFor, which finds hashes that are incorrect for the above batch. This causes the problem, which is a concurrency one because it is driven by a non-deterministic order of operations.

In the case here, since the input thread doesn't not set batch_ variable in side KeyHasher. I would expect when the processing thread invokes AdvanceAndMemoize, it would find the batch_ is NULLPTR and then compute the hash for the first batch. What do you mean by " finds hashes that are incorrect for the above batch" here? (Similar to what @westonpace is asking above, IIUC, the input thread doesn't not touch the KeyHasher at all so there is there an concurrency/racing issue?)

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 7, 2023

I would expect when the processing thread invokes AdvanceAndMemoize, it would find the batch_ is NULLPTR

This case doesn't always occur though it is indeed the common one. It occurs when the input-receiving thread invalidates the key hasher while the processing thread is not executing HashesFor concurrently. In this case, the invalidation sets KeyHasher::batch_ to nullptr, and the processing threads finds this nullptr in a later invocation of HashesFor.

The other case (also explained here) is when the processing thread is executing HashesFor concurrently with the input-receiving thread executing Invalidate. In this case, KeyHasher::batch_ may be set to nullptr first by the invalidation and then set to the batch passed to HashesFor, near the end of this function.

So, AdvanceAndMemoize may find either a nullptr or some batch in KeyHasher::batch_.

What do you mean by " finds hashes that are incorrect for the above batch" here?

I meant a key hasher cache miss, i.e., that the condition in this if-statement evaluates to false.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 7, 2023

This case doesn't always occur though it is indeed the common one. It occurs when the input-receiving thread invalidates the key hasher while the processing thread is not executing HashesFor concurrently. In this case, the invalidation sets KeyHasher::batch_ to nullptr, and the processing threads finds this nullptr in a later invocation of HashesFor.
The other case (also #36499 (comment)) is when the processing thread is executing HashesFor concurrently with the input-receiving thread executing Invalidate.

I am trying understand the issue pre-PR, but you seems to explaining the behavior after the code change here.

Going back to my original question. Regarding your original comments on pre-PR issue behavior:

Given this background, we can see that the problem is not about an invalid/pointer to a record batch but about invalid cached hashes. In the pre-PR code, the failure could happen with the following order of operations:

The input-receiving thread pushes a record batch to an empty queue, placing it at the front, thus making it the current record batch being processed.
The processing thread invokes AdvanceAndMemoize, which invokes GetLatestKey, which invokes GetKey, which invokes HashesFor, which finds hashes that are incorrect for the above batch. This causes the problem, which is a concurrency one because it is driven by a non-deterministic order of operations.

Can you explain your comment above w.r.t to my question below?

In the case here, since the input thread doesn't not set batch_ variable in side KeyHasher. I would expect when the processing thread invokes AdvanceAndMemoize, it would find the batch_ is NULLPTR and then compute the hash for the first batch. What do you mean by " finds hashes that are incorrect for the above batch" here? (Similar to what @westonpace is asking above, IIUC, the input thread doesn't not touch the KeyHasher at all so there is there an concurrency/racing issue?)

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 7, 2023

I am trying understand the issue pre-PR, but you seems to explaining the behavior after the code change here.

Correct, sorry.

Can you explain your comment above w.r.t to my question below?

Sure. See below.

I would expect when the processing thread invokes AdvanceAndMemoize, it would find the batch_ is NULLPTR and then compute the hash for the first batch.

In the pre-PR code, this description is correct for the first batch. For the second batch, AdvanceAndMemoize invokes GetLatestKey before it invokes Advance, which means HashesFor (invoked under GetLatestKey) finds KeyHasher::batch_ referring to the first batch instead of to the second. This is what I meant by "finds hashes that are incorrect for the above batch".

Now, taking this a step further, if the two batches have a different address, then things seem OK because the hashes would then be recomputed. I suspect the bug is triggered when the first and second batches happen to have the same address, which we know is a race condition, leading to the use of incorrect hashes. Granted, I haven't yet nailed down the exact triggering condition in the pre-PR code. It would take some effort - let me know if you'd like this investigated. For the time being, in the current version of the PR, I preferred to find the minimal change that works and I can explain.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 8, 2023

Now, taking this a step further, if the two batches have a different address, then things seem OK because the hashes would then be recomputed. I suspect the bug is triggered when the first and second batches happen to have the same address, which we know is a race condition, leading to the use of incorrect hashes. Granted, I haven't yet nailed down the exact triggering condition in the pre-PR code. It would take some effort - let me know if you'd like this investigated. For the time being, in the current version of the PR, I preferred to find the minimal change that works and I can explain.

If that is case, then I would argue this is not a concurrency issue, but rather, that the KeyHasher class is broken and failed to cache hash correctly - you can have the same wrong hash issue too if everything is on one thread, and therefore, I don't think moving things to another thread is the right fix. I remembered we had this issue before and I thought one of your PR fixed this issue, but I cannot seem to find the change anymore. (There was one about Allocator reusing the buffer address for the second batch on MacOS or sth)

Also, are you able to confirm that the failure happened pre-PR is because the second batch has the same address as the first batch?

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 8, 2023

If that is case, then I would argue this is not a concurrency issue, but rather, that the KeyHasher class is broken and failed to cache hash correctly - you can have the same wrong hash issue too if everything is on one thread

I understand that if you take just the key hasher alone then the caching by address can (be driven to) fail using one thread. However, this argument is ignoring invalidation. I could make a similar argument that the KeyHasher with proper invalidation is correct and can be so even when everything is on one thread. I agree that it would be cleaner to completely remove the address checking from the key hasher, since invalidation makes this not needed, and instead just check for nullptr (see if (batch_) here) as set by the invalidation. I did not do it in the current version of this PR because my goal was the simplest change that fixes. I would prefer to do it in a separate PR, but we could decide to do it in this one. In any case, when done it would eliminate the need for the above arguments.

therefore, I don't think moving things to another thread is the right fix.

Presumably, you're referring to the invalidation from the input-receiving thread. Indeed, there is an alternative where the key hasher operations are all done from one thread, which we discussed starting in this post. It may be considered better by some standard, certainly doable, yet also more complex. This is because, as discussed, AdvanceAndMemoize accesses the key hasher before calling Advance, which is where a new batch is handled and the key hasher can be invalidated, but for correctness we need to invalidate before accessing the key hasher. To deal with this in one thread, we'd need to detect a new batch at the front of the queue (in AdvanceAndMemoize before accessing the key hasher) as a trigger for invalidation. We can't do this by comparing batch addresses, so we'd probably need to change the queue to contain serially numbered batches (rather than just batches) and do the detection by comparing serial numbers.

Note that up until recently, I did not have access to a platform where I could reproduce the kind of failure discussed here. Now that I do, I'm more confident I could locally test changes relevant to this kind of failure.

I remembered we had this issue before and I thought one of your PR fixed this issue, but I cannot seem to find the change anymore. (There was one about Allocator reusing the buffer address for the second batch on MacOS or sth)

My understanding of the timeline of relevant changes is:

  1. GH-34391: [C++] Future as-of-join-node hangs on distant times #34392 introduced invalidation, which was originally done from the input-receiving thread. IIRC, this is also the PR where we first observed an infrequent failure on macOS and fixed it.
  2. GH-36092: [C++] Simplify concurrency in as-of-join node #36094 wished to simplify things by moving operations to one thread, but ended up introducing the bug we're dealing with here.
  3. GH-36482: [C++][CI] Fix sporadic test failures in AsofJoinBasicTest #36499 (the current PR) reverts the moving of the key hasher invalidation.

Also, are you able to confirm that the failure happened pre-PR is because the second batch has the same address as the first batch?

Right now I don't have debug-print confirmation of this or of the opposite. It would take some effort to get - let me know if you want this done.

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 9, 2023

I created #36580 as an alternative solution described here. It passed the internal testing and I'm waiting to see if it passes CI. After that, we could measure performance and decide whether to go with this or that PR.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 10, 2023

@rtpsw #36580 is not exactly what I was asking for. There are still two thread reading/writing the batch_index variable which can lead to race conditions / complex threading model.

For the sake of moving forward, I am OK with what is implemented in this PR - however, I would requesting adding detailed explanation of why the invalidation needs to happen on the input received thread (basically because KeyHasher class cannot detect that the top of queue batch has changed if the address of the next batch happens to be the same as the top of queue one).

I would still like to simply the threading model there so that only one thread needs to interact with the KeyHasher class. But that can be left as future work.

@westonpace
Copy link
Member

I've been able to reproduce errors very reliably with this patch (2516354)

The patch accurately simulates what I think our real world constraints are on ordering and multi-threading:

  • Each input may deliver its next batch at any time (e.g. maybe input 2 batch 0 comes first and maybe input 1 batch 0 comes first)
  • Within in an input we know that batches will be ordered. (e.g. input 2 batch 5 will always come after input 2 batch 4)
  • A single input will not call InputReceived reentrantly (e.g. a call to input 2 batch 4 InputReceived will finish before the call to input 2 batch 5 InputReceived begins)
  • There are no guarantees across inputs (e.g. there is no guarantee input 3 will deliver batch 4 before input 2 delivers batch 100)

This commit actually generates a lot of errors in many different asof join node tests. Also, unfortunately, the patch proposed here does not fix all of these errors.

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 11, 2023

@rtpsw #36580 is not exactly what I was asking for.

Right, this PR is the alternative I described. We could also try the alternative you described, where the hashes are paired with the batch, and check its performance, as discussed.

There are still two thread reading/writing the batch_index variable which can lead to race conditions / complex threading model.

There is no batch index variable that is shared between threads. There is a member variable InputState::batch_index_ that is accessed exclusively by the input-receiving thread and used to number the incoming record batches, and there is a separate value NumberedRecordBatch::index that (along with the record batch it numbers) is transferred via the queue to the processing thread, where it is used and set to a local variable batch_index.

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 11, 2023

've been able to reproduce errors very reliably with this patch (2516354)

I'm also seeing errors with this patch. This should be very helpful, thanks!

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the purpose of moving things forward I am going to approve this PR.

At minimum, this PR fixes the sporadic test failures. Although I am not a big fan of the approach here I think it is acceptable.

We can improve the code structure / simply concurrent models (e.g. using immutable <RecordBatch, hash> tuples) as follow up.

As for the potential issues @westonpace's test util reveals, we can look into it separately as well.

@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting changes Awaiting changes labels Jul 12, 2023
@icexelloss
Copy link
Contributor

@westonpace Are you OK with merging this one as is?

@rtpsw
Copy link
Contributor Author

rtpsw commented Jul 12, 2023

As for the potential issues @westonpace's test util reveals, we can look into it separately as well.

#36651

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm fine with this as an incremental addition.

@westonpace westonpace merged commit 0339078 into apache:main Jul 12, 2023
@westonpace westonpace removed the awaiting merge Awaiting merge label Jul 12, 2023
@rtpsw rtpsw deleted the GH-36482 branch July 13, 2023 06:54
raulcd pushed a commit that referenced this pull request Jul 13, 2023
…36499)

### What changes are included in this PR?

The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time.

### Are these changes tested?

Yes, by existing tests.

### Are there any user-facing changes?

No.

**This PR contains a "Critical Fix".**
* Closes: #36482

Authored-by: Yaron Gvili <rtpsw@hotmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
chelseajonesr pushed a commit to chelseajonesr/arrow that referenced this pull request Jul 20, 2023
…Test (apache#36499)

### What changes are included in this PR?

The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time.

### Are these changes tested?

Yes, by existing tests.

### Are there any user-facing changes?

No.

**This PR contains a "Critical Fix".**
* Closes: apache#36482

Authored-by: Yaron Gvili <rtpsw@hotmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@conbench-apache-arrow
Copy link

After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 0339078.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about possible false positives for unstable benchmarks that are known to sometimes produce them.

R-JunmingChen pushed a commit to R-JunmingChen/arrow that referenced this pull request Aug 20, 2023
…Test (apache#36499)

### What changes are included in this PR?

The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time.

### Are these changes tested?

Yes, by existing tests.

### Are there any user-facing changes?

No.

**This PR contains a "Critical Fix".**
* Closes: apache#36482

Authored-by: Yaron Gvili <rtpsw@hotmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++][CI] Sporadic test failures in AsofJoinBasicTest
5 participants