Skip to content

Commit

Permalink
Fix bug where hash wasn't added to pending acks in classic implementa…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
lhotari committed Oct 9, 2024
1 parent 09ab041 commit 2d2524c
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -245,7 +246,18 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
groupedEntries.clear();
final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new HashMap<>();

for (Entry entry : entries) {
for (int i = 0; i < entriesCount; i++) {
Entry inputEntry = entries.get(i);
EntryAndMetadata entry;
if (inputEntry instanceof EntryAndMetadata entryAndMetadataInstance) {
entry = entryAndMetadataInstance;
} else {
// replace the input entry with EntryAndMetadata instance. In addition to the entry and metadata,
// it will also carry the pre-calculated sticky key hash
entry = EntryAndMetadata.create(inputEntry,
Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(), getSubscriptionName(), -1));
entries.set(i, entry);
}
int stickyKeyHash = getStickyKeyHash(entry);
Consumer c = selector.select(stickyKeyHash);
if (c != null) {
Expand Down

0 comments on commit 2d2524c

Please sign in to comment.