Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Optimize the performance of individual acknowledgments #23072

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -531,14 +532,16 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {

//this method is for individual ack not carry the transaction
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
List<Pair<Consumer, Position>> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
Position position;
long ackedCount = 0;
long batchSize = getBatchSize(msgId);
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
Pair<Consumer, Long> ackOwnerConsumerAndBatchSize =
getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId());
Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.getLeft();
long ackedCount;
long batchSize = ackOwnerConsumerAndBatchSize.getRight();
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
Expand All @@ -557,28 +560,32 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
} else {
position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
}

positionsAcked.add(position);
positionsAcked.add(Pair.of(ackOwnerConsumer, position));

checkAckValidationError(ack, position);

totalAckCount += ackedCount;
}
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
subscription.acknowledgeMessage(positionsAcked.stream()
.map(Pair::getRight)
.collect(Collectors.toList()), AckType.Individual, properties);
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
completableFuture.complete(totalAckCount);
if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionPair -> {
Consumer ackOwnerConsumer = positionPair.getLeft();
Position position = positionPair.getRight();
//check if the position can remove from the consumer pending acks.
// the bit set is empty in pending ack handle.
if (AckSetStateUtil.hasAckSet(position)) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck(position)) {
removePendingAcks(position);
removePendingAcks(ackOwnerConsumer, position);
}
}
}));
Expand All @@ -590,7 +597,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
//this method is for individual ack carry the transaction
private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
// Individual ack
List<MutablePair<Position, Integer>> positionsAcked = new ArrayList<>();
List<Pair<Consumer, MutablePair<Position, Integer>>> positionsAcked = new ArrayList<>();
if (!isTransactionEnabled()) {
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
Expand All @@ -600,20 +607,23 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
Position position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null);
Consumer ackOwnerConsumer = getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(),
msgId.getEntryId()).getLeft();
// acked count at least one
long ackedCount = 0;
long batchSize = 0;
long ackedCount;
long batchSize;
if (msgId.hasBatchSize()) {
batchSize = msgId.getBatchSize();
// ack batch messages set ackeCount = batchSize
ackedCount = msgId.getBatchSize();
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, msgId.getBatchSize())));
} else {
// ack no batch message set ackedCount = 1
batchSize = 0;
ackedCount = 1;
positionsAcked.add(new MutablePair<>(position, (int) batchSize));
positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, (int) batchSize)));
}
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());

if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
Expand All @@ -625,47 +635,31 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

checkCanRemovePendingAcksAndHandle(position, msgId);
checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId);

checkAckValidationError(ack, position);

totalAckCount.add(ackedCount);
}

CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked);
ack.getTxnidLeastBits(), positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList()));
if (Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) ->
positionsAcked.forEach(positionLongMutablePair -> {
positionsAcked.forEach(positionPair -> {
Consumer ackOwnerConsumer = positionPair.getLeft();
MutablePair<Position, Integer> positionLongMutablePair = positionPair.getRight();
if (AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) {
removePendingAcks(positionLongMutablePair.left);
removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left);
}
}
}));
}
return completableFuture.thenApply(__ -> totalAckCount.sum());
}

private long getBatchSize(MessageIdData msgId) {
long batchSize = 1;
if (Subscription.isIndividualAckMode(subType)) {
LongPair longPair = pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId());
// Consumer may ack the msg that not belongs to it.
if (longPair == null) {
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
longPair = ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
if (longPair != null) {
batchSize = longPair.first;
}
} else {
batchSize = longPair.first;
}
}
return batchSize;
}

private long getAckedCountForMsgIdNoAckSets(long batchSize, Position position, Consumer consumer) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
Expand Down Expand Up @@ -725,26 +719,39 @@ private void checkAckValidationError(CommandAck ack, Position position) {
}
}

private boolean checkCanRemovePendingAcksAndHandle(Position position, MessageIdData msgId) {
private boolean checkCanRemovePendingAcksAndHandle(Consumer ackOwnedConsumer,
Position position, MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
return removePendingAcks(position);
return removePendingAcks(ackOwnedConsumer, position);
}
return false;
}

private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
Consumer ackOwnerConsumer = this;
/**
* Retrieves the acknowledgment owner consumer and batch size for the specified ledgerId and entryId.
*
* @param ledgerId The ID of the ledger.
* @param entryId The ID of the entry.
* @return Pair<Consumer, BatchSize>
*/
private Pair<Consumer, Long> getAckOwnerConsumerAndBatchSize(long ledgerId, long entryId) {
if (Subscription.isIndividualAckMode(subType)) {
if (!getPendingAcks().containsKey(ledgerId, entryId)) {
LongPair longPair = getPendingAcks().get(ledgerId, entryId);
if (longPair != null) {
return Pair.of(this, longPair.first);
} else {
// If there are more consumers, this step will consume more CPU, and it should be optimized later.
for (Consumer consumer : subscription.getConsumers()) {
if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
ackOwnerConsumer = consumer;
break;
if (consumer != this) {
longPair = consumer.getPendingAcks().get(ledgerId, entryId);
if (longPair != null) {
return Pair.of(consumer, longPair.first);
}
}
}
}
}
return ackOwnerConsumer;
return Pair.of(this, 1L);
}

private long[] getCursorAckSet(Position position) {
Expand Down Expand Up @@ -1019,44 +1026,24 @@ public int hashCode() {
*
* @param position
*/
private boolean removePendingAcks(Position position) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
for (Consumer consumer : subscription.getConsumers()) {
if (!consumer.equals(this) && consumer.getPendingAcks().containsKey(position.getLedgerId(),
position.getEntryId())) {
ackOwnedConsumer = consumer;
break;
}
}
} else {
ackOwnedConsumer = this;
private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return false;
}

// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
LongPair ackedPosition = ackOwnedConsumer != null
? ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId())
: null;
if (ackedPosition != null) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
return false;
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}

public ConcurrentLongLongPairHashMap getPendingAcks() {
Expand Down
Loading