-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: reject expired and duplicate messages #3743
Conversation
if (ackHandler.totalExpiration.isBefore(now())) { | ||
// Message expired while waiting. We don't extend these messages anymore, | ||
// so it was probably sent to someone else. Don't work on it. | ||
// Don't nack it either, because we'd be nacking someone else's message. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
// forget removes from pendingMessages; this is OK, concurrent maps can | ||
// handle concurrent iterations and modifications. | ||
entry.getValue().forget(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Generally like it, nice clean approach 👍 Won't handle a final Subscriber subscriber = Subscriber.newBuilder(subscriptionName, (msg, ack) -> {
// do something
})
.setMaxAckExtensionPeriod(Duration.ZERO)
.build(); |
That's right. This won't handle the case where max ack extension is zero. I'm a little concerned by this; this essentially changes the behavior from "we won't extend these" to "we'll never work on these". I'll make a small fix for this. |
👍 I think this will fix #3383 and #2465 |
Don't merge this yet. Someone from pubsub should take a look. |
@@ -398,6 +423,14 @@ public void nack() { | |||
@Override | |||
public void run() { | |||
try { | |||
if (ackHandler.totalExpiration.plusSeconds(messageDeadlineSeconds.get()).isBefore(now())) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
CC @csainty
This should let us clear through backlogs more quickly.