Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement modbus driver #314

Merged
merged 14 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions acs-edge/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repo?=acs-edge
include ${mk}/acs.js.mk

local.build:
npm install
npx tsc --project tsconfig.json

local.run: local.build
Expand Down
2 changes: 2 additions & 0 deletions acs-edge/docs/TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@
* Supply data topic names to a Device in return for addresses when
it subscribes to the connection manifold.
* Poll the manifold using connection/datatopic pairs.
* Possibly the Device should always assume we are using a 'smart'
driver, and polling should be handled by the manifold?
7 changes: 0 additions & 7 deletions acs-edge/lib/device.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,19 +286,16 @@ export class Device {
}

_handleData(obj: { [p: string]: any }, parseVals: boolean) {
logf("_handleData %s (%s) %O", this._name, parseVals, obj);
// Array to keep track of values that changed
let changedMetrics: sparkplugMetric[] = [];
// Iterate through each key in obj
for (let addr in obj) {
// Get all payload paths registered for this address
const paths = this._metrics.getPathsForAddr(addr);
logf("paths for %s: %s", addr, paths);
// Iterate through each path
paths.forEach((path) => {
// Get the complete metric according to its address and path
const metric = this._metrics.getByAddrPath(addr, path);
logf("metric for %s:%s: %O", addr, path, metric);
// If the metric can be read i.e. GET method
if (typeof metric.properties !== "undefined" && (metric.properties.method.value as string).search(
/^GET/g) > -1) {
Expand All @@ -313,8 +310,6 @@ export class Device {
this._delimiter
) : obj[addr];

logf("parsed new val: %O", newVal);

// Test if the value is a bigint and convert it to a Long. This is a hack to ensure that the
// Tahu library works - it only accepts Longs, not bigints.
if (typeof newVal === "bigint") {
Expand All @@ -334,8 +329,6 @@ export class Device {
this._payloadFormat,
this._delimiter
);
logf("updating metric %s:%s ts %s val %O",
addr, path, timestamp, newVal);

// Update the metric value and push it to the array of changed metrics
changedMetrics.push({...(this._metrics.setValueByAddrPath(addr,
Expand Down
19 changes: 14 additions & 5 deletions acs-edge/lib/devices/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ export class DriverConnection extends DeviceConnection {
const { id, msg, data, payload } = message;
if (id != this.id) return;

log(util.format("DRIVER message: %s %s", id, msg));
//log(util.format("DRIVER message: %s %s", id, msg));
switch (msg) {
case "status": return this.#msg_status(payload.toString());
case "data": return this.#msg_data(data, payload);
Expand All @@ -153,8 +153,10 @@ export class DriverConnection extends DeviceConnection {
}

#msg_status (status: string) {
const ost = this.status;
this.status = status;
log(`DRIVER [${this.id}]: status ${status}`);
log(`DRIVER [${this.id}]: status ${ost} -> ${status}`);

switch (status) {
case "READY":
this.broker.publish({
Expand All @@ -165,16 +167,23 @@ export class DriverConnection extends DeviceConnection {
this.#send_addrs();
break;
case "UP":
this.emit("open");
if (ost != "UP")
this.emit("open");
break;
case "DOWN":
this.emit("close");
case "CONF":
case "CONN":
case "AUTH":
case "ERR":
if (ost == "UP")
this.emit("close");
break;
}
}

#send_addrs () {
const addrs = {
version: 1,
addrs: Object.fromEntries(this.addrs),
groups: Object.fromEntries(
[...this.groups.entries()]
Expand All @@ -192,7 +201,7 @@ export class DriverConnection extends DeviceConnection {

#msg_data (data: string, payload: Buffer) {
const addr = this.addrs.get(data);
log(`Driver [${this.id}]: data ${data} ${addr}`);
//log(`Driver [${this.id}]: data ${data} ${addr}`);
if (addr)
this.emit("data", { [addr]: payload });
}
Expand Down
6 changes: 3 additions & 3 deletions acs-edge/lib/driverBroker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export class DriverBroker extends EventEmitter {
const { id } = client;
const { topic } = packet;

log("PUBLISH: %s %s", id, topic);
//log("PUBLISH: %s %s", id, topic);
if (packet.retain)
return callback(new Error("Retained PUBLISH forbidden"));
if (!this.acl.get(id)!.publish.test(topic))
Expand All @@ -125,7 +125,7 @@ export class DriverBroker extends EventEmitter {
}

message (topic, payload) {
log("PACKET: %s %o", topic, payload);
//log("PACKET: %s %o", topic, payload);

const match = topic.match(topicrx);
if (!match) {
Expand All @@ -142,7 +142,7 @@ export class DriverBroker extends EventEmitter {

const topic = `${prefix}/${id}/${msg}`
+ (data ? `/${data}` : "");
log("Publishing %s: %O", topic, packet);
//log("Publishing %s: %O", topic, packet);
return new Promise((resolve, reject) =>
this.broker.publish({
cmd: "publish",
Expand Down
4 changes: 0 additions & 4 deletions acs-edge/lib/helpers/typeHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,6 @@ export class Metrics {
export function parseValueFromPayload(msg: any, metric: sparkplugMetric, payloadFormat: serialisationType | string, delimiter?: string) {
let payload: any;
const path = (typeof metric.properties !== "undefined" && typeof metric.properties.path !== "undefined" ? metric.properties.path.value as string : "");
logf("parseValueFP: path [%s], fmt [%s], type [%s], msg [%o]",
path, payloadFormat, metric.type, msg);
switch (payloadFormat) {
case serialisationType.delimited:
// Handle no delimiter
Expand Down Expand Up @@ -598,8 +596,6 @@ export function typeLens(type: string): number {
*/
export function parseValFromBuffer(type: sparkplugDataType, endianness: byteOrder, byteAddr: number, buf: Buffer, bit?: number): any {

logf("parseValFB: addr %s, bit %s, end %s, typ %s, buf: %O",
byteAddr, bit, endianness, type, buf);
switch (type) {
case sparkplugDataType.boolean:
if (bit != null) {
Expand Down
10 changes: 5 additions & 5 deletions acs-edge/lib/translator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ export class Translator extends EventEmitter {
log(`Created Sparkplug node "${ids.sparkplug!}".`);

log("Starting driver broker...");
this.broker.on("message", msg =>
log(util.format("Driver message: %O", msg)));
//this.broker.on("message", msg =>
// log(util.format("Driver message: %O", msg)));
await this.broker.start();

// Create a new device connection for each type listed in config file
Expand Down Expand Up @@ -212,8 +212,8 @@ export class Translator extends EventEmitter {

// What to do when the device connection has new data from a device
newConn.on('data', (obj: { [index: string]: any }, parseVals = true) => {
log(util.format("Received data for %s: (%s) %O",
connection.name, parseVals, obj));
//log(util.format("Received data for %s: (%s) %O",
// connection.name, parseVals, obj));
connection.devices?.forEach((devConf: deviceOptions) => {
this.devices[devConf.deviceId]?._handleData(obj, parseVals);
})
Expand Down Expand Up @@ -375,4 +375,4 @@ Trying again in ${interval} seconds...`);
await timers.setTimeout(interval * 1000);
}
}
}
}
2 changes: 2 additions & 0 deletions edge-modbus/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules
tmp
33 changes: 33 additions & 0 deletions edge-modbus/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# syntax=docker/dockerfile:1

FROM node:22-alpine AS build
ARG acs_npm=NO
ARG revision=unknown

USER root
RUN <<'SHELL'
install -d -o node -g node /home/node/app
SHELL
WORKDIR /home/node/app
USER node
COPY package*.json ./
RUN <<'SHELL'
touch /home/node/.npmrc
if [ "${acs_npm}" != NO ]
then
npm config set @amrc-factoryplus:registry "${acs_npm}"
fi

npm install --save=false
SHELL
COPY --chown=node . .
RUN <<'SHELL'
echo "export const GIT_VERSION=\"$revision\";" > ./lib/git-version.js
SHELL

FROM node:22-alpine AS run
# Copy across from the build container.
WORKDIR /home/node/app
COPY --from=build --chown=root:root /home/node/app ./
USER node
CMD node bin/driver.js
6 changes: 6 additions & 0 deletions edge-modbus/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
top=..
include ${top}/mk/acs.init.mk

repo?=edge-modbus

include ${mk}/acs.js.mk
14 changes: 14 additions & 0 deletions edge-modbus/bin/driver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/* AMRC Connectivity Stack
* Modbus Edge Agent driver
* Copyright 2024 AMRC
*/

import { PolledDriver } from "@amrc-factoryplus/edge-driver";
import { modbusHandler } from "../lib/modbus.js";

const drv = new PolledDriver({
env: process.env,
handler: modbusHandler,
serial: true,
});
drv.run();
103 changes: 103 additions & 0 deletions edge-modbus/lib/modbus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/* AMRC Connectivity Stack
* Modbus Edge Agent driver
* Copyright 2024 AMRC
*/

import ModbusRTU from "modbus-serial";

const RECONNECT = 5000;

const funcs = {
input: {
read: (c, a, l) => c.readInputRegisters(a, l),
},
holding: {
read: (c, a, l) => c.readHoldingRegisters(a, l),
},
coil: {
read: (c, a, l) => c.readCoils(a, l),
},
discrete: {
read: (c, a, l) => c.readDiscreteInputs(a, l),
},
};

class ModbusHandler {
constructor (driver, conf) {
this.driver = driver;
this.conf = conf;

this.log = driver.debug.bound("modbus");

this.client = new ModbusRTU();
this.on_close = () => this.reconnect();
}

run () {
const { driver, client } = this;
const { host, port } = this.conf;

client.on("close", this.on_close);

client.connectTCP(host, { port })
.then(() => driver.setStatus("UP"))
.catch(this.on_close);

return this;
}

close () {
const { client } = this;

client.off("close", this.on_close);
return new Promise(r => client.close(r));
}

async reconnect () {
const { client, driver } = this;

this.log("Modbus connection closed");
setTimeout(() => {
this.log("Reconnecting to modbus");
client.open(e => {
driver.setStatus(e ? "CONN" : "UP");
if (e) {
this.log("Failed to connect to modbus: %s", e);
this.reconnect();
}
});
}, RECONNECT);
}

parseAddr (spec) {
const parts = spec.split(",");
if (parts.length != 4) return;

const id = Number.parseInt(parts[0]);
if (Number.isNaN(id) || id < 0) return;
const func = funcs[parts[1]];
if (!func) return;
const addr = Number.parseInt(parts[2]);
if (Number.isNaN(addr) || addr < 0) return;
const len = Number.parseInt(parts[3]);
if (Number.isNaN(len) || len < 1) return;

return { id, func, addr, len };
}

async poll (addr) {
const { client } = this;

if (!client.isOpen) return;

client.setID(addr.id);
const val = await addr.func.read(client, addr.addr, addr.len);
return val.buffer;
}
}

export function modbusHandler (driver, conf) {
if (conf.protocol != "tcp")
return;
return new ModbusHandler(driver, conf).run();
}
Loading