diff --git a/lib/mqttclient.ts b/lib/mqttclient.ts index 0df753e..4361681 100644 --- a/lib/mqttclient.ts +++ b/lib/mqttclient.ts @@ -13,7 +13,8 @@ try { } catch (e) { } import Long from "long"; -import {InfluxDB, Point} from '@influxdata/influxdb-client' +import {InfluxDB, Point, DEFAULT_WriteOptions} from '@influxdata/influxdb-client' +import {Agent} from 'http' dotenv?.config() @@ -32,11 +33,45 @@ if (!influxOrganisation) { throw new Error("INFLUX_ORG environment variable is not set"); } +const batchSize: number = Number.parseInt(process.env.BATCH_SIZE); +if (!batchSize) { + throw new Error("BATCH_SIZE environment variable is not set"); +} + +let i = 0; + +// Node.js HTTP client OOTB does not reuse established TCP connections, a custom node HTTP agent +// can be used to reuse them and thus reduce the count of newly established networking sockets +const keepAliveAgent = new Agent({ + keepAlive: true, // reuse existing connections + keepAliveMsecs: 20 * 1000, // 20 seconds keep alive +}) + const influxDB = new InfluxDB({ url: influxURL, token: influxToken, + transportOptions: { + agent: keepAliveAgent, + } }) +/* points/lines are batched in order to minimize networking and increase performance */ +const flushBatchSize = batchSize; + +const writeApi = influxDB.getWriteApi(influxOrganisation, 'default', 'ns', { + /* the maximum points/lines to send in a single batch to InfluxDB server */ + batchSize: flushBatchSize + 1, // don't let automatically flush data + /* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */ + flushInterval: 0, + /* maximum size of the retry buffer - it contains items that could not be sent for the first time */ + maxBufferLines: 30_000, + /* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */ + maxRetries: 0, // do not retry writes + // ... there are more write options that can be customized, see + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html +}); + interface MQTTClientConstructorParams { e: { serviceClient: ServiceClient; @@ -54,9 +89,22 @@ export default class MQTTClient { } async init() { + + process.on('exit', () => { + this.flushBuffer(); + keepAliveAgent.destroy(); + }) + return this; } + private flushBuffer() { + let bufferSize = i; + writeApi.flush().then(() => { + logger.info(`🚀 Flushed ${bufferSize} points to InfluxDB`); + }) + } + async run() { const mqtt = await this.serviceClient.mqtt_client(); @@ -79,6 +127,9 @@ export default class MQTTClient { on_close() { logger.warn(`❌ Disconnected from Factory+ broker`); + + // Flush any remaining data + this.flushBuffer(); } on_reconnect() { @@ -87,6 +138,8 @@ export default class MQTTClient { on_error(error: any) { logger.error("🚨 MQTT error: %o", error); + // Flush any remaining data + this.flushBuffer(); } async on_message(topicString: string, message: Uint8Array | Reader) { @@ -220,8 +273,6 @@ export default class MQTTClient { if (value === null) return; - const writeApi = influxDB.getWriteApi(influxOrganisation, 'default'); - writeApi.useDefaultTags({ instance: birth.instance, schema: birth.schema, @@ -281,9 +332,14 @@ export default class MQTTClient { } - writeApi.close().then(() => { - logger.debug(`Written to InfluxDB: [${birth.type}] ${topic.address}/${birth.name} = ${value}`); - }) + i++; + + logger.debug(`Added to write buffer (${i}/${flushBatchSize}): [${birth.type}] ${topic.address}/${birth.name} = ${value}`); + + if (i >= flushBatchSize) { + this.flushBuffer(); + i = 0; + } } setNestedValue(obj, path, value) {