Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1059] Fix callback not update if push worker excluded during retry #2005

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ private void submitRetryPushData(
partitionId,
batchId,
newLoc);
pushDataRpcResponseCallback.updateLatestPartition(newLoc);
try {
if (!isPushTargetWorkerExcluded(newLoc, pushDataRpcResponseCallback)) {
if (!testRetryRevive || remainReviveTimes < 1) {
Expand All @@ -281,7 +282,6 @@ private void submitRetryPushData(
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
PushData newPushData =
new PushData(PRIMARY_MODE, shuffleKey, newLoc.getUniqueId(), newBuffer);
pushDataRpcResponseCallback.updateLatestPartition(newLoc);
client.pushData(newPushData, pushDataTimeout, pushDataRpcResponseCallback);
} else {
throw new RuntimeException(
Expand Down Expand Up @@ -633,18 +633,17 @@ boolean newerPartitionLocationExists(

void excludeWorkerByCause(StatusCode cause, PartitionLocation oldLocation) {
if (pushExcludeWorkerOnFailureEnabled && oldLocation != null) {
if (cause == StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY) {
pushExcludedWorkers.add(oldLocation.hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY) {
pushExcludedWorkers.add(oldLocation.hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_TIMEOUT_PRIMARY) {
pushExcludedWorkers.add(oldLocation.hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA) {
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_REPLICA) {
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
} else if (cause == StatusCode.PUSH_DATA_TIMEOUT_REPLICA) {
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
switch (cause) {
case PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY:
case PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY:
case PUSH_DATA_TIMEOUT_PRIMARY:
pushExcludedWorkers.add(oldLocation.hostAndPushPort());
break;
case PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA:
case PUSH_DATA_CONNECTION_EXCEPTION_REPLICA:
case PUSH_DATA_TIMEOUT_REPLICA:
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
break;
}
}
}
Expand Down Expand Up @@ -905,10 +904,10 @@ public void onFailure(Throwable e) {
PartitionLocation latest = loc;

@Override
public void updateLatestPartition(PartitionLocation latest) {
pushState.addBatch(nextBatchId, latest.hostAndPushPort());
public void updateLatestPartition(PartitionLocation newloc) {
pushState.addBatch(nextBatchId, newloc.hostAndPushPort());
pushState.removeBatch(nextBatchId, this.latest.hostAndPushPort());
this.latest = latest;
this.latest = newloc;
}

@Override
Expand Down Expand Up @@ -1003,12 +1002,10 @@ public void onSuccess(ByteBuffer response) {

@Override
public void onFailure(Throwable e) {
StatusCode cause = getPushDataFailCause(e.getMessage());

if (pushState.exception.get() != null) {
return;
}

StatusCode cause = getPushDataFailCause(e.getMessage());
if (remainReviveTimes <= 0) {
if (e instanceof CelebornIOException) {
callback.onFailure(e);
Expand Down Expand Up @@ -1383,11 +1380,10 @@ public void onSuccess(ByteBuffer response) {

@Override
public void onFailure(Throwable e) {
StatusCode cause = getPushDataFailCause(e.getMessage());

if (pushState.exception.get() != null) {
return;
}
StatusCode cause = getPushDataFailCause(e.getMessage());
if (remainReviveTimes <= 0) {
if (e instanceof CelebornIOException) {
callback.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class ChangePartitionManager(

private var batchHandleChangePartition: Option[ScheduledFuture[_]] = _

private val testRetryRevive = conf.testRetryRevive

def start(): Unit = {
batchHandleChangePartition = batchHandleChangePartitionSchedulerThread.map {
// noinspection ConvertExpressionToSAM
Expand Down Expand Up @@ -204,7 +206,7 @@ class ChangePartitionManager(
logWarning(s"Batch handle change partition for $changes")

// Exclude all failed workers
if (changePartitions.exists(_.causes.isDefined)) {
if (changePartitions.exists(_.causes.isDefined) && !testRetryRevive) {
changePartitions.filter(_.causes.isDefined).foreach { changePartition =>
lifecycleManager.workerStatusTracker.excludeWorkerFromPartition(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RetryReviveTest extends AnyFunSuite
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.getOrCreate()
val result = ss.sparkContext.parallelize(1 to 1000, 2)
.map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
.map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(4).collect()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here changing the groupByKey param is to decrease some logs produced by the UT. It doesn't matters.

assert(result.size == 1000)
ss.stop()
}
Expand Down