From 22a87c67f07b55266e277f83f5ceb17d9f32f67e Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Fri, 9 Feb 2024 15:45:21 -0500 Subject: [PATCH] fix: Message ordering fix for #1889 (#1903) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: change assignees for issues and PRs to michaelpri10 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: Revert PR#1807 and use a LinkedHasMap in the MessageDispatcher * fix: Make processedReceivedMessages thread-safe * fix: Only synchronize on the outstandingReceipts object in the MessageDispatcher --------- Co-authored-by: Owl Bot --- .../cloud/pubsub/v1/MessageDispatcher.java | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 635bc92d5..1810badd2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -92,8 +93,8 @@ class MessageDispatcher { private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); - private final ConcurrentMap outstandingReceipts = - new ConcurrentHashMap(); + private final LinkedHashMap outstandingReceipts = + new LinkedHashMap(); private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(); private final AtomicBoolean extendDeadline = new AtomicBoolean(true); private final Lock jobLock; @@ -397,7 +398,9 @@ void processReceivedMessages(List messages) { if (this.exactlyOnceDeliveryEnabled.get()) { // For exactly once deliveries we don't add to outstanding batch because we first // process the receipt modack. If that is successful then we process the message. - outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage)); + synchronized (outstandingReceipts) { + outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage)); + } } else if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) { // putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the // previously-mapped element. @@ -417,33 +420,36 @@ void processReceivedMessages(List messages) { } void notifyAckSuccess(AckRequestData ackRequestData) { - - if (outstandingReceipts.containsKey(ackRequestData.getAckId())) { - outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete(); - List outstandingBatch = new ArrayList<>(); - - for (Iterator> it = - outstandingReceipts.entrySet().iterator(); - it.hasNext(); ) { - Map.Entry receipt = it.next(); - // If receipt is complete then add to outstandingBatch to process the batch - if (receipt.getValue().isReceiptComplete()) { - it.remove(); - if (pendingMessages.putIfAbsent( - receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler) - == null) { - outstandingBatch.add(receipt.getValue().getOutstandingMessage()); + synchronized (outstandingReceipts) { + if (outstandingReceipts.containsKey(ackRequestData.getAckId())) { + outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete(); + List outstandingBatch = new ArrayList<>(); + + for (Iterator> it = + outstandingReceipts.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry receipt = it.next(); + // If receipt is complete then add to outstandingBatch to process the batch + if (receipt.getValue().isReceiptComplete()) { + it.remove(); + if (pendingMessages.putIfAbsent( + receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler) + == null) { + outstandingBatch.add(receipt.getValue().getOutstandingMessage()); + } + } else { + break; } - } else { - break; } + processBatch(outstandingBatch); } - processBatch(outstandingBatch); } } void notifyAckFailed(AckRequestData ackRequestData) { - outstandingReceipts.remove(ackRequestData.getAckId()); + synchronized (outstandingReceipts) { + outstandingReceipts.remove(ackRequestData.getAckId()); + } } private void processBatch(List batch) {