Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert the delaying node to a gated node to demonstrate my original idea #4

Merged
merged 2 commits into from
Jun 14, 2023

Conversation

westonpace
Copy link

As I mentioned in the PR, feel free to take this idea or leave it.

@github-actions
Copy link

github-actions bot commented Jun 7, 2023

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename the pull request title in the following format?

GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

In the case of PARQUET issues on JIRA the title also supports:

PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

See also:

Comment on lines 1471 to 1472
static constexpr auto kKindName = "BackpressureDelayingNode";
static constexpr const char* kFactoryName = "backpressure_delay";
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change these constants to "GatedNode" and "gated".

@@ -1490,6 +1578,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
std::string name() const { return name_prefix + ";" + (is_fast ? "fast" : "slow"); }
};

Gate gate;
GatedNodeOptions gate_options(&gate);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see gate_options is used anywhere. @westonpace, are you sure the tester is doing what you intended?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I must be missing something or getting lucky with timing

@rtpsw
Copy link
Owner

rtpsw commented Jun 13, 2023

Overall, the approach looks good. However, see my comment - something seems to be missing.

@rtpsw
Copy link
Owner

rtpsw commented Jun 13, 2023

The conflict-resolved version I just committed includes a fix that uses the gated node in front of one of the source nodes, which I believe was the intention. This fix allowed the backpressure tests to pass.

@rtpsw
Copy link
Owner

rtpsw commented Jun 13, 2023

A local debug-run of this version caught something:

[ RUN      ] AsofJoinTest.BackpressureWithBatchesGen
/mnt/user1/tscontract/github/rtpsw/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:1636: Failure
Value of: _fut77.Wait(::arrow::kDefaultAssertFinishesWaitSeconds)
  Actual: false
Expected: true
[  FAILED  ] AsofJoinTest.BackpressureWithBatchesGen (64008 ms)

@rtpsw
Copy link
Owner

rtpsw commented Jun 13, 2023

@westonpace, do you have an idea about the cause of the failure? If not, we may be better off using the existing approach in apache#35874 at least for the time being.

@icexelloss
Copy link
Collaborator

@rtpsw Do we know why the test fails with the gated version - This might be uncovering some bug?

@rtpsw
Copy link
Owner

rtpsw commented Jun 13, 2023

@rtpsw Do we know why the test fails with the gated version - This might be uncovering some bug?

We do not know at the moment. It's not quick to determine, and for now I haven't made an attempt to debug more deeply.

@westonpace
Copy link
Author

I'll take a look at this now

@westonpace
Copy link
Author

I'm hitting this deadlock reliably now too. It appears that the process thread is exiting partway even though there is still data to process.

@westonpace
Copy link
Author

Actually, I take that back. It doesn't seem to be process thread related. The two ungated inputs pause (as expected) and then the test immediately releases the gate. The gated node then sends a big flood of batches and gets paused. Then the gated node isn't being unpaused for some reason.

@westonpace
Copy link
Author

Ah :) The gated node is not maintaining order. I will fix this.

@westonpace
Copy link
Author

@rtpsw @icexelloss

I believe I've solved the problem in the gated version. There were no issues with the asof join node. The only problems were:

  • I wasn't applying the gated node at all (as Yaron noticed)
  • The BusyWait could incorrectly let the test pass because it wasn't re-verifying the counters after
  • The gated node would allow batches to get sent out of order

@icexelloss
Copy link
Collaborator

@rtpsw @icexelloss

I believe I've solved the problem in the gated version. There were no issues with the asof join node. The only problems were:

* I wasn't applying the gated node at all (as Yaron noticed)

* The BusyWait could incorrectly let the test pass because it wasn't re-verifying the counters after

* The gated node would allow batches to get sent out of order

Sweet. Thanks Weston!

bool callback_added = maybe_unlocked.TryAddCallback([this] {
return [this](const Status& st) {
DCHECK_OK(st);
plan_->query_context()->ScheduleTask(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of a (not so related) question I had. In asof join node, should we call

output_->InputReceived(this, std::move(out_b)) within ScheduleTask (since this is on the processing thread)

https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#LL1356C21-L1356C67

And in general, when to call InputReceived directly vs calling InputReceived within a ScheduleTask?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of a (not so related) question I had. In asof join node, should we call

output_->InputReceived(this, std::move(out_b)) within ScheduleTask (since this is on the processing thread)

Yes, I noticed that as well. You probably should. That being said, it probably won't make too much difference since you're running everything single threaded anyways. However, even that isn't really true. Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream. If you use schedule task then you'd just be transferring more work to the other thread so it might actually hurt.

So I was going to wait and make this suggestion when / if you add multithreading support to asof-join.

And in general, when to call InputReceived directly vs calling InputReceived within a ScheduleTask?

You should call it directly if you are going to keep processing the data that is in the current thread's CPU cache. You should schedule a new task if you are going to start processing a batch of data that isn't in the current thread's CPU cache.

This case fits the second condition. However, this case is also a special case anyways. This is because the thread that executes that callback will be the unit test thread (as part of the call to ReleaseAllBatches). We definitely want to transfer to the scheduler anytime we are coming from "outside the exec plan".

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream.

A bit more accurately, this "thread-splitting" occurs for each as-of-join node in the plan. We could create an issue to replace the internal-as-of-join-thread with some kind of execution facility.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. Any plan with an asof join node today effectively becomes a "two-threaded" program. There is one thread for everything leading up to the asof join node and one thread for the processing and everything downstream.

Yeah that has been something that I have slight concern over. I don't love the fact that downstream works, e.g., projections happens outside the serial execution thread. I was trying to get to the model where the asof join processing thread pulls data from the (buffered) asof join input queues, do the join and send the output to the downstream via the scheduler. So from downstream node point of view, the asof join processing thread is transparent to any other node and purely something internal to the asof join node.

It doesn't really bring any performance benefit I think (like you mentioned, it might actually make it run slower because more work is shifted to the scheduler thread), but I do like the simpler execution model of it (processing thread being transparent). And I think if we want to do that, we can do this by calling output_->InputReceived(this, std::move(out_b)) with ScheduleTask. (Yaron is working on some internal benchmark suite so once that is ready we can play with this more)

@rtpsw
Copy link
Owner

rtpsw commented Jun 14, 2023

CI jobs failures seem unrelated.

@rtpsw rtpsw merged commit 4ecd7ed into rtpsw:GH-35838 Jun 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants