From e9f3229dde5174be77e8b678c066b6eafc5d33be Mon Sep 17 00:00:00 2001 From: Hisoka Date: Thu, 5 Jan 2023 17:30:13 +0800 Subject: [PATCH 1/2] [Improve] [Checkpoint] Fix CheckpointIDCounter Thread Not Safe Problem --- .../engine/server/checkpoint/IMapCheckpointIDCounter.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java index a6861b210ba..e9b20dfbc17 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java @@ -55,8 +55,11 @@ public CompletableFuture shutdown(PipelineStatus pipelineStatus) { @Override public long getAndIncrement() throws Exception { - Long currentId = checkpointIdMap.get(pipelineId); - checkpointIdMap.put(pipelineId, currentId + 1); + Long currentId; + synchronized (this) { + currentId = checkpointIdMap.get(pipelineId); + checkpointIdMap.put(pipelineId, currentId + 1); + } return currentId; } From 33911c83b5d6dd714b5158c865dcbb3e1e1f489b Mon Sep 17 00:00:00 2001 From: Hisoka Date: Thu, 5 Jan 2023 17:34:22 +0800 Subject: [PATCH 2/2] [Improve] [Checkpoint] Fix CheckpointIDCounter Thread Not Safe Problem --- .../server/checkpoint/IMapCheckpointIDCounter.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java index e9b20dfbc17..3070ff9e744 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.server.checkpoint; +import static com.google.common.base.Preconditions.checkNotNull; + import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter; @@ -55,12 +57,9 @@ public CompletableFuture shutdown(PipelineStatus pipelineStatus) { @Override public long getAndIncrement() throws Exception { - Long currentId; - synchronized (this) { - currentId = checkpointIdMap.get(pipelineId); - checkpointIdMap.put(pipelineId, currentId + 1); - } - return currentId; + Long nextId = checkpointIdMap.compute(pipelineId, (k, v) -> v == null ? null : v + 1); + checkNotNull(nextId); + return nextId - 1; } @Override