Skip to content

Commit

Permalink
[improve][broker] PIP-315: Configurable max delay limit for delayed d…
Browse files Browse the repository at this point in the history
…elivery (#21798)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
Co-authored-by: JiangHaiting <jianghaiting@apache.org>
  • Loading branch information
3 people authored Jan 26, 2024
1 parent e81a20d commit d37d31f
Show file tree
Hide file tree
Showing 18 changed files with 246 additions and 10 deletions.
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ delayedDeliveryMaxNumBuckets=-1
# fixed delays in messages in a different way.
delayedDeliveryFixedDelayDetectionLookahead=50000

# The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which exceeds this
# max delay, then it will return an error to the producer.
# The default value is 0 which means there is no limit on the max delivery delay.
delayedDeliveryMaxDelayInMillis=0

# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
+ "logic to handle fixed delays in messages in a different way.")
private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which \
exceeds this max delay, then it will return an error to the producer. \
The default value is 0 which means there is no limit on the max delivery delay.""")
private long delayedDeliveryMaxDelayInMillis = 0;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeli
topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
topicPolicies.setDelayedDeliveryTickTimeMillis(
deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
topicPolicies.setDelayedDeliveryMaxDelayInMillis(
deliveryPolicies == null ? null : deliveryPolicies.getMaxDeliveryDelayInMillis());
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
Expand Down Expand Up @@ -903,6 +905,7 @@ protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryP
delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(policies.getDelayedDeliveryTickTimeMillis())
.active(policies.getDelayedDeliveryEnabled())
.maxDeliveryDelayInMillis(policies.getDelayedDeliveryMaxDelayInMillis())
.build();
}
if (delayedDeliveryPolicies == null && applied) {
Expand All @@ -911,6 +914,8 @@ protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryP
delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis())
.active(pulsar().getConfiguration().isDelayedDeliveryEnabled())
.maxDeliveryDelayInMillis(
pulsar().getConfiguration().getDelayedDeliveryMaxDelayInMillis())
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getReplicatorDispatchRate().updateTopicValue(
DispatchRateImpl.normalize(data.getReplicatorDispatchRate()));
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getDelayedDeliveryMaxDelayInMillis().updateTopicValue(data.getDelayedDeliveryMaxDelayInMillis());
topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
topicPolicies.getSubscriptionDispatchRate().updateTopicValue(
DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()));
Expand Down Expand Up @@ -287,6 +288,9 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getDelayedDeliveryTickTimeMillis().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::getTickTime).orElse(null));
topicPolicies.getDelayedDeliveryMaxDelayInMillis().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::getMaxDeliveryDelayInMillis).orElse(null));
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
updateNamespaceReplicatorDispatchRate(namespacePolicies,
Expand Down Expand Up @@ -387,6 +391,8 @@ private void updateTopicPolicyByBrokerConfig() {
topicPolicies.getPublishRate().updateBrokerValue(publishRateInBroker(config));
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
topicPolicies.getDelayedDeliveryMaxDelayInMillis()
.updateBrokerValue(config.getDelayedDeliveryMaxDelayInMillis());
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
topicPolicies.getReplicationClusters().updateBrokerValue(Collections.emptyList());
SchemaCompatibilityStrategy schemaCompatibilityStrategy = config.getSchemaCompatibilityStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,14 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
publishContext.completed(
new NotAllowedException(
String.format("Exceeds max allowed delivery delay of %s milliseconds",
getDelayedDeliveryMaxDelayInMillis())), -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}

MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
Expand Down Expand Up @@ -3876,6 +3884,14 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
publishContext.completed(
new NotAllowedException(
String.format("Exceeds max allowed delivery delay of %s milliseconds",
getDelayedDeliveryMaxDelayInMillis())), -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}

MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
Expand Down Expand Up @@ -3942,6 +3958,10 @@ public boolean isDelayedDeliveryEnabled() {
return topicPolicies.getDelayedDeliveryEnabled().get();
}

public long getDelayedDeliveryMaxDelayInMillis() {
return topicPolicies.getDelayedDeliveryMaxDelayInMillis().get();
}

public int getMaxUnackedMessagesOnSubscription() {
return topicPolicies.getMaxUnackedMessagesOnSubscription().get();
}
Expand Down Expand Up @@ -4093,4 +4113,18 @@ private CompletableFuture<Void> transactionBufferCleanupAndClose() {
public Optional<TopicName> getShadowSourceTopic() {
return Optional.ofNullable(shadowSourceTopic);
}

protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
if (maxDeliveryDelayInMs > 0) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
return msgMetadata.hasDeliverAtTime()
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void testDisableDelayedDelivery() throws Exception {
DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(2000)
.active(false)
.maxDeliveryDelayInMillis(10_000)
.build();
admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
//zk update takes time
Expand Down Expand Up @@ -124,6 +125,7 @@ public void testNamespaceDelayedDeliveryPolicyApi() throws Exception {
DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(3)
.active(true)
.maxDeliveryDelayInMillis(5000)
.build();
admin.namespaces().setDelayedDeliveryMessages(namespace, delayedDeliveryPolicies);
Awaitility.await().untilAsserted(()
Expand Down Expand Up @@ -151,30 +153,35 @@ public void testDelayedDeliveryApplied() throws Exception {
DelayedDeliveryPolicies.builder()
.tickTime(conf.getDelayedDeliveryTickTimeMillis())
.active(conf.isDelayedDeliveryEnabled())
.maxDeliveryDelayInMillis(conf.getDelayedDeliveryMaxDelayInMillis())
.build();
assertEquals(admin.topics().getDelayedDeliveryPolicy(topic, true), brokerLevelPolicy);
//set namespace-level policy
DelayedDeliveryPolicies namespaceLevelPolicy = DelayedDeliveryPolicies.builder()
.tickTime(100)
.active(true)
.maxDeliveryDelayInMillis(4000)
.build();
admin.namespaces().setDelayedDeliveryMessages(namespace, namespaceLevelPolicy);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.namespaces().getDelayedDelivery(namespace)));
DelayedDeliveryPolicies policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
assertEquals(policyFromBroker.getTickTime(), 100);
assertTrue(policyFromBroker.isActive());
assertEquals(policyFromBroker.getMaxDeliveryDelayInMillis(), 4000);
// set topic-level policy
DelayedDeliveryPolicies topicLevelPolicy = DelayedDeliveryPolicies.builder()
.tickTime(200)
.active(true)
.maxDeliveryDelayInMillis(5000)
.build();
admin.topics().setDelayedDeliveryPolicy(topic, topicLevelPolicy);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topics().getDelayedDeliveryPolicy(topic)));
policyFromBroker = admin.topics().getDelayedDeliveryPolicy(topic, true);
assertEquals(policyFromBroker.getTickTime(), 200);
assertTrue(policyFromBroker.isActive());
assertEquals(policyFromBroker.getMaxDeliveryDelayInMillis(), 5000);
//remove topic-level policy
admin.topics().removeDelayedDeliveryPolicy(topic);
Awaitility.await().untilAsserted(()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
Expand Down Expand Up @@ -196,4 +197,43 @@ public void testOperationDispatchRate() throws Exception {
this.testTenant, this.testNamespace));
assertTrue(Objects.isNull(dispatchRate));
}

@Test
public void testOperationDelayedDelivery() throws Exception {
boolean isActive = true;
long tickTime = 1000;
long maxDeliveryDelayInMillis = 5000;
// 1. set delayed delivery policy
namespaces.setDelayedDeliveryPolicies(this.testTenant, this.testNamespace,
DelayedDeliveryPolicies.builder()
.active(isActive)
.tickTime(tickTime)
.maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
.build());

// 2. query delayed delivery policy & check
DelayedDeliveryPolicies policy =
(DelayedDeliveryPolicies) asyncRequests(response -> namespaces.getDelayedDeliveryPolicies(response,
this.testTenant, this.testNamespace));
assertEquals(policy.isActive(), isActive);
assertEquals(policy.getTickTime(), tickTime);
assertEquals(policy.getMaxDeliveryDelayInMillis(), maxDeliveryDelayInMillis);

// 3. remove & check
namespaces.removeDelayedDeliveryPolicies(this.testTenant, this.testNamespace);
policy =
(DelayedDeliveryPolicies) asyncRequests(response -> namespaces.getDelayedDeliveryPolicies(response,
this.testTenant, this.testNamespace));
assertTrue(Objects.isNull(policy));

// 4. invalid namespace check
String invalidNamespace = this.testNamespace + "/";
try {
namespaces.setDelayedDeliveryPolicies(this.testTenant, invalidNamespace,
DelayedDeliveryPolicies.builder().build());
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
Expand Down Expand Up @@ -3163,6 +3164,31 @@ public void testProduceChangesWithEncryptionRequired() throws Exception {
});
}

@Test
public void testDelayedDeliveryPolicy() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);

boolean isActive = true;
long tickTime = 1000;
long maxDeliveryDelayInMillis = 5000;
DelayedDeliveryPolicies policy = DelayedDeliveryPolicies.builder()
.active(isActive)
.tickTime(tickTime)
.maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
.build();

admin.topicPolicies().setDelayedDeliveryPolicy(topic, policy);
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getDelayedDeliveryPolicy(topic), policy));

admin.topicPolicies().removeDelayedDeliveryPolicy(topic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getDelayedDeliveryPolicy(topic)));

admin.topics().delete(topic, true);
}

@Test
public void testUpdateRetentionWithPartialFailure() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
Expand Down Expand Up @@ -3207,5 +3233,4 @@ public void testUpdateRetentionWithPartialFailure() throws Exception {
admin.namespaces().removeRetention(myNamespace);
admin.topics().delete(tpName, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -337,6 +338,7 @@ public void testEnableAndDisableTopicDelayedDelivery() throws Exception {
DelayedDeliveryPolicies delayedDeliveryPolicies = DelayedDeliveryPolicies.builder()
.tickTime(2000)
.active(false)
.maxDeliveryDelayInMillis(5000)
.build();
admin.topics().setDelayedDeliveryPolicy(topicName, delayedDeliveryPolicies);
//wait for update
Expand All @@ -349,6 +351,7 @@ public void testEnableAndDisableTopicDelayedDelivery() throws Exception {

assertFalse(admin.topics().getDelayedDeliveryPolicy(topicName).isActive());
assertEquals(2000, admin.topics().getDelayedDeliveryPolicy(topicName).getTickTime());
assertEquals(5000, admin.topics().getDelayedDeliveryPolicy(topicName).getMaxDeliveryDelayInMillis());

admin.topics().removeDelayedDeliveryPolicy(topicName);
//wait for update
Expand Down Expand Up @@ -622,4 +625,42 @@ public void testDispatcherReadFailure() throws Exception {
}
}

@Test
public void testDelayedDeliveryExceedsMaxDelay() throws Exception {
long maxDeliveryDelayInMillis = 5000;
String topic = BrokerTestUtil.newUniqueName("testDelayedDeliveryExceedsMaxDelay");

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

admin.topicPolicies().setDelayedDeliveryPolicy(topic,
DelayedDeliveryPolicies.builder()
.active(true)
.tickTime(100L)
.maxDeliveryDelayInMillis(maxDeliveryDelayInMillis)
.build());

//wait for update
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (admin.topics().getDelayedDeliveryPolicy(topic) != null) {
break;
}
}

try {
producer.newMessage()
.value("msg")
.deliverAfter(6, TimeUnit.SECONDS)
.send();

producer.flush();
fail("Should have thrown NotAllowedException due to exceeding maxDeliveryDelayInMillis");
} catch (PulsarClientException.NotAllowedException ex) {
assertEquals(ex.getMessage(), "Exceeds max allowed delivery delay of "
+ maxDeliveryDelayInMillis + " milliseconds");
}
}
}
Loading

0 comments on commit d37d31f

Please sign in to comment.