-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert the delaying node to a gated node to demonstrate my original idea #4
Conversation
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format?
or
In the case of PARQUET issues on JIRA the title also supports:
See also: |
static constexpr auto kKindName = "BackpressureDelayingNode"; | ||
static constexpr const char* kFactoryName = "backpressure_delay"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change these constants to "GatedNode" and "gated".
@@ -1490,6 +1578,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size, | |||
std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); } | |||
}; | |||
|
|||
Gate gate; | |||
GatedNodeOptions gate_options(&gate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see gate_options
is used anywhere. @westonpace, are you sure the tester is doing what you intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I must be missing something or getting lucky with timing
Overall, the approach looks good. However, see my comment - something seems to be missing. |
The conflict-resolved version I just committed includes a fix that uses the gated node in front of one of the source nodes, which I believe was the intention. This fix allowed the backpressure tests to pass. |
A local debug-run of this version caught something:
|
@westonpace, do you have an idea about the cause of the failure? If not, we may be better off using the existing approach in apache#35874 at least for the time being. |
@rtpsw Do we know why the test fails with the gated version - This might be uncovering some bug? |
We do not know at the moment. It's not quick to determine, and for now I haven't made an attempt to debug more deeply. |
I'll take a look at this now |
I'm hitting this deadlock reliably now too. It appears that the process thread is exiting partway even though there is still data to process. |
Actually, I take that back. It doesn't seem to be process thread related. The two ungated inputs pause (as expected) and then the test immediately releases the gate. The gated node then sends a big flood of batches and gets paused. Then the gated node isn't being unpaused for some reason. |
Ah :) The gated node is not maintaining order. I will fix this. |
…at it delivers batches in order.
I believe I've solved the problem in the gated version. There were no issues with the asof join node. The only problems were:
|
Sweet. Thanks Weston! |
bool callback_added = maybe_unlocked.TryAddCallback([this] { | ||
return [this](const Status& st) { | ||
DCHECK_OK(st); | ||
plan_->query_context()->ScheduleTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me of a (not so related) question I had. In asof join node, should we call
output_->InputReceived(this, std::move(out_b))
within ScheduleTask
(since this is on the processing thread)
https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#LL1356C21-L1356C67
And in general, when to call InputReceived
directly vs calling InputReceived
within a ScheduleTask?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me of a (not so related) question I had. In asof join node, should we call
output_->InputReceived(this, std::move(out_b)) within ScheduleTask (since this is on the processing thread)
Yes, I noticed that as well. You probably should. That being said, it probably won't make too much difference since you're running everything single threaded anyways. However, even that isn't really true. Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream. If you use schedule task then you'd just be transferring more work to the other thread so it might actually hurt.
So I was going to wait and make this suggestion when / if you add multithreading support to asof-join.
And in general, when to call InputReceived directly vs calling InputReceived within a ScheduleTask?
You should call it directly if you are going to keep processing the data that is in the current thread's CPU cache. You should schedule a new task if you are going to start processing a batch of data that isn't in the current thread's CPU cache.
This case fits the second condition. However, this case is also a special case anyways. This is because the thread that executes that callback will be the unit test thread (as part of the call to ReleaseAllBatches
). We definitely want to transfer to the scheduler anytime we are coming from "outside the exec plan".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream.
A bit more accurately, this "thread-splitting" occurs for each as-of-join node in the plan. We could create an issue to replace the internal-as-of-join-thread with some kind of execution facility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
. Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream.
Yeah that has been something that I have slight concern over. I don't love the fact that downstream works, e.g., projections happens outside the serial execution thread. I was trying to get to the model where the asof join processing thread pulls data from the (buffered) asof join input queues, do the join and send the output to the downstream via the scheduler. So from downstream node point of view, the asof join processing thread is transparent to any other node and purely something internal to the asof join node.
It doesn't really bring any performance benefit I think (like you mentioned, it might actually make it run slower because more work is shifted to the scheduler thread), but I do like the simpler execution model of it (processing thread being transparent). And I think if we want to do that, we can do this by calling output_->InputReceived(this, std::move(out_b))
with ScheduleTask
. (Yaron is working on some internal benchmark suite so once that is ready we can play with this more)
CI jobs failures seem unrelated. |
As I mentioned in the PR, feel free to take this idea or leave it.