-
Notifications
You must be signed in to change notification settings - Fork 364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1771] Bring forward PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA #2989
Conversation
@@ -719,7 +720,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler | |||
logError( | |||
s"PushMergedData replication failed during connecting peer for partitionLocation: $location", | |||
e) | |||
pushMergedDataCallback.onFailure( | |||
wrappedCallback.onFailure( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should keep pushMergedDataCallback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done~
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
Outdated
Show resolved
Hide resolved
pushData.body().retain() | ||
replicateThreadPool.submit(new Runnable { | ||
override def run(): Unit = { | ||
val peer = location.getPeer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also check unavailablePeers when run replicate task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, except a nit
peer.getReplicatePort) | ||
if (unavailablePeers.containsKey(peerWorker)) { | ||
for (fileWriterIndex <- 0 until totalFileWriters) { | ||
if (fileWriters(fileWriterIndex) != null && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fileWriters(fileWriterIndex) -> fileWriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done~
if (fileWriter != null && !pushMergedDataCallback.isHardSplitPartition(fileWriterIndex)) { | ||
fileWriter.decrementPendingWrites() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: I am trying to understand why this is different here vs within the Runnable
below ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce the number of jobs submitted to threadpool when peerWorker is unavailable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fileWriter.decrementPendingWrites()
here is to make up the operation in writeLocalData
after the Runnable
which fileWriter.incrementPendingWrites()
before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, merge to main(v0.6.0)
What changes were proposed in this pull request?
1.Move the peerWorker available judgement out of ThreadPool.
2.Move
retain
after the available worker judgment Which means we don't have to release if peerWorker is unavailable.2. Add
fileWriter.decrementPendingWrites()
if peerWorker is unavailable since it will return and won't decrementPendingWreites inwriteLocalData
.Why are the changes needed?
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing UT & cluster testing.