Skip to content

Commit

Permalink
Add logs to find job restore from master active switch error
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed Jul 15, 2023
1 parent a16cd57 commit c49a449
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -167,6 +168,9 @@ public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int pipeli
}

public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) {
log.info(
"reported pipeline running stack: "
+ Arrays.toString(Thread.currentThread().getStackTrace()));
getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ private boolean turnToEndState(@NonNull ExecutionState endState) {
public boolean updateTaskState(
@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
synchronized (this) {
LOGGER.fine(
LOGGER.info(
String.format(
"Try to update the task %s state from %s to %s",
taskFullName, current, targetState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SubPlan {
private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);

/** The max num pipeline can restore. */
public static final int PIPELINE_MAX_RESTORE_NUM = 2; // TODO should set by config
public static final int PIPELINE_MAX_RESTORE_NUM = 3; // TODO should set by config

private final List<PhysicalVertex> physicalVertexList;

Expand Down Expand Up @@ -332,6 +332,9 @@ private void turnToEndState(@NonNull PipelineStatus endState) throws Exception {
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
this.currPipelineStatus = endState;
LOGGER.info(
String.format(
"%s turn to end state %s.", pipelineFullName, currPipelineStatus));
}
}

Expand Down Expand Up @@ -511,11 +514,17 @@ private void resetPipelineState() throws Exception {
LOGGER.severe(message);
throw new IllegalStateException(message);
}

LOGGER.info(
String.format(
"Reset pipeline %s state to %s",
getPipelineFullName(), PipelineStatus.CREATED));
updateStateTimestamps(PipelineStatus.CREATED);
runningJobStateIMap.set(pipelineLocation, PipelineStatus.CREATED);
this.currPipelineStatus = PipelineStatus.CREATED;
;
LOGGER.info(
String.format(
"Reset pipeline %s state to %s complete",
getPipelineFullName(), PipelineStatus.CREATED));
return null;
},
new RetryUtils.RetryMaterial(
Expand Down

0 comments on commit c49a449

Please sign in to comment.