Skip to content

Commit

Permalink
[FLINK-27399][Connector/Pulsar] Change the initial start cursor and s…
Browse files Browse the repository at this point in the history
…top cursor for better handle the consuming behaviors.
  • Loading branch information
syhily committed Jul 8, 2022
1 parent 1351303 commit 0d30a4c
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 64 deletions.
30 changes: 25 additions & 5 deletions docs/content.zh/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
```
{{< /tab >}}
{{< /tabs >}}

- 与前者不同的是,给定的消息可以跳过,再进行消费。
{{< tabs "pulsar-starting-position-from-message-id-bool" >}}
{{< tab "Java" >}}
Expand All @@ -333,7 +334,8 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
```
{{< /tab >}}
{{< /tabs >}}
- 从给定的消息时间开始消费。

- 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 `StartCursor.fromPublishTime(long)`
{{< tabs "pulsar-starting-position-message-time" >}}
{{< tab "Java" >}}
```java
Expand All @@ -347,6 +349,11 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
{{< /tab >}}
{{< /tabs >}}

- 从给定的消息发布时间开始消费。
```java
StartCursor.fromPublishTime(long);
```

{{< hint info >}}
每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。

Expand Down Expand Up @@ -402,6 +409,7 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
```
{{< /tab >}}
{{< /tabs >}}

- 停止于某条消息之后,结果里包含此消息。
{{< tabs "pulsar-boundedness-after-message-id" >}}
{{< tab "Java" >}}
Expand All @@ -415,7 +423,18 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
```
{{< /tab >}}
{{< /tabs >}}
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`

- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里不包含此时间戳的消息。
```java
StopCursor.atEventTime(long);
```

- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里包含此时间戳的消息。
```java
StopCursor.afterEventTime(long);
```

- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里不包含此时间戳的消息。
{{< tabs "pulsar-boundedness-publish-time" >}}
{{< tab "Java" >}}
```java
Expand All @@ -429,9 +448,10 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
{{< /tab >}}
{{< /tabs >}}

{{< hint warning >}}
StopCursor.atEventTime(long) 目前已经处于弃用状态。
{{< /hint >}}
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里包含此时间戳的消息。
```java
StopCursor.afterPublishTime(long);
```

### Source 配置项

Expand Down
37 changes: 31 additions & 6 deletions docs/content/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ The Pulsar connector consumes from the latest available message if the message I
```
{{< /tab >}}
{{< /tabs >}}

- Start from a specified message between the earliest and the latest.
The Pulsar connector consumes from the latest available message if the message ID doesn't exist.

Expand All @@ -371,7 +372,10 @@ The Pulsar connector consumes from the latest available message if the message I
{{< /tab >}}
{{< /tabs >}}

- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
This method is deprecated because the name is totally wrong which may cause confuse.
You can use `StartCursor.fromPublishTime(long)` instead.

{{< tabs "pulsar-starting-position-message-time" >}}
{{< tab "Java" >}}
```java
Expand All @@ -385,6 +389,11 @@ The Pulsar connector consumes from the latest available message if the message I
{{< /tab >}}
{{< /tabs >}}

- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
```java
StartCursor.fromPublishTime(long);
```

{{< hint info >}}
Each Pulsar message belongs to an ordered sequence on its topic.
The sequence ID (`MessageId`) of the message is ordered in that sequence.
Expand Down Expand Up @@ -418,7 +427,7 @@ Built-in stop cursors include:
{{< /tab >}}
{{< /tabs >}}

- Stop at the latest available message when the Pulsar source starts consuming messages.
- Stop at the latest available message when the Pulsar source starts consuming messages.
{{< tabs "pulsar-boundedness-latest" >}}
{{< tab "Java" >}}
```java
Expand All @@ -445,6 +454,7 @@ Built-in stop cursors include:
```
{{< /tab >}}
{{< /tabs >}}

- Stop but include the given message in the consuming result.
{{< tabs "pulsar-boundedness-after-message-id" >}}
{{< tab "Java" >}}
Expand All @@ -459,7 +469,20 @@ Built-in stop cursors include:
{{< /tab >}}
{{< /tabs >}}

- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
- Stop at the specified event time by `Message<byte[]>.getEventTime()`. The message with the
given event time won't be included in the consuming result.
```java
StopCursor.atEventTime(long);
```

- Stop after the specified event time by `Message<byte[]>.getEventTime()`. The message with the
given event time will be included in the consuming result.
```java
StopCursor.afterEventTime(long);
```

- Stop at the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
given publish time won't be included in the consuming result.
{{< tabs "pulsar-boundedness-publish-time" >}}
{{< tab "Java" >}}
```java
Expand All @@ -473,9 +496,11 @@ Built-in stop cursors include:
{{< /tab >}}
{{< /tabs >}}

{{< hint warning >}}
StopCursor.atEventTime(long) is now deprecated.
{{< /hint >}}
- Stop after the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
given publish time will be included in the consuming result.
```java
StopCursor.afterPublishTime(long);
```

### Source Configurable Options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@

package org.apache.flink.connector.pulsar.source.enumerator.cursor;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;

import javax.annotation.Nullable;

import java.io.Serializable;

/** The class for defining the start or stop position. */
/**
* The class for defining the start or stop position. We only expose the constructor for end user.
*/
@PublicEvolving
public final class CursorPosition implements Serializable {
private static final long serialVersionUID = -802405183307684549L;
Expand All @@ -52,22 +52,19 @@ public CursorPosition(@Nullable Long timestamp) {
this.timestamp = timestamp;
}

@VisibleForTesting
@Internal
public Type getType() {
return type;
}

@Internal
public MessageId getMessageId() {
return messageId;
}

/** Pulsar consumer could be subscribed by the position. */
public void seekPosition(Consumer<?> consumer) throws PulsarClientException {
if (type == Type.MESSAGE_ID) {
consumer.seek(messageId);
} else {
if (timestamp != null) {
consumer.seek(timestamp);
} else {
consumer.seek(System.currentTimeMillis());
}
}
@Internal
public Long getTimestamp() {
return timestamp;
}

@Override
Expand All @@ -82,6 +79,7 @@ public String toString() {
/**
* The position type for reader to choose whether timestamp or message id as the start position.
*/
@Internal
public enum Type {
TIMESTAMP,

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.pulsar.source.enumerator.cursor;

import org.apache.flink.annotation.Internal;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;

import static org.apache.flink.util.Preconditions.checkArgument;

/** The helper class for Pulsar's message id. */
@Internal
public final class MessageIdUtils {

private MessageIdUtils() {
// No public constructor.
}

/**
* The implementation from the <a
* href="https://github.com/apache/pulsar/blob/7c8dc3201baad7d02d886dbc26db5c03abce77d6/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java#L85">this
* code</a> to get the next message id.
*/
public static MessageId nextMessageId(MessageId messageId) {
MessageIdImpl idImpl = unwrapMessageId(messageId);

if (idImpl.getEntryId() < 0) {
return newMessageId(idImpl.getLedgerId(), 0, idImpl.getPartitionIndex());
} else {
return newMessageId(
idImpl.getLedgerId(), idImpl.getEntryId() + 1, idImpl.getPartitionIndex());
}
}

/**
* Convert the message id interface to its backend implementation. And check if it's a batch
* message id. We don't support the batch message for its low performance now.
*/
public static MessageIdImpl unwrapMessageId(MessageId messageId) {
MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
if (idImpl instanceof BatchMessageIdImpl) {
int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize();
checkArgument(batchSize == 1, "We only support normal message id currently.");
}

return idImpl;
}

/** Hide the message id implementation. */
public static MessageId newMessageId(long ledgerId, long entryId, int partitionIndex) {
return new MessageIdImpl(ledgerId, entryId, partitionIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.MessageIdStartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.TimestampStartCursor;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;

import java.io.Serializable;
Expand All @@ -43,13 +41,6 @@ public interface StartCursor extends Serializable {

CursorPosition position(String topic, int partitionId);

/** Helper method for seek the right position for given pulsar consumer. */
default void seekPosition(String topic, int partitionId, Consumer<?> consumer)
throws PulsarClientException {
CursorPosition position = position(topic, partitionId);
position.seekPosition(consumer);
}

// --------------------------- Static Factory Methods -----------------------------

static StartCursor defaultStartCursor() {
Expand All @@ -64,19 +55,38 @@ static StartCursor latest() {
return fromMessageId(MessageId.latest);
}

/**
* Find the available message id and start consuming from it. The given message is included in
* the consuming result by default if you provide a specified message id instead of {@link
* MessageId#earliest} or {@link MessageId#latest}.
*/
static StartCursor fromMessageId(MessageId messageId) {
return fromMessageId(messageId, true);
}

/**
* @param messageId Find the available message id and start consuming from it.
* @param inclusive {@code true} would include the given message id.
* @param inclusive {@code true} would include the given message id if it's not the {@link
* MessageId#earliest} or {@link MessageId#latest}.
*/
static StartCursor fromMessageId(MessageId messageId, boolean inclusive) {
return new MessageIdStartCursor(messageId, inclusive);
}

/**
* This method is designed for seeking message from event time. But Pulsar didn't support
* seeking from message time, instead, it would seek the position from publish time. We only
* keep this method for backward compatible.
*
* @deprecated Use {@link #fromPublishTime(long)} instead.
*/
@Deprecated
static StartCursor fromMessageTime(long timestamp) {
return new TimestampStartCursor(timestamp);
}

/** Seek the start position by using message publish time. */
static StartCursor fromPublishTime(long timestamp) {
return new TimestampStartCursor(timestamp);
}
}
Loading

0 comments on commit 0d30a4c

Please sign in to comment.