From 5e68b5a1d7108c4ffc8b5ed6c05a25d164861c8a Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sat, 22 Jul 2023 19:10:39 +0800 Subject: [PATCH] [Improve] Improve CheckpointCoordinator notify complete when restore --- .../checkpoint/CheckpointCloseReason.java | 3 ++- .../checkpoint/CheckpointCoordinator.java | 26 ++++++++++++++++--- .../server/checkpoint/CheckpointManager.java | 4 +-- .../engine/server/master/JobMaster.java | 5 +++- 4 files changed, 30 insertions(+), 8 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java index ae1af4d41fa1..9f35f62fd608 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java @@ -26,7 +26,8 @@ public enum CheckpointCloseReason { CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."), CHECKPOINT_INSIDE_ERROR("CheckpointCoordinator inside have error."), AGGREGATE_COMMIT_ERROR("Aggregate commit error."), - TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error"); + TASK_NOT_ALL_READY_WHEN_SAVEPOINT("Task not all ready, savepoint error"), + CHECKPOINT_NOTIFY_COMPLETE_FAILED("Checkpoint notify complete failed"); private final String message; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 9e0ef2a53ab1..44d6f07df244 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -272,7 +272,8 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) { checkpointCoordinatorFuture.complete( new CheckpointCoordinatorState( CheckpointCoordinatorStatus.FAILED, errorByPhysicalVertex.get())); - checkpointManager.handleCheckpointError(pipelineId); + checkpointManager.handleCheckpointError( + pipelineId, reason.equals(CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED)); } private void restoreTaskState(TaskLocation taskLocation) { @@ -316,9 +317,26 @@ private void allTaskReady() { isAllTaskReady = true; InvocationFuture[] futures = notifyTaskStart(); CompletableFuture.allOf(futures).join(); + notifyCompleted(latestCompletedCheckpoint); scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval()); } + private void notifyCompleted(CompletedCheckpoint completedCheckpoint) { + if (completedCheckpoint != null) { + try { + LOG.info("start notify checkpoint completed, checkpoint:{}", completedCheckpoint); + InvocationFuture[] invocationFutures = + notifyCheckpointCompleted(completedCheckpoint.getCheckpointId()); + CompletableFuture.allOf(invocationFutures).join(); + } catch (Throwable e) { + handleCoordinatorError( + "notify checkpoint completed failed", + e, + CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED); + } + } + } + public InvocationFuture[] notifyTaskStart() { return plan.getPipelineSubtasks().stream() .map(NotifyTaskStartOperation::new) @@ -358,6 +376,7 @@ protected void restoreCoordinator(boolean alreadyStarted) { shutdown = false; if (alreadyStarted) { isAllTaskReady = true; + notifyCompleted(latestCompletedCheckpoint); tryTriggerPendingCheckpoint(CHECKPOINT_TYPE); } else { isAllTaskReady = false; @@ -719,10 +738,9 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId()); - InvocationFuture[] invocationFutures = notifyCheckpointCompleted(checkpointId); - CompletableFuture.allOf(invocationFutures).join(); - // TODO: notifyCheckpointCompleted fail + latestCompletedCheckpoint = completedCheckpoint; + notifyCompleted(completedCheckpoint); if (isCompleted()) { cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED); if (latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index f34ae2f6a0ad..0c5a91698e7b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -174,8 +174,8 @@ public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) { getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted); } - protected void handleCheckpointError(int pipelineId) { - jobMaster.handleCheckpointError(pipelineId); + protected void handleCheckpointError(int pipelineId, boolean neverRestore) { + jobMaster.handleCheckpointError(pipelineId, neverRestore); } private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 13b89a69dd94..11cc5f21b0b5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -325,7 +325,10 @@ public void run() { } } - public void handleCheckpointError(long pipelineId) { + public void handleCheckpointError(long pipelineId, boolean neverRestore) { + if (neverRestore) { + this.neverNeedRestore(); + } this.physicalPlan .getPipelineList() .forEach(