diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java index 77632971652..9f07428ff4c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java @@ -81,9 +81,7 @@ public void init() throws Exception { } @NonNull @Override - public ProgressState call() throws Exception { - return progress.toState(); - } + public abstract ProgressState call() throws Exception; public TaskLocation getTaskLocation() { return this.taskLocation; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index 5cab2dd0b24..2a77a49729f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -40,7 +40,7 @@ public class SeaTunnelSourceCollector implements Collector { private final Meter sourceReceivedQPS; - private volatile long rowCountThisPollNext; + private volatile boolean emptyThisPollNext; public SeaTunnelSourceCollector( Object checkpointLock, @@ -56,7 +56,7 @@ public SeaTunnelSourceCollector( public void collect(T row) { try { sendRecordToNext(new Record<>(row)); - rowCountThisPollNext++; + emptyThisPollNext = false; sourceReceivedCount.inc(); sourceReceivedQPS.markEvent(); } catch (IOException e) { @@ -69,12 +69,12 @@ public Object getCheckpointLock() { return checkpointLock; } - public long getRowCountThisPollNext() { - return this.rowCountThisPollNext; + public boolean isEmptyThisPollNext() { + return emptyThisPollNext; } - public void resetRowCountThisPollNext() { - this.rowCountThisPollNext = 0; + public void resetEmptyThisPollNext() { + this.emptyThisPollNext = true; } public void sendRecordToNext(Record record) throws IOException { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java index 797033f8d2e..a83f4bfb1de 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java @@ -68,7 +68,7 @@ public class SinkAggregatedCommitterTask private static final long serialVersionUID = 5906594537520393503L; - private SeaTunnelTaskState currState; + private volatile SeaTunnelTaskState currState; private final SinkAction sink; private final int maxWriterSize; @@ -138,16 +138,22 @@ protected void stateProcess() throws Exception { if (restoreComplete.isDone()) { currState = READY_START; reportTaskStatus(READY_START); + } else { + Thread.sleep(100); } break; case READY_START: if (startCalled) { currState = STARTING; + } else { + Thread.sleep(100); } break; case STARTING: if (receivedSinkWriter) { currState = RUNNING; + } else { + Thread.sleep(100); } break; case RUNNING: diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index e4928343cf9..9ca01eba322 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -133,11 +133,13 @@ public void close() throws IOException { public void collect() throws Exception { if (!prepareClose) { reader.pollNext(collector); - if (collector.getRowCountThisPollNext() == 0) { + if (collector.isEmptyThisPollNext()) { Thread.sleep(100); } else { - collector.resetRowCountThisPollNext(); + collector.resetEmptyThisPollNext(); } + } else { + Thread.sleep(100); } }