Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub Subscriber will not stop when messages have not been acked or nacked #3752

Closed
Xeli opened this issue Sep 30, 2018 · 3 comments
Closed
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. triage me I really want to be triaged.

Comments

@Xeli
Copy link

Xeli commented Sep 30, 2018

Issue
When using the Subscriber of the PubSub client, if any message is not ack'ed or nack'ed the subscriber does not shut down anymore.

Why would I want this?
In my case, I want to read PubSub messages using Apache Flink. Flink has a technique to keep itself consistent called checkpointing. Only on every checkpoint will should I ack messages. When a part of Flink crashes/exits/shuts down we want to ignore all messages since the last checkpoint and have PubSub resend them.

This means messages will only be acknowledged on a checkpoint (maybe every 500ms) and when shutting down the last couple of messages won't be acknowledged.

How to reproduce
I've edited the pubsub example in this repo to show what I mean:

public static void main(String... args) throws Exception {
    ProjectSubscriptionName subscription = ProjectSubscriptionName.of("bolcom-stg-jey-streaming-886", "streaming-test");
    MessageReceiver receiver =
            new MessageReceiver() {
                @Override
                public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                    System.out.println("Received message: " + message.getData().toStringUtf8());
                    //DO NOT ack() or nack() and the subscriber will never stop
                    //consumer.ack();
                }
            };
    Subscriber subscriber = null;
    try {
        subscriber = Subscriber.newBuilder(subscription, receiver)
                               .build();
        subscriber.addListener(
                new Subscriber.Listener() {
                    @Override
                    public void failed(Subscriber.State from, Throwable failure) {
                        // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
                       System.err.println(failure);
                    }
               },
                MoreExecutors.directExecutor());
        subscriber.startAsync().awaitRunning();

        // In this example, we will pull messages for one minute (60,000ms) then stop.
        // In a real application, this sleep-then-stop is not necessary.
        // Simply call stopAsync().awaitTerminated() when the server is shutting down, etc.
        Thread.sleep(5000);
    } finally {
        if (subscriber != null) {
            System.out.println("Stopping");
            subscriber.stopAsync().awaitTerminated();
        }
    }
}

but basically, all that has changed is I do not ack() or nack() anything.

Question:
The example above won't ever return from the awaitTerminated() call and thus will not exit. Is this behavior by design? Or should I terminate in a different manner?

@JustinBeckwith JustinBeckwith added the triage me I really want to be triaged. label Oct 1, 2018
@chingor13 chingor13 added the api: pubsub Issues related to the Pub/Sub API. label Oct 4, 2018
@chingor13
Copy link
Contributor

@pongad Is this also related to the fixes you've been working on?

@pongad
Copy link
Contributor

pongad commented Oct 5, 2018

This is actually intentional. awaitTerminated assumes that you're interested in messages, so it won't return until you ack or nack all messages. There are a few ways to get what you want.

  1. Just don't wait. IIRC, all threads Subscriber creates internally are daemon threads: if all other threads exit, these threads won't stop the JVM from exiting. If you want to just stop your application this might be an easy way to do it.

  2. After pubsub: reject expired and duplicate messages #3743 is merged, we'll correctly give up on messages that take too long to process. If you finish processing messages in 1 min, you can set setMaxAckExtensionPeriod to 1 min so that Subscriber will forget about your messages after about 1 minute. After it forgets, awaitTermination should return. Note that after we forget the messages, we won't extend their deadline either, so this option might be a little dangerous.

  3. You can use concurrent map to hold AckConsumer. When you ack or nack, remove them from the map. When you want to shutdown, nack everything in the map. This might be a little expensive on the CPU.

@Xeli
Copy link
Author

Xeli commented Oct 5, 2018

@pongad
Thank you for your comment.

As long as this is intentional it is fine for us to work around this, it just threw me off that awaitTerminated() never actually terminated in our case.

Option 1, unfortunately, does not work for us, because Flink actually removes/unloads classes from the JVM, and those daemon threads start throwing lots of NoClassDefFoundException, so we really want to be sure everything has shutdown before we tell Flink it's oke to unload everything.

Option 3 seems like the way to go, which is fine, we should not buffer too many messages at a given time anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. triage me I really want to be triaged.
Projects
None yet
Development

No branches or pull requests

4 participants