Skip to content

Commit

Permalink
[FLINK-27917][Connector/Pulsar] Drop Consumer.seek() in the testing m…
Browse files Browse the repository at this point in the history
…ethod for fixing the race condition. (#20581)
  • Loading branch information
syhily authored Aug 23, 2022
1 parent 05bd634 commit 6ce74dd
Showing 1 changed file with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@
import org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension;
import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.TestTemplate;
Expand All @@ -61,7 +60,6 @@
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
Expand Down Expand Up @@ -136,14 +134,31 @@ private void seekStartPositionAndHandleSplit(
new PulsarPartitionSplit(partition, StopCursor.never(), null, null);
SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split));

// create consumer and seek before split changes
try (Consumer<byte[]> consumer = reader.createPulsarConsumer(partition)) {
// inclusive messageId
consumer.seek(startPosition);
} catch (PulsarClientException e) {
sneakyThrow(e);
// Create the subscription and set the start position for this reader.
// Remember not to use Consumer.seek(startPosition)
SourceConfiguration sourceConfiguration = reader.sourceConfiguration;
PulsarAdmin pulsarAdmin = reader.pulsarAdmin;
String subscriptionName = sourceConfiguration.getSubscriptionName();
List<String> subscriptions =
sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName));
if (!subscriptions.contains(subscriptionName)) {
// If this subscription is not available. Just create it.
sneakyAdmin(
() ->
pulsarAdmin
.topics()
.createSubscription(
topicName, subscriptionName, startPosition));
} else {
// Reset the subscription if this is existed.
sneakyAdmin(
() ->
pulsarAdmin
.topics()
.resetCursor(topicName, subscriptionName, startPosition));
}

// Accept the split and start consuming.
reader.handleSplitsChanges(addition);
}

Expand Down Expand Up @@ -200,7 +215,7 @@ void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase<String> splitReader)
String topicName = randomAlphabetic(10);

// Add a split
seekStartPositionAndHandleSplit(splitReader, topicName, 0);
handleSplit(splitReader, topicName, 0, MessageId.latest);

// Poll once with a null message
PulsarMessage<String> message1 = fetchedMessage(splitReader);
Expand All @@ -224,7 +239,7 @@ void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase<String> splitReader)
void consumeMessageCreatedAfterHandleSplitChangesAndFetch(
PulsarPartitionSplitReaderBase<String> splitReader) {
String topicName = randomAlphabetic(10);
seekStartPositionAndHandleSplit(splitReader, topicName, 0);
handleSplit(splitReader, topicName, 0, MessageId.latest);
operator().sendMessage(topicNameWithPartition(topicName, 0), STRING, randomAlphabetic(10));
fetchedMessages(splitReader, 1, true);
}
Expand Down

0 comments on commit 6ce74dd

Please sign in to comment.