Skip to content

Commit

Permalink
[CELEBORN-1059] Fix callback not update if push worker excluded durin…
Browse files Browse the repository at this point in the history
…g retry

### What changes were proposed in this pull request?
When retry push data and revive succeed in ShuffleClientImpl#submitRetryPushData, if new location is excluded, the callback's `lastest` location has not been updated when wrappedCallback.onFailure is called in ShuffleClientImpl#isPushTargetWorkerExcluded. Therefore there may be problems with subsequent revive.

### Why are the changes needed?
Ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test.

Closes #2005 from onebox-li/improve-push-exclude.

Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
onebox-li authored and waitinfuture committed Nov 1, 2023
1 parent 320714b commit cd8acf8
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ private void submitRetryPushData(
partitionId,
batchId,
newLoc);
pushDataRpcResponseCallback.updateLatestPartition(newLoc);
try {
if (!isPushTargetWorkerExcluded(newLoc, pushDataRpcResponseCallback)) {
if (!testRetryRevive || remainReviveTimes < 1) {
Expand All @@ -281,7 +282,6 @@ private void submitRetryPushData(
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
PushData newPushData =
new PushData(PRIMARY_MODE, shuffleKey, newLoc.getUniqueId(), newBuffer);
pushDataRpcResponseCallback.updateLatestPartition(newLoc);
client.pushData(newPushData, pushDataTimeout, pushDataRpcResponseCallback);
} else {
throw new RuntimeException(
Expand Down Expand Up @@ -633,18 +633,17 @@ boolean newerPartitionLocationExists(

void excludeWorkerByCause(StatusCode cause, PartitionLocation oldLocation) {
if (pushExcludeWorkerOnFailureEnabled && oldLocation != null) {
if (cause == StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY) {
pushExcludedWorkers.add(oldLocation.hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY) {
pushExcludedWorkers.add(oldLocation.hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_TIMEOUT_PRIMARY) {
pushExcludedWorkers.add(oldLocation.hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA) {
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA) {
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_TIMEOUT_REPLICA) {
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
switch (cause) {
case PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY:
case PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY:
case PUSH_DATA_TIMEOUT_PRIMARY:
pushExcludedWorkers.add(oldLocation.hostAndPushPort());
break;
case PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA:
case PUSH_DATA_CONNECTION_EXCEPTION_REPLICA:
case PUSH_DATA_TIMEOUT_REPLICA:
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
break;
}
}
}
Expand Down Expand Up @@ -905,10 +904,10 @@ public void onFailure(Throwable e) {
PartitionLocation latest = loc;

@Override
public void updateLatestPartition(PartitionLocation latest) {
pushState.addBatch(nextBatchId, latest.hostAndPushPort());
public void updateLatestPartition(PartitionLocation newloc) {
pushState.addBatch(nextBatchId, newloc.hostAndPushPort());
pushState.removeBatch(nextBatchId, this.latest.hostAndPushPort());
this.latest = latest;
this.latest = newloc;
}

@Override
Expand Down Expand Up @@ -1003,12 +1002,10 @@ public void onSuccess(ByteBuffer response) {

@Override
public void onFailure(Throwable e) {
StatusCode cause = getPushDataFailCause(e.getMessage());

if (pushState.exception.get() != null) {
return;
}

StatusCode cause = getPushDataFailCause(e.getMessage());
if (remainReviveTimes <= 0) {
if (e instanceof CelebornIOException) {
callback.onFailure(e);
Expand Down Expand Up @@ -1383,11 +1380,10 @@ public void onSuccess(ByteBuffer response) {

@Override
public void onFailure(Throwable e) {
StatusCode cause = getPushDataFailCause(e.getMessage());

if (pushState.exception.get() != null) {
return;
}
StatusCode cause = getPushDataFailCause(e.getMessage());
if (remainReviveTimes <= 0) {
if (e instanceof CelebornIOException) {
callback.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class ChangePartitionManager(

private var batchHandleChangePartition: Option[ScheduledFuture[_]] = _

private val testRetryRevive = conf.testRetryRevive

def start(): Unit = {
batchHandleChangePartition = batchHandleChangePartitionSchedulerThread.map {
// noinspection ConvertExpressionToSAM
Expand Down Expand Up @@ -204,7 +206,7 @@ class ChangePartitionManager(
logWarning(s"Batch handle change partition for $changes")

// Exclude all failed workers
if (changePartitions.exists(_.causes.isDefined)) {
if (changePartitions.exists(_.causes.isDefined) && !testRetryRevive) {
changePartitions.filter(_.causes.isDefined).foreach { changePartition =>
lifecycleManager.workerStatusTracker.excludeWorkerFromPartition(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RetryReviveTest extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.getOrCreate()
val result = ss.sparkContext.parallelize(1 to 1000, 2)
.map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
.map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(4).collect()
assert(result.size == 1000)
ss.stop()
}
Expand Down

0 comments on commit cd8acf8

Please sign in to comment.