From 2d2524cc699b4ebd667c1028579d51e4ee93a176 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 9 Oct 2024 04:53:33 +0300 Subject: [PATCH] Fix bug where hash wasn't added to pending acks in classic implementation --- ...tickyKeyDispatcherMultipleConsumersClassic.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index d639f282a8ecf..64748ad779d1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -245,7 +246,18 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis groupedEntries.clear(); final Map> consumerStickyKeyHashesMap = new HashMap<>(); - for (Entry entry : entries) { + for (int i = 0; i < entriesCount; i++) { + Entry inputEntry = entries.get(i); + EntryAndMetadata entry; + if (inputEntry instanceof EntryAndMetadata entryAndMetadataInstance) { + entry = entryAndMetadataInstance; + } else { + // replace the input entry with EntryAndMetadata instance. In addition to the entry and metadata, + // it will also carry the pre-calculated sticky key hash + entry = EntryAndMetadata.create(inputEntry, + Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(), getSubscriptionName(), -1)); + entries.set(i, entry); + } int stickyKeyHash = getStickyKeyHash(entry); Consumer c = selector.select(stickyKeyHash); if (c != null) {