-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
subscription.on message wasn't received of messagActually unrelated failures, but we should bump this anyway.es after 10 mins or more. #9
Comments
From @callmehiphop on September 8, 2017 18:19 Thanks for reporting! We've had multiple reports of similar behavior and we're currently investigating this issue. In the meantime you might want to use pubsub v0.13.x as a temporary workaround. |
From @QTGate on September 8, 2017 23:32 Thank you. const pubsub = require ( '@google-cloud/pubsub' )( connectOption )
const listenSub = () => {
let subscription = pubsub.topic ( topicName ).subscription ( subScription )
subscription.on ( 'message', message => {
message.ack()
....
})
subscription.once('error', err => {
....
})
// each 10 mins restart
const tt = setTimeout (() => {
return subscription.close().then(() => {
listenPubSubMaster = null
return listenSub ()
}, 600000 )
}
listenSub() |
From @caseyduquettesc on September 18, 2017 3:20 We see a similar behavior, although it takes a few hours. Eventually the subscription stops processing new messages and we have to restart the subscription. This only started after upgrading to 0.14. |
From @mscottx88 on September 22, 2017 18:52 I too am seeing the same problem. We set up a forever running process on GKE and it seems to go unresponsive after some time. I came to the same conclusion of putting a restarter. Let's hope this gets resolved soon! |
From @callmehiphop on September 23, 2017 0:4 This is definitely a result of moving towards the new streaming API, I'm working on resolving this issue, but in the mean time I suggest downgrading to 0.13. I'll update this issue once it's been resolved. |
From @pongad on September 23, 2017 10:44 @callmehiphop FYI, we resolved this in Go by adding gRPC keepalive. I'm not sure how the code should look like in Node though. https://code-review.googlesource.com/#/c/gocloud/+/17070/ |
From @callmehiphop on September 25, 2017 14:10 @pongad thanks for the update! I believe we just have to set the |
From @callmehiphop on September 25, 2017 14:17 @QTGate @mscottx88 @caseyduquettesc I've opened a PR (#2627) which I believe will fix the issue you are seeing, would any of you mind testing it out? Thanks! |
From @mscottx88 on September 25, 2017 14:18 I'm willing to give it a go, for sure! On Mon, Sep 25, 2017, 7:17 AM Dave Gramlich notifications@github.com
|
From @mscottx88 on September 25, 2017 22:46 @callmehiphop I applied the two fixed files to my deployment but now I am seeing duplication of message delivery. That is, I am seeing the same message being pulled more than one time. update Also, it appears that the connection has been lost or something as it is sitting idle again even with messages being published to it. |
From @callmehiphop on September 26, 2017 17:30 @jganetsk I'm noticing that messages that are not acked before a stream is closed will be redelivered when the next stream is opened regardless of whether or not the ack deadline was hit. Is that the expected behavior? If so I'm assuming we need to filter these messages out. |
From @jganetsk on September 26, 2017 17:45 @callmehiphop I don't think you should be filtering any messages out. In general, it is possible to receive duplicates, and it is possible that they come before the ack deadline was hit. If this happens consistently, that's a problem that we should look at. How much time passes between delivery and redelivery? Does it correspond to the default ack deadline? |
From @jganetsk on September 26, 2017 18:5 We will take a look at this, thanks for reporting it. |
From @callmehiphop on September 26, 2017 18:7 @jganetsk as far as I can tell it only happens when closing and re-opening a stream. I'm putting together an elaborate test and I'll give you the details of my findings once I have some to share. |
From @callmehiphop on September 26, 2017 21:6 There appears to be 2 different scenarios in which messages will be redelivered before they have been acked. Streams being closed.In the event of a stream being closed it appears that all un-acked messages will be redelivered to a different stream regardless of the current ack deadline.
Race ConditionI believe we occasionally hit a race condition when attempting to modify an existing ack deadline. Currently we calculate the ack deadline using the 99th percentile of ack times ( If our @lukesneeringer have you seen anything similar to what I'm describing above in the Python client? If this is indeed the reason why a redelivery occurs, is it reasonable to calculate the sleep time at a lower percent? |
From @mscottx88 on September 30, 2017 0:22 Here is my solution to the race condition you describe. A simple 'use strict'; const config = require('@google-cloud/rcloadenv'); const cp = require('child_process'); const fs = require('fs-extra'); const logger = require('winston'); const pubsub = require('./providers/pubsub'); const uuid = require('uuid/v4'); const { name: configurationStorageName } = require('../package.json'); /** * Listen for and process incoming messages. * * @returns {Function} * An async function that can be called to stop listening for messages. */ const listen = () => { let activeMessages = 0; logger.info(`Subscription started at ${new Date().toISOString()}.`); const subscription = pubsub .get() .topic(process.env.SUBSCRIBE_TOPIC_NAME) .subscription(process.env.SUBSCRIBE_SUBSCRIPTION_NAME, { flowControl: { maxMessages: +process.env.MAX_CONCURRENT_MESSAGES } }); subscription.on('error', (error) => { logger.error(error); }); subscription.on('message', (message) => { const { email, pwd, uid } = message.attributes; logger.info(`Message received at ${new Date().toISOString()} for uuid ${uid}.`); const sessionId = uuid(); const downloadPath = `${__dirname}/../downloads/${sessionId}`; const profilePath = `${__dirname}/../profiles/${sessionId}`; const args = [ uid, email, pwd, sessionId, downloadPath, profilePath ]; const options = { cwd: process.cwd(), env: process.env, execArgv: [], silent: true }; logger.info(`Starting session ${sessionId}`); const downloader = cp.fork(`${__dirname}/download.js`, args, options); activeMessages++; downloader.stdout.on('data', (data) => { const text = new Buffer(data).toString('utf8'); logger.info(text); }); downloader.on('exit', async (code) => { logger.info(`Session ${sessionId} completed with exit code ${code}.`); try { await Promise.all([fs.remove(downloadPath), fs.remove(profilePath)]); } catch (error) { logger.error(`Could not cleanup path(s) in session ${sessionId} because of ${error}.`); } try { message.ack(); } finally { activeMessages--; } if (activeMessages === 0) { subscription.emit('importer-idle'); } }); }); return () => { return new Promise((resolve, reject) => { // stop the flow of incoming messages subscription.removeAllListeners('message'); const shutdown = async () => { try { await subscription.close(); logger.info(`Subscription ended at ${new Date().toISOString()}.`); resolve(); } catch (error) { reject(error); } }; if (activeMessages !== 0) { subscription.on('importer-idle', shutdown); } else { shutdown(); } }); }; }; /** * Run the process. * * @returns {void} */ const run = async () => { logger.level = process.env.LOG_LEVEL; try { await config.getAndApply(configurationStorageName); } catch (error) { logger.error(error); throw error; } let unlisten = listen(); setInterval(async () => { try { await unlisten(); } catch (error) { logger.error(error); } finally { unlisten = listen(); } }, +process.env.RESTART_INTERVAL); }; module.exports = { run }; |
From @QTGate on October 16, 2017 1:4
Steps to reproduceconst pubsub = require ( '@google-cloud/pubsub' )( connectOption )
const listenSub = () => {
let subscription = pubsub.topic ( topicName ).subscription ( subScription )
subscription.on ( 'message', message => {
message.ack()
....
})
// each 10 mins restart
const tt = setTimeout (() => {
return subscription.close().then(() => {
listenPubSubMaster = null
return listenSub ()
}, 600000 )
}
listenSub() I did make new subscription each 10 mins. |
From @ShahNewazKhan on October 18, 2017 19:23
I am also seeing the memory increase and not being released after implementing the subscriber refresh. |
@callmehiphop will this be addressed by #2716 or another issue/PR you've already worked on? |
From @QTGate on November 3, 2017 8:0 No I am not. Even the new v0.14.6. Also have memory problem. I plan do subscription.on gCloud API's by my self. |
From @QTGate on September 7, 2017 17:42
Steps to reproduce
I received all un-received messages when I restart it.
I checked the connectionPool object when it have not received message:
connectionPool.isPaused [false]
connectionPool.isOpen [true]
connectionPool.connections [5]
Thank you.
Copied from original issue: googleapis/google-cloud-node#2598
The text was updated successfully, but these errors were encountered: