From 813954389f2d48fe6d61dca7abaa594f674676f5 Mon Sep 17 00:00:00 2001 From: Gus Class Date: Thu, 8 Feb 2018 11:15:45 -0800 Subject: [PATCH 1/2] Adds exponential backoff to MQTT --- .../cloudiot_mqtt_example_nodejs.js | 85 +++++++++++++------ 1 file changed, 61 insertions(+), 24 deletions(-) diff --git a/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js b/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js index bd6763fd1d..fe6155865b 100644 --- a/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js +++ b/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js @@ -21,6 +21,21 @@ const jwt = require('jsonwebtoken'); const mqtt = require('mqtt'); // [END iot_mqtt_include] +// The initial backoff time after a disconnection occurs, in seconds. +var MINIMUM_BACKOFF_TIME = 1; + +// The maximum backoff time before giving up, in seconds. +var MAXIMUM_BACKOFF_TIME = 32; + +// Whether to wait with exponential backoff before publishing. +var shouldBackoff = false; + +// The current backoff time. +var backoffTime = 1; + +// Whether an asynchronous publish chain is in progress. +var publishChainInProgress = false; + console.log('Google Cloud IoT Core MQTT example.'); var argv = require(`yargs`) .options({ @@ -122,18 +137,42 @@ function createJwt (projectId, privateKeyFile, algorithm) { // messageCount. // [START iot_mqtt_publish] function publishAsync (messageCount, numMessages) { - const payload = `${argv.registryId}/${argv.deviceId}-payload-${messageCount}`; - // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. - // Cloud IoT Core also supports qos=0 for at most once delivery. - console.log('Publishing message:', payload); - client.publish(mqttTopic, payload, { qos: 1 }); - - const delayMs = argv.messageType === 'events' ? 1000 : 2000; - if (messageCount < numMessages) { - // If we have published fewer than numMessage messages, publish payload - // messageCount + 1 in 1 second. - // [START iot_mqtt_jwt_refresh] + // If we have published enough messages or backed off too many times, stop. + if (messageCount > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) { + if (backoffTime >= MAXIMUM_BACKOFF_TIME) { + console.log('Backoff time is too high. Giving up.'); + } + console.log('Closing connection to MQTT. Goodbye!'); + client.end(); + publishChainInProgress = false; + return; + } + + // Publish and schedule the next publish. + publishChainInProgress = true; + var publishDelayMs = 0; + if (shouldBackoff) { + publishDelayMs = 1000 * (backoffTime + Math.random()); + backoffTime *= 2; + console.log(`Backing off for ${publishDelayMs}ms before publishing.`); + } + + setTimeout(function () { + const payload = `${argv.registryId}/${argv.deviceId}-payload-${messageCount}`; + + // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. + // Cloud IoT Core also supports qos=0 for at most once delivery. + console.log('Publishing message:', payload); + client.publish(mqttTopic, payload, { qos: 1 }, function (err) { + if (!err) { + shouldBackoff = false; + backoffTime = MINIMUM_BACKOFF_TIME; + } + }); + + var schedulePublishDelayMs = argv.messageType === 'events' ? 1000 : 2000; setTimeout(function () { + // [START iot_mqtt_jwt_refresh] let secsFromIssue = parseInt(Date.now() / 1000) - iatTime; if (secsFromIssue > argv.tokenExpMins * 60) { iatTime = parseInt(Date.now() / 1000); @@ -145,15 +184,16 @@ function publishAsync (messageCount, numMessages) { client.on('connect', (success) => { console.log('connect'); - if (success) { - publishAsync(1, argv.numMessages); - } else { + if (!success) { console.log('Client not connected...'); + } else if (!publishChainInProgress) { + publishAsync(1, argv.numMessages); } }); client.on('close', () => { console.log('close'); + shouldBackoff = true; }); client.on('error', (err) => { @@ -168,14 +208,10 @@ function publishAsync (messageCount, numMessages) { // Note: logging packet send is very verbose }); } + // [END iot_mqtt_jwt_refresh] publishAsync(messageCount + 1, numMessages); - }, delayMs); - // [END iot_mqtt_jwt_refresh] - } else { - // Otherwise, close the connection. - console.log('Closing connection to MQTT. Goodbye!'); - client.end(); - } + }, schedulePublishDelayMs); + }, publishDelayMs); } // [END iot_mqtt_publish] @@ -213,15 +249,16 @@ const mqttTopic = `/devices/${argv.deviceId}/${argv.messageType}`; client.on('connect', (success) => { console.log('connect'); - if (success) { - publishAsync(1, argv.numMessages); - } else { + if (!success) { console.log('Client not connected...'); + } else if (!publishChainInProgress) { + publishAsync(1, argv.numMessages); } }); client.on('close', () => { console.log('close'); + shouldBackoff = true; }); client.on('error', (err) => { From 7a33989dc6b60b55bc1da5ffcf37137099ca920c Mon Sep 17 00:00:00 2001 From: Gus Class Date: Thu, 15 Feb 2018 12:50:37 -0800 Subject: [PATCH 2/2] refactors messageCount to better name --- iot/mqtt_example/cloudiot_mqtt_example_nodejs.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js b/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js index fe6155865b..516aa9c409 100644 --- a/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js +++ b/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js @@ -134,11 +134,11 @@ function createJwt (projectId, privateKeyFile, algorithm) { // [END iot_mqtt_jwt] // Publish numMessages messages asynchronously, starting from message -// messageCount. +// messagesSent. // [START iot_mqtt_publish] -function publishAsync (messageCount, numMessages) { +function publishAsync (messagesSent, numMessages) { // If we have published enough messages or backed off too many times, stop. - if (messageCount > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) { + if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) { if (backoffTime >= MAXIMUM_BACKOFF_TIME) { console.log('Backoff time is too high. Giving up.'); } @@ -158,7 +158,7 @@ function publishAsync (messageCount, numMessages) { } setTimeout(function () { - const payload = `${argv.registryId}/${argv.deviceId}-payload-${messageCount}`; + const payload = `${argv.registryId}/${argv.deviceId}-payload-${messagesSent}`; // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. // Cloud IoT Core also supports qos=0 for at most once delivery. @@ -209,7 +209,7 @@ function publishAsync (messageCount, numMessages) { }); } // [END iot_mqtt_jwt_refresh] - publishAsync(messageCount + 1, numMessages); + publishAsync(messagesSent + 1, numMessages); }, schedulePublishDelayMs); }, publishDelayMs); }