From 0d30a4c49ece84e822e80124c854eb215baa7b24 Mon Sep 17 00:00:00 2001 From: Yufan Sheng Date: Fri, 8 Jul 2022 02:13:22 +0800 Subject: [PATCH] [FLINK-27399][Connector/Pulsar] Change the initial start cursor and stop cursor for better handle the consuming behaviors. --- .../docs/connectors/datastream/pulsar.md | 30 ++++++-- .../docs/connectors/datastream/pulsar.md | 37 ++++++++-- .../enumerator/cursor/CursorPosition.java | 30 ++++---- .../enumerator/cursor/MessageIdUtils.java | 71 +++++++++++++++++++ .../source/enumerator/cursor/StartCursor.java | 30 +++++--- .../source/enumerator/cursor/StopCursor.java | 36 +++++++--- .../cursor/start/MessageIdStartCursor.java | 24 +++---- .../cursor/stop/LatestMessageStopCursor.java | 21 +++++- .../cursor/stop/MessageIdStopCursor.java | 14 +++- .../enumerator/cursor/StopCursorTest.java | 2 +- 10 files changed, 231 insertions(+), 64 deletions(-) create mode 100644 flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md index b7ca75e3c52a5c..1614cc8d9144de 100644 --- a/docs/content.zh/docs/connectors/datastream/pulsar.md +++ b/docs/content.zh/docs/connectors/datastream/pulsar.md @@ -320,6 +320,7 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的 ``` {{< /tab >}} {{< /tabs >}} + - 与前者不同的是,给定的消息可以跳过,再进行消费。 {{< tabs "pulsar-starting-position-from-message-id-bool" >}} {{< tab "Java" >}} @@ -333,7 +334,8 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的 ``` {{< /tab >}} {{< /tabs >}} -- 从给定的消息时间开始消费。 + +- 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 `StartCursor.fromPublishTime(long)`。 {{< tabs "pulsar-starting-position-message-time" >}} {{< tab "Java" >}} ```java @@ -347,6 +349,11 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的 {{< /tab >}} {{< /tabs >}} +- 从给定的消息发布时间开始消费。 + ```java + StartCursor.fromPublishTime(long); + ``` + {{< hint info >}} 每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。 @@ -402,6 +409,7 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败 ``` {{< /tab >}} {{< /tabs >}} + - 停止于某条消息之后,结果里包含此消息。 {{< tabs "pulsar-boundedness-after-message-id" >}} {{< tab "Java" >}} @@ -415,7 +423,18 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败 ``` {{< /tab >}} {{< /tabs >}} -- 停止于某个给定的消息发布时间戳,比如 `Message.getPublishTime()`。 + +- 停止于某个给定的消息事件时间戳,比如 `Message.getEventTime()`,消费结果里不包含此时间戳的消息。 + ```java + StopCursor.atEventTime(long); + ``` + +- 停止于某个给定的消息事件时间戳,比如 `Message.getEventTime()`,消费结果里包含此时间戳的消息。 + ```java + StopCursor.afterEventTime(long); + ``` + +- 停止于某个给定的消息发布时间戳,比如 `Message.getPublishTime()`,消费结果里不包含此时间戳的消息。 {{< tabs "pulsar-boundedness-publish-time" >}} {{< tab "Java" >}} ```java @@ -429,9 +448,10 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败 {{< /tab >}} {{< /tabs >}} - {{< hint warning >}} - StopCursor.atEventTime(long) 目前已经处于弃用状态。 - {{< /hint >}} +- 停止于某个给定的消息发布时间戳,比如 `Message.getPublishTime()`,消费结果里包含此时间戳的消息。 + ```java + StopCursor.afterPublishTime(long); + ``` ### Source 配置项 diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md index 568110193e8c8f..6ec2150c2c2b49 100644 --- a/docs/content/docs/connectors/datastream/pulsar.md +++ b/docs/content/docs/connectors/datastream/pulsar.md @@ -354,6 +354,7 @@ The Pulsar connector consumes from the latest available message if the message I ``` {{< /tab >}} {{< /tabs >}} + - Start from a specified message between the earliest and the latest. The Pulsar connector consumes from the latest available message if the message ID doesn't exist. @@ -371,7 +372,10 @@ The Pulsar connector consumes from the latest available message if the message I {{< /tab >}} {{< /tabs >}} -- Start from the specified message time by `Message.getPublishTime()`. +- Start from the specified message publish time by `Message.getPublishTime()`. +This method is deprecated because the name is totally wrong which may cause confuse. +You can use `StartCursor.fromPublishTime(long)` instead. + {{< tabs "pulsar-starting-position-message-time" >}} {{< tab "Java" >}} ```java @@ -385,6 +389,11 @@ The Pulsar connector consumes from the latest available message if the message I {{< /tab >}} {{< /tabs >}} +- Start from the specified message publish time by `Message.getPublishTime()`. + ```java + StartCursor.fromPublishTime(long); + ``` + {{< hint info >}} Each Pulsar message belongs to an ordered sequence on its topic. The sequence ID (`MessageId`) of the message is ordered in that sequence. @@ -418,7 +427,7 @@ Built-in stop cursors include: {{< /tab >}} {{< /tabs >}} -- Stop at the latest available message when the Pulsar source starts consuming messages. +- Stop at the latest available message when the Pulsar source starts consuming messages. {{< tabs "pulsar-boundedness-latest" >}} {{< tab "Java" >}} ```java @@ -445,6 +454,7 @@ Built-in stop cursors include: ``` {{< /tab >}} {{< /tabs >}} + - Stop but include the given message in the consuming result. {{< tabs "pulsar-boundedness-after-message-id" >}} {{< tab "Java" >}} @@ -459,7 +469,20 @@ Built-in stop cursors include: {{< /tab >}} {{< /tabs >}} -- Stop at the specified message time by `Message.getPublishTime()`. +- Stop at the specified event time by `Message.getEventTime()`. The message with the +given event time won't be included in the consuming result. + ```java + StopCursor.atEventTime(long); + ``` + +- Stop after the specified event time by `Message.getEventTime()`. The message with the +given event time will be included in the consuming result. + ```java + StopCursor.afterEventTime(long); + ``` + +- Stop at the specified publish time by `Message.getPublishTime()`. The message with the +given publish time won't be included in the consuming result. {{< tabs "pulsar-boundedness-publish-time" >}} {{< tab "Java" >}} ```java @@ -473,9 +496,11 @@ Built-in stop cursors include: {{< /tab >}} {{< /tabs >}} - {{< hint warning >}} - StopCursor.atEventTime(long) is now deprecated. - {{< /hint >}} +- Stop after the specified publish time by `Message.getPublishTime()`. The message with the +given publish time will be included in the consuming result. + ```java + StopCursor.afterPublishTime(long); + ``` ### Source Configurable Options diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java index a2aaff629066d2..e908a1eebda02f 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java @@ -18,18 +18,18 @@ package org.apache.flink.connector.pulsar.source.enumerator.cursor; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; import javax.annotation.Nullable; import java.io.Serializable; -/** The class for defining the start or stop position. */ +/** + * The class for defining the start or stop position. We only expose the constructor for end user. + */ @PublicEvolving public final class CursorPosition implements Serializable { private static final long serialVersionUID = -802405183307684549L; @@ -52,22 +52,19 @@ public CursorPosition(@Nullable Long timestamp) { this.timestamp = timestamp; } - @VisibleForTesting + @Internal + public Type getType() { + return type; + } + + @Internal public MessageId getMessageId() { return messageId; } - /** Pulsar consumer could be subscribed by the position. */ - public void seekPosition(Consumer consumer) throws PulsarClientException { - if (type == Type.MESSAGE_ID) { - consumer.seek(messageId); - } else { - if (timestamp != null) { - consumer.seek(timestamp); - } else { - consumer.seek(System.currentTimeMillis()); - } - } + @Internal + public Long getTimestamp() { + return timestamp; } @Override @@ -82,6 +79,7 @@ public String toString() { /** * The position type for reader to choose whether timestamp or message id as the start position. */ + @Internal public enum Type { TIMESTAMP, diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java new file mode 100644 index 00000000000000..acb47db7c1ecde --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.cursor; + +import org.apache.flink.annotation.Internal; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The helper class for Pulsar's message id. */ +@Internal +public final class MessageIdUtils { + + private MessageIdUtils() { + // No public constructor. + } + + /** + * The implementation from the this + * code to get the next message id. + */ + public static MessageId nextMessageId(MessageId messageId) { + MessageIdImpl idImpl = unwrapMessageId(messageId); + + if (idImpl.getEntryId() < 0) { + return newMessageId(idImpl.getLedgerId(), 0, idImpl.getPartitionIndex()); + } else { + return newMessageId( + idImpl.getLedgerId(), idImpl.getEntryId() + 1, idImpl.getPartitionIndex()); + } + } + + /** + * Convert the message id interface to its backend implementation. And check if it's a batch + * message id. We don't support the batch message for its low performance now. + */ + public static MessageIdImpl unwrapMessageId(MessageId messageId) { + MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId); + if (idImpl instanceof BatchMessageIdImpl) { + int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize(); + checkArgument(batchSize == 1, "We only support normal message id currently."); + } + + return idImpl; + } + + /** Hide the message id implementation. */ + public static MessageId newMessageId(long ledgerId, long entryId, int partitionIndex) { + return new MessageIdImpl(ledgerId, entryId, partitionIndex); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java index af35319a5a71e7..9929d544f4ce68 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java @@ -22,9 +22,7 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.MessageIdStartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.TimestampStartCursor; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import java.io.Serializable; @@ -43,13 +41,6 @@ public interface StartCursor extends Serializable { CursorPosition position(String topic, int partitionId); - /** Helper method for seek the right position for given pulsar consumer. */ - default void seekPosition(String topic, int partitionId, Consumer consumer) - throws PulsarClientException { - CursorPosition position = position(topic, partitionId); - position.seekPosition(consumer); - } - // --------------------------- Static Factory Methods ----------------------------- static StartCursor defaultStartCursor() { @@ -64,19 +55,38 @@ static StartCursor latest() { return fromMessageId(MessageId.latest); } + /** + * Find the available message id and start consuming from it. The given message is included in + * the consuming result by default if you provide a specified message id instead of {@link + * MessageId#earliest} or {@link MessageId#latest}. + */ static StartCursor fromMessageId(MessageId messageId) { return fromMessageId(messageId, true); } /** * @param messageId Find the available message id and start consuming from it. - * @param inclusive {@code true} would include the given message id. + * @param inclusive {@code true} would include the given message id if it's not the {@link + * MessageId#earliest} or {@link MessageId#latest}. */ static StartCursor fromMessageId(MessageId messageId, boolean inclusive) { return new MessageIdStartCursor(messageId, inclusive); } + /** + * This method is designed for seeking message from event time. But Pulsar didn't support + * seeking from message time, instead, it would seek the position from publish time. We only + * keep this method for backward compatible. + * + * @deprecated Use {@link #fromPublishTime(long)} instead. + */ + @Deprecated static StartCursor fromMessageTime(long timestamp) { return new TimestampStartCursor(timestamp); } + + /** Seek the start position by using message publish time. */ + static StartCursor fromPublishTime(long timestamp) { + return new TimestampStartCursor(timestamp); + } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java index 0bf46ce1282748..3a55c4fbad73d8 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java @@ -65,28 +65,48 @@ static StopCursor latest() { } /** - * Stop when the messageId is equal or greater than the specified messageId. Message that is - * equal to the specified messageId will not be consumed. + * Stop consuming when the messageId is equal or greater than the specified messageId. Message + * that is equal to the specified messageId will not be consumed. */ static StopCursor atMessageId(MessageId messageId) { - return new MessageIdStopCursor(messageId); + if (MessageId.latest.equals(messageId)) { + return new LatestMessageStopCursor(true); + } else { + return new MessageIdStopCursor(messageId); + } } /** - * Stop when the messageId is greater than the specified messageId. Message that is equal to the - * specified messageId will be consumed. + * Stop consuming when the messageId is greater than the specified messageId. Message that is + * equal to the specified messageId will be consumed. */ static StopCursor afterMessageId(MessageId messageId) { - return new MessageIdStopCursor(messageId, false); + if (MessageId.latest.equals(messageId)) { + return new LatestMessageStopCursor(false); + } else { + return new MessageIdStopCursor(messageId, false); + } } - @Deprecated + /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */ static StopCursor atEventTime(long timestamp) { return new EventTimestampStopCursor(timestamp); } - /** Stop when message publishTime is greater than the specified timestamp. */ + /** Stop consuming when message eventTime is greater than the specified timestamp. */ + static StopCursor afterEventTime(long timestamp) { + return new EventTimestampStopCursor(timestamp + 1); + } + + /** + * Stop consuming when message publishTime is greater than or equals the specified timestamp. + */ static StopCursor atPublishTime(long timestamp) { return new PublishTimestampStopCursor(timestamp); } + + /** Stop consuming when message publishTime is greater than the specified timestamp. */ + static StopCursor afterPublishTime(long timestamp) { + return new PublishTimestampStopCursor(timestamp + 1); + } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java index 71a4eb6a02657a..03d5ba3e1339e0 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java @@ -25,7 +25,8 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; -import static org.apache.flink.util.Preconditions.checkState; +import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId; +import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId; /** This cursor would leave pulsar start consuming from a specific message id. */ public class MessageIdStartCursor implements StartCursor { @@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor { * code for understanding pulsar internal logic. * * @param messageId The message id for start position. - * @param inclusive Should we include the start message id in consuming result. + * @param inclusive Whether we include the start message id in consuming result. This works only + * if we provide a specified message id instead of {@link MessageId#earliest} or {@link + * MessageId#latest}. */ public MessageIdStartCursor(MessageId messageId, boolean inclusive) { - if (inclusive) { - this.messageId = messageId; + MessageIdImpl idImpl = unwrapMessageId(messageId); + if (MessageId.earliest.equals(idImpl) || MessageId.latest.equals(idImpl) || inclusive) { + this.messageId = idImpl; } else { - checkState( - messageId instanceof MessageIdImpl, - "We only support normal message id and batch message id."); - MessageIdImpl id = (MessageIdImpl) messageId; - if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) { - this.messageId = messageId; - } else { - this.messageId = - new MessageIdImpl( - id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex()); - } + this.messageId = nextMessageId(idImpl); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java index 257081f5c8938b..29dd68268c0dca 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java @@ -30,13 +30,26 @@ /** * A stop cursor that initialize the position to the latest message id. The offsets initialization * are taken care of by the {@code PulsarPartitionSplitReaderBase} instead of by the {@code - * PulsarSourceEnumerator}. + * PulsarSourceEnumerator}. We would include the latest message available in Pulsar by default. */ public class LatestMessageStopCursor implements StopCursor { private static final long serialVersionUID = 1702059838323965723L; private MessageId messageId; + /** + * Set this to false would include the latest available message when the flink pipeline start. + */ + private final boolean exclusive; + + public LatestMessageStopCursor() { + this.exclusive = false; + } + + public LatestMessageStopCursor(boolean exclusive) { + this.exclusive = exclusive; + } + @Override public void open(PulsarAdmin admin, TopicPartition partition) { if (messageId == null) { @@ -48,6 +61,10 @@ public void open(PulsarAdmin admin, TopicPartition partition) { @Override public boolean shouldStop(Message message) { MessageId id = message.getMessageId(); - return id.compareTo(messageId) >= 0; + if (exclusive) { + return id.compareTo(messageId) > 0; + } else { + return id.compareTo(messageId) >= 0; + } } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java index 7af55a00cc09fd..2befc09ae94ed0 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java @@ -22,6 +22,12 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.MessageIdImpl; + +import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.pulsar.client.api.MessageId.earliest; +import static org.apache.pulsar.client.api.MessageId.latest; /** * Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for @@ -39,7 +45,13 @@ public MessageIdStopCursor(MessageId messageId) { } public MessageIdStopCursor(MessageId messageId, boolean exclusive) { - this.messageId = messageId; + MessageIdImpl idImpl = unwrapMessageId(messageId); + checkArgument(!earliest.equals(idImpl), "MessageId.earliest is not supported."); + checkArgument( + !latest.equals(idImpl), + "MessageId.latest is not supported, use LatestMessageStopCursor instead."); + + this.messageId = idImpl; this.exclusive = exclusive; } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java index d003107793a847..d97120229883b8 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java @@ -48,7 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test different implementation of StopCursor. */ -public class StopCursorTest extends PulsarTestSuiteBase { +class StopCursorTest extends PulsarTestSuiteBase { @Test void publishTimeStopCursor() throws IOException {