-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Not acking without ackDeadline override #314
Comments
Having same issue after upgrading from 0.18.0 to 0.19.1, back then - I did not had much time to investigate, so reverted, and forgot about it for some time. Today, was trying to deploy new version using version 0.21.1, and have exactly same issue as before - some messages are not acknowledged. And, indeed - it looks to be related to having delay for message processing before acknowledging it… I have other service with almost identical setup, consuming messages from same topics, but without noticeable delay before acknowledgement - and everything works fine there. |
We've been working on refactoring the Subscription class (#388). If anyone has some spare time and wouldn't mind testing to see if it resolves this issue, that would be most excellent! |
Just put out a new release ( |
will try to reproduce it tomorrow |
@zxbodya thanks! |
tried to run my app with a new release… - unfortunately it did not work as I expect. but, now I see some errors in a log: will try to debug it, and maybe to build smaller app to reproduce the issue |
Tried to write small app to reproduce the issue - no luck, in small app it works… I feel, I miss something, but have not much idea what exactly might be wrong In my app I have something like this: const pubsub = new PubSub({
projectId: process.env.GOOGLE_PUBSUB_PROJECT_ID,
});
const sub1 = pubsub.subscription(subName1);
sub1.on('error', (err: Error) => {
logger.error({ err }, 'subscription error');
});
const sub2 = pubsub.subscription(subName1);
sub2.on('error', (err: Error) => {
logger.error({ err }, 'subscription error');
});
merge(
fromEvent<Message>(sub1, 'message'),
fromEvent<Message>(sub2, 'message'),
// here is more subscriptions from different topics around 30 topics
)
.pipe(
tap(message => {
logger.debug(
{ messageId: message.id, messageData: message.data.toString() },
'incoming'
);
}),
delay(3000), // or around 60000 for subscriptionExtraAdded
mergeMap(message => {
message.ack();
logger.debug({ messageId: message.id }, 'ack');
return [];
})
)
.subscribe(); And in a small app it works fine… But in real app - it does not, and what I see in logs looks weird for me: received 3 messages, one from
Tried to acknowledge
errors from
Tried to acknowledge
more errors:
10 minues passed and messages were redelivered:
error in
Tried to acknowledge
then, errors like this for subscriptions where was no messages:
No more log messages from this point… @callmehiphop any suggestion where to look for possible cause? Btw, looking at this log - it looks that subscription might end-up in "not working" state… is there some specific errors needed to be handled manually to prevent this from happening? |
@zxbodya actually there isn't really a good way to detect if the subscription crashed, I'm going to put together a PR that will hopefully improve the quality of the Errors emitted, as well as a hook to tell when the subscription closed. |
@zxbodya I just cut new a release (
Additionally Subscriptions now emit a subscription.on('close', () => { /* */ }); |
I think the original issue here might be resolved, we're not really seeing any chatter about it. I'm going to close it, but if anyone starts to experience this again let me know and I'll re-open. @zxbodya if the secondary issue you mentioned persists, let me know and I'll open a new issue to investigate. Thanks all! |
tried to reproduce the issue I had, using latest version - indeed, it looks to be fixed in v0.24.1 |
Unfortunately, It worked when we had ~80 messages in the queue, as they were processed with only a minimal number of "expired" acks, as reported through However, with 4k+ messages in the queue and all the same settings ( We've tried varying |
@dinvlad interesting! Thanks for the graphs, those are super helpful! |
Thanks @callmehiphop - the interesting point is that this service is running on-prem (to shuttle files between on-prem and GCS), instead of GKE. I guess it may be subject to similar network-related limitations. |
It appears that only versions starting from 0.23.0 are affected by this behavior. With 0.22.2, streaming pulls don't stop and while we still get occasional "expired" acks, they only happen in a fraction of cases (instead of 100%). |
@dinvlad would you mind opening a new issue with the details of what you're seeing? |
Apologies - #468 |
@dinvlad thanks! That should help us solve your issue more efficiently. |
Environment details
@google-cloud/pubsub
version: 0.20.1Steps to reproduce
ack()
is being called.Here's my original subscription code that works (I simplified it). Using this code, messages are removed from the queue as expected after being ack'd:
Because I needed to slow down processing, I added a delay. After adding this delay, 500 messages would be removed from the queue after being ack'd. Other messages (thousands of them) would also be ack'd, but wouldn't be removed from the queue. Eventually, the same messages would come back around to be processed multiple times.
After reading the source for this library, I noticed this line: https://github.com/googleapis/nodejs-pubsub/blob/master/src/subscriber.js#L281
I added this line to the code and things started working again:
It appears that the default 10000ms
ackDeadline
is somehow causing a problem in my case.Full working code here:
Here is a chart from stackdriver showing the timeline of events:
The text was updated successfully, but these errors were encountered: