diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java index a6861b210ba..3070ff9e744 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.server.checkpoint; +import static com.google.common.base.Preconditions.checkNotNull; + import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter; @@ -55,9 +57,9 @@ public CompletableFuture shutdown(PipelineStatus pipelineStatus) { @Override public long getAndIncrement() throws Exception { - Long currentId = checkpointIdMap.get(pipelineId); - checkpointIdMap.put(pipelineId, currentId + 1); - return currentId; + Long nextId = checkpointIdMap.compute(pipelineId, (k, v) -> v == null ? null : v + 1); + checkNotNull(nextId); + return nextId - 1; } @Override