Skip to content

Commit

Permalink
[FLINK-27399][Connector/Pulsar] Change initial consuming position set…
Browse files Browse the repository at this point in the history
…ting logic for better handle the checkpoint. (#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
  • Loading branch information
syhily committed Aug 13, 2022
1 parent 0e19d8e commit 04dfdd4
Show file tree
Hide file tree
Showing 56 changed files with 2,929 additions and 1,267 deletions.
229 changes: 116 additions & 113 deletions docs/content.zh/docs/connectors/datastream/pulsar.md

Large diffs are not rendered by default.

270 changes: 146 additions & 124 deletions docs/content/docs/connectors/datastream/pulsar.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,6 @@
<td>Boolean</td>
<td>If enabled, the consumer will automatically retry messages.</td>
</tr>
<tr>
<td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
<td style="word-wrap: break-word;">Latest</td>
<td><p>Enum</p></td>
<td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
</tr>
<tr>
<td><h5>pulsar.consumer.subscriptionMode</h5></td>
<td style="word-wrap: break-word;">Durable</td>
Expand Down
75 changes: 67 additions & 8 deletions flink-connectors/flink-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ under the License.
<packaging>jar</packaging>

<properties>
<pulsar.version>2.8.0</pulsar.version>
<pulsar.version>2.9.1</pulsar.version>

<!-- Test Libraries -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
<assertj-core.version>3.20.2</assertj-core.version>
<commons-lang3.version>3.11</commons-lang3.version>
<grpc.version>1.33.0</grpc.version>
<pulsar-commons-lang3.version>3.11</pulsar-commons-lang3.version>
<pulsar-zookeeper.version>3.6.3</pulsar-zookeeper.version>
<pulsar-netty.version>4.1.72.Final</pulsar-netty.version>
<pulsar-grpc.version>1.33.0</pulsar-grpc.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -153,12 +155,22 @@ under the License.
<version>${pulsar.version}</version>
<scope>test</scope>
</dependency>

<!-- Pulsar use a newer commons-lang3 in broker. -->
<!-- Bump the version only for testing. -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
<version>${pulsar-commons-lang3.version}</version>
<scope>test</scope>
</dependency>

<!-- Pulsar use a newer zookeeper in broker. -->
<!-- Bump the version only for testing. -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${pulsar-zookeeper.version}</version>
<scope>test</scope>
</dependency>

Expand All @@ -170,21 +182,63 @@ under the License.
<artifactId>pulsar-client-all</artifactId>
<version>${pulsar.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.activation</groupId>
<artifactId>javax.activation</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-package-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<!-- gRPC use version range which don't support by flink ci. -->

<dependencyManagement>
<dependencies>
<!-- Pulsar use higher gRPC version. -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<version>${pulsar-grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Pulsar use higher netty version. -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${pulsar-netty.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -207,7 +261,9 @@ under the License.
<configuration>
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
<forkCount>1</forkCount>
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en</argLine>
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber}
-XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en
</argLine>
</configuration>
</plugin>
<plugin>
Expand All @@ -229,7 +285,8 @@ under the License.
<outputDirectory>
${project.build.directory}/generated-test-sources/protobuf/java
</outputDirectory>
<protocArtifact>com.google.protobuf:protoc:3.5.1:exe:${os.detected.classifier}
<protocArtifact>
com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
</configuration>
<executions>
Expand Down Expand Up @@ -275,6 +332,7 @@ under the License.
<include>**/testutils/**</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
<include>containers/txnStandalone.conf</include>
</includes>
</configuration>
</execution>
Expand All @@ -298,6 +356,7 @@ under the License.
<include>**/testutils/**</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
<include>containers/txnStandalone.conf</include>
</includes>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;

import org.apache.pulsar.client.admin.PulsarAdmin;
Expand All @@ -31,6 +32,8 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.function.Consumer;
Expand All @@ -40,6 +43,7 @@
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
Expand Down Expand Up @@ -273,4 +277,32 @@ public static <T, V> void setOptionValue(
setter.accept(value);
}
}

/**
* Get the option value by a prefix. We would return an empty map if the option doesn't exist.
*/
public static Map<String, String> getProperties(
Configuration configuration, ConfigOption<Map<String, String>> option) {
Map<String, String> properties = new HashMap<>();
if (configuration.contains(option)) {
Map<String, String> map = configuration.get(option);
properties.putAll(map);
}

// Filter the sub config option. These options could be provided by SQL.
String prefix = option.key() + ".";
List<String> keys =
configuration.keySet().stream()
.filter(key -> key.startsWith(prefix) && key.length() > prefix.length())
.collect(toList());

// Put these config options' value into return result.
for (String key : keys) {
ConfigOption<String> o = ConfigOptions.key(key).stringType().noDefaultValue();
String value = configuration.get(o);
properties.put(key.substring(prefix.length()), value);
}

return properties;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
Expand Down Expand Up @@ -144,32 +145,31 @@ public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext
@Override
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator(
SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
SplitsAssignmentState assignmentState =
new SplitsAssignmentState(stopCursor, sourceConfiguration);
SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration);
return new PulsarSourceEnumerator(
subscriber,
startCursor,
rangeGenerator,
configuration,
sourceConfiguration,
enumContext,
assignmentState);
splitAssigner);
}

@Override
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
PulsarSourceEnumState checkpoint) {
SplitsAssignmentState assignmentState =
new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint);
SplitAssigner splitAssigner =
SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint);
return new PulsarSourceEnumerator(
subscriber,
startCursor,
rangeGenerator,
configuration,
sourceConfiguration,
enumContext,
assignmentState);
splitAssigner);
}

@Override
Expand Down
Loading

0 comments on commit 04dfdd4

Please sign in to comment.