Skip to content

Commit

Permalink
[Hotfix][Zeta][Checkpoint] Fix CheckpointIDCounter Thread Not Safe Pr…
Browse files Browse the repository at this point in the history
…oblem (#3875)

* [Improve] [Checkpoint] Fix CheckpointIDCounter Thread Not Safe Problem

* [Improve] [Checkpoint] Fix CheckpointIDCounter Thread Not Safe Problem
  • Loading branch information
Hisoka-X authored Jan 6, 2023
1 parent 73fc0c1 commit 41751d8
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.engine.server.checkpoint;

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
Expand Down Expand Up @@ -55,9 +57,9 @@ public CompletableFuture<Void> shutdown(PipelineStatus pipelineStatus) {

@Override
public long getAndIncrement() throws Exception {
Long currentId = checkpointIdMap.get(pipelineId);
checkpointIdMap.put(pipelineId, currentId + 1);
return currentId;
Long nextId = checkpointIdMap.compute(pipelineId, (k, v) -> v == null ? null : v + 1);
checkNotNull(nextId);
return nextId - 1;
}

@Override
Expand Down

0 comments on commit 41751d8

Please sign in to comment.