From 8cc3c0f9b1cd2b983ef9cc0427a3e9bdedd78905 Mon Sep 17 00:00:00 2001 From: wu-a-ge Date: Tue, 11 Jul 2023 22:24:53 +0800 Subject: [PATCH] [HotFix][Zeta] fix after the savepoint job is restored, the checkpoint file cannot be generated #4985 (#5051) * fix after the savepoint job is restored, the checkpoint file cannot be generated --- .../server/checkpoint/CheckpointCoordinator.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 3ae2658509e..9e0ef2a53ab 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -183,8 +183,19 @@ public CheckpointCoordinator( this.checkpointIdCounter = checkpointIdCounter; this.readyToCloseStartingTask = new CopyOnWriteArraySet<>(); if (pipelineState != null) { - this.latestCompletedCheckpoint = + // fix after the savepoint job is restored, the checkpoint file cannot be generated + CompletedCheckpoint tmpCheckpoint = serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class); + this.latestCompletedCheckpoint = + new CompletedCheckpoint( + tmpCheckpoint.getJobId(), + tmpCheckpoint.getPipelineId(), + tmpCheckpoint.getCheckpointId(), + tmpCheckpoint.getCheckpointTimestamp(), + CheckpointType.CHECKPOINT_TYPE, + tmpCheckpoint.getCompletedTimestamp(), + tmpCheckpoint.getTaskStates(), + tmpCheckpoint.getTaskStatistics()); } this.checkpointCoordinatorFuture = new CompletableFuture();