Skip to content

Commit

Permalink
Added metric timestamp to influx writePoint
Browse files Browse the repository at this point in the history
  • Loading branch information
djnewbould committed Jun 28, 2024
1 parent a362e1c commit a7f645d
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions influxdb-sparkplug-ingester/lib/mqttclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,20 @@ export default class MQTTClient {
if (!birth) {
logger.error(`Metric ${metric.alias} is unknown for ${topic.address.group}/${topic.address.node}/${topic.address.device}`);
}

const metricTimestamp = new Date(metric.timestamp);
// Send each metric to InfluxDB
this.writeToInfluxDB(birth, topic, metric.value)

this.writeToInfluxDB(birth, topic, metric.value, metricTimestamp)
});
}

writeToInfluxDB(birth, topic: Topic, value) {

/**
* Writes metric values to InfluxDB using the metric timestamp.
* @param birth Birth certificate for device.
* @param topic Topic the metric was published on.
* @param value Metric value to write to InfluxDB.
* @param timestamp Timestamp from the metric to write to influx.
*/
writeToInfluxDB(birth, topic: Topic, value, timestamp: Date) {
if (value === null) return;
if (birth.transient) {
logger.debug(`Metric ${birth.name} is transient, not writing to InfluxDB`);
Expand Down Expand Up @@ -444,7 +449,11 @@ export default class MQTTClient {
logger.warn(`${topic.address}/${path}/${metricName} should be a ${birth.type} but received ${numVal}. Not recording.`);
return;
}
writeApi.writePoint(new Point(`${metricName}:i`).intField('value', numVal));
writeApi.writePoint(
new Point(`${metricName}:i`)
.intField('value', numVal)
.timestamp(timestamp)
);
break;
case "UInt8":
case "UInt16":
Expand All @@ -456,7 +465,11 @@ export default class MQTTClient {
logger.warn(`${topic.address}/${path}/${metricName} should be a ${birth.type} but received ${numVal}. Not recording.`);
return;
}
writeApi.writePoint(new Point(`${metricName}:u`).uintField('value', numVal));
writeApi.writePoint(
new Point(`${metricName}:u`)
.uintField('value', numVal)
.timestamp(timestamp)
);
break;
case "Float":
case "Double":
Expand All @@ -466,17 +479,27 @@ export default class MQTTClient {
logger.warn(`${topic.address}/${path}/${metricName} should be a ${birth.type} but received ${numVal}. Not recording.`);
return;
}
writeApi.writePoint(new Point(`${metricName}:d`).floatField('value', numVal));
writeApi.writePoint(
new Point(`${metricName}:d`)
.floatField('value', numVal)
.timestamp(timestamp)
);
break;
case "Boolean":
if (typeof value != "boolean") {
logger.warn(`${topic.address}/${path}/${metricName} should be a ${birth.type} but received ${value}. Not recording.`);
return;
}
writeApi.writePoint(new Point(`${metricName}:b`).booleanField('value', value));
writeApi.writePoint(
new Point(`${metricName}:b`)
.booleanField('value', value)
.timestamp(timestamp));
break;
default:
writeApi.writePoint(new Point(`${metricName}:s`).stringField('value', value));
writeApi.writePoint(
new Point(`${metricName}:s`)
.stringField('value', value)
.timestamp(timestamp));
break;

}
Expand Down

0 comments on commit a7f645d

Please sign in to comment.