diff --git a/packages/binding-mqtt/src/mqtt-broker-server.ts b/packages/binding-mqtt/src/mqtt-broker-server.ts index fa665b434..e87834d84 100644 --- a/packages/binding-mqtt/src/mqtt-broker-server.ts +++ b/packages/binding-mqtt/src/mqtt-broker-server.ts @@ -36,6 +36,7 @@ import { } from "@node-wot/core"; import { InteractionOptions } from "wot-typescript-definitions"; import { ActionElement, PropertyElement } from "wot-thing-description-types"; +import { Readable } from "stream"; const { info, debug, error, warn } = createLoggers("binding-mqtt", "mqtt-broker-server"); @@ -246,6 +247,7 @@ export default class MqttBrokerServer implements ProtocolServer { * For further discussion see https://github.com/eclipse/thingweb.node-wot/pull/253 */ let value; + if ("properties" in packet && "contentType" in packet.properties) { try { value = ContentSerdes.get().contentToValue( @@ -278,8 +280,11 @@ export default class MqttBrokerServer implements ProtocolServer { ), }; + const contentType = action.forms[options.formIndex].contentType ?? ContentSerdes.DEFAULT; + const inputContent = new Content(contentType, Readable.from(value)); + thing - .handleInvokeAction(segments[this.INTERACTION_NAME_SEGMENT_INDEX], value, options) + .handleInvokeAction(segments[this.INTERACTION_NAME_SEGMENT_INDEX], inputContent, options) .then((output: unknown) => { if (output) { warn( @@ -320,12 +325,11 @@ export default class MqttBrokerServer implements ProtocolServer { ), }; + const formContentType = property.forms[options.formIndex].contentType ?? ContentSerdes.DEFAULT; + const inputContent = new Content(formContentType, Readable.from(payload.toString())); + try { - thing.handleWriteProperty( - segments[this.INTERACTION_NAME_SEGMENT_INDEX], - JSON.parse(payload.toString()), - options - ); + thing.handleWriteProperty(segments[this.INTERACTION_NAME_SEGMENT_INDEX], inputContent, options); } catch (err) { error( `MqttBrokerServer at ${this.brokerURI} got error on writing to property '${