You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Issue
When using the Subscriber of the PubSub client, if any message is not ack'ed or nack'ed the subscriber does not shut down anymore.
Why would I want this?
In my case, I want to read PubSub messages using Apache Flink. Flink has a technique to keep itself consistent called checkpointing. Only on every checkpoint will should I ack messages. When a part of Flink crashes/exits/shuts down we want to ignore all messages since the last checkpoint and have PubSub resend them.
This means messages will only be acknowledged on a checkpoint (maybe every 500ms) and when shutting down the last couple of messages won't be acknowledged.
How to reproduce
I've edited the pubsub example in this repo to show what I mean:
public static void main(String... args) throws Exception {
ProjectSubscriptionName subscription = ProjectSubscriptionName.of("bolcom-stg-jey-streaming-886", "streaming-test");
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("Received message: " + message.getData().toStringUtf8());
//DO NOT ack() or nack() and the subscriber will never stop
//consumer.ack();
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscription, receiver)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
System.err.println(failure);
}
},
MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
// In this example, we will pull messages for one minute (60,000ms) then stop.
// In a real application, this sleep-then-stop is not necessary.
// Simply call stopAsync().awaitTerminated() when the server is shutting down, etc.
Thread.sleep(5000);
} finally {
if (subscriber != null) {
System.out.println("Stopping");
subscriber.stopAsync().awaitTerminated();
}
}
}
but basically, all that has changed is I do not ack() or nack() anything.
Question:
The example above won't ever return from the awaitTerminated() call and thus will not exit. Is this behavior by design? Or should I terminate in a different manner?
The text was updated successfully, but these errors were encountered:
This is actually intentional. awaitTerminated assumes that you're interested in messages, so it won't return until you ack or nack all messages. There are a few ways to get what you want.
Just don't wait. IIRC, all threads Subscriber creates internally are daemon threads: if all other threads exit, these threads won't stop the JVM from exiting. If you want to just stop your application this might be an easy way to do it.
After pubsub: reject expired and duplicate messages #3743 is merged, we'll correctly give up on messages that take too long to process. If you finish processing messages in 1 min, you can set setMaxAckExtensionPeriod to 1 min so that Subscriber will forget about your messages after about 1 minute. After it forgets, awaitTermination should return. Note that after we forget the messages, we won't extend their deadline either, so this option might be a little dangerous.
You can use concurrent map to hold AckConsumer. When you ack or nack, remove them from the map. When you want to shutdown, nack everything in the map. This might be a little expensive on the CPU.
As long as this is intentional it is fine for us to work around this, it just threw me off that awaitTerminated() never actually terminated in our case.
Option 1, unfortunately, does not work for us, because Flink actually removes/unloads classes from the JVM, and those daemon threads start throwing lots of NoClassDefFoundException, so we really want to be sure everything has shutdown before we tell Flink it's oke to unload everything.
Option 3 seems like the way to go, which is fine, we should not buffer too many messages at a given time anyway.
Issue
When using the Subscriber of the PubSub client, if any message is not ack'ed or nack'ed the subscriber does not shut down anymore.
Why would I want this?
In my case, I want to read PubSub messages using Apache Flink. Flink has a technique to keep itself consistent called checkpointing. Only on every checkpoint will should I ack messages. When a part of Flink crashes/exits/shuts down we want to ignore all messages since the last checkpoint and have PubSub resend them.
This means messages will only be acknowledged on a checkpoint (maybe every 500ms) and when shutting down the last couple of messages won't be acknowledged.
How to reproduce
I've edited the pubsub example in this repo to show what I mean:
but basically, all that has changed is I do not ack() or nack() anything.
Question:
The example above won't ever return from the awaitTerminated() call and thus will not exit. Is this behavior by design? Or should I terminate in a different manner?
The text was updated successfully, but these errors were encountered: