Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: messages stuck in buffer, preventing proper load balancing #17

Closed
kir-titievsky opened this issue Apr 13, 2018 · 23 comments
Closed
Assignees
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@kir-titievsky
Copy link

Repro:

  1. Publish 60 messages, with numbers 1 through 6 as message content. Observe the total backlog for a subscription reach 60 messages and 23 bytes.
  2. Start an instance of a subscriber client, with a single-threaded executor and FlowControl set to 1 message per buffer (see code below). The subscriber takes 10 second to process each message.
  3. Observe that the subscriber processes messages, one at a time, every 10 seconds (see log output below)
  4. The bug: Start two new instances of the same subscriber client, roughly a minute later. Observe: they process no messages. Expected behavior: the two subscribers immediately start processing messages.
  5. Stop the first subscriber client. Observe: the next two subscriber clients start processing messages.

The hypothesis here is that the entire backlog is stuck in the gRPC and other buffers, between the server and the client. So the server thinks the messages are out and being processed, while the client code can't really see the messages. When new clients connect, the server does not have anything to send them. Killing the original client effectively "nacks" the messages in the buffer, by sending a stream close signal to the server. This allows the server to start sending messages to the other clients.

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import org.threeten.bp.ZonedDateTime;
import org.threeten.bp.format.DateTimeFormatter;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Sub{

    private static AtomicInteger messageCounter = new AtomicInteger(0);
    private static String appInstanceId = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HHmmss"));

    static class MessageHandler implements MessageReceiver {

        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            try {
                TimeUnit.SECONDS.sleep(10);
                System.out.println(
                        ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + ",\t"
                        + "App instance id" + appInstanceId
                        + ",\tProcessing id: " + messageCounter.incrementAndGet()
                        + ",\tmessage content:" + message.getData().toStringUtf8()
                );
                consumer.ack();
            }
            catch (InterruptedException e){
                consumer.nack();
            }
        }
    }

    /** Receive messages over a subscription. */
    public static void main(String... args) throws Exception {
        // set subscriber id, eg. my-sub
        String projectId = args[0];
        String subscriptionId = args[1];
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of( projectId, subscriptionId);
        Subscriber subscriber = null;
        try {
            // we create a single threaded subscriber with the most restrictive flow control setting:
            subscriber =
                    Subscriber.newBuilder(subscriptionName, new MessageHandler())
                            .setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1L).build())
                            .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build())
                            .build();
            subscriber.startAsync().awaitTerminated();
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync();
            }
        }
    }
}
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-pubsub</artifactId>
        <version>0.42.1-beta</version>
    </dependency>
Logs
# start first client at 17:00:34
17:00:47,	App instance id170034,	Processing id: 1,	message content:1
17:00:57,	App instance id170034,	Processing id: 2,	message content:5
17:01:07,	App instance id170034,	Processing id: 3,	message content:9
17:01:17,	App instance id170034,	Processing id: 4,	message content:4
17:01:27,	App instance id170034,	Processing id: 5,	message content:8
17:01:37,	App instance id170034,	Processing id: 6,	message content:3
17:01:47,	App instance id170034,	Processing id: 7,	message content:7
17:01:57,	App instance id170034,	Processing id: 8,	message content:2
17:02:07,	App instance id170034,	Processing id: 9,	message content:6
17:02:17,	App instance id170034,	Processing id: 10,	message content:13
17:02:27,	App instance id170034,	Processing id: 11,	message content:17
17:02:37,	App instance id170034,	Processing id: 12,	message content:12
17:02:47,	App instance id170034,	Processing id: 13,	message content:16
17:02:57,	App instance id170034,	Processing id: 14,	message content:11
17:03:07,	App instance id170034,	Processing id: 15,	message content:15
17:03:17,	App instance id170034,	Processing id: 16,	message content:10
17:03:27,	App instance id170034,	Processing id: 17,	message content:14
17:03:37,	App instance id170034,	Processing id: 18,	message content:22

# start second and third clients around here: they generate no logs

17:03:47,	App instance id170034,	Processing id: 19,	message content:26
17:03:57,	App instance id170034,	Processing id: 20,	message content:21
17:04:07,	App instance id170034,	Processing id: 21,	message content:25
17:04:17,	App instance id170034,	Processing id: 22,	message content:19
# kill first client

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
# second client logs (note they start "10 seconds" after the first client is killed
17:04:35,	App instance id170328,	Processing id: 1,	message content:39
17:04:45,	App instance id170328,	Processing id: 2,	message content:47
17:04:55,	App instance id170328,	Processing id: 3,	message content:20
17:05:05,	App instance id170328,	Processing id: 4,	message content:40
17:05:15,	App instance id170328,	Processing id: 5,	message content:38
17:05:25,	App instance id170328,	Processing id: 6,	message content:46
17:05:35,	App instance id170328,	Processing id: 7,	message content:27
17:05:45,	App instance id170328,	Processing id: 8,	message content:42
17:05:55,	App instance id170328,	Processing id: 9,	message content:55
17:06:05,	App instance id170328,	Processing id: 10,	message content:48
17:06:15,	App instance id170328,	Processing id: 11,	message content:56
17:06:25,	App instance id170328,	Processing id: 12,	message content:54
17:06:35,	App instance id170328,	Processing id: 13,	message content:49
17:06:45,	App instance id170328,	Processing id: 14,	message content:57
17:07:39,	App instance id170328,	Processing id: 15,	message content:18
17:07:49,	App instance id170328,	Processing id: 16,	message content:23
17:08:19,	App instance id170328,	Processing id: 17,	message content:30
17:08:29,	App instance id170328,	Processing id: 18,	message content:34
17:09:39,	App instance id170328,	Processing id: 19,	message content:32

# third client logs (note they start "10 seconds" after the first client is killed
17:04:35,	App instance id170328,	Processing id: 1,	message content:39
17:04:45,	App instance id170328,	Processing id: 2,	message content:47
17:04:55,	App instance id170328,	Processing id: 3,	message content:20
17:05:05,	App instance id170328,	Processing id: 4,	message content:40
17:05:15,	App instance id170328,	Processing id: 5,	message content:38
17:05:25,	App instance id170328,	Processing id: 6,	message content:46
17:05:35,	App instance id170328,	Processing id: 7,	message content:27
17:05:45,	App instance id170328,	Processing id: 8,	message content:42
17:05:55,	App instance id170328,	Processing id: 9,	message content:55
17:06:05,	App instance id170328,	Processing id: 10,	message content:48
17:06:15,	App instance id170328,	Processing id: 11,	message content:56
17:06:25,	App instance id170328,	Processing id: 12,	message content:54
17:06:35,	App instance id170328,	Processing id: 13,	message content:49
17:06:45,	App instance id170328,	Processing id: 14,	message content:57
17:07:39,	App instance id170328,	Processing id: 15,	message content:18
17:07:49,	App instance id170328,	Processing id: 16,	message content:23
17:08:19,	App instance id170328,	Processing id: 17,	message content:30
17:08:29,	App instance id170328,	Processing id: 18,	message content:34
17:09:39,	App instance id170328,	Processing id: 19,	message content:32
@kir-titievsky
Copy link
Author

Max D has suggested a simple fix for this: close the stream whenever the buffer is full. This would not have worked when we used the same streamingPull connection for acks as for pulls. But using a separate connection for acks makes this feasible.

The cost is additional bandwidth, but at least the "stuckness" and load-balancing behavior is sane.

@kir-titievsky
Copy link
Author

@saturnism FYI -- and thanks for discovering this.

@pongad
Copy link
Contributor

pongad commented Apr 18, 2018

This should have been alleviated by googleapis/google-cloud-java#3147 since we'd only open 1 connection by default and fewer messages should get stuck in buffer.

@kir-titievsky
Copy link
Author

kir-titievsky commented Apr 18, 2018 via email

@willy2706
Copy link

willy2706 commented Apr 19, 2018

@pongad I agree with @kir-titievsky. This issue occurs in my production when there are lots of messages published to a topic.

At last, I downgraded to v0.39.0-beta and used PollingSubscriberConnection

@justbaum30
Copy link

I'm also running into this issue on my own project. Almost same exact criteria as @kir-titievsky laid out above where we only want each instance of our application to be picking up 1 message at a time because the processing time of the message is relatively long, yet variable. We're seeing situations where a single subscriber hoards messages while another is left with nothing to do even though the max element count is set to 1 with a single executor.

Are there any updates on this issue? It's looking like we're going to have to change to synchronous polling instead because of this issue. Thanks for your time.

@meltsufin
Copy link
Member

@kir-titievsky This sounds like a serious bug, not a feature request. Or, am I missing something?

@kir-titievsky
Copy link
Author

kir-titievsky commented Sep 20, 2018 via email

@roku6185
Copy link

roku6185 commented Dec 12, 2018

Are there any code snippets to avoid "hoarding" using sync. pull? I'm having something similar to this issue on some jobs. Do I have to take care of extending the ack deadline myself when using sync. pull?

@meltsufin
Copy link
Member

@roku6185 The discussion about "hoarding" was in regards to the streaming (async) pull. Can you explain your issue with the sync pull? In sync pull you can specify the limit on how many messages to retrieve at a time. So, hoarding should not be an issue, in theory.

@roku6185
Copy link

My bad, let my rephrase. I'm using async. pull today and I'm having some kind of problem with retries. I think this has to do with the ack deadlnes not being extended properly. So I'm looking for some code snippet on how to implement the sync. pull in order to avoid this problem. I don't know if my problem is a "hoarding" problem, but my setup is similar to what @justbaum30 described above.

@meltsufin
Copy link
Member

Problems with streaming pull do exist as laid out in the above discussion.
We have a task to completely move away from streaming pull and use synchronous pull instead.
You can use synchronous pull through the Pub/Sub template.
If you think there is a specific issue we don't know about in the streaming pull, we will probably need an example that reproduces it to debug further.

@ajaaym
Copy link

ajaaym commented Mar 8, 2019

@meltsufin @roku6185 @kir-titievsky You can use below settings to avoid getting lot of message buffered in to client.

    Subscriber subscriber = Subscriber.newBuilder("", new MessageReceiver() {
      @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        consumer.ack();
      }
    }).setChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setChannelConfigurator(
            new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
              @Override public ManagedChannelBuilder apply(ManagedChannelBuilder input) {
                ((NettyChannelBuilder)input).flowControlWindow(4096); // try to keep around 10 * avg msg size
                return input;
              }
            }).build())
        .setParallelPullCount(1) //try to keep it to lower number if you dont want to buffer more message, 1 here will buffer 10 msg when flowcontrolwindow is configured properly
.setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(10).build()) //make here num puller * 10
        .build();

@meltsufin
Copy link
Member

@ajaaym This pretty ugly, but may be a valuable workaround for some people. I'd prefer to see this exposed a bit more conveniently in the Pub/Sub client library.
@kir-titievsky Is this an approach you recommend?

@ajaaym
Copy link

ajaaym commented Mar 8, 2019

@meltsufin yes it is, but can be used until proper solution is out.

@sduskis
Copy link
Contributor

sduskis commented May 3, 2019

@ajaaym and I discussed the possibility of having a new maxMessageSize variable that would allow the client to properly size the flowControlWindow

@chingor13 chingor13 transferred this issue from googleapis/google-cloud-java Dec 4, 2019
@chingor13 chingor13 added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Dec 4, 2019
@google-cloud-label-sync google-cloud-label-sync bot added the api: pubsub Issues related to the googleapis/java-pubsub API. label Jan 29, 2020
@meredithslota
Copy link

@kir-titievsky and @kamalaboulhosn — is this feature request still relevant? Feels like it might be made obsolete by the recent discussions re: flow control.

@kir-titievsky
Copy link
Author

yes, this should now be solved by the flow control api changes.

@meredithslota
Copy link

Ok, great. I'm just going to close this FR then - let me know if I've misunderstood. :)

@meredithslota meredithslota self-assigned this Feb 5, 2021
@dgrudenic-qumu
Copy link

So anyone knows if this feature is available so that example above works as expected?

@meredithslota
Copy link

https://cloud.google.com/pubsub/docs/pull with the included sample https://cloud.google.com/pubsub/docs/samples/pubsub-subscriber-flow-settings#pubsub_subscriber_flow_settings-java shows using flow control with a Pub/Sub subscriber. I'm not sure which portion of the thread you mean re: works as expected, but hopefully that'll get you started!

@dgrudenic-qumu
Copy link

We actually experienced same behavior mentioned in example above.

So if you have 3 subscribers on same subscription/topic and you want that each subscriber is processing 1 message at the time which is kind of standard point to point workflow it is not really possible with FlowControlSettings and setting setMaxOutstandingElementCount=1 because as soon as first subscriber gets message it will buffer more than 1 message.

Let me know if this is not the case and example above is actually working.

@meredithslota
Copy link

I opened a new issue to debug the issue you are having, since the original issue was a feature request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/java-pubsub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests