Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1771] Bring forward PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA #2989

Closed
wants to merge 7 commits into from

Conversation

zaynt4606
Copy link
Contributor

@zaynt4606 zaynt4606 commented Dec 10, 2024

What changes were proposed in this pull request?

1.Move the peerWorker available judgement out of ThreadPool.
2.Move retain after the available worker judgment Which means we don't have to release if peerWorker is unavailable.
2. Add fileWriter.decrementPendingWrites() if peerWorker is unavailable since it will return and won't decrementPendingWreites in writeLocalData.

Why are the changes needed?

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing UT & cluster testing.

@zaynt4606 zaynt4606 changed the title [CELEBORN-1771] Fix async reference count error in PushDatahandler [CELEBORN-1771] Bring forward PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA Dec 12, 2024
@@ -719,7 +720,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
logError(
s"PushMergedData replication failed during connecting peer for partitionLocation: $location",
e)
pushMergedDataCallback.onFailure(
wrappedCallback.onFailure(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should keep pushMergedDataCallback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done~

pushData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
val peer = location.getPeer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also check unavailablePeers when run replicate task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done~

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except a nit

peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
for (fileWriterIndex <- 0 until totalFileWriters) {
if (fileWriters(fileWriterIndex) != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileWriters(fileWriterIndex) -> fileWriter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done~

if (fileWriter != null && !pushMergedDataCallback.isHardSplitPartition(fileWriterIndex)) {
fileWriter.decrementPendingWrites()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: I am trying to understand why this is different here vs within the Runnable below ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce the number of jobs submitted to threadpool when peerWorker is unavailable.

Copy link
Contributor Author

@zaynt4606 zaynt4606 Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileWriter.decrementPendingWrites() here is to make up the operation in writeLocalData after the Runnable which fileWriter.incrementPendingWrites() before.

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, merge to main(v0.6.0)

@RexXiong RexXiong closed this in 2eb4c23 Dec 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants