Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add batching and connection reuse to InfluxDB client (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexGodbehere authored Jul 28, 2023
1 parent e2ed574 commit f0edab1
Showing 1 changed file with 62 additions and 6 deletions.
68 changes: 62 additions & 6 deletions lib/mqttclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ try {
} catch (e) {
}
import Long from "long";
import {InfluxDB, Point} from '@influxdata/influxdb-client'
import {InfluxDB, Point, DEFAULT_WriteOptions} from '@influxdata/influxdb-client'
import {Agent} from 'http'

dotenv?.config()

Expand All @@ -32,11 +33,45 @@ if (!influxOrganisation) {
throw new Error("INFLUX_ORG environment variable is not set");
}

const batchSize: number = Number.parseInt(process.env.BATCH_SIZE);
if (!batchSize) {
throw new Error("BATCH_SIZE environment variable is not set");
}

let i = 0;

// Node.js HTTP client OOTB does not reuse established TCP connections, a custom node HTTP agent
// can be used to reuse them and thus reduce the count of newly established networking sockets
const keepAliveAgent = new Agent({
keepAlive: true, // reuse existing connections
keepAliveMsecs: 20 * 1000, // 20 seconds keep alive
})

const influxDB = new InfluxDB({
url: influxURL,
token: influxToken,
transportOptions: {
agent: keepAliveAgent,
}
})

/* points/lines are batched in order to minimize networking and increase performance */
const flushBatchSize = batchSize;

const writeApi = influxDB.getWriteApi(influxOrganisation, 'default', 'ns', {
/* the maximum points/lines to send in a single batch to InfluxDB server */
batchSize: flushBatchSize + 1, // don't let automatically flush data
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
flushInterval: 0,
/* maximum size of the retry buffer - it contains items that could not be sent for the first time */
maxBufferLines: 30_000,
/* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */
maxRetries: 0, // do not retry writes
// ... there are more write options that can be customized, see
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html
});

interface MQTTClientConstructorParams {
e: {
serviceClient: ServiceClient;
Expand All @@ -54,9 +89,22 @@ export default class MQTTClient {
}

async init() {

process.on('exit', () => {
this.flushBuffer();
keepAliveAgent.destroy();
})

return this;
}

private flushBuffer() {
let bufferSize = i;
writeApi.flush().then(() => {
logger.info(`🚀 Flushed ${bufferSize} points to InfluxDB`);
})
}

async run() {

const mqtt = await this.serviceClient.mqtt_client();
Expand All @@ -79,6 +127,9 @@ export default class MQTTClient {

on_close() {
logger.warn(`❌ Disconnected from Factory+ broker`);

// Flush any remaining data
this.flushBuffer();
}

on_reconnect() {
Expand All @@ -87,6 +138,8 @@ export default class MQTTClient {

on_error(error: any) {
logger.error("🚨 MQTT error: %o", error);
// Flush any remaining data
this.flushBuffer();
}

async on_message(topicString: string, message: Uint8Array | Reader) {
Expand Down Expand Up @@ -220,8 +273,6 @@ export default class MQTTClient {

if (value === null) return;

const writeApi = influxDB.getWriteApi(influxOrganisation, 'default');

writeApi.useDefaultTags({
instance: birth.instance,
schema: birth.schema,
Expand Down Expand Up @@ -281,9 +332,14 @@ export default class MQTTClient {

}

writeApi.close().then(() => {
logger.debug(`Written to InfluxDB: [${birth.type}] ${topic.address}/${birth.name} = ${value}`);
})
i++;

logger.debug(`Added to write buffer (${i}/${flushBatchSize}): [${birth.type}] ${topic.address}/${birth.name} = ${value}`);

if (i >= flushBatchSize) {
this.flushBuffer();
i = 0;
}
}

setNestedValue(obj, path, value) {
Expand Down

0 comments on commit f0edab1

Please sign in to comment.