Skip to content

Commit

Permalink
[FLINK-27399][Connector/Pulsar] Create the initial subscription inste…
Browse files Browse the repository at this point in the history
…ad seek every time. This should fix the wrong position setting.
  • Loading branch information
syhily committed Jul 8, 2022
1 parent 0d30a4c commit 1ae927b
Show file tree
Hide file tree
Showing 25 changed files with 841 additions and 502 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,6 @@
<td>Boolean</td>
<td>If enabled, the consumer will automatically retry messages.</td>
</tr>
<tr>
<td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
<td style="word-wrap: break-word;">Latest</td>
<td><p>Enum</p></td>
<td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
</tr>
<tr>
<td><h5>pulsar.consumer.subscriptionMode</h5></td>
<td style="word-wrap: break-word;">Durable</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
Expand Down Expand Up @@ -142,31 +143,30 @@ public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext
@Override
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator(
SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
SplitsAssignmentState assignmentState =
new SplitsAssignmentState(stopCursor, sourceConfiguration);
SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration);
return new PulsarSourceEnumerator(
subscriber,
startCursor,
rangeGenerator,
sourceConfiguration,
enumContext,
assignmentState);
splitAssigner);
}

@Internal
@Override
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
PulsarSourceEnumState checkpoint) {
SplitsAssignmentState assignmentState =
new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint);
SplitAssigner splitAssigner =
SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint);
return new PulsarSourceEnumerator(
subscriber,
startCursor,
rangeGenerator,
sourceConfiguration,
enumContext,
assignmentState);
splitAssigner);
}

@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
* <p>To stop the connector user has to disable the auto partition discovery. As auto partition
* discovery always expected new splits to come and not exiting. To disable auto partition
* discovery, use builder.setConfig({@link
* PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
* PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
*
* <pre>{@code
* PulsarSource<String> source = PulsarSource
Expand Down Expand Up @@ -266,7 +266,7 @@ public PulsarSourceBuilder<OUT> setTopicPattern(
}

/**
* The consumer name is informative and it can be used to identify a particular consumer
* The consumer name is informative, and it can be used to identify a particular consumer
* instance from the topic stats.
*/
public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
Expand Down Expand Up @@ -321,7 +321,7 @@ public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor) {
* <p>To stop the connector user has to disable the auto partition discovery. As auto partition
* discovery always expected new splits to come and not exiting. To disable auto partition
* discovery, use builder.setConfig({@link
* PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
* PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
*
* @param stopCursor The {@link StopCursor} to specify the stopping offset.
* @return this PulsarSourceBuilder.
Expand All @@ -334,7 +334,7 @@ public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor stopCursor) {
}

/**
* By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
* By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
* and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
* {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor}
* to specify the stopping offsets for each partition. When all the partitions have reached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.configuration.description.Description;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
Expand Down Expand Up @@ -503,6 +504,12 @@ private PulsarSourceOptions() {
code("PulsarClientException"))
.build());

/**
* @deprecated This option would be reset by {@link StartCursor}, no need to use it anymore.
* Pulsar didn't support this config option before 1.10.1, so we have to remove this config
* option.
*/
@Deprecated
public static final ConfigOption<SubscriptionInitialPosition>
PULSAR_SUBSCRIPTION_INITIAL_POSITION =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionInitialPosition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
Expand Down Expand Up @@ -113,8 +112,6 @@ public static <T> ConsumerBuilder<T> createConsumerBuilder(
builder::consumerName);
configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
configuration.useOption(
PULSAR_SUBSCRIPTION_INITIAL_POSITION, builder::subscriptionInitialPosition);
createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
configuration.useOption(
PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public int getMessageQueueCapacity() {
return messageQueueCapacity;
}

/**
* We would override the interval into a negative number when we set the connector with bounded
* stop cursor.
*/
public boolean isEnablePartitionDiscovery() {
return getPartitionDiscoveryIntervalMs() > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

package org.apache.flink.connector.pulsar.source.enumerator;

import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* The state class for pulsar source enumerator, used for storing the split state. This class is
* managed and controlled by {@link SplitsAssignmentState}.
* managed and controlled by {@link SplitAssigner}.
*/
public class PulsarSourceEnumState {

Expand All @@ -46,11 +49,12 @@ public class PulsarSourceEnumState {
private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;

/**
* A {@link PulsarPartitionSplit} should be assigned for all flink readers. Using this map for
* recording assign status.
* It is used for Shared subscription. Every {@link PulsarPartitionSplit} should be assigned for
* all flink readers. Using this map for recording assign status.
*/
private final Map<Integer, Set<String>> readerAssignedSplits;

/** The pipeline has been triggered and topic partitions have been assigned to readers. */
private final boolean initialized;

public PulsarSourceEnumState(
Expand Down Expand Up @@ -85,4 +89,10 @@ public Map<Integer, Set<String>> getReaderAssignedSplits() {
public boolean isInitialized() {
return initialized;
}

/** The initial assignment state for Pulsar. */
public static PulsarSourceEnumState initialState() {
return new PulsarSourceEnumState(
new HashSet<>(), new HashSet<>(), new HashMap<>(), new HashMap<>(), false);
}
}
Loading

0 comments on commit 1ae927b

Please sign in to comment.