Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Receipt modack #1540

Merged
merged 69 commits into from
Aug 24, 2023
Merged
Changes from 3 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
76539e8
receipt-modack for exactly once
maitrimangal Mar 27, 2023
a06a4b7
changing setup
maitrimangal Mar 27, 2023
d8a1779
changing the pendingReceipt List
maitrimangal Apr 3, 2023
680ac2d
using scheduled fixed rate
maitrimangal Apr 18, 2023
ec652a6
using blocked queues
maitrimangal Apr 18, 2023
4236363
using blocked queues
maitrimangal Apr 18, 2023
88dfc84
using blocked queues
maitrimangal Apr 18, 2023
1741342
adding null safety
maitrimangal Apr 18, 2023
b5d9706
adding null safety
maitrimangal Apr 18, 2023
baa470b
removing list
maitrimangal Apr 18, 2023
d7adf96
adding list back
maitrimangal Apr 18, 2023
349d0c4
if permanent failure, remove outstandingmsg from queue
maitrimangal Apr 18, 2023
3f877d5
adding snippet of test
maitrimangal May 19, 2023
1423c01
adding method to streaming subscriber
maitrimangal May 19, 2023
2f8a408
adding method to streaming subscriber
maitrimangal May 19, 2023
aa35b97
adding notifyAcks
maitrimangal May 19, 2023
71a90df
changing notifyAckFailed calls
maitrimangal May 24, 2023
4dfa1fd
addressing some comments
maitrimangal May 30, 2023
a893dd6
changed logic to use one datastructure
maitrimangal May 30, 2023
3193b81
fixing notifyFailed
maitrimangal Jun 5, 2023
bb0c2a9
fixing notifyFailed
maitrimangal Jun 5, 2023
60de4e1
changing Pair to custom class
maitrimangal Jun 15, 2023
0d9d41b
removing the not needed data structure
maitrimangal Jun 26, 2023
19c51db
Fixing test
maitrimangal Jun 26, 2023
ba5f30b
Fixing test
maitrimangal Jun 26, 2023
9d808eb
Fixing test
maitrimangal Jun 26, 2023
1a16d9a
Fixing test
maitrimangal Jun 26, 2023
e5718ab
fixing format
maitrimangal Jun 26, 2023
3a520ed
fixing test to call receiveMessage
maitrimangal Jun 26, 2023
6ca0337
testing test failure
maitrimangal Jun 26, 2023
5553ded
testing test failure
maitrimangal Jun 26, 2023
85bb43d
testing test failure
maitrimangal Jun 26, 2023
c8c4b63
increasing timestamp to test
maitrimangal Jun 26, 2023
aed52a1
increasing timestamp to test
maitrimangal Jun 26, 2023
d8fb0fa
adding log statement for testing
maitrimangal Jun 27, 2023
b90a6b6
Fixing lint
maitrimangal Jun 27, 2023
0681eca
Adding more logs
maitrimangal Jun 29, 2023
6ebd401
batch size log
maitrimangal Jul 17, 2023
10a43f3
changing method to syncronized
maitrimangal Jul 17, 2023
ead3c51
fixing for loop to not remove as we are iterating
maitrimangal Jul 20, 2023
bb35726
trying a concurrent map
maitrimangal Jul 20, 2023
f9abf69
fix: syncronizing notifyFailed
maitrimangal Aug 17, 2023
551528b
fix: removing unused import
maitrimangal Aug 17, 2023
04124fe
fix: reformat
maitrimangal Aug 17, 2023
7dcc8bc
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 18, 2023
f4ca8a5
fix: removing System.out.println statements
maitrimangal Aug 18, 2023
e1c251a
Merge branch 'main' of https://github.com/googleapis/java-pubsub into…
maitrimangal Aug 18, 2023
f156c4a
Merge branch 'ReceiptModack' of https://github.com/maitrimangal/java-…
maitrimangal Aug 18, 2023
fc9b8a8
fix: reviewign comments
maitrimangal Aug 18, 2023
3091823
fix: lint
maitrimangal Aug 18, 2023
1cc0365
adding another ordering key test example
maitrimangal Aug 18, 2023
d906e11
fix: trying to run this test again
maitrimangal Aug 18, 2023
46c2511
fix: trying to run this test again
maitrimangal Aug 19, 2023
b8edb54
fix: removing commented code
maitrimangal Aug 21, 2023
14e7292
fix: removing commented code
maitrimangal Aug 21, 2023
eb780ca
resolving the comments from review
maitrimangal Aug 22, 2023
aa5848b
adding custom matcher
maitrimangal Aug 22, 2023
ac3840e
adding custom matcher
maitrimangal Aug 22, 2023
e201ae4
adding custom matcher
maitrimangal Aug 22, 2023
1c94e57
adding custom matcher
maitrimangal Aug 22, 2023
3e9c83f
adding custom matcher correcting the matching statement
maitrimangal Aug 22, 2023
3aef16b
lint
maitrimangal Aug 22, 2023
f8db409
removing comments
maitrimangal Aug 22, 2023
11af2a0
removing comments
maitrimangal Aug 22, 2023
3200173
removing comments
maitrimangal Aug 22, 2023
e06b968
changing messageMatcher to messageDataMatcher, and fixing other nit t…
maitrimangal Aug 23, 2023
1846eb6
lint
maitrimangal Aug 23, 2023
3e45ca5
addressing review comments
maitrimangal Aug 24, 2023
a2336bd
addressing review comments
maitrimangal Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class MessageDispatcher {
private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> exactlyOncePendingReceipts = new LinkedBlockingQueue<>();

private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
Expand Down Expand Up @@ -372,13 +373,46 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
// totally expire so that pubsub service sends us the message again.
continue;
}
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
pendingReceipts.add(ackRequestData);
if (this.exactlyOnceDeliveryEnabled.get()) {
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
exactlyOncePendingReceipts.add(ackRequestData);
} else {
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
pendingReceipts.add(ackRequestData);
}
}
if (this.exactlyOnceDeliveryEnabled.get()) {
List<ModackRequestData> modackRequestData = new ArrayList<ModackRequestData>();
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
exactlyOncePendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
modackRequestData.add(
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
}
ackProcessor.sendModackOperations(modackRequestData);
ApiFuture.addCallback(ackReqData.messageFuture, new ApiFutureCallback<string>() {
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onFailure(Throwable throwable) {
System.out.println("Error with receipt modack");
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
}
@Override
public void onSuccess() {
// outstandingBatch.add(new OutstandingMessage(message, ackHandler));
// processBatch(outstandingBatch);
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
try {
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
}
}, MoreExecutors.directExecutor());
} else {
processBatch(outstandingBatch);
}

processBatch(outstandingBatch);
}


private void processBatch(List<OutstandingMessage> batch) {
messagesWaiter.incrementPendingCount(batch.size());
for (OutstandingMessage message : batch) {
Expand Down