diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index 7f5d88762e5..f1de236cfa0 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -36,8 +36,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -57,18 +59,19 @@ public class KafkaSourceSplitEnumerator private final ConsumerMetadata metadata; private final Context context; private long discoveryIntervalMillis; - private AdminClient adminClient; + private final AdminClient adminClient; - private Map pendingSplit; + private final Map pendingSplit; private final Map assignedSplit; private ScheduledExecutorService executor; - private ScheduledFuture scheduledFuture; + private ScheduledFuture scheduledFuture; KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context context) { this.metadata = metadata; this.context = context; this.assignedSplit = new HashMap<>(); this.pendingSplit = new HashMap<>(); + this.adminClient = initAdminClient(this.metadata.getProperties()); } KafkaSourceSplitEnumerator( @@ -97,7 +100,6 @@ public class KafkaSourceSplitEnumerator @Override public void open() { - this.adminClient = initAdminClient(this.metadata.getProperties()); if (discoveryIntervalMillis > 0) { this.executor = Executors.newScheduledThreadPool( @@ -180,7 +182,6 @@ public void close() throws IOException { public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { pendingSplit.putAll(convertToNextSplit(splits)); - assignSplit(); } } @@ -191,6 +192,7 @@ public void addSplitsBack(List splits, int subtaskId) { listOffsets( splits.stream() .map(KafkaSourceSplit::getTopicPartition) + .filter(Objects::nonNull) .collect(Collectors.toList()), OffsetSpec.latest()); splits.forEach( @@ -199,7 +201,7 @@ public void addSplitsBack(List splits, int subtaskId) { split.setEndOffset(listOffsets.get(split.getTopicPartition())); }); return splits.stream() - .collect(Collectors.toMap(split -> split.getTopicPartition(), split -> split)); + .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split)); } catch (Exception e) { throw new KafkaConnectorException( KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e); @@ -225,7 +227,7 @@ public void registerReader(int subtaskId) { @Override public KafkaSourceState snapshotState(long checkpointId) throws Exception { - return new KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet())); + return new KafkaSourceState(new HashSet<>(assignedSplit.values())); } @Override @@ -291,18 +293,12 @@ private synchronized void assignSplit() { readySplit.computeIfAbsent(taskID, id -> new ArrayList<>()); } - pendingSplit - .entrySet() - .forEach( - s -> { - if (!assignedSplit.containsKey(s.getKey())) { - readySplit - .get( - getSplitOwner( - s.getKey(), context.currentParallelism())) - .add(s.getValue()); - } - }); + pendingSplit.forEach( + (key, value) -> { + if (!assignedSplit.containsKey(key)) { + readySplit.get(getSplitOwner(key, context.currentParallelism())).add(value); + } + }); readySplit.forEach( (id, split) -> { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java index abb58a4f148..c3cce03d3bd 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java @@ -25,11 +25,14 @@ import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask; import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation; +import lombok.extern.slf4j.Slf4j; + import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +@Slf4j public class SeaTunnelSplitEnumeratorContext implements SourceSplitEnumerator.Context { @@ -60,6 +63,10 @@ public Set registeredReaders() { @Override public void assignSplit(int subtaskIndex, List splits) { + if (registeredReaders().isEmpty()) { + log.warn("No reader is obtained, skip this assign!"); + return; + } task.getExecutionContext() .sendToMember( new AssignSplitOperation<>( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index 16adec49e00..ddb95f1688e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -163,7 +163,7 @@ private void register() { enumeratorTaskAddress) .get(); } catch (InterruptedException | ExecutionException e) { - log.warn("source register failed {}", e); + log.warn("source register failed.", e); throw new RuntimeException(e); } } @@ -177,7 +177,7 @@ public void requestSplit() { enumeratorTaskAddress) .get(); } catch (InterruptedException | ExecutionException e) { - log.warn("source request split failed [{}]", e); + log.warn("source request split failed.", e); throw new RuntimeException(e); } } @@ -192,7 +192,7 @@ public void sendSourceEventToEnumerator(SourceEvent sourceEvent) { enumeratorTaskAddress) .get(); } catch (InterruptedException | ExecutionException e) { - log.warn("source request split failed {}", e); + log.warn("source request split failed.", e); throw new RuntimeException(e); } } @@ -258,7 +258,7 @@ public void restoreState(List actionStateList) throws Except enumeratorTaskAddress) .get(); } catch (InterruptedException | ExecutionException e) { - log.warn("source request split failed {}", e); + log.warn("source request split failed.", e); throw new RuntimeException(e); } }