Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Large number of duplicate messages suddenly #778

Closed
cristiangraz opened this issue Oct 3, 2017 · 72 comments
Closed

pubsub: Large number of duplicate messages suddenly #778

cristiangraz opened this issue Oct 3, 2017 · 72 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@cristiangraz
Copy link

I've been noticing an influx of duplicate messages. Previously I don't think I had ever came across one, suddenly started seeing large volumes of dupes. For example out of 50 messages that were all ACKd just recently, I saw 48 duplicates. It sounds similar to this issue:
googleapis/google-cloud-java#2465

When I look in Google Cloud Console at the API requests, I'm seeing large numbers of 499 error codes. In the last 4 days I have 1,312 200 error codes, but 7,580 499 error codes.

MaxOutstandingMessages = 10
MaxExtension = 10 minutes

Versions:

  • cloud.google.com/go: aeeacb092ec71c83668e4828857289d89c34b610
  • github.com/googleapis/gax-go: 317e0006254c44a0ac427cc52a0e083ff0b9622f
  • google.golang.org/genproto: 1e559d0a00eef8a9a43151db4665280bd8dd5886
  • google.golang.org/grpc: f92cdcd7dcdc69e81b2d7b338479a19a8723cfa3
@jba jba added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. api: pubsub Issues related to the Pub/Sub API. labels Oct 3, 2017
@pongad
Copy link
Contributor

pongad commented Oct 3, 2017

@cristiangraz We have a working reproduction in Java. I'll see if I can also repro it here. Could you let us know how long the deadline on your subscription is?

@cristiangraz
Copy link
Author

@pongad Is the deadline different from MaxExtension in the ReceiveSettings struct? Just want to confirm I give you the right thing, MaxExtension is 10 minutes and MaxOutstandingMessages in this instance was set to 10.

@pongad
Copy link
Contributor

pongad commented Oct 4, 2017

@cristiangraz Sorry for the confusion, I was referring to deadline configured on pubsub subscription. You can get it from either the cloud console, or run

cfg, err := subscription.Config(ctx)
_ = err // handle err
fmt.Println(cfg.AckDeadline)

I sent a PR which should fix the problem to the Java repo. I plan to fix all the desgin concerns there first, then I'll replicate it here.

@cristiangraz
Copy link
Author

@pongad the deadline on the subscription is set to 10 seconds (from cloud console). Also in this instance all of the workers are completing in ~2-3 seconds once they start receive the message.

@jfbus
Copy link

jfbus commented Oct 5, 2017

Same here. We tried using MaxOutstandingMessages = 10 and got lots of duplicates. Switching back to the default was much better. AckDeadline was also set to 10sec.

@pongad
Copy link
Contributor

pongad commented Oct 6, 2017

This symptom suggests that this is the same problem we're seeing in Java.

Our diagnosis is this: The perceived communication latency between pubsub server and the client can be high, so the client thinks messages expires further into the future than they actually do. By the time the client send "please give me more time" request to the server, it's too late: the server already considered the messages expired and resent them.

@cristiangraz
Copy link
Author

@pongad With a pubsub deadline (in cloud console of 10s): Pubsub starts counting the 10s before client starts counting 10s (due to latency) -- so pubsub has already resent the message before client could request an extension? Am I understanding that correctly?

If a worker on our end is processing a message in < 2s, does that mean there could up to 8s of latency between the pubsub server and our client? Is there something on the Pubsub server that has caused latency to increase recently? We're running on Google Container Engine, so trying to grasp what the latency issues might be inside the datacenter from GKE to Pubsub.

If we changed the subscription deadline to 30s for example, would that solve this issue?

We have MaxOustandingMessages (of 10 in this instance) equal to the number of goroutines processing messages (in order to avoid pulling messages before they are ready to be processed).

Thanks for all your help so far.

@pongad
Copy link
Contributor

pongad commented Oct 6, 2017

@cristiangraz There are a few details, so I need to answer out of order.

pubsub has already resent the message before client could request an extension? Am I understanding that correctly?

Yes, we believe that's the root cause.

If a worker on our end is processing a message in < 2s, does that mean there could up to 8s of latency between the pubsub server and our client?

There are a few variables at play. According to this comment each response contains "a single batch as published". So, if you set MaxOutstandingMessages = 10, the server might send us 11 messages in the same response. The client will process up to 10 messages concurrently, but the 11th message needs to wait until one of the first 10 finishes. So the 11th message might appear to take 4 seconds to process, etc.

Is there something on the Pubsub server that has caused latency to increase recently? We're running on Google Container Engine, so trying to grasp what the latency issues might be inside the datacenter from GKE to Pubsub.

I'm actually not 100% sure. After I reproduced the problem, the logs showed that there has to be some latency going on, but I cannot tell which component (gRPC? pubsub server? firewall? etc) is introducing the latency.

If we changed the subscription deadline to 30s for example, would that solve this issue?

That would certainly help. I should note that if you set it too high, it will take longer for the message to be redelivered if your machine crashes. If you decide to increase the deadline, could you let us know if it's helping?

I created a PR to Java client (after merging, I can replicate in other langs) that will make the client send "I need more time" message to the server earlier to account for the latency.

@zhenjl
Copy link

zhenjl commented Oct 6, 2017

I am running into the same issue and it's incredibly frustrating. In my debugging, I set ReceiveSettings.NumGoroutines = 1, MaxOutstandingMessages = 1 and ReceiveSettings.MaxExtension = 10 * time.Minute.

With the default subscription.AckDeadline = 10seconds:

  1. If I ack the msg as soon as I receive it, I get 100% deletion rate.
  2. If I wait 100ms after (just sleep), I deleted 196 of 200 msgs.
  3. If I wait 200ms after, I deleted 102 of 200 msgs.
  4. If I wait 300ms after, I deleted only 68 of 200 msgs.

I also created a subscription w/ AckDeadline: 5 * time.Minute. the deletion/ack rate is slightly better. The last run I did w/ 300ms delay ack'ed 123 of 200 msgs.

Did Google Pubsub change something on their end? Seems like it's all happening in the last couple of days.

@cristiangraz
Copy link
Author

@pongad I tried to change the subscription deadline in the cloud console, but I didn't see any option for it. Is there any way to get more info internally from the Pubsub team about possible latency issues? It's definitely worrisome to see latency > 10-20ms inside of GKE, let alone the possibility of latency in the several seconds (if it's not related to streaming pull).

w.r.t. MaxOutstandingMessages and streaming pulls -- So if I set a value of 10, I should receive 10 messages? The only exception being that messages published in batch are also fetched in batch causing the streaming pull to possibly retrieve more than 10 messages and wait to process them? If that's correct, in our instance we were seeing duplicate messages mostly in an our domain system from an order. The order items are batch published in a single call to the fulfillment system, but the fulfillment system handles each message individually and publishes one at a time to the domain system (no batch publishing). They all are published around the same time, but not in a batch call. The domain system is where I am seeing the largest number of duplicates. If that's the only exception to more than MaxOutstandingMessages being received, then in our case there shouldn't be any messages that have been pulled but are waiting to begin processing? Is it possible something else is causing the latency?

This is only anecdotal as I haven't setup a test as thorough as @zhenjl, but from what I can observe duplicates are more likely when multiple messages are coming in around the same time (although not originating from a batch publish). When a worker only has a single message to process, I don't see any signs of duplicates so far -- even in the same workers.

@pongad
Copy link
Contributor

pongad commented Oct 9, 2017

I made a CL that should help with this.

@zhenjl Thank you for the measurements. This is in line with our expectation. If you ack the message very quickly, the acks will reach the server before the server expires the message, so there'd be fewer duplicates.

@cristiangraz You're right, I'm sorry. The only way to "change" the deadline right now is to delete the subscription and recreate. Hopefully the CL would mean you don't have to do that.

The fact that you're seeing this problem in your domain system is indeed suspicious. Could you let us know how you're publishing? If you're publishing by topic.Publish in the pubsub package, the publisher tries to batch messages together to increase performance. Could that be the problem?

I'll continue to work with the pubsub team to fix this.

@cristiangraz
Copy link
Author

@pongad Yeah we are using topic.Publish -- I suppose if several independent goroutines are calling Publish the client could be batching them.

@pongad
Copy link
Contributor

pongad commented Oct 10, 2017

We investigated this more. We believe the messages are buffered on a machine between your VM and the pubsub server. This explains the latency: pubsub server can send messages into the buffer faster than we can read it out, so some messages spend a few seconds in there. We're working to fix this.

In the immediate term, you can try reverting to v0.11.0. That version pulls messages from a slower endpoint that shouldn't be affected by this problem. If you see compile error like undefined: gax.MustCompilePathTemplate, you need to also revert gax-go to version v1.0.0. Please let me know if this helps.

@cristiangraz
Copy link
Author

Thanks @pongad. We rolled out a fix for all of our workers on Friday so they are unique by (subscription name, message id) using SQL locks, so we're currently protected from duplicate concurrent messages. I had some issues with v0.11.0 so am going to stay pinned to the current version until a fix is ready.

I appreciate all of your help digging into this, it sounds like you may have found the cause behind the latency. If you need any additional info please let me know.

@cmoad
Copy link

cmoad commented Oct 17, 2017

+1 on this issue. We've been trying to determine for days why we aren't processing through our queues and reverting to v0.11.0 seems to have resolved the issue. We were showing a ton of Google Cloud Pub/Sub API errors in our cloud console.

@ProfessorBeekums
Copy link

I'm also seeing this issue. I had actually changed my ack deadline in the Google Cloud UI to 300 seconds, but noticed my messages being redelivered every 10. The UI showed me the deadline was 300 seconds though.

Recreating my subscription fixed that problem, but is there a reason that edit option isn't disabled if it isn't supported?

@jba
Copy link
Contributor

jba commented Oct 17, 2017

@ProfessorBeekums Subscription.Receive gets the ack deadline from the service at the beginning, and then remembers it for the life of the call. You have to call Receive again to get the change.

@tecbot
Copy link

tecbot commented Oct 27, 2017

Any updates on this issue? We are facing the same issue and this is a real pain.

@jba
Copy link
Contributor

jba commented Oct 27, 2017

It's an active area of work internally. There should be fixes rolled out in the next couple of weeks.

@tmatsuo
Copy link
Contributor

tmatsuo commented Nov 8, 2017

I'm getting the same problem with this version: 8c4ed1f

My theory of this problem is the following:

This line:
https://github.com/GoogleCloudPlatform/google-cloud-go/blob/8dff92c85f4225d90bdd100ea741d9903acc259e/pubsub/iterator.go#L58

It always uses the 5 seconds before the deadline. Let's say we have 10 seconds ack deadline on the subscription.

OK case:

Time Client Server
0 Pull a message Send a message
5 Ask for extension Quickly accept the extension

Bad case (assuming it retries on failure):

Time Client Server
0 Pull a message Send a message
5 Ask for extension Somehow Erroring, hanging almost 5 seconds
10 Retrying modifyAckDeadline Got it, but it's too late

Originally I started to experience this problem with this issue:
#382

As I recall, when I used the legacy client lib, I manually send modifyAckDeadline by myself, with a grace period of 15 seconds with 60 seconds of AckDeadline. At that time, it works much much better.

That makes me think of something like:

keepAlivePeriod := Max(po.ackDeadline - 5*time.Second, po.ackDeadline * 1/4)

for an easy mitigation. WDYT?

@pongad
Copy link
Contributor

pongad commented Nov 8, 2017

@tmatsuo From our analysis, things look slightly different from what you just mentioned.

In your table, the server sends and the client receives at about the same time. Our analysis shows that this isn't really the case. There could be significant buffering between client and server. So things might look kind of like this instead:

Time Client Server
0 Buffers (the client hasn't "seen" the message) Send a message
6 Sees message
10 Message expires
11 Sends modack Got it, but too late

Your solution might help to a degree. However, it's always possible for enough messages to back up that changing keep alive period won't help.

We're fixing this partly server-side so things look like this:

Time Client Server
0 Buffers (the client hasn't "seen" the message) Send a message, but pause the "clock"
6 Sees message; notifies server [1] starts the clock, message expires at time=16
11 Sends modack Got it, with time to spare

The change [1] is already made to the client here.

Last week, we estimated that the server-side fix should be rolling out this week. I'll keep this thread updated.

@tmatsuo
Copy link
Contributor

tmatsuo commented Nov 8, 2017

@pongad Alright, it sounds promising :) Thanks!

@tmatsuo
Copy link
Contributor

tmatsuo commented Nov 8, 2017

@pongad

FWIW, although your scenario looks promising, I think the suggestion of mine or similar approach will also be needed, because I often observe duplicated messages certain time after the pull request was made.

@pongad
Copy link
Contributor

pongad commented Nov 8, 2017

duplicated messages certain time after the pull request was made

I'm not 100% sure I parse this right. Are you saying the duplicate arrive a certain time after you ack it?

@tmatsuo
Copy link
Contributor

tmatsuo commented Nov 8, 2017

@pongad

No. I mean that, it seems that the dups are happening for me at not only the first iteration of modack, but also on one of the subsequent iterations of modack. I'm not 100% confident though.

@tmatsuo
Copy link
Contributor

tmatsuo commented Nov 15, 2017

@pongad
I think some of the modack failures in our case might have been because of network congestion (and short grace period of course), sorry for the noise.

@tecbot
Copy link

tecbot commented Mar 7, 2018

@jba We updated pubsub vendor to the latest version of one service which was affected by this bug in the past to test it again. The issue still exists. The unacked msg counter increases linear and we get always the same message over and over again regardless we ack them immediately on the client side. How we can help you to debug this problem?

@jba
Copy link
Contributor

jba commented Mar 7, 2018

@tecbot:

  • Are you also running on GKE? If so, does the same problem occur if you run your docker container directly on GCE?

  • Please share as much of your Receive code as you're comfortable with. Especially useful are the ReceiveSettings you're using.

  • Could you enable the client instrumentation we recently added? Details are in my comment above.

@pongad
Copy link
Contributor

pongad commented Mar 8, 2018

I'll unassign myself from this, but I'll keep an eye on it.

@tecbot
Copy link

tecbot commented Mar 8, 2018

@jba:

Are you also running on GKE? If so, does the same problem occur if you run your docker container directly on GCE?

Yep, running on GKE but we have not tested it on GCE yet, but we had this service in the past in our own datacenter with the same error, so it shouldn't depend on it.

Please share as much of your Receive code as you're comfortable with. Especially useful are the ReceiveSettings you're using.

Our Receive looks simplified like that (removed code is protobuf parsing and uses a different fn to execute), maybe one important point is that we delay the execution for 20s, also the real fn can take 2min to complete (depends on the data).

const delay = 20 * time.Second

func Do(ctx context.Context, sub *pubsub.Subscription, fn func(byte[]) error) error {
	err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
		// delay the execution
		select {
		case <-ctx.Done():
			m.Nack()
			return
		case <-time.NewTimer(delay).C:
		}
		if err := fn(m.Data); err != nil {
			m.Nack()
			return
		}
		m.Ack()
	})
	if err != context.Canceled {
		return err
	}
	return nil
}

Creating the subscription, we set MaxOutstandingMessages to 30 to limit throughput:

        ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
	defer cancel()

	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		...
	}
	sub := client.Subscription(subID)
	ok, err := sub.Exists(ctx)
	if err != nil {
		...
	}
	if !ok {
		sub, err = client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
			Topic:       client.Topic(topicID),
			AckDeadline: 60 * time.Second,
		})
		if err != nil {
			...
		}
	}
	sub.ReceiveSettings.MaxOutstandingMessages = 30
	return sub

Could you enable the client instrumentation we recently added? Details are in my comment above.

We added the instrumentation, here you have multiple graphs. What we can see so far is that there are intervals without any Acks, then afterwards a burst happens of Acks. Doesn't look right to me.

Graph for 12 hours:
pubsub_12hours

Graph for 3 hours:
pubsub_3hours

Graph for a time frame with out peaks:
pubsub_extract

Here are graph from stackdriver:
bildschirmfoto 2018-03-08 um 10 40 26

@tecbot
Copy link

tecbot commented Mar 8, 2018

@jba we tested a different service without any delay and there is the same issue. There is always only a short period of time where message comes in, and then there is ~20 min break without any messages. This repeats every time. Maybe to mentioned both topics have a low throughput on the publisher side, ~1 new msg/s. Do you bundle/buffer msgs on the server side before publish them to stream subscriptions?

@tecbot
Copy link

tecbot commented Mar 13, 2018

@jba any updates? We tested more scenarios and for us it's clear that it depends on the publish rate of the topic. We have always problems with subscriptions if the publish rate is slow. We can see that subscribers don't receive messages at all or with a really big delay ~10-20 minutes but at the same time stackdriver unacked messages increases. Subscriptions with a publish rate ~1000/s have no problems.

Edit: We added now a "ping" msg to publish every 100ms a message in the topic, which "resolves" the stuck subscribers. If we stop the pinger, the subscribers stuck instantly.

@jba
Copy link
Contributor

jba commented Mar 13, 2018

Thanks for all the info. We don't have a theory yet about what you're seeing. The server doesn't buffer, and it doesn't rebundle—if the publisher sends 3 messages in one Publish RPC, then the subscriber will receive those three messages in one chunk.

The fact that your "ping" messages unsticks the subscribers is very interesting—we haven't observed that before.

I'm trying to look at the black graphs but am having trouble interpreting the y-axis. The raw counters always increment, so I don't think you could be using those directly since your graphs go down. Are you showing a delta? For example, in the third graph (between peaks), the average value for Ack is about 88. Is that 88 acks over the last 10s sampling interval? (OpenCensus sends sample data every 10s, unless you call SetReportingPeriod.) Is it 88 acks per second? Neither makes sense with the numbers you provided: if you're handling 30 messages every 2 minutes or so, your ack rate should be .25/s.

@jeanbza
Copy link
Member

jeanbza commented Mar 14, 2018

@tecbot Something is funky, or I'm misunderstanding.

For the top graphs: I think your graphs are saying that acks are happening steadily at around 90 per minute (or per second? I think it's saying per minute... but not sure), which seems good.

For the bottom graphs: Could you provide a also a graph showing num_undelivered_messages, as well as instrumentation around the time it takes to ack messages? The two reasons for the spikiness I could think of: there actually aren't any undelivered messages, or the processing time is ~20-40m. I think it is really important to have instrumentation around process time for us to understand what's going on with your app.

Furthermore, I've taken your code and replicated it (see program here). I'm not sure how you're sending, so I used the gcloud console to send a message each second (actually, initially 2s, then 1s - you'll see a slight uptick early on). Here are the charts:

screen shot 2018-03-14 at 10 27 02 am

So unfortunately I'm unable to repro. Could you describe the way you're publishing? I'm interested in the following:

  • Are you using a program to publish? If so which language? If not which tool?
  • Could you describe any publish settings you've configured?
  • Could you provide a repro, if publishing via code?

edit: If you need any help instrumenting your app to show time to process the message, please let me know - happy to provide code snippet!

@sfriquet
Copy link

sfriquet commented Apr 7, 2018

FWIW we also encounter a similar experience using PubSub on GKE.

We reproduce this behavior when we suddenly stop consuming a subscription and then starting to acknowledge messages again.
When we notice such behavior the number of undelivered messages is in the order of the dozen of thousands at least.

When we restart consuming the subscription again by acking the messages, we see a surge in duplicated messages until the number of unack'ed messages drop to 0.
We also notice that the rate of duplicate message decreases as the number of undelivered messages drop.
Once the messages no longer accumulate in the queue we see almost no duplicate messages.

@jba
Copy link
Contributor

jba commented Apr 9, 2018

@sfriquet:

stop consuming a subscription and then starting to acknowledge messages again

So you exit from the call to Receive, then wait a while, then call Receive again?

When we restart consuming the subscription again by acking the messages, we see a surge in duplicated messages...

The timer for a message's ack deadline begins when the message arrives on the client, or after 10 minutes, whichever comes first. Messages arrive on the client in bulk, and may be buffered (by proxies or the client's network layer) before they arrive. If you ack slowly, messages may time out, and then you will see duplicates.

For example, say there are 10,000 undelivered messages when you call Receive, and you process them one at a time (one subscriber process, NumGoroutines=1, MaxOutstandingMessages=1). You ack each message after 1 second. After 10 minutes you have seen 600 messages, but there may be many more that have left the server and are in a buffer somewhere. Ten seconds later (assuming the default ack deadline of 10s), these messages will time out. You will see and ack them as you continue processing, but the server will have already marked them as expired and will redeliver them.

The bottom line is that it can be hard to distinguish this behavior—which is "normal," though undesirable—from a bug in the PubSub client or service.

It would help if you could give us more data.

@sfriquet
Copy link

@jba

So you exit from the call to Receive, then wait a while, then call Receive again?

Yes and no. We notice duplicates in 2 circumstances.

  • Indeed stopping the client then restarting it later
  • When we have to Nack messages. e.g validation issue on our end, we suddenly start Nack'ing messages to a point that the queue isn't being consumed at all.
    Note that we see duplicates when we start consuming the queue again. What I mean is we only see duplicates when the queue has accumulated and we're trying to dequeue it.

pubsub_dup_rate

Also, slightly unrelated I guess, but it looks like Nack'ed messages are put back at the front of the queue, so that they are immediately fetched again by the clients. We saw a few times our pipeline completely stalling because of relatively few number of "invalid" messages that kept being retried.

For example [...]

Thanks for the example, if I understand this right:

  • When the client pulls messages, an undefined? amount of messages are actually pulled from the server (more than MaxOustandingMessages I presume)
  • after 10 minutes + subscription deadline, if they haven't been seen by the client, they will timeout and will eventually be sent again, even though they should eventually be processed by the client, which can lead to duplicates.

So that'd mean that if a queue has accumulated too many undelivered messages, such that they can't be processed within (10 minutes + subscription deadline), then duplicates are to be expected. Is that right?

That'd match what we see then. What could we possibly do to mitigate this?

It would help if you could give us more data.

What would help?

In terms of settings, we use:

  • Subscription has a 600s deadline
  • Receiver settings has NumGoroutines=1, MaxOutstandingMessages=8.
  • Message processing time: p99 is < 10s, p95 is < 1s

Thanks a lot for the clarification
In the meantime what we did is we added a cache layer to filter out messages based on their ID.

@jba
Copy link
Contributor

jba commented Apr 10, 2018

When the client pulls messages, an undefined? amount of messages are actually pulled from the server

The number of messages pulled will usually be the same as the number published together. For very high publish rates, the server may additionally batch separate messages that arrive close in time.

We saw a few times our pipeline completely stalling because of relatively few number of "invalid" messages that kept being retried.

Nacking a message will indeed cause it to be redelivered, perhaps promptly. One solution to this is to have a separate topic for invalid messages. If the messages are permanently invalid, then that topic should probably be subscribed to by something that alerts humans. If they are temporarily invalid, then the subscriber should ack the message, sleep on it a while, then republish it to the original topic. (There is no feature that publishes a message after a delay, or at a particular time in the future).

So that'd mean that if a queue has accumulated too many undelivered messages, such that they can't be processed within (10 minutes + subscription deadline), then duplicates are to be expected. Is that right?

Almost. They don't have to be processed by your code in that time, but they do have to make it onto the client. That may be a minor distinction or a large one, depending on a number of factors.

You should be able to mitigate the problem by

  • increasing the ack deadline
  • increasing throughput (adding processes, or increasing MaxOutstandingMessages).

Discarding the dups, as you're doing, is also fine. When you say you added a "cache layer," do you mean a separate process/server? Because you may be able to get away with having each client process get rid of just the duplicates it sees. That won't be perfect, but it may be enough, since your system has to tolerate duplicates anyway.

@sfriquet
Copy link

Thanks for the advice.

When you say you added a "cache layer," do you mean a separate process/server? Because you may be able to get away with having each client process get rid of just the duplicates it sees.

Reading your explanations again it indeed seems that the duplicates would be on a per client basis. In such case 'caching' at the client level should work too and be much simpler to implement/maintain then.

@jba
Copy link
Contributor

jba commented Apr 12, 2018

duplicates would be on a per client basis

I don't think that's right. The service will load balance messages (including re-sent ones) across all streams in a subscription. However, if you only have a handful of streams, a significant fraction will end up on the same one. For instance, if you have two streams (processes), then each will see half the messages, and get half the dups. So a per-client solution will weed out a quarter of the dups. I guess that's not a great ratio, now that I do the math. In any case, my point was that a per-client solution is much simpler architecturally, and maybe it gives you enough de-duping to make your system perform adequately.

@rayrutjes
Copy link

rayrutjes commented May 22, 2018

It seems like this issue is haunting us as we have it in multiple projects now.
After having followed this whole discussion and also spent time studying the internals of this library, we came to the conclusion that the current library is designed for pipelines having a setup with a somewhat stable incoming messages, and a high throughput in terms of processing.

In our case, we receive spikes of messages every 5 minutes, and the processing time of 1 message can vary and take up to a couple of seconds sometimes and there is not expected correlation between the number of incoming messages and the speed at which we want to process them.

If our understanding is correct, the streaming pull strategy used in this library can eventually fetch more messages than the MaxOutstandingMessages, which from a developer experience point of view is a bit hard to understand. I do understand now that this allows for a very high throughput in some scenarios. However it also introduces all issues discussed in this thread.

On our side, we tried leveraging the non streaming pull approach and so far it seems to address the problems. However our solution required us to re-implement parts of this pubsub client in order to re-create some of the needed features.

Is there any chance you could introduce a parameter letting the user choose whether to use the experimental streaming subscription pulling, or using the API endpoint? It seems like the latter respects the MaxOutstandingMessages and would work very fine in our use case.

Otherwise, if you plan to somehow deprecate the Pull endpoint in favour of the StreamingPull, is there any chances we could implement an option forcing the client to respect the max outstanding messages? Even a hack in the beginning, for example if the client was to Nack directly all messages after MaxOutstandingMessages amount has been received could help us solve our issue.

I hope this all makes sense. We feel like our current implementation re-invents the wheel, and given that you mentioned earlier that you were working on this case, I wanted to share our experimentations and expectations. I hope this is somewhat useful.

@jba
Copy link
Contributor

jba commented May 22, 2018

@rayrutjes, we'll pass your comments along to the PubSub team.

@JustinBeckwith JustinBeckwith added the 🚨 This issue needs some love. label Jun 8, 2018
@sfriquet
Copy link

sfriquet commented Jul 13, 2018

@cristiangraz I'm curious why is this issue closed?

@johnreutersward
Copy link

@jba Can we reopen this since people are still experiencing problems? I don't want this issue to lose visibility.

@cristiangraz
Copy link
Author

@sfriquet @johnreutersward The initial issue I was having regarding excessive duplicate messages has been fixed. There were lots of other unrelated comments on this issue that went quiet, but looks like there are some additional cases (like this one #778 (comment)) related to duplicate messages that I missed. Apologies, reopening.

@jba Will leave this up to you or the team to close this whenever it's ready to be closed.

@cristiangraz cristiangraz reopened this Jul 25, 2018
@jba
Copy link
Contributor

jba commented Oct 2, 2018

As of afb8009, we support synchronous mode for Receive. If you want more control over throughput, please try it. (It ensures that you will never pull more than MaxOutstandingMessages from the Pub/Sub service at a time.)

I'm going to close this now. Reopen if you are still experiencing many duplicates and synchronous mode isn't helping.

@jba jba closed this as completed Oct 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests