From 1ae927becbd547f003ff41322660ff7154e2db9f Mon Sep 17 00:00:00 2001 From: Yufan Sheng Date: Fri, 8 Jul 2022 02:13:35 +0800 Subject: [PATCH] [FLINK-27399][Connector/Pulsar] Create the initial subscription instead seek every time. This should fix the wrong position setting. --- .../pulsar_consumer_configuration.html | 6 - .../connector/pulsar/source/PulsarSource.java | 14 +- .../pulsar/source/PulsarSourceBuilder.java | 8 +- .../pulsar/source/PulsarSourceOptions.java | 7 + .../config/PulsarSourceConfigUtils.java | 3 - .../source/config/SourceConfiguration.java | 4 + .../enumerator/PulsarSourceEnumState.java | 16 +- .../enumerator/PulsarSourceEnumerator.java | 148 +++++------ .../enumerator/SplitsAssignmentState.java | 239 ------------------ .../assigner/NormalSplitAssigner.java | 126 +++++++++ .../assigner/SharedSplitAssigner.java | 148 +++++++++++ .../enumerator/assigner/SplitAssigner.java | 64 +++++ .../assigner/SplitAssignerFactory.java | 65 +++++ .../subscriber/PulsarSubscriber.java | 2 +- .../reader/PulsarSourceReaderFactory.java | 2 +- .../PulsarOrderedPartitionSplitReader.java | 20 +- .../source/split/PulsarPartitionSplit.java | 4 +- .../pulsar/source/PulsarSourceITCase.java | 20 -- .../PulsarSourceEnumStateSerializerTest.java | 5 +- .../PulsarSourceEnumeratorTest.java | 10 +- .../enumerator/SplitsAssignmentStateTest.java | 119 --------- .../assigner/NormalSplitAssignerTest.java | 95 +++++++ .../assigner/SharedSplitAssignerTest.java | 98 +++++++ .../assigner/SplitAssignerTestBase.java | 112 ++++++++ .../PulsarPartitionSplitReaderTestBase.java | 8 +- 25 files changed, 841 insertions(+), 502 deletions(-) delete mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java delete mode 100644 flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java create mode 100644 flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssignerTest.java create mode 100644 flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java create mode 100644 flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java diff --git a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html index bc8b6df40608b8..4e05b270de4b0c 100644 --- a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html +++ b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html @@ -140,12 +140,6 @@ Boolean If enabled, the consumer will automatically retry messages. - -
pulsar.consumer.subscriptionInitialPosition
- Latest -

Enum

- Initial position at which to set cursor when subscribing to a topic at first time.

Possible values: -
pulsar.consumer.subscriptionMode
Durable diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java index a6c48d14bc87e7..6f4775df366ac8 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java @@ -32,7 +32,8 @@ import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator; -import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState; +import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner; +import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; @@ -142,15 +143,14 @@ public SourceReader createReader(SourceReaderContext @Override public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { - SplitsAssignmentState assignmentState = - new SplitsAssignmentState(stopCursor, sourceConfiguration); + SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration); return new PulsarSourceEnumerator( subscriber, startCursor, rangeGenerator, sourceConfiguration, enumContext, - assignmentState); + splitAssigner); } @Internal @@ -158,15 +158,15 @@ public SplitEnumerator createEnumer public SplitEnumerator restoreEnumerator( SplitEnumeratorContext enumContext, PulsarSourceEnumState checkpoint) { - SplitsAssignmentState assignmentState = - new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint); + SplitAssigner splitAssigner = + SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint); return new PulsarSourceEnumerator( subscriber, startCursor, rangeGenerator, sourceConfiguration, enumContext, - assignmentState); + splitAssigner); } @Internal diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index ed68215819f7bb..1a5d6ea975877e 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -97,7 +97,7 @@ *

To stop the connector user has to disable the auto partition discovery. As auto partition * discovery always expected new splits to come and not exiting. To disable auto partition * discovery, use builder.setConfig({@link - * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1). + * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1). * *

{@code
  * PulsarSource source = PulsarSource
@@ -266,7 +266,7 @@ public PulsarSourceBuilder setTopicPattern(
     }
 
     /**
-     * The consumer name is informative and it can be used to identify a particular consumer
+     * The consumer name is informative, and it can be used to identify a particular consumer
      * instance from the topic stats.
      */
     public PulsarSourceBuilder setConsumerName(String consumerName) {
@@ -321,7 +321,7 @@ public PulsarSourceBuilder setStartCursor(StartCursor startCursor) {
      * 

To stop the connector user has to disable the auto partition discovery. As auto partition * discovery always expected new splits to come and not exiting. To disable auto partition * discovery, use builder.setConfig({@link - * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1). + * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1). * * @param stopCursor The {@link StopCursor} to specify the stopping offset. * @return this PulsarSourceBuilder. @@ -334,7 +334,7 @@ public PulsarSourceBuilder setUnboundedStopCursor(StopCursor stopCursor) { } /** - * By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner + * By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner * and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor} * to specify the stopping offsets for each partition. When all the partitions have reached diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java index 39a73974f5c0a6..c80ddd3b1fadd6 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.description.Description; import org.apache.flink.connector.pulsar.common.config.PulsarOptions; import org.apache.flink.connector.pulsar.source.config.CursorVerification; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -503,6 +504,12 @@ private PulsarSourceOptions() { code("PulsarClientException")) .build()); + /** + * @deprecated This option would be reset by {@link StartCursor}, no need to use it anymore. + * Pulsar didn't support this config option before 1.10.1, so we have to remove this config + * option. + */ + @Deprecated public static final ConfigOption PULSAR_SUBSCRIPTION_INITIAL_POSITION = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionInitialPosition") diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java index 602a1577938e10..0a4dc31e8d320b 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java @@ -59,7 +59,6 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC; -import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; @@ -113,8 +112,6 @@ public static ConsumerBuilder createConsumerBuilder( builder::consumerName); configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted); configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel); - configuration.useOption( - PULSAR_SUBSCRIPTION_INITIAL_POSITION, builder::subscriptionInitialPosition); createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy); configuration.useOption( PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS, diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java index 24e7ec0c9c0cf3..f957e1a57750e7 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java @@ -83,6 +83,10 @@ public int getMessageQueueCapacity() { return messageQueueCapacity; } + /** + * We would override the interval into a negative number when we set the connector with bounded + * stop cursor. + */ public boolean isEnablePartitionDiscovery() { return getPartitionDiscoveryIntervalMs() > 0; } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java index dbab9e21781232..56bbbd20a32a24 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java @@ -18,15 +18,18 @@ package org.apache.flink.connector.pulsar.source.enumerator; +import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; /** * The state class for pulsar source enumerator, used for storing the split state. This class is - * managed and controlled by {@link SplitsAssignmentState}. + * managed and controlled by {@link SplitAssigner}. */ public class PulsarSourceEnumState { @@ -46,11 +49,12 @@ public class PulsarSourceEnumState { private final Map> sharedPendingPartitionSplits; /** - * A {@link PulsarPartitionSplit} should be assigned for all flink readers. Using this map for - * recording assign status. + * It is used for Shared subscription. Every {@link PulsarPartitionSplit} should be assigned for + * all flink readers. Using this map for recording assign status. */ private final Map> readerAssignedSplits; + /** The pipeline has been triggered and topic partitions have been assigned to readers. */ private final boolean initialized; public PulsarSourceEnumState( @@ -85,4 +89,10 @@ public Map> getReaderAssignedSplits() { public boolean isInitialized() { return initialized; } + + /** The initial assignment state for Pulsar. */ + public static PulsarSourceEnumState initialState() { + return new PulsarSourceEnumState( + new HashSet<>(), new HashSet<>(), new HashMap<>(), new HashMap<>(), false); + } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java index 7890dcf1847f77..a64e9272c708c3 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java @@ -22,40 +22,29 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.util.FlinkRuntimeException; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.KeySharedPolicy; -import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Range; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.MessageId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; import static java.util.Collections.singletonList; import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin; -import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient; -import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; -import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH; -import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; /** The enumerator class for pulsar source. */ @Internal @@ -65,13 +54,12 @@ public class PulsarSourceEnumerator private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class); private final PulsarAdmin pulsarAdmin; - private final PulsarClient pulsarClient; private final PulsarSubscriber subscriber; private final StartCursor startCursor; private final RangeGenerator rangeGenerator; private final SourceConfiguration sourceConfiguration; private final SplitEnumeratorContext context; - private final SplitsAssignmentState assignmentState; + private final SplitAssigner splitAssigner; public PulsarSourceEnumerator( PulsarSubscriber subscriber, @@ -79,15 +67,14 @@ public PulsarSourceEnumerator( RangeGenerator rangeGenerator, SourceConfiguration sourceConfiguration, SplitEnumeratorContext context, - SplitsAssignmentState assignmentState) { + SplitAssigner splitAssigner) { this.pulsarAdmin = createAdmin(sourceConfiguration); - this.pulsarClient = createClient(sourceConfiguration); this.subscriber = subscriber; this.startCursor = startCursor; this.rangeGenerator = rangeGenerator; this.sourceConfiguration = sourceConfiguration; this.context = context; - this.assignmentState = assignmentState; + this.splitAssigner = splitAssigner; } @Override @@ -123,9 +110,9 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname @Override public void addSplitsBack(List splits, int subtaskId) { // Put the split back to current pending splits. - assignmentState.putSplitsBackToPendingList(splits, subtaskId); + splitAssigner.addSplitsBack(splits, subtaskId); - // If the failed subtask has already restarted, we need to assign pending splits to it + // If the failed subtask has already restarted, we need to assign pending splits to it. if (context.registeredReaders().containsKey(subtaskId)) { assignPendingPartitionSplits(singletonList(subtaskId)); } @@ -142,7 +129,7 @@ public void addReader(int subtaskId) { @Override public PulsarSourceEnumState snapshotState(long checkpointId) { - return assignmentState.snapshotState(); + return splitAssigner.snapshotState(); } @Override @@ -164,54 +151,7 @@ public void close() { */ private Set getSubscribedTopicPartitions() { int parallelism = context.currentParallelism(); - Set partitions = - subscriber.getSubscribedTopicPartitions(pulsarAdmin, rangeGenerator, parallelism); - - // Seek start position for given partitions. - seekStartPosition(partitions); - - return partitions; - } - - private void seekStartPosition(Set partitions) { - ConsumerBuilder consumerBuilder = consumerBuilder(); - Set seekedTopics = new HashSet<>(); - - for (TopicPartition partition : partitions) { - String topicName = partition.getFullTopicName(); - if (!assignmentState.containsTopic(topicName) && seekedTopics.add(topicName)) { - try (Consumer consumer = - sneakyClient(() -> consumerBuilder.clone().topic(topicName).subscribe())) { - startCursor.seekPosition( - partition.getTopic(), partition.getPartitionId(), consumer); - } catch (PulsarClientException e) { - if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) { - throw new IllegalArgumentException(e); - } else { - // WARN_ON_MISMATCH would just print this warning message. - // No need to print the stacktrace. - LOG.warn( - "Failed to set initial consuming position for partition {}", - partition, - e); - } - } - } - } - } - - private ConsumerBuilder consumerBuilder() { - ConsumerBuilder builder = - createConsumerBuilder(pulsarClient, Schema.BYTES, sourceConfiguration); - if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) { - Range range = TopicRange.createFullRange().toPulsarRange(); - KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(range); - // Force this consume use sticky hash range in Key_Shared subscription. - // Pulsar won't remove old message dispatcher before 2.8.2 release. - builder.keySharedPolicy(keySharedPolicy); - } - - return builder; + return subscriber.getSubscribedTopicPartitions(pulsarAdmin, rangeGenerator, parallelism); } /** @@ -230,13 +170,55 @@ private void checkPartitionChanges(Set fetchedPartitions, Throwa } // Append the partitions into current assignment state. - assignmentState.appendTopicPartitions(fetchedPartitions); - List registeredReaders = new ArrayList<>(context.registeredReaders().keySet()); + List newPartitions = + splitAssigner.registerTopicPartitions(fetchedPartitions); + createSubscription(newPartitions); // Assign the new readers. + List registeredReaders = new ArrayList<>(context.registeredReaders().keySet()); assignPendingPartitionSplits(registeredReaders); } + /** Create subscription on topic partition if it doesn't exist. */ + private void createSubscription(List newPartitions) { + for (TopicPartition partition : newPartitions) { + String topicName = partition.getFullTopicName(); + String subscriptionName = sourceConfiguration.getSubscriptionName(); + + List subscriptions = + sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName)); + if (!subscriptions.contains(subscriptionName)) { + CursorPosition position = + startCursor.position(partition.getTopic(), partition.getPartitionId()); + MessageId initialPosition = queryInitialPosition(topicName, position); + + sneakyAdmin( + () -> + pulsarAdmin + .topics() + .createSubscription( + topicName, subscriptionName, initialPosition)); + } + } + } + + /** Query the available message id from Pulsar. */ + private MessageId queryInitialPosition(String topicName, CursorPosition position) { + CursorPosition.Type type = position.getType(); + if (type == CursorPosition.Type.TIMESTAMP) { + return sneakyAdmin( + () -> + pulsarAdmin + .topics() + .getMessageIdByTimestamp(topicName, position.getTimestamp())); + } else if (type == CursorPosition.Type.MESSAGE_ID) { + return position.getMessageId(); + } else { + throw new UnsupportedOperationException("We don't support this seek type " + type); + } + } + + /** Query the unassigned splits and assign them to the available readers. */ private void assignPendingPartitionSplits(List pendingReaders) { // Validate the reader. pendingReaders.forEach( @@ -248,17 +230,19 @@ private void assignPendingPartitionSplits(List pendingReaders) { }); // Assign splits to downstream readers. - assignmentState.assignSplits(pendingReaders).ifPresent(context::assignSplits); + splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits); // If periodically partition discovery is disabled and the initializing discovery has done, - // signal NoMoreSplitsEvent to pending readers - if (assignmentState.noMoreNewPartitionSplits()) { - LOG.debug( - "No more PulsarPartitionSplits to assign." - + " Sending NoMoreSplitsEvent to reader {} in subscription {}.", - pendingReaders, - sourceConfiguration.getSubscriptionDesc()); - pendingReaders.forEach(this.context::signalNoMoreSplits); + // signal NoMoreSplitsEvent to pending readers. + for (Integer reader : pendingReaders) { + if (splitAssigner.noMoreSplits(reader)) { + LOG.debug( + "No more PulsarPartitionSplits to assign." + + " Sending NoMoreSplitsEvent to reader {} in subscription {}.", + reader, + sourceConfiguration.getSubscriptionDesc()); + context.signalNoMoreSplits(reader); + } } } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java deleted file mode 100644 index cbc4826583a53a..00000000000000 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.pulsar.source.enumerator; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; -import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; -import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; -import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; -import org.apache.flink.util.InstantiationUtil; - -import org.apache.pulsar.client.api.SubscriptionType; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; - -/** The state class for recording the split assignment. */ -@Internal -public class SplitsAssignmentState { - - private final StopCursor stopCursor; - private final SourceConfiguration sourceConfiguration; - - // The dynamic states for checkpoint. - private final Set appendedPartitions; - // This pending splits is used for Key_Shared, Failover, Exclusive subscription. - private final Set pendingPartitionSplits; - // These two fields are used for Shared subscription. - private final Map> sharedPendingPartitionSplits; - private final Map> readerAssignedSplits; - private boolean initialized; - - public SplitsAssignmentState(StopCursor stopCursor, SourceConfiguration sourceConfiguration) { - this.stopCursor = stopCursor; - this.sourceConfiguration = sourceConfiguration; - this.appendedPartitions = new HashSet<>(); - this.pendingPartitionSplits = new HashSet<>(); - this.sharedPendingPartitionSplits = new HashMap<>(); - this.readerAssignedSplits = new HashMap<>(); - this.initialized = false; - } - - public SplitsAssignmentState( - StopCursor stopCursor, - SourceConfiguration sourceConfiguration, - PulsarSourceEnumState sourceEnumState) { - this.stopCursor = stopCursor; - this.sourceConfiguration = sourceConfiguration; - this.appendedPartitions = sourceEnumState.getAppendedPartitions(); - this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits(); - this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits(); - this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits(); - this.initialized = sourceEnumState.isInitialized(); - } - - public PulsarSourceEnumState snapshotState() { - return new PulsarSourceEnumState( - appendedPartitions, - pendingPartitionSplits, - sharedPendingPartitionSplits, - readerAssignedSplits, - initialized); - } - - /** - * Append the new fetched partitions to current state. We would generate pending source split - * for downstream pulsar readers. Since the {@link SplitEnumeratorContext} don't support put the - * split back to enumerator, we don't support partition deletion. - * - * @param fetchedPartitions The partitions from the {@link PulsarSubscriber}. - */ - public void appendTopicPartitions(Set fetchedPartitions) { - for (TopicPartition partition : fetchedPartitions) { - // If this partition is a new partition. - if (!appendedPartitions.contains(partition)) { - if (!sharePartition()) { - // Create a split and add it to pending list. - pendingPartitionSplits.add(createSplit(partition)); - } - - // Shared subscription don't create splits, we just register partitions. - appendedPartitions.add(partition); - } - } - - // Update this initialize flag. - if (!initialized) { - this.initialized = true; - } - } - - public boolean containsTopic(String topicName) { - return appendedPartitions.stream() - .anyMatch(partition -> Objects.equals(partition.getFullTopicName(), topicName)); - } - - /** Put these splits back to pending list. */ - public void putSplitsBackToPendingList(List splits, int readerId) { - if (!sharePartition()) { - // Put these splits back to normal pending list. - pendingPartitionSplits.addAll(splits); - } else { - // Put the splits back to shared pending list. - Set pending = - sharedPendingPartitionSplits.computeIfAbsent(readerId, id -> new HashSet<>()); - pending.addAll(splits); - } - } - - public Optional> assignSplits( - List pendingReaders) { - // Avoid empty readers assign. - if (pendingReaders.isEmpty()) { - return Optional.empty(); - } - - Map> assignMap; - - // We extract the assign logic into two method for better readability. - if (!sharePartition()) { - assignMap = assignNormalSplits(pendingReaders); - } else { - assignMap = assignSharedSplits(pendingReaders); - } - - if (assignMap.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(new SplitsAssignment<>(assignMap)); - } - } - - /** - * @return It would return true only if periodically partition discovery is disabled, the - * initializing partition discovery has finished AND there is no pending splits for - * assignment. - */ - public boolean noMoreNewPartitionSplits() { - return !sourceConfiguration.isEnablePartitionDiscovery() - && initialized - && pendingPartitionSplits.isEmpty(); - } - - // ----------------- private methods ------------------- - - /** The splits don't shared for all the readers. */ - private Map> assignNormalSplits( - List pendingReaders) { - Map> assignMap = new HashMap<>(); - - // Drain a list of splits. - List pendingSplits = drainPendingPartitionsSplits(); - for (int i = 0; i < pendingSplits.size(); i++) { - PulsarPartitionSplit split = pendingSplits.get(i); - int readerId = pendingReaders.get(i % pendingReaders.size()); - assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split); - } - - return assignMap; - } - - /** Every split would be shared among available readers. */ - private Map> assignSharedSplits( - List pendingReaders) { - Map> assignMap = new HashMap<>(); - - // Drain the splits from share pending list. - for (Integer reader : pendingReaders) { - Set pendingSplits = sharedPendingPartitionSplits.remove(reader); - if (pendingSplits == null) { - pendingSplits = new HashSet<>(); - } - - Set assignedSplits = - readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>()); - - for (TopicPartition partition : appendedPartitions) { - String partitionName = partition.toString(); - if (!assignedSplits.contains(partitionName)) { - pendingSplits.add(createSplit(partition)); - assignedSplits.add(partitionName); - } - } - - if (!pendingSplits.isEmpty()) { - assignMap.put(reader, new ArrayList<>(pendingSplits)); - } - } - - return assignMap; - } - - private PulsarPartitionSplit createSplit(TopicPartition partition) { - try { - StopCursor stop = InstantiationUtil.clone(stopCursor); - return new PulsarPartitionSplit(partition, stop); - } catch (IOException | ClassNotFoundException e) { - throw new IllegalStateException(e); - } - } - - private List drainPendingPartitionsSplits() { - List splits = new ArrayList<>(pendingPartitionSplits); - pendingPartitionSplits.clear(); - - return splits; - } - - /** {@link SubscriptionType#Shared} mode should share a same split for all the readers. */ - private boolean sharePartition() { - return sourceConfiguration.getSubscriptionType() == SubscriptionType.Shared; - } -} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java new file mode 100644 index 00000000000000..b9afd1c286338d --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssigner.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.apache.pulsar.client.api.SubscriptionType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * This assigner is used for {@link SubscriptionType#Failover} and {@link + * SubscriptionType#Exclusive} subscriptions. + */ +@Internal +public class NormalSplitAssigner implements SplitAssigner { + private static final long serialVersionUID = 8412586087991597092L; + + private final StopCursor stopCursor; + private final boolean enablePartitionDiscovery; + + // These fields would be saved into checkpoint. + + private final Set appendedPartitions; + private final Set pendingPartitionSplits; + private boolean initialized; + + public NormalSplitAssigner( + StopCursor stopCursor, + SourceConfiguration sourceConfiguration, + PulsarSourceEnumState sourceEnumState) { + this.stopCursor = stopCursor; + this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery(); + this.appendedPartitions = sourceEnumState.getAppendedPartitions(); + this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits(); + this.initialized = sourceEnumState.isInitialized(); + } + + @Override + public List registerTopicPartitions(Set fetchedPartitions) { + List newPartitions = new ArrayList<>(); + + for (TopicPartition partition : fetchedPartitions) { + if (!appendedPartitions.contains(partition)) { + pendingPartitionSplits.add(new PulsarPartitionSplit(partition, stopCursor)); + appendedPartitions.add(partition); + newPartitions.add(partition); + } + } + + if (!initialized) { + initialized = true; + } + + return newPartitions; + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + pendingPartitionSplits.addAll(splits); + } + + @Override + public Optional> createAssignment( + List readers) { + if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) { + return Optional.empty(); + } + + Map> assignMap = new HashMap<>(); + + List partitionSplits = new ArrayList<>(pendingPartitionSplits); + int readerCount = readers.size(); + for (int i = 0; i < partitionSplits.size(); i++) { + int index = i % readerCount; + Integer readerId = readers.get(index); + PulsarPartitionSplit split = partitionSplits.get(i); + assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split); + } + pendingPartitionSplits.clear(); + + return Optional.of(new SplitsAssignment<>(assignMap)); + } + + @Override + public boolean noMoreSplits(Integer reader) { + return !enablePartitionDiscovery && initialized && pendingPartitionSplits.isEmpty(); + } + + @Override + public PulsarSourceEnumState snapshotState() { + return new PulsarSourceEnumState( + appendedPartitions, + pendingPartitionSplits, + new HashMap<>(), + new HashMap<>(), + initialized); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java new file mode 100644 index 00000000000000..48d75c8dee30d2 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.apache.pulsar.client.api.SubscriptionType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */ +@Internal +public class SharedSplitAssigner implements SplitAssigner { + private static final long serialVersionUID = 8468503133499402491L; + + private final StopCursor stopCursor; + private final boolean enablePartitionDiscovery; + + // These fields would be saved into checkpoint. + + private final Set appendedPartitions; + private final Map> sharedPendingPartitionSplits; + private final Map> readerAssignedSplits; + private boolean initialized; + + public SharedSplitAssigner( + StopCursor stopCursor, + SourceConfiguration sourceConfiguration, + PulsarSourceEnumState sourceEnumState) { + this.stopCursor = stopCursor; + this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery(); + this.appendedPartitions = sourceEnumState.getAppendedPartitions(); + this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits(); + this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits(); + this.initialized = sourceEnumState.isInitialized(); + } + + @Override + public List registerTopicPartitions(Set fetchedPartitions) { + List newPartitions = new ArrayList<>(); + + for (TopicPartition partition : fetchedPartitions) { + if (!appendedPartitions.contains(partition)) { + appendedPartitions.add(partition); + newPartitions.add(partition); + } + } + + if (!initialized) { + initialized = true; + } + + return newPartitions; + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + Set pendingPartitionSplits = + sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> new HashSet<>()); + pendingPartitionSplits.addAll(splits); + } + + @Override + public Optional> createAssignment( + List readers) { + if (readers.isEmpty()) { + return Optional.empty(); + } + + Map> assignMap = new HashMap<>(); + for (Integer reader : readers) { + Set pendingSplits = sharedPendingPartitionSplits.remove(reader); + if (pendingSplits == null) { + pendingSplits = new HashSet<>(); + } + + Set assignedSplits = + readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>()); + + for (TopicPartition partition : appendedPartitions) { + String partitionName = partition.toString(); + if (!assignedSplits.contains(partitionName)) { + pendingSplits.add(new PulsarPartitionSplit(partition, stopCursor)); + assignedSplits.add(partitionName); + } + } + + if (!pendingSplits.isEmpty()) { + assignMap.put(reader, new ArrayList<>(pendingSplits)); + } + } + + if (assignMap.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(new SplitsAssignment<>(assignMap)); + } + } + + @Override + public boolean noMoreSplits(Integer reader) { + Set pendingSplits = sharedPendingPartitionSplits.get(reader); + Set assignedSplits = readerAssignedSplits.get(reader); + + return !enablePartitionDiscovery + && initialized + && (pendingSplits == null || pendingSplits.isEmpty()) + && (assignedSplits != null && assignedSplits.size() == appendedPartitions.size()); + } + + @Override + public PulsarSourceEnumState snapshotState() { + return new PulsarSourceEnumState( + appendedPartitions, + new HashSet<>(), + sharedPendingPartitionSplits, + readerAssignedSplits, + initialized); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java new file mode 100644 index 00000000000000..bc03f5103fd885 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * The split assigner for different subscription. We would spread all the splits to different + * readers and store all the state into checkpoint. + */ +@Internal +public interface SplitAssigner extends Serializable { + + /** + * Add the current available partitions into assigner. + * + * @param fetchedPartitions The available partitions queried from Pulsar broker. + * @return New topic partitions compare to previous registered partitions. + */ + List registerTopicPartitions(Set fetchedPartitions); + + /** + * Add a split back to the split assigner if the reader fails. We would try to reassign the + * split or add it to the pending list. + */ + void addSplitsBack(List splits, int subtaskId); + + /** Create a split assignment from the current readers. */ + Optional> createAssignment(List readers); + + /** + * It would return true only if periodically partition discovery is disabled, the initializing + * partition discovery has finished AND there is no pending splits for assignment. + */ + boolean noMoreSplits(Integer reader); + + /** Snapshot the current assign state into checkpoint. */ + PulsarSourceEnumState snapshotState(); +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java new file mode 100644 index 00000000000000..09047c7a0a7bce --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; + +import org.apache.pulsar.client.api.SubscriptionType; + +import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState; +import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; +import static org.apache.pulsar.client.api.SubscriptionType.Failover; +import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared; +import static org.apache.pulsar.client.api.SubscriptionType.Shared; + +/** The factory for creating split assigner. */ +@Internal +public final class SplitAssignerFactory { + + private SplitAssignerFactory() { + // No public constructor. + } + + /** Create blank assigner. */ + public static SplitAssigner create( + StopCursor stopCursor, SourceConfiguration sourceConfiguration) { + return create(stopCursor, sourceConfiguration, initialState()); + } + + /** Create assigner from checkpoint state. */ + public static SplitAssigner create( + StopCursor stopCursor, + SourceConfiguration sourceConfiguration, + PulsarSourceEnumState sourceEnumState) { + SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType(); + if (subscriptionType == Exclusive + || subscriptionType == Failover + || subscriptionType == Key_Shared) { + return new NormalSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState); + } else if (subscriptionType == Shared) { + return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState); + } else { + throw new IllegalArgumentException( + "We don't support this subscription type: " + subscriptionType); + } + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java index 08ba1faa44214e..b8a55bf8a34b86 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java @@ -48,7 +48,7 @@ public interface PulsarSubscriber extends Serializable { /** * Get a set of subscribed {@link TopicPartition}s. The method could throw {@link - * IllegalStateException}, a extra try catch is required. + * IllegalStateException}, an extra try catch is required. * * @param pulsarAdmin The admin interface used to retrieve subscribed topic partitions. * @param rangeGenerator The range for different partitions. diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java index 6a5d51522314e9..820888a7bf37f1 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java @@ -77,7 +77,7 @@ public static SourceReader create( SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType(); if (subscriptionType == SubscriptionType.Failover || subscriptionType == SubscriptionType.Exclusive) { - // Create a ordered split reader supplier. + // Create an ordered split reader supplier. Supplier> splitReaderSupplier = () -> new PulsarOrderedPartitionSplitReader<>( diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java index bb6d79641f503c..6807c976d0cf46 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; -import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader; @@ -40,6 +39,7 @@ import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH; +import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId; /** * The split reader a given {@link PulsarPartitionSplit}, it would be closed once the {@link @@ -80,12 +80,18 @@ protected void startConsumer(PulsarPartitionSplit split, Consumer consum // Reset the start position for ordered pulsar consumer. if (latestConsumedId != null) { - StartCursor startCursor = StartCursor.fromMessageId(latestConsumedId, false); - TopicPartition partition = split.getPartition(); - + LOG.debug("Start seeking from the checkpoint {}", latestConsumedId); try { - startCursor.seekPosition( - partition.getTopic(), partition.getPartitionId(), consumer); + MessageId initialPosition; + if (latestConsumedId == MessageId.latest + || latestConsumedId == MessageId.earliest) { + // This logic is added only the compatible. + initialPosition = latestConsumedId; + } else { + initialPosition = nextMessageId(latestConsumedId); + } + + consumer.seek(initialPosition); } catch (PulsarClientException e) { if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) { throw new IllegalArgumentException(e); @@ -95,7 +101,7 @@ protected void startConsumer(PulsarPartitionSplit split, Consumer consum LOG.warn( "Failed to reset cursor to {} on partition {}", latestConsumedId, - partition, + split.getPartition(), e); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java index 90e29ca47120f7..458be403fbf77e 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java @@ -30,13 +30,15 @@ import javax.annotation.Nullable; +import java.io.Serializable; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; /** A {@link SourceSplit} implementation for a Pulsar's partition. */ @Internal -public class PulsarPartitionSplit implements SourceSplit { +public class PulsarPartitionSplit implements SourceSplit, Serializable { + private static final long serialVersionUID = -6857317360756062625L; private final TopicPartition partition; diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java index 2f1faa536723a8..fa46f9a4cdcd1a 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java @@ -24,8 +24,6 @@ import org.apache.flink.connector.pulsar.testutils.cases.SingleTopicConsumingContext; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; -import org.apache.flink.connector.testframe.environment.TestEnvironment; -import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -58,22 +56,4 @@ class PulsarSourceITCase extends SourceTestSuiteBase { @TestContext PulsarTestContextFactory multipleTopic = new PulsarTestContextFactory<>(pulsar, MultipleTopicConsumingContext::new); - - @Override - public void testScaleUp( - TestEnvironment testEnv, - DataStreamSourceExternalContext externalContext, - CheckpointingMode semantic) - throws Exception { - super.testScaleUp(testEnv, externalContext, semantic); - } - - @Override - public void testScaleDown( - TestEnvironment testEnv, - DataStreamSourceExternalContext externalContext, - CheckpointingMode semantic) - throws Exception { - super.testScaleDown(testEnv, externalContext, semantic); - } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java index 38c40ed88ef7b8..5f18e8f51316cb 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java @@ -34,6 +34,7 @@ import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer.INSTANCE; +import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.newMessageId; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; @@ -51,7 +52,9 @@ void serializeAndDeserializePulsarSourceEnumState() throws Exception { Collections.singleton( new PulsarPartitionSplit( new TopicPartition(randomAlphabetic(10), 10, createFullRange()), - StopCursor.defaultStopCursor())); + StopCursor.defaultStopCursor(), + newMessageId(100L, 23L, 44), + null)); Map> shared = Collections.singletonMap(5, splits); Map> mapping = ImmutableMap.of( diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java index 1dcbe84ba61eea..aebb76119dfdc5 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java @@ -22,6 +22,8 @@ import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner; +import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; @@ -51,6 +53,7 @@ import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.latest; import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber; @@ -368,6 +371,7 @@ private PulsarSourceEnumerator createEnumerator( Configuration configuration = operator().config(); configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType); + configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10)); if (enablePeriodicPartitionDiscovery) { configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 60L); } else { @@ -375,15 +379,15 @@ private PulsarSourceEnumerator createEnumerator( } SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration); - SplitsAssignmentState assignmentState = - new SplitsAssignmentState(latest(), sourceConfiguration, sourceEnumState); + SplitAssigner assigner = + SplitAssignerFactory.create(latest(), sourceConfiguration, sourceEnumState); return new PulsarSourceEnumerator( subscriber, StartCursor.earliest(), new FullRangeGenerator(), sourceConfiguration, enumContext, - assignmentState); + assigner); } private void registerReader( diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java deleted file mode 100644 index ac811c3dddbfea..00000000000000 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.pulsar.source.enumerator; - -import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; -import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; -import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; - -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; -import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; - -import org.apache.pulsar.client.api.SubscriptionType; -import org.junit.jupiter.api.Test; - -import java.util.List; -import java.util.Optional; -import java.util.Set; - -import static java.util.Collections.singletonList; -import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; -import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.InstanceOfAssertFactories.map; - -/** Unit tests for {@link SplitsAssignmentState}. */ -class SplitsAssignmentStateTest { - - private final Set partitions = - Sets.newHashSet( - new TopicPartition("some-topic", 1, new TopicRange(1, 30)), - new TopicPartition("some-topic", 2, new TopicRange(31, 60)), - new TopicPartition("some-topic", 3, new TopicRange(61, MAX_RANGE)), - new TopicPartition(randomAlphabetic(10), -1, createFullRange())); - - @Test - void assignSplitsForSharedSubscription() { - SplitsAssignmentState state1 = - new SplitsAssignmentState( - StopCursor.defaultStopCursor(), createConfig(SubscriptionType.Shared)); - state1.appendTopicPartitions(partitions); - Optional> assignment1 = - state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4)); - - assertThat(assignment1) - .isPresent() - .get() - .extracting(SplitsAssignment::assignment) - .asInstanceOf(map(Integer.class, List.class)) - .hasSize(5) - .allSatisfy((idx, list) -> assertThat(list).hasSize(4)); - - Optional> assignment2 = - state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4)); - assertThat(assignment2).isNotPresent(); - - // Reassign reader 3. - state1.putSplitsBackToPendingList(assignment1.get().assignment().get(3), 3); - Optional> assignment3 = - state1.assignSplits(Lists.newArrayList(0, 1, 2, 4)); - assertThat(assignment3).isNotPresent(); - - Optional> assignment4 = - state1.assignSplits(singletonList(3)); - assertThat(assignment4) - .isPresent() - .get() - .extracting(SplitsAssignment::assignment) - .asInstanceOf(map(Integer.class, List.class)) - .hasSize(1); - } - - @Test - void assignSplitsForExclusiveSubscription() { - SplitsAssignmentState state1 = - new SplitsAssignmentState( - StopCursor.defaultStopCursor(), createConfig(SubscriptionType.Exclusive)); - state1.appendTopicPartitions(partitions); - Optional> assignment1 = - state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4)); - - assertThat(assignment1).isPresent(); - assertThat(assignment1.get().assignment()) - .hasSize(4) - .allSatisfy((idx, list) -> assertThat(list).hasSize(1)); - - Optional> assignment2 = - state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4)); - assertThat(assignment2).isNotPresent(); - } - - private SourceConfiguration createConfig(SubscriptionType type) { - Configuration configuration = new Configuration(); - configuration.set(PULSAR_SUBSCRIPTION_TYPE, type); - - return new SourceConfiguration(configuration); - } -} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssignerTest.java new file mode 100644 index 00000000000000..73ea706a2f675e --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NormalSplitAssignerTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.assigner; + +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Unit tests for {@link NormalSplitAssigner}. */ +class NormalSplitAssignerTest extends SplitAssignerTestBase { + + @Test + void noMoreSplits() { + NormalSplitAssigner assigner = splitAssigner(true); + assertFalse(assigner.noMoreSplits(3)); + + assigner = splitAssigner(false); + assertFalse(assigner.noMoreSplits(3)); + + assigner.registerTopicPartitions(createPartitions("f", 8)); + assertFalse(assigner.noMoreSplits(3)); + + assigner.createAssignment(singletonList(1)); + assertTrue(assigner.noMoreSplits(1)); + assertTrue(assigner.noMoreSplits(3)); + } + + @Test + void partitionsAssignment() { + NormalSplitAssigner assigner = splitAssigner(true); + assigner.registerTopicPartitions(createPartitions("d", 4)); + List readers = Arrays.asList(1, 3, 5, 7); + + // Assignment with initial states. + Optional> assignment = + assigner.createAssignment(readers); + assertThat(assignment).isPresent(); + assertThat(assignment.get().assignment()).hasSize(1); + + // Reassignment with same readers. + assignment = assigner.createAssignment(readers); + assertThat(assignment).isNotPresent(); + + // Register new partition and assign. + assigner.registerTopicPartitions(createPartitions("e", 5)); + assigner.registerTopicPartitions(createPartitions("f", 1)); + assigner.registerTopicPartitions(createPartitions("g", 3)); + assigner.registerTopicPartitions(createPartitions("h", 4)); + assignment = assigner.createAssignment(readers); + assertThat(assignment).isPresent(); + assertThat(assignment.get().assignment()).hasSize(4); + + // Assign to new readers. + readers = Arrays.asList(2, 4, 6, 8); + assignment = assigner.createAssignment(readers); + assertThat(assignment).isNotPresent(); + } + + @Override + protected NormalSplitAssigner createAssigner( + StopCursor stopCursor, + SourceConfiguration sourceConfiguration, + PulsarSourceEnumState sourceEnumState) { + return new NormalSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java new file mode 100644 index 00000000000000..91584b8768821d --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.assigner; + +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Unit tests for {@link SharedSplitAssigner}. */ +class SharedSplitAssignerTest extends SplitAssignerTestBase { + + @Test + void noMoreSplits() { + SharedSplitAssigner assigner = splitAssigner(true); + assertFalse(assigner.noMoreSplits(3)); + + assigner = splitAssigner(false); + assertFalse(assigner.noMoreSplits(3)); + + assigner.registerTopicPartitions(createPartitions("f", 8)); + assertFalse(assigner.noMoreSplits(3)); + + assigner.createAssignment(singletonList(1)); + assertTrue(assigner.noMoreSplits(1)); + assertFalse(assigner.noMoreSplits(3)); + + assigner.createAssignment(singletonList(3)); + assertTrue(assigner.noMoreSplits(3)); + } + + @Test + void partitionsAssignment() { + SharedSplitAssigner assigner = splitAssigner(true); + assigner.registerTopicPartitions(createPartitions("d", 4)); + List readers = Arrays.asList(1, 3, 5, 7); + + // Assignment with initial states. + Optional> assignment = + assigner.createAssignment(readers); + assertThat(assignment).isPresent(); + assertThat(assignment.get().assignment()).hasSize(4); + + // Reassignment with same readers. + assignment = assigner.createAssignment(readers); + assertThat(assignment).isNotPresent(); + + // Register new partition and assign. + assigner.registerTopicPartitions(createPartitions("e", 5)); + assignment = assigner.createAssignment(readers); + assertThat(assignment).isPresent(); + assertThat(assignment.get().assignment()).hasSize(4); + + // Assign to new readers. + readers = Arrays.asList(2, 4, 6, 8); + assignment = assigner.createAssignment(readers); + assertThat(assignment).isPresent(); + assertThat(assignment.get().assignment()) + .hasSize(4) + .allSatisfy((k, v) -> assertThat(v).hasSize(2)); + } + + @Override + protected SharedSplitAssigner createAssigner( + StopCursor stopCursor, + SourceConfiguration sourceConfiguration, + PulsarSourceEnumState sourceEnumState) { + return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java new file mode 100644 index 00000000000000..1256566b483544 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.assigner; + +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState; +import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.defaultStopCursor; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test utils for split assigners. */ +abstract class SplitAssignerTestBase { + + @Test + void registerTopicPartitionsWillOnlyReturnNewPartitions() { + T assigner = splitAssigner(true); + + Set partitions = createPartitions("persistent://public/default/a", 1); + List newPartitions = assigner.registerTopicPartitions(partitions); + assertThat(newPartitions) + .hasSize(1) + .first() + .hasFieldOrPropertyWithValue("topic", "persistent://public/default/a") + .hasFieldOrPropertyWithValue("partitionId", 1); + + newPartitions = assigner.registerTopicPartitions(partitions); + assertThat(newPartitions).isEmpty(); + + partitions = createPartitions("persistent://public/default/b", 2); + newPartitions = assigner.registerTopicPartitions(partitions); + assertThat(newPartitions) + .hasSize(1) + .hasSize(1) + .first() + .hasFieldOrPropertyWithValue("topic", "persistent://public/default/b") + .hasFieldOrPropertyWithValue("partitionId", 2); + } + + @Test + void noReadersProvideForAssignment() { + T assigner = splitAssigner(false); + assigner.registerTopicPartitions(createPartitions("c", 5)); + + Optional> assignment = + assigner.createAssignment(emptyList()); + assertThat(assignment).isNotPresent(); + } + + @Test + void noPartitionsProvideForAssignment() { + T assigner = splitAssigner(true); + Optional> assignment = + assigner.createAssignment(singletonList(4)); + assertThat(assignment).isNotPresent(); + } + + protected Set createPartitions(String topic, int partitionId) { + TopicPartition p1 = new TopicPartition(topic, partitionId, createFullRange()); + return singleton(p1); + } + + protected T splitAssigner(boolean discovery) { + Configuration configuration = new Configuration(); + + if (discovery) { + configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 1000L); + } else { + configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L); + } + + SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration); + return createAssigner(defaultStopCursor(), sourceConfiguration, initialState()); + } + + protected abstract T createAssigner( + StopCursor stopCursor, + SourceConfiguration sourceConfiguration, + PulsarSourceEnumState sourceEnumState); +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java index 538e45826d7fc5..9ffbda74260d90 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java @@ -24,7 +24,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; -import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; @@ -86,7 +85,7 @@ TestOrderlinessExtension.class, TestLoggerExtension.class, }) -public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase { +abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase { @RegisterExtension PulsarSplitReaderInvocationContextProvider provider = @@ -138,8 +137,7 @@ private void seekStartPositionAndHandleSplit( // create consumer and seek before split changes try (Consumer consumer = reader.createPulsarConsumer(partition)) { // inclusive messageId - StartCursor startCursor = StartCursor.fromMessageId(startPosition); - startCursor.seekPosition(partition.getTopic(), partition.getPartitionId(), consumer); + consumer.seek(startPosition); } catch (PulsarClientException e) { sneakyThrow(e); } @@ -185,7 +183,7 @@ private List> fetchedMessages( if (verify) { assertThat(messages).as("We should fetch the expected size").hasSize(expectedCount); if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) { - assertThat(finishedSplits).as("Split should not be marked as finished").hasSize(0); + assertThat(finishedSplits).as("Split should not be marked as finished").isEmpty(); } else { assertThat(finishedSplits).as("Split should be marked as finished").hasSize(1); }