From 83995655deef05f13e48e427cc12e38fbdd393d1 Mon Sep 17 00:00:00 2001 From: liuli Date: Thu, 14 Sep 2023 19:51:06 +0800 Subject: [PATCH] fix some checkpoint err --- .../server/checkpoint/CheckpointCoordinator.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 222f60a5cb50..cf487a8484ac 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -406,24 +406,25 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) { shutdown)); return; } - if (checkpointType.isFinalCheckpoint() || checkpointType.isSchemaChangeCheckpoint()) { - if (pendingCounter.get() > 0) { - scheduleTriggerPendingCheckpoint(checkpointType, 500L); - return; - } - } if (schemaChanging.get() && checkpointType.isGeneralCheckpoint()) { LOG.info("skip trigger generic-checkpoint because schema change in progress"); return; } + if (pendingCounter.get() > 0) { + scheduleTriggerPendingCheckpoint(checkpointType, 500L); + LOG.info( + "skip trigger checkpoint because there is already a pending checkpoint."); + return; + } + CompletableFuture pendingCheckpoint = createPendingCheckpoint(currentTimestamp, checkpointType); startTriggerPendingCheckpoint(pendingCheckpoint); pendingCounter.incrementAndGet(); // if checkpoint type are final type, we don't need to trigger next checkpoint - if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) { + if (checkpointType.notFinalCheckpoint()) { scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval()); } }