diff --git a/.eslintrc.json b/.eslintrc.json new file mode 100644 index 00000000..40a8234a --- /dev/null +++ b/.eslintrc.json @@ -0,0 +1,32 @@ +{ + "extends": [ + "eslint:recommended", + "plugin:@typescript-eslint/recommended-type-checked", + "plugin:@typescript-eslint/stylistic-type-checked" + ], + "plugins": [ + "@typescript-eslint" + ], + "parser": "@typescript-eslint/parser", + "parserOptions": { + "project": true + }, + "root": true, + "ignorePatterns": [ + "**/dist/**" + ], + "rules": { + "@typescript-eslint/no-unused-vars": "off", + "@typescript-eslint/restrict-template-expressions": "off", + "@typescript-eslint/no-explicit-any": "off", + "@typescript-eslint/no-unsafe-assignment": "off", + "@typescript-eslint/no-unsafe-argument": "off", + "@typescript-eslint/no-unsafe-member-access": "off", + "@typescript-eslint/no-unsafe-call": "off", + "@typescript-eslint/no-misused-promises": "off", + "@typescript-eslint/semi": [ + "error", + "always" + ] + } +} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 99ad57f1..5a6701e8 100644 --- a/.gitignore +++ b/.gitignore @@ -285,10 +285,6 @@ FakesAssemblies/ # GhostDoc plugin setting file *.GhostDoc.xml -# Node.js Tools for Visual Studio -.ntvs_analysis.dat -node_modules/ - # Visual Studio 6 build log *.plg @@ -368,3 +364,27 @@ MigrationBackup/ # Generated C protobuf files *.pb-c.* + +# Generated TS protobuf type files +**/generated +*-pb.js +*-pb.d.ts + +# TypeScript/Node/JS related files +node_modules/ +.node_modules/ +**/dist/ +scripts/eslint/built/ +scripts/debug.bat +scripts/run.bat +scripts/**/*.js +scripts/**/*.js.map +**/.DS_Store +.settings +yarn.lock +yarn-error.log +package-lock.json +.eslintcache +*v8.log +**/tsconfig.tsbuildinfo +notes.md \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 30e2962d..f9a2a5cb 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,8 +7,10 @@ "request": "launch", "mode": "debug", "program": "${workspaceFolder}/scenarios/getting_started/go/getting_started.go", - "args": ["${workspaceFolder}/scenarios/getting_started/.env"], - "cwd" : "${workspaceFolder}/scenarios/getting_started/" + "args": [ + "${workspaceFolder}/scenarios/getting_started/.env" + ], + "cwd": "${workspaceFolder}/scenarios/getting_started/" }, { "name": "getting_started/dotnet", @@ -32,7 +34,10 @@ "preLaunchTask": "build_telemetry", "program": "${workspaceFolder}/scenarios/telemetry/dotnet/telemetry_producer/bin/Debug/net7.0/telemetry_producer.dll", "cwd": "${workspaceFolder}/scenarios/telemetry/", - "args": ["--envFile", "vehicle01.env"], + "args": [ + "--envFile", + "vehicle01.env" + ], "console": "integratedTerminal", "stopAtEntry": false, "presentation": { @@ -46,7 +51,10 @@ "preLaunchTask": "build_telemetry", "program": "${workspaceFolder}/scenarios/telemetry/dotnet/telemetry_consumer/bin/Debug/net7.0/telemetry_consumer.dll", "cwd": "${workspaceFolder}/scenarios/telemetry/", - "args": ["--envFile", "map-app.env"], + "args": [ + "--envFile", + "map-app.env" + ], "console": "integratedTerminal", "stopAtEntry": false, "presentation": { @@ -54,12 +62,15 @@ } }, { - "name": "command_consumer/dotnet", + "name": "command_client/dotnet", "type": "coreclr", "request": "launch", "preLaunchTask": "build_command", - "program": "${workspaceFolder}/scenarios/command/dotnet/command_consumer/bin/Debug/net7.0/command_consumer.dll", - "args": ["--envFile", "mobile-app.env"], + "program": "${workspaceFolder}/scenarios/command/dotnet/command_client/bin/Debug/net7.0/command_client.dll", + "args": [ + "--envFile", + "mobile-app.env" + ], "cwd": "${workspaceFolder}/scenarios/command/", "console": "integratedTerminal", "stopAtEntry": false, @@ -68,12 +79,15 @@ } }, { - "name": "command_producer/dotnet", + "name": "command_server/dotnet", "type": "coreclr", "request": "launch", "preLaunchTask": "build_command", - "program": "${workspaceFolder}/scenarios/command/dotnet/command_producer/bin/Debug/net7.0/command_producer.dll", - "args": ["--envFile", "vehicle03.env"], + "program": "${workspaceFolder}/scenarios/command/dotnet/command_server/bin/Debug/net7.0/command_server.dll", + "args": [ + "--envFile", + "vehicle03.env" + ], "cwd": "${workspaceFolder}/scenarios/command/", "console": "integratedTerminal", "stopAtEntry": false, @@ -108,7 +122,9 @@ "program": "${workspaceFolder}/scenarios/telemetry/c/build/telemetry_producer", "stopAtEntry": false, "cwd": "${workspaceFolder}/scenarios/telemetry", - "args": ["${workspaceFolder}/scenarios/telemetry/vehicle01.env"], + "args": [ + "${workspaceFolder}/scenarios/telemetry/vehicle01.env" + ], "setupCommands": [ { "description": "Enable pretty-printing for gdb", @@ -129,7 +145,9 @@ "program": "${workspaceFolder}/scenarios/telemetry/c/build/telemetry_consumer", "stopAtEntry": false, "cwd": "${workspaceFolder}/scenarios/telemetry", - "args": ["${workspaceFolder}/scenarios/telemetry/map-app.env"], + "args": [ + "${workspaceFolder}/scenarios/telemetry/map-app.env" + ], "setupCommands": [ { "description": "Enable pretty-printing for gdb", @@ -150,7 +168,9 @@ "program": "${workspaceFolder}/scenarios/command/c/build/command_client", "stopAtEntry": false, "cwd": "${workspaceFolder}/scenarios/command", - "args": ["${workspaceFolder}/scenarios/command/mobile-app.env"], + "args": [ + "${workspaceFolder}/scenarios/command/mobile-app.env" + ], "setupCommands": [ { "description": "Enable pretty-printing for gdb", @@ -171,7 +191,9 @@ "program": "${workspaceFolder}/scenarios/command/c/build/command_server", "stopAtEntry": false, "cwd": "${workspaceFolder}/scenarios/command", - "args": ["${workspaceFolder}/scenarios/command/vehicle03.env"], + "args": [ + "${workspaceFolder}/scenarios/command/vehicle03.env" + ], "setupCommands": [ { "description": "Enable pretty-printing for gdb", @@ -184,26 +206,188 @@ "order": 7, "group": "C" } + }, + { + "name": "TypeScript Getting Started", + "type": "node", + "request": "launch", + "preLaunchTask": "build_ts_getting_started", + "program": "${workspaceFolder}/scenarios/getting_started/ts/gettingStarted/src/index.ts", + "args": [ + "--env-file", + ".env" + ], + "cwd": "${workspaceFolder}/scenarios/getting_started", + "console": "integratedTerminal", + "smartStep": true, + "showAsyncStacks": true, + "sourceMaps": true, + "outFiles": [ + "${workspaceFolder}/mqttclients/ts/mqttjsClientExtensions/dist/**/*.js", + "${workspaceFolder}/scenarios/getting_started/ts/gettingStarted/dist/**/*.js" + ], + "skipFiles": [ + "/**" + ], + "presentation": { + "group": "ts" + } + }, + { + "name": "TypeScript Telemetry Producer", + "type": "node", + "request": "launch", + "preLaunchTask": "build_ts_telemetryproducer", + "program": "${workspaceFolder}/scenarios/telemetry/ts/telemetryProducer/src/index.ts", + "args": [ + "--env-file", + "vehicle01.env" + ], + "cwd": "${workspaceFolder}/scenarios/telemetry", + "console": "integratedTerminal", + "smartStep": true, + "showAsyncStacks": true, + "sourceMaps": true, + "outFiles": [ + "${workspaceFolder}/mqttclients/ts/mqttjsClientExtensions/dist/**/*.js", + "${workspaceFolder}/scenarios/telemetry/ts/telemetryProducer/dist/**/*.js" + ], + "skipFiles": [ + "/**" + ], + "presentation": { + "group": "ts" + } + }, + { + "name": "TypeScript Telemetry Consumer", + "type": "node", + "request": "launch", + "preLaunchTask": "build_ts_telemetryconsumer", + "program": "${workspaceFolder}/scenarios/telemetry/ts/telemetryConsumer/src/index.ts", + "args": [ + "--env-file", + "map-app.env" + ], + "cwd": "${workspaceFolder}/scenarios/telemetry", + "console": "integratedTerminal", + "smartStep": true, + "showAsyncStacks": true, + "sourceMaps": true, + "outFiles": [ + "${workspaceFolder}/mqttclients/ts/mqttjsClientExtensions/dist/**/*.js", + "${workspaceFolder}/scenarios/telemetry/ts/telemetryConsumer/dist/**/*.js" + ], + "skipFiles": [ + "/**" + ], + "presentation": { + "group": "ts" + } + }, + { + "name": "TypeScript Command Server", + "type": "node", + "request": "launch", + "preLaunchTask": "build_ts_commandserver", + "program": "${workspaceFolder}/scenarios/command/ts/commandServer/src/index.ts", + "args": [ + "--env-file", + "vehicle03.env" + ], + "cwd": "${workspaceFolder}/scenarios/command", + "console": "integratedTerminal", + "smartStep": true, + "showAsyncStacks": true, + "sourceMaps": true, + "outFiles": [ + "${workspaceFolder}/mqttclients/ts/mqttjsClientExtensions/dist/**/*.js", + "${workspaceFolder}/scenarios/command/ts/commandServer/dist/**/*.js" + ], + "skipFiles": [ + "/**" + ], + "presentation": { + "group": "ts" + } + }, + { + "name": "TypeScript Command Client", + "type": "node", + "request": "launch", + "preLaunchTask": "build_ts_commandclient", + "program": "${workspaceFolder}/scenarios/command/ts/commandClient/src/index.ts", + "args": [ + "--env-file", + "mobile-app.env" + ], + "cwd": "${workspaceFolder}/scenarios/command", + "console": "integratedTerminal", + "smartStep": true, + "showAsyncStacks": true, + "sourceMaps": true, + "outFiles": [ + "${workspaceFolder}/mqttclients/ts/mqttjsClientExtensions/dist/**/*.js", + "${workspaceFolder}/scenarios/command/ts/commandClient/dist/**/*.js" + ], + "skipFiles": [ + "/**" + ], + "presentation": { + "group": "ts" + } + }, + { + "name": "TypeScript Alert", + "type": "node", + "request": "launch", + "preLaunchTask": "build_ts_alert", + "program": "${workspaceFolder}/scenarios/alert/ts/alert/src/index.ts", + "args": [], + "cwd": "${workspaceFolder}/scenarios/alert", + "console": "integratedTerminal", + "smartStep": true, + "showAsyncStacks": true, + "sourceMaps": true, + "outFiles": [ + "${workspaceFolder}/mqttclients/ts/mqttjsClientExtensions/dist/**/*.js", + "${workspaceFolder}/scenarios/alert/ts/alert/dist/**/*.js" + ], + "skipFiles": [ + "/**" + ], + "presentation": { + "group": "ts" + } } ], "compounds": [ { "name": "telemetry/dotnet", - "configurations": ["telemetry_producer/dotnet", "telemetry_consumer/dotnet" ], + "configurations": [ + "telemetry_producer/dotnet", + "telemetry_consumer/dotnet" + ], "presentation": { "group": "dotnet" } }, { "name": "command/dotnet", - "configurations": ["command_producer/dotnet", "command_consumer/dotnet" ], + "configurations": [ + "command_server/dotnet", + "command_client/dotnet" + ], "presentation": { "group": "dotnet" } }, { "name": "C Telemetry", - "configurations": ["C Telemetry Consumer", "C Telemetry Producer" ], + "configurations": [ + "C Telemetry Consumer", + "C Telemetry Producer" + ], "stopAll": true, "presentation": { "order": 2, @@ -212,7 +396,10 @@ }, { "name": "C Command", - "configurations": ["C Command Server", "C Command Client" ], + "configurations": [ + "C Command Server", + "C Command Client" + ], "stopAll": true, "presentation": { "order": 3, @@ -220,4 +407,4 @@ } } ] -} +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..92c9a505 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "dotnet.defaultSolution": "disable", + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + }, + "python.formatting.provider": "none", + "cmake.configureOnOpen": false +} \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json index d7872728..262a575d 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -59,6 +59,78 @@ "label": "Build C Command", "type": "shell", "command": "cmake --preset=command;cmake --build --preset=command" + }, + { + "label": "build_ts_getting_started", + "type": "shell", + "options": { + "cwd": "${workspaceFolder}/scenarios/getting_started/ts/gettingStarted" + }, + "command": "node ${workspaceFolder}/node_modules/.bin/tsc -p .", + "problemMatcher": [ + "$tsc" + ], + "group": "build" + }, + { + "label": "build_ts_telemetryproducer", + "type": "shell", + "options": { + "cwd": "${workspaceFolder}/scenarios/telemetry/ts/telemetryProducer" + }, + "command": "node ${workspaceFolder}/node_modules/.bin/tsc -p .", + "problemMatcher": [ + "$tsc" + ], + "group": "build" + }, + { + "label": "build_ts_telemetryconsumer", + "type": "shell", + "options": { + "cwd": "${workspaceFolder}/scenarios/telemetry/ts/telemetryConsumer" + }, + "command": "node ${workspaceFolder}/node_modules/.bin/tsc -p .", + "problemMatcher": [ + "$tsc" + ], + "group": "build" + }, + { + "label": "build_ts_commandserver", + "type": "shell", + "options": { + "cwd": "${workspaceFolder}/scenarios/command/ts/commandServer" + }, + "command": "node ${workspaceFolder}/node_modules/.bin/tsc -p .", + "problemMatcher": [ + "$tsc" + ], + "group": "build" + }, + { + "label": "build_ts_commandclient", + "type": "shell", + "options": { + "cwd": "${workspaceFolder}/scenarios/command/ts/commandClient" + }, + "command": "node ${workspaceFolder}/node_modules/.bin/tsc -p .", + "problemMatcher": [ + "$tsc" + ], + "group": "build" + }, + { + "label": "build_ts_alert", + "type": "shell", + "options": { + "cwd": "${workspaceFolder}/scenarios/alert/ts/alert" + }, + "command": "node ${workspaceFolder}/node_modules/.bin/tsc -p .", + "problemMatcher": [ + "$tsc" + ], + "group": "build" } ] -} +} \ No newline at end of file diff --git a/README.md b/README.md index 71483398..349b5982 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ languages: - csharp - c - python +- TypeScript - go name: "MQTT Application Samples" description: "Guidance to build Pub/Sub applications targeting MQTT Brokers." @@ -15,12 +16,13 @@ products: | [Setup](./Setup.md) | [Getting Started](./scenarios/getting_started/) | [Telemetry](./scenarios/telemetry/) | [Command](./scenarios/command/) | -These samples provide guidance to build Pub/Sub applications targeting MQTT Brokers in different programming languages. The samples are provided in different programming languages: +These samples provide guidance to build Pub/Sub applications targeting MQTT Brokers in different programming languages. The samples are provided in these programming languages: - C# - C - Go - Python +- TypeScript The instructions are provided for the following MQTT Brokers: - **Azure Event Grid Namespaces** @@ -51,6 +53,7 @@ Each language requires developer tools, such as compilers and SDKs to build and - [C](./mqttclients/c/README.md) - [Go](./mqttclients/go/README.md) - Python (TBD) +- [TypeScript](./mqttclients/ts/README.md) # Scenarios @@ -65,13 +68,13 @@ Each scenario requires the following configurations: Follow the instructions in the [Prerequisites](#magic_wand-prerequisites) to configure these scenarios. -| Scenario | Description | dotnet | C | python | go | -| -------- | ------------|--------|---|------- | -- | -| [Getting Started](./scenarios/getting_started/) | This quick start scenario simulates basic MQTT tasks.| ✓| ✓| ✓| ✓ | -| [JWT Authentication](./scenarios/jwt_authentication/) | This is a quick start scenario that authenticates to Azure Event Grid using Json Web Tokens (JWT) | ✓| soon| soon| soon | -| [Telemetry](./scenarios/telemetry/) | This scenario simulates multiple clients (the producers) sending data to a different set of topics to be consumed by a single application (the consumer). | ✓| ✓| ✓| soon | -| [Command](./scenarios/command/) | This scenario simulates the request-response messaging pattern using MQTT v5. | ✓| ✓ | soon | soon | -| [Alert](./scenarios/alert/) | This scenario simulates a fan-out use case where multiple clients receive a singlemessage from the same topic. | ✓| soon| soon| soon | +| Scenario | Description | dotnet | C | python | go | TypeScript | +| :------- | :---------- | :----: |:-:| :-----:|:-: | :--------: | +| [Getting Started](./scenarios/getting_started/) | This quick start scenario simulates basic MQTT tasks.| ✓| ✓| ✓| ✓ | ✓ | +| [JWT Authentication](./scenarios/jwt_authentication/) | This is a quick start scenario that authenticates to Azure Event Grid using Json Web Tokens (JWT) | ✓| soon| soon| soon | soon | +| [Telemetry](./scenarios/telemetry/) | This scenario simulates multiple clients (the producers) sending data to a different set of topics to be consumed by a single application (the consumer). | ✓| ✓| ✓| soon | ✓ | +| [Command](./scenarios/command/) | This scenario simulates the request-response messaging pattern using MQTT v5. | ✓| ✓ | soon | soon | ✓ | +| [Alert](./scenarios/alert/) | This scenario simulates a fan-out use case where multiple clients receive a singlemessage from the same topic. | ✓| soon| soon| soon | soon | >note: soon: in progress and will be added soon diff --git a/Setup.md b/Setup.md index 98baa8ae..547c1b12 100644 --- a/Setup.md +++ b/Setup.md @@ -181,3 +181,17 @@ See [c extensions](./mqttclients/c/README.md) for more details. ### Python Python samples have been tested with python 3.10.4, to install follow the instructions from https://www.python.org/downloads/ + +### TypeScript + +TypeScript samples have been tested with NodeJS version 18.16.0 and NPM version 9.5.1. Version 18 or higher of NodeJS and version 8 or higher of NPM is required. See https://nodejs.org. The samples are written using the [MQTT.js library](https://www.npmjs.com/package/mqtt). + +The TypeScript samples are built using [TypeScript ESLint](https://typescript-eslint.io/blog/announcing-typescript-eslint-v6/) for Visual Studio Code and [TypeScript project references](https://www.typescriptlang.org/docs/handbook/project-references.html). This allows abstracted client and utility classes to be separate dependent projects of the main example scenario projects. + +To setup the initial project references and build the dependencies run the following commands from the main repository root directory: +```bash +npm i +npm run build +``` + +Each of the samples can be run and debugged either from [Visual Studio Code](https://code.visualstudio.com/), or from the command line. See the README file in each scenario for specific instructions. diff --git a/mqttclients/ts/mqttjsClientExtensions/package.json b/mqttclients/ts/mqttjsClientExtensions/package.json new file mode 100644 index 00000000..96fbab31 --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/package.json @@ -0,0 +1,28 @@ +{ + "name": "@mqttapplicationsamples/mqttjsclientextensions", + "version": "1.0.0", + "description": "Extensions and utilities to use with MQTTjs", + "main": "dist/index.js", + "scripts": { + "build": "tsc --build", + "clean": "tsc --build --clean", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "mqtt", + "iot", + "azure" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "dependencies": { + "mqtt-match": "^3.0.0", + "mqtt-pattern": "^2.1.0", + "pino": "^8.16.2", + "pino-pretty": "^10.2.3", + "uuid": "^9.0.1" + }, + "devDependencies": { + "@types/uuid": "^9.0.7" + } +} \ No newline at end of file diff --git a/mqttclients/ts/mqttjsClientExtensions/src/commandClient.ts b/mqttclients/ts/mqttjsClientExtensions/src/commandClient.ts new file mode 100644 index 00000000..36ca1cfc --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/commandClient.ts @@ -0,0 +1,94 @@ +import { + IPublishPacket, + MqttClient +} from 'mqtt'; +import { v4 as uuidV4 } from 'uuid'; +import { + logger, + DeferredPromise, + IMessageSerializer +} from './'; + +const ModuleName = 'CommandClient'; + +export class CommandClient +{ + private mqttClient: MqttClient; + private commandName: string; + private requestTopicPattern: string; + private requestTopic: string; + private responseTopicPattern: string; + private responseTopic: string; + private requestSerializer: IMessageSerializer; + private responseSerializer: IMessageSerializer; + private correlationId: string; + private deferredPromise: DeferredPromise; + + constructor(mqttClient: MqttClient, requestTopicPattern: string, responseTopicPattern: string, commandName: string, requestSerializer: IMessageSerializer, responseSerializer: IMessageSerializer) { + this.mqttClient = mqttClient; + this.commandName = commandName; + this.requestTopicPattern = requestTopicPattern; + this.responseTopicPattern = responseTopicPattern; + this.requestSerializer = requestSerializer; + this.responseSerializer = responseSerializer; + + this.mqttClient.on('message', (topic: string, payload: Buffer, packet: IPublishPacket) => { + if (topic === this.responseTopic) { + if (packet.properties?.contentType !== responseSerializer.contentType) { + logger.error({ tags: [ModuleName] }, `Message received on topic ${topic} but with invalid content type. Expected ${this.responseSerializer.contentType} - received ${packet.properties?.contentType}`); + } + + const responseCorrelationId = (packet.properties?.correlationData ?? '').toString(); + + if (responseCorrelationId !== this.correlationId) { + logger.error({ tags: [ModuleName] }, `Message received on topic ${topic} but correlationId does not match. Expected ${this.correlationId} - received ${responseCorrelationId}`); + } + + logger.info({ tags: [ModuleName] }, `Message received on topic: ${topic} with correlationId: ${responseCorrelationId}`); + + const response = this.responseSerializer.fromBytes(payload); + + return this.deferredPromise.resolve(response); + } + }); + } + + public async invokeAsync(clientId: string, request: T, timeoutInMilliSeconds = 5000): Promise { + this.requestTopic = this.requestTopicPattern.replace('{clientId}', clientId).replace('{commandName}', this.commandName); + this.responseTopic = this.responseTopicPattern.replace('{clientId}', clientId).replace('{commandName}', this.commandName); + + this.correlationId = uuidV4(); + + await this.mqttClient.subscribeAsync(this.responseTopic, { + qos: 1, + }); + + const requestBytes = this.requestSerializer.toBytes(request); + const pubAck = await this.mqttClient.publishAsync(this.requestTopic, requestBytes, { + qos: 1, + properties: { + contentType: this.requestSerializer.contentType, + correlationData: Buffer.from(this.correlationId), + responseTopic: this.responseTopic, + userProperties: { + status: '200' + } + } + }); + + logger.info({ tags: [ModuleName] }, `Published command request on topic: ${this.requestTopic}, with mid: ${pubAck?.messageId}, correlationId: ${this.correlationId}`); + + this.deferredPromise = new DeferredPromise(); + return this.promiseWithTimeout(this.deferredPromise.promise, timeoutInMilliSeconds); + } + + private async promiseWithTimeout(promise: Promise, timeoutInMilliSeconds: number): Promise { + const timeout = new Promise((_, reject) => { + setTimeout(() => { + reject(new Error(`Command response timed out after ${timeoutInMilliSeconds} milliseconds`)); + }, timeoutInMilliSeconds); + }); + + return Promise.race([promise, timeout]); + } +} diff --git a/mqttclients/ts/mqttjsClientExtensions/src/commandServer.ts b/mqttclients/ts/mqttjsClientExtensions/src/commandServer.ts new file mode 100644 index 00000000..a8665752 --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/commandServer.ts @@ -0,0 +1,80 @@ +import { + IPublishPacket, + MqttClient +} from 'mqtt'; +import { + logger, + IMessageSerializer +} from './'; + +const ModuleName = 'CommandServer'; + +export class CommandServer +{ + private mqttClient: MqttClient; + private requestTopic = ''; + private commandName: string; + private requestSerializer: IMessageSerializer; + private responseSerializer: IMessageSerializer; + public onCommandReceived: (request: T) => TResp; + + constructor(mqttClient: MqttClient, requestTopicPattern: string, commandName: string, requestSerializer: IMessageSerializer, responseSerializer: IMessageSerializer) { + this.mqttClient = mqttClient; + this.commandName = commandName; + this.requestSerializer = requestSerializer; + this.responseSerializer = responseSerializer; + + this.requestTopic = requestTopicPattern.replace('{clientId}', mqttClient.options.clientId ?? '').replace('{commandName}', this.commandName); + + this.mqttClient.on('message', async (topic: string, payload: Buffer, packet: IPublishPacket) => { + if (topic === this.requestTopic) { + if (packet.properties?.contentType !== requestSerializer.contentType) { + throw new Error(`Invalid content type. Expected :${this.requestSerializer.contentType} Actual :${packet.properties?.contentType}`); + } + + const request = this.requestSerializer.fromBytes(payload); + const responseTopic = packet.properties.responseTopic ?? ''; + const requestCorrelationId = packet.properties?.correlationData; + + try { + logger.info({ tags: [ModuleName] }, `Received command request on topic: ${this.requestTopic}, with correlationId: ${requestCorrelationId?.toString()}`); + + const response = this.onCommandReceived(request); + const respBytes = this.responseSerializer.toBytes(response); + + const pubAck = await this.mqttClient.publishAsync(responseTopic, respBytes, { + qos: 1, + properties: { + contentType: this.responseSerializer.contentType, + correlationData: packet.properties?.correlationData, + userProperties: { + status: '200' + } + } + }); + + logger.info({ tags: [ModuleName] }, `Published success response with mid: ${pubAck?.messageId} on topic: ${responseTopic}, with correlationId: ${requestCorrelationId?.toString()}`); + } + catch (ex) { + const pubAck = await this.mqttClient.publishAsync(responseTopic, Buffer.from(ex.message), { + qos: 1, + properties: { + correlationData: packet.properties?.correlationData, + userProperties: { + status: '500' + } + } + }); + + logger.warn({ tags: [ModuleName] }, `Published error response with mid: ${pubAck?.messageId} on topic: ${responseTopic}`); + } + } + }); + } + + public async startAsync(): Promise { + await this.mqttClient.subscribeAsync(this.requestTopic, { + qos: 1 + }); + } +} \ No newline at end of file diff --git a/mqttclients/ts/mqttjsClientExtensions/src/deferredPromise.ts b/mqttclients/ts/mqttjsClientExtensions/src/deferredPromise.ts new file mode 100644 index 00000000..7909cb76 --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/deferredPromise.ts @@ -0,0 +1,20 @@ +export class DeferredPromise { + public then: T; + public catch: T; + public resolve: (value: T | PromiseLike) => void; + public reject: (value: T | PromiseLike) => void; + private promiseInternal: Promise; + + constructor() { + this.promiseInternal = new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + }); + this.then = this.promiseInternal.then.bind(this.promiseInternal); + this.catch = this.promiseInternal.catch.bind(this.promiseInternal); + } + + public get promise(): Promise { + return this.promiseInternal; + } +} diff --git a/mqttclients/ts/mqttjsClientExtensions/src/geoJsonPoint.ts b/mqttclients/ts/mqttjsClientExtensions/src/geoJsonPoint.ts new file mode 100644 index 00000000..b8f592df --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/geoJsonPoint.ts @@ -0,0 +1,9 @@ +export class GeoJsonPoint { + constructor(x: number, y: number) { + this.coordinates[0] = x; + this.coordinates[1] = y; + } + + public type = 'Point'; + public coordinates: number[] = [0, 0]; +} diff --git a/mqttclients/ts/mqttjsClientExtensions/src/index.ts b/mqttclients/ts/mqttjsClientExtensions/src/index.ts new file mode 100644 index 00000000..79eff261 --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/index.ts @@ -0,0 +1,27 @@ +import { logger } from './logger'; +import { DeferredPromise } from './deferredPromise'; +import { GeoJsonPoint } from './geoJsonPoint'; +import { MqttConnectionSettings } from './mqttConnectionSettings'; +import { SampleMqttClient } from './sampleMqttClient'; +import { IMessageSerializer } from './messageSerializer'; +import { Utf8JsonSerializer } from './utf8JsonSerializer'; +import { TelemetryMessage } from './telemetryMessage'; +import { TelemetryProducer } from './telemetryProducer'; +import { TelemetryConsumer } from './telemetryConsumer'; +import { CommandServer } from './commandServer'; +import { CommandClient } from './commandClient'; + +export { + logger, + DeferredPromise, + GeoJsonPoint, + MqttConnectionSettings, + SampleMqttClient, + IMessageSerializer, + Utf8JsonSerializer, + TelemetryMessage, + TelemetryProducer, + TelemetryConsumer, + CommandServer, + CommandClient +}; \ No newline at end of file diff --git a/mqttclients/ts/mqttjsClientExtensions/src/logger.ts b/mqttclients/ts/mqttjsClientExtensions/src/logger.ts new file mode 100644 index 00000000..ff39b802 --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/logger.ts @@ -0,0 +1,18 @@ +import Pino from 'pino'; + +export const logger = Pino({ + transport: { + target: 'pino-pretty', + options: { + colorize: true, + messageFormat: '[{tags}] {msg}', + translateTime: 'SYS:yyyy-mm-dd"T"HH:MM:sso', + ignore: 'pid,hostname,tags,msg' + } + }, + serializers: { + tags: (tags: string[]) => { + return `${tags}`; + } + } +}); diff --git a/mqttclients/ts/mqttjsClientExtensions/src/messageSerializer.ts b/mqttclients/ts/mqttjsClientExtensions/src/messageSerializer.ts new file mode 100644 index 00000000..f6fefc8e --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/messageSerializer.ts @@ -0,0 +1,5 @@ +export interface IMessageSerializer { + contentType: string; + toBytes(payload: T): Buffer; + fromBytes(bytes: Buffer): T; +} \ No newline at end of file diff --git a/mqttclients/ts/mqttjsClientExtensions/src/mqttConnectionSettings.ts b/mqttclients/ts/mqttjsClientExtensions/src/mqttConnectionSettings.ts new file mode 100644 index 00000000..f651570b --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/mqttConnectionSettings.ts @@ -0,0 +1,151 @@ +import { config } from 'dotenv'; +import { IClientOptions } from 'mqtt'; +import * as fs from 'fs'; + +enum AuthType { + X509 = 'X509', + Basic = 'Basic' +} + +// const ModuleName = 'MqttConnectionSettings'; +const ConnectTimeoutInSeconds = 10; + +const defaultHostname = 'localhost'; +const defaultKeepAliveInSeconds = 30; +const defaultCleanSession = true; +const defaultTcpPort = 8883; +const defaultUseTls = true; +const defaultDisableCrl = false; + +export class MqttConnectionSettings { + private _hostname = defaultHostname; + public clientId = ''; + public certFile = ''; + public keyFile = ''; + public keyFilePassword = ''; + public username = ''; + public password = ''; + public keepAliveInSeconds: number = defaultKeepAliveInSeconds; + public cleanSession: boolean = defaultCleanSession; + public tcpPort: number = defaultTcpPort; + public useTls: boolean = defaultUseTls; + public caFile = ''; + public disableCrl: boolean = defaultDisableCrl; + + constructor(hostname: string) { + this._hostname = hostname; + } + + public get hostname(): string { + return this._hostname; + } + + public get auth(): AuthType { + return this.certFile ? AuthType.X509 : AuthType.Basic; + } + + public static fromConnectionString(connectionString: string): MqttConnectionSettings { + let cs: MqttConnectionSettings; + + try { + const csElements = connectionString.split(';'); + const csMap = new Map(); + for (const element of csElements) { + const [key, value] = element.split('='); + csMap.set(key, value); + } + const csObj = Object.fromEntries(csMap); + + cs = new MqttConnectionSettings(csObj.HostName); + cs.clientId = csObj?.ClientId ?? ''; + cs.keyFile = csObj?.KeyFile ?? ''; + cs.certFile = csObj?.CertFile ?? ''; + cs.username = csObj?.Username ?? ''; + cs.password = csObj?.Password ?? ''; + cs.keepAliveInSeconds = Number(csObj?.KeepAliveInSeconds ?? defaultKeepAliveInSeconds); + cs.cleanSession = Boolean(csObj?.CleanSession === undefined ? defaultCleanSession : csObj?.CleanSession); + cs.tcpPort = Number(csObj?.TcpPort ?? defaultTcpPort); + cs.useTls = Boolean(csObj?.UseTls === undefined ? defaultUseTls : csObj?.UseTls); + cs.caFile = csObj?.CaFile ?? ''; + cs.disableCrl = Boolean(csObj?.DisableCrl === undefined ? defaultDisableCrl : csObj?.DisableCrl); + } + catch (ex) { + throw new Error(`Error while parsing connection string: ${ex.message}`); + } + + return cs; + } + + public static createFromEnvVars(envFilePath = '.env'): MqttConnectionSettings { + // Load environment variables from .env file using the dotenv package + const envConfig = config({ path: envFilePath }); + + if (envConfig.error) { + throw new Error(envConfig.error.message); + } + + const hostname = envConfig.parsed?.MQTT_HOST_NAME; + + if (!hostname) { + throw new Error('MQTT_HOST_NAME environment variable is not set'); + } + + if (envConfig.parsed?.MQTT_PASSWORD && !envConfig.parsed?.MQTT_USERNAME) { + throw new Error('MQTT_USERNAME environment variable is required if MQTT_PASSWORD is set'); + } + + const cs = new MqttConnectionSettings(hostname); + cs.tcpPort = Number(envConfig.parsed?.MQTT_TCP_PORT ?? defaultTcpPort); + cs.useTls = Boolean(envConfig.parsed?.MQTT_USE_TLS === undefined ? defaultUseTls : envConfig.parsed?.MQTT_USE_TLS); + cs.cleanSession = Boolean(envConfig.parsed?.MQTT_CLEAN_SESSION === undefined ? defaultCleanSession : envConfig.parsed?.MQTT_CLEAN_SESSION); + cs.keepAliveInSeconds = Number(envConfig.parsed?.MQTT_KEEP_ALIVE_IN_SECONDS ?? defaultKeepAliveInSeconds); + cs.clientId = envConfig.parsed?.MQTT_CLIENT_ID ?? ''; + cs.username = envConfig.parsed?.MQTT_USERNAME ?? ''; + cs.password = envConfig.parsed?.MQTT_PASSWORD ?? ''; + cs.certFile = envConfig.parsed?.MQTT_CERT_FILE ?? ''; + cs.keyFile = envConfig.parsed?.MQTT_KEY_FILE ?? ''; + cs.caFile = envConfig.parsed?.MQTT_CA_FILE ?? ''; + + return cs; + } + + public static createMqttClientOptions(cs: MqttConnectionSettings): IClientOptions { + const mqttClientOptions: IClientOptions = { + clientId: cs.clientId, + protocol: 'mqtt', + host: cs.hostname, + port: cs.tcpPort, + keepalive: cs.keepAliveInSeconds, + connectTimeout: ConnectTimeoutInSeconds * 1000, + rejectUnauthorized: true, + manualConnect: true, + clean: cs.cleanSession, + protocolVersion: 5 + }; + + try { + if (cs.username) { + mqttClientOptions.username = cs.username; + mqttClientOptions.password = cs.password; + } + + if (cs.useTls) { + mqttClientOptions.protocol = 'mqtts'; + } + + if (cs.certFile) { + mqttClientOptions.cert = fs.readFileSync(cs.certFile); + mqttClientOptions.key = fs.readFileSync(cs.keyFile); + } + + if (cs.caFile) { + mqttClientOptions.ca = fs.readFileSync(cs.caFile); + } + } + catch (ex) { + throw new Error(`Error while creating client options: ${ex.message}`); + } + + return mqttClientOptions; + } +} diff --git a/mqttclients/ts/mqttjsClientExtensions/src/sampleMqttClient.ts b/mqttclients/ts/mqttjsClientExtensions/src/sampleMqttClient.ts new file mode 100644 index 00000000..3890370b --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/sampleMqttClient.ts @@ -0,0 +1,110 @@ +import { logger } from './logger'; +import { MqttConnectionSettings } from './mqttConnectionSettings'; +import { + ErrorWithReasonCode, + MqttClient, + connect as mqttConnect +} from 'mqtt'; + +const ModuleName = 'SampleMqttClient'; + +export class SampleMqttClient { + public static createFromConnectionSettings(cs: MqttConnectionSettings): SampleMqttClient { + let mqttSampleClient: SampleMqttClient = null as any; + + try { + logger.info({ tags: [ModuleName] }, `Creating instance of SampleMqttClient for clientId: ${cs.clientId}`); + + const mqttClient = mqttConnect(MqttConnectionSettings.createMqttClientOptions(cs)); + + mqttSampleClient = new SampleMqttClient(mqttClient); + } + catch (ex) { + logger.error({ tags: [ModuleName] }, `Error while creating instance of SampleMqttClient: ${ex.message}`); + } + + return mqttSampleClient; + } + + public mqttClient: MqttClient; + + constructor(mqttClient: MqttClient) { + this.mqttClient = mqttClient; + } + + public async connectAsync(): Promise { + try { + this.mqttClient.on('close', this.onClose.bind(this)); + this.mqttClient.on('end', this.onEnd.bind(this)); + this.mqttClient.on('reconnect', this.onReconnect.bind(this)); + this.mqttClient.on('offline', this.onOffline.bind(this)); + this.mqttClient.on('error', this.onError.bind(this)); + + // Connect to MQTT broker + logger.info({ tags: [ModuleName] }, `Starting connection for clientId: ${this.mqttClient.options.clientId}`); + + this.mqttClient.connect(); + + await new Promise((resolve) => { + const interval = setInterval(() => { + if (this.mqttClient.connected) { + clearInterval(interval); + + return resolve(); + } + }, 1000); + }); + + if (!this.mqttClient.connected) { + await this.mqttClient.endAsync(true); + + throw new Error('Unable to connect to MQTT broker'); + } + + logger.info({ tags: [ModuleName] }, `MQTT client connected - clientId: ${this.mqttClient.options.clientId}`); + } + catch (ex) { + logger.error({ tags: [ModuleName] }, `MQTT client connect error: ${ex.message}`); + } + } + + private onClose(): void { + logger.info({ tags: [ModuleName] }, 'MQTT broker connection closed'); + } + + private onEnd(): void { + logger.info({ tags: [ModuleName] }, 'MQTT broker connection ended'); + } + + private onReconnect(): void { + logger.info({ tags: [ModuleName] }, 'MQTT broker session re-connected'); + } + + private onOffline(): void { + logger.info({ tags: [ModuleName] }, 'MQTT broker connection is offline'); + } + + private onError(error: Error | ErrorWithReasonCode): void { + logger.error({ tags: [ModuleName] }, `MQTT client error:`); + + if ((error as ErrorWithReasonCode)?.code) { + logger.error({ tags: [ModuleName] }, ` - reason code: ${(error as ErrorWithReasonCode).code}`); + + if ((error as ErrorWithReasonCode)?.message) { + logger.error({ tags: [ModuleName] }, ` - message: ${(error as ErrorWithReasonCode).message}`); + } + else { + const errors = (error as any).errors; + if (errors && Array.isArray(errors)) { + for (const subError of errors) { + logger.error({ tags: [ModuleName] }, ` - message: ${subError.message}`); + } + } + } + } + + logger.error({ tags: [ModuleName] }, `Terminating the sample...`); + + process.exit(1); + } +} diff --git a/mqttclients/ts/mqttjsClientExtensions/src/telemetryConsumer.ts b/mqttclients/ts/mqttjsClientExtensions/src/telemetryConsumer.ts new file mode 100644 index 00000000..2b81ec2f --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/telemetryConsumer.ts @@ -0,0 +1,35 @@ +import { + IPublishPacket, + MqttClient +} from 'mqtt'; +import { matches as mqttMatches } from 'mqtt-pattern'; +import { + IMessageSerializer, + TelemetryMessage +} from './'; + +export class TelemetryConsumer { + private mqttClient: MqttClient; + private serializer: IMessageSerializer; + private topicPattern: string; + public onTelemetryReceived: (msg: TelemetryMessage) => Promise; + + public constructor(mqttClient: MqttClient, serializer: IMessageSerializer, topicPattern: string) { + this.mqttClient = mqttClient; + this.serializer = serializer; + this.topicPattern = topicPattern; + } + + public async startAsync(): Promise { + this.mqttClient.on('message', (topic: string, payload: Buffer, _packet: IPublishPacket) => { + if (mqttMatches(this.topicPattern, topic)) { + const segments = topic.split('/'); + const msg = new TelemetryMessage(segments[1], this.serializer.fromBytes(payload)); + + void this.onTelemetryReceived(msg); + } + }); + + await this.mqttClient.subscribeAsync(this.topicPattern, { qos: 1 }); + } +} \ No newline at end of file diff --git a/mqttclients/ts/mqttjsClientExtensions/src/telemetryMessage.ts b/mqttclients/ts/mqttjsClientExtensions/src/telemetryMessage.ts new file mode 100644 index 00000000..3ce161f6 --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/telemetryMessage.ts @@ -0,0 +1,10 @@ +export class TelemetryMessage +{ + public clientIdFromTopic: string; + public payload: T; + + constructor(clientIdFromTopic: string, payload: T) { + this.clientIdFromTopic = clientIdFromTopic; + this.payload = payload; + } +} diff --git a/mqttclients/ts/mqttjsClientExtensions/src/telemetryProducer.ts b/mqttclients/ts/mqttjsClientExtensions/src/telemetryProducer.ts new file mode 100644 index 00000000..7265c44b --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/telemetryProducer.ts @@ -0,0 +1,30 @@ +import { + IPublishPacket, + MqttClient +} from 'mqtt'; +import { + QoS +} from 'mqtt-packet'; +import { IMessageSerializer } from './'; + +export class TelemetryProducer +{ + private mqttClient: MqttClient; + private telemetryTopic: string; + private serializer: IMessageSerializer; + + constructor(mqttClient: MqttClient, serializer: IMessageSerializer, topicPattern: string) { + this.mqttClient = mqttClient; + this.serializer = serializer; + this.telemetryTopic = topicPattern.replace('{clientId}', mqttClient.options.clientId!); + } + + public async SendTelemetryAsync(message: T, qos: QoS = 1, retain = false): Promise { + const pubAck = await this.mqttClient.publishAsync(this.telemetryTopic, this.serializer.toBytes(message), { + qos, + retain + }); + + return pubAck as IPublishPacket; + } +} diff --git a/mqttclients/ts/mqttjsClientExtensions/src/utf8JsonSerializer.ts b/mqttclients/ts/mqttjsClientExtensions/src/utf8JsonSerializer.ts new file mode 100644 index 00000000..8b2daf02 --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/src/utf8JsonSerializer.ts @@ -0,0 +1,13 @@ +import { IMessageSerializer } from "."; + +export class Utf8JsonSerializer implements IMessageSerializer { + public contentType = 'application/json'; + + public fromBytes(payload: Buffer): T { + return JSON.parse(payload.toString('utf8')) as T; + } + + public toBytes(payload: T): Buffer { + return Buffer.from(JSON.stringify(payload), 'utf8'); + } +} diff --git a/mqttclients/ts/mqttjsClientExtensions/tsconfig.json b/mqttclients/ts/mqttjsClientExtensions/tsconfig.json new file mode 100644 index 00000000..7acd5733 --- /dev/null +++ b/mqttclients/ts/mqttjsClientExtensions/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "composite": true, + "rootDir": "./src", + "outDir": "./dist" + }, + "include": [ + "src/**/*.ts" + ] +} \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 00000000..60a92bd3 --- /dev/null +++ b/package.json @@ -0,0 +1,42 @@ +{ + "name": "mqttapplicationsamples", + "version": "1.0.0", + "description": "Guidance to build Pub/Sub applications targeting MQTT Brokers.", + "scripts": { + "build": "tsc --build --verbose", + "clean": "tsc --build --clean && npm run clean:dist", + "clean:dist": "rm -rf $(find . -path ./node_modules -prune -o -type d -name 'dist' -print)", + "eslint": "eslint .", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "mqtt", + "iot", + "azure" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "private": true, + "workspaces": [ + "mqttclients/ts/mqttjsClientExtensions", + "scenarios/getting_started/ts/gettingStarted", + "scenarios/telemetry/ts/telemetryproducer", + "scenarios/telemetry/ts/telemetryconsumer", + "scenarios/command/ts/protoMessages", + "scenarios/command/ts/commandServer", + "scenarios/command/ts/commandClient" + ], + "dependencies": { + "commander": "^11.1.0", + "dotenv": "^16.3.1", + "mqtt": "^5.3.3", + "mqtt-packet": "^9.0.0" + }, + "devDependencies": { + "@types/node": "^20.10.4", + "@typescript-eslint/eslint-plugin": "^6.13.2", + "@typescript-eslint/parser": "^6.13.2", + "eslint": "^8.55.0", + "typescript": "^5.3.3" + } +} diff --git a/scenarios/command/README.md b/scenarios/command/README.md index 57fd307e..4b085199 100644 --- a/scenarios/command/README.md +++ b/scenarios/command/README.md @@ -273,3 +273,28 @@ python python/command_receiver.py --env-file=vehicle03.env ```bash python python/command_invoker.py --env-file=mobile-app.env ``` + +### TypeScript + +To build the TypeScript sample run: +>Note: The scenario should already be built from the initial `npm i` command at the root. + +```bash +# from folder scenarios/command +npm run build:proto --prefix ./ts/protoMessages && npm run build --prefix ./ts/protoMessages && npm run build --prefix ./ts/commandServer && npm run build --prefix ./ts/commandClient +``` + +To run the TypeScript sample execute each line below in a different shell/terminal: +```bash +# from folder scenarios/command +node ./ts/commandServer/dist/index.js --env-file vehicle03.env +``` +```bash +# from folder scenarios/command +node ./ts/commandClient/dist/index.js --env-file mobile-app.env +``` + +To see detailed MQTT.js debug logging configure the DEBUG environment variable before running the sample. +```bash +export DEBUG=mqttjs* && node ... +``` diff --git a/scenarios/command/ts/commandClient/package.json b/scenarios/command/ts/commandClient/package.json new file mode 100644 index 00000000..43a2c409 --- /dev/null +++ b/scenarios/command/ts/commandClient/package.json @@ -0,0 +1,22 @@ +{ + "name": "@mqttapplicationsamples/commandclient", + "version": "1.0.0", + "description": "Request-response messaging pattern - TypeScript", + "main": "dist/index.js", + "scripts": { + "build": "tsc --build", + "clean": "tsc --build --clean", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "mqtt", + "iot", + "azure" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "dependencies": { + "@mqttapplicationsamples/mqttjsclientextensions": "^1.0.0", + "@mqttapplicationsamples/protomessages": "^1.0.0" + } +} \ No newline at end of file diff --git a/scenarios/command/ts/commandClient/src/index.ts b/scenarios/command/ts/commandClient/src/index.ts new file mode 100644 index 00000000..034b596a --- /dev/null +++ b/scenarios/command/ts/commandClient/src/index.ts @@ -0,0 +1,105 @@ +import { + IConnackPacket, + IDisconnectPacket +} from 'mqtt'; +import { Command } from 'commander'; +import { + logger, + MqttConnectionSettings, + SampleMqttClient +} from '@mqttapplicationsamples/mqttjsclientextensions'; +import { + Timestamp, + UnlockRequest +} from '@mqttapplicationsamples/protomessages'; +import { UnlockCommandClient } from './unlockCommandClient'; + +// Parse command line arguments to get the environment file path +const programCommands = new Command(); +programCommands + .requiredOption('-e, --env-file ', 'Environment filepath') + .parse(process.argv); + +// Load environment variables from .env file using the dotenv package +const programOptions = programCommands.opts(); + +const ModuleName = 'SampleApp'; + +let sampleApp: SampleApp; + +class SampleApp { + private sampleMqttClient: SampleMqttClient; + + public async stopSample(): Promise { + if (this.sampleMqttClient) { + await this.sampleMqttClient.mqttClient.endAsync(true); + } + } + + public async startSample(): Promise { + try { + logger.info({ tags: [ModuleName] }, `Starting MQTT command client`); + + const cs = MqttConnectionSettings.createFromEnvVars(programOptions.envFile); + + // Create the SampleMqttClient instance, this wraps the MQTT.js client + this.sampleMqttClient = SampleMqttClient.createFromConnectionSettings(cs); + + this.sampleMqttClient.mqttClient.on('connect', this.onConnect.bind(this)); + this.sampleMqttClient.mqttClient.on('disconnect', this.onDisconnect.bind(this)); + + // Connect to the MQTT broker using the connection settings from the .env file + await this.sampleMqttClient.connectAsync(); + + const commandClient = new UnlockCommandClient(this.sampleMqttClient.mqttClient); + + try { + logger.info({ tags: [ModuleName] }, `Invoking unlock command: ${new Date().toISOString()}`); + + const unlockRequest = UnlockRequest.create({ + when: Timestamp.now(), + requestedFrom: this.sampleMqttClient.mqttClient.options.clientId + }); + + const response = await commandClient.invokeAsync("vehicle03", unlockRequest, 30000); + + logger.info({ tags: [ModuleName] }, `Command response succeed=${response.succeed}`); + } + catch (ex) { + logger.error({ tags: [ModuleName] }, `Command response error: ${ex.message}`); + } + } + catch (ex) { + logger.error({ tags: [ModuleName] }, `MQTT client sample error: ${ex.message}`); + } + } + + private onConnect(connAck: IConnackPacket): void { + logger.info({ tags: [ModuleName] }, `Client Connected: ${this.sampleMqttClient.mqttClient.connected} with CONNACK: ${connAck.reasonCode}`); + } + + private onDisconnect(packet: IDisconnectPacket): void { + logger.info({ tags: [ModuleName] }, `Mqtt client disconnected with reason: ${packet.reasonCode}`); + } +} + +process.on('SIGINT', async () => { + logger.error({ tags: [ModuleName] }, `SIGINT received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +process.on('SIGTERM', async () => { + logger.error({ tags: [ModuleName] }, `SIGTERM received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +void (async () => { + sampleApp = new SampleApp(); + await sampleApp.startSample(); +})().catch(); diff --git a/scenarios/command/ts/commandClient/src/unlockCommandClient.ts b/scenarios/command/ts/commandClient/src/unlockCommandClient.ts new file mode 100644 index 00000000..48fb1136 --- /dev/null +++ b/scenarios/command/ts/commandClient/src/unlockCommandClient.ts @@ -0,0 +1,19 @@ +import { MqttClient } from 'mqtt'; +import { CommandClient } from '@mqttapplicationsamples/mqttjsclientextensions'; +import { + UnlockRequest, + UnlockResponse, + UnlockRequestSerializer, + UnlockResponseSerializer +} from '@mqttapplicationsamples/protomessages'; + +export const UnlockCommand = 'unlock'; +export const RequestTopicPattern = 'vehicles/{clientId}/command/{commandName}/request'; +export const ResponseTopicPattern = 'vehicles/{clientId}/command/{commandName}/response'; + +export class UnlockCommandClient extends CommandClient +{ + constructor(mqttClient: MqttClient) { + super(mqttClient, RequestTopicPattern, ResponseTopicPattern, UnlockCommand, new UnlockRequestSerializer(), new UnlockResponseSerializer()); + } +} diff --git a/scenarios/command/ts/commandClient/tsconfig.json b/scenarios/command/ts/commandClient/tsconfig.json new file mode 100644 index 00000000..8cc7af3a --- /dev/null +++ b/scenarios/command/ts/commandClient/tsconfig.json @@ -0,0 +1,19 @@ +{ + "extends": "../../../../tsconfig.base.json", + "compilerOptions": { + "composite": true, + "rootDir": "./src", + "outDir": "./dist" + }, + "include": [ + "src/**/*.ts" + ], + "references": [ + { + "path": "../../../../mqttclients/ts/mqttjsClientExtensions" + }, + { + "path": "../protoMessages" + } + ] +} \ No newline at end of file diff --git a/scenarios/command/ts/commandServer/package.json b/scenarios/command/ts/commandServer/package.json new file mode 100644 index 00000000..86e65496 --- /dev/null +++ b/scenarios/command/ts/commandServer/package.json @@ -0,0 +1,22 @@ +{ + "name": "@mqttapplicationsamples/commandserver", + "version": "1.0.0", + "description": "Request-response messaging pattern - TypeScript", + "main": "dist/index.js", + "scripts": { + "build": "tsc --build", + "clean": "tsc --build --clean", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "mqtt", + "iot", + "azure" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "dependencies": { + "@mqttapplicationsamples/mqttjsclientextensions": "^1.0.0", + "@mqttapplicationsamples/protomessages": "^1.0.0" + } +} \ No newline at end of file diff --git a/scenarios/command/ts/commandServer/src/index.ts b/scenarios/command/ts/commandServer/src/index.ts new file mode 100644 index 00000000..75daedc3 --- /dev/null +++ b/scenarios/command/ts/commandServer/src/index.ts @@ -0,0 +1,98 @@ +import { + IConnackPacket, + IDisconnectPacket +} from 'mqtt'; +import { Command } from 'commander'; +import { + logger, + MqttConnectionSettings, + SampleMqttClient +} from '@mqttapplicationsamples/mqttjsclientextensions'; +import { + UnlockRequest, + UnlockResponse +} from '@mqttapplicationsamples/protomessages'; +import { UnlockCommandServer } from './unlockCommandServer'; + +// Parse command line arguments to get the environment file path +const programCommands = new Command(); +programCommands + .requiredOption('-e, --env-file ', 'Environment filepath') + .parse(process.argv); + +// Load environment variables from .env file using the dotenv package +const programOptions = programCommands.opts(); + +const ModuleName = 'SampleApp'; + +let sampleApp: SampleApp; + +class SampleApp { + private sampleMqttClient: SampleMqttClient; + + public async stopSample(): Promise { + if (this.sampleMqttClient) { + await this.sampleMqttClient.mqttClient.endAsync(true); + } + } + + public async startSample(): Promise { + try { + logger.info({ tags: [ModuleName] }, `Starting MQTT command server`); + + const cs = MqttConnectionSettings.createFromEnvVars(programOptions.envFile); + + // Create the SampleMqttClient instance, this wraps the MQTT.js client + this.sampleMqttClient = SampleMqttClient.createFromConnectionSettings(cs); + + this.sampleMqttClient.mqttClient.on('connect', this.onConnect.bind(this)); + this.sampleMqttClient.mqttClient.on('disconnect', this.onDisconnect.bind(this)); + + // Connect to the MQTT broker using the connection settings from the .env file + await this.sampleMqttClient.connectAsync(); + + const commandUnlock = new UnlockCommandServer(this.sampleMqttClient.mqttClient); + commandUnlock.onCommandReceived = this.unlock.bind(this); + + await commandUnlock.startAsync(); + } + catch (ex) { + logger.error({ tags: [ModuleName] }, `MQTT client sample error: ${ex.message}`); + } + } + + private onConnect(connAck: IConnackPacket): void { + logger.info({ tags: [ModuleName] }, `Client Connected: ${this.sampleMqttClient.mqttClient.connected} with CONNACK: ${connAck.reasonCode}`); + } + + private onDisconnect(packet: IDisconnectPacket): void { + logger.info({ tags: [ModuleName] }, `Mqtt client disconnected with reason: ${packet.reasonCode}`); + } + + private unlock(unlockRequest: UnlockRequest): UnlockResponse { + logger.info({ tags: [ModuleName] }, `Handling unlock request from ${unlockRequest.requestedFrom}`); + + return UnlockResponse.create({ succeed: true }); + } +} + +process.on('SIGINT', async () => { + logger.error({ tags: [ModuleName] }, `SIGINT received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +process.on('SIGTERM', async () => { + logger.error({ tags: [ModuleName] }, `SIGTERM received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +void (async () => { + sampleApp = new SampleApp(); + await sampleApp.startSample(); +})().catch(); diff --git a/scenarios/command/ts/commandServer/src/unlockCommandServer.ts b/scenarios/command/ts/commandServer/src/unlockCommandServer.ts new file mode 100644 index 00000000..3f70ff09 --- /dev/null +++ b/scenarios/command/ts/commandServer/src/unlockCommandServer.ts @@ -0,0 +1,18 @@ +import { MqttClient } from 'mqtt'; +import { CommandServer } from '@mqttapplicationsamples/mqttjsclientextensions'; +import { + UnlockRequest, + UnlockResponse, + UnlockRequestSerializer, + UnlockResponseSerializer +} from '@mqttapplicationsamples/protomessages'; + +export const UnlockCommand = 'unlock'; +export const RequestTopicPattern = 'vehicles/{clientId}/command/{commandName}/request'; + +export class UnlockCommandServer extends CommandServer +{ + constructor(mqttClient: MqttClient) { + super(mqttClient, RequestTopicPattern, UnlockCommand, new UnlockRequestSerializer(), new UnlockResponseSerializer()); + } +} diff --git a/scenarios/command/ts/commandServer/tsconfig.json b/scenarios/command/ts/commandServer/tsconfig.json new file mode 100644 index 00000000..8cc7af3a --- /dev/null +++ b/scenarios/command/ts/commandServer/tsconfig.json @@ -0,0 +1,19 @@ +{ + "extends": "../../../../tsconfig.base.json", + "compilerOptions": { + "composite": true, + "rootDir": "./src", + "outDir": "./dist" + }, + "include": [ + "src/**/*.ts" + ], + "references": [ + { + "path": "../../../../mqttclients/ts/mqttjsClientExtensions" + }, + { + "path": "../protoMessages" + } + ] +} \ No newline at end of file diff --git a/scenarios/command/ts/protoMessages/package.json b/scenarios/command/ts/protoMessages/package.json new file mode 100644 index 00000000..ba0eb7b2 --- /dev/null +++ b/scenarios/command/ts/protoMessages/package.json @@ -0,0 +1,23 @@ +{ + "name": "@mqttapplicationsamples/protomessages", + "version": "1.0.0", + "description": "Protobuf support for mqtt request-response messaging pattern - TypeScript", + "main": "dist/index.js", + "scripts": { + "build": "npm run build:proto && tsc --build", + "clean": "tsc --build --clean", + "build:proto": "mkdir -p ./src/generated && ../../../../node_modules/.bin/protoc --plugin=../../../../node_modules/.bin/protoc-gen-ts --ts_out ./src/generated --proto_path ./src/proto ./src/proto/unlock_command.proto", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "mqtt", + "iot", + "azure" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "dependencies": { + "@mqttapplicationsamples/mqttjsclientextensions": "^1.0.0", + "@protobuf-ts/plugin": "^2.9.3" + } +} \ No newline at end of file diff --git a/scenarios/command/ts/protoMessages/src/index.ts b/scenarios/command/ts/protoMessages/src/index.ts new file mode 100644 index 00000000..13e44db6 --- /dev/null +++ b/scenarios/command/ts/protoMessages/src/index.ts @@ -0,0 +1,15 @@ +import { Timestamp } from './generated/google/protobuf/timestamp'; +import { + UnlockRequest, + UnlockResponse +} from './generated/unlock_command'; +import { UnlockRequestSerializer } from './unlockRequestSerializer'; +import { UnlockResponseSerializer } from './unlockResponseSerializer'; + +export { + Timestamp, + UnlockRequest, + UnlockResponse, + UnlockRequestSerializer, + UnlockResponseSerializer +}; diff --git a/scenarios/command/ts/protoMessages/src/proto/unlock_command.proto b/scenarios/command/ts/protoMessages/src/proto/unlock_command.proto new file mode 100644 index 00000000..ec20e6d9 --- /dev/null +++ b/scenarios/command/ts/protoMessages/src/proto/unlock_command.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message UnlockRequest { + google.protobuf.Timestamp when = 1; + string requestedFrom = 2; +} + +message UnlockResponse { + bool succeed = 1; + string errorDetail = 2; +} + +service Commands { + rpc Unlock(UnlockRequest) returns (UnlockResponse); +} diff --git a/scenarios/command/ts/protoMessages/src/unlockRequestSerializer.ts b/scenarios/command/ts/protoMessages/src/unlockRequestSerializer.ts new file mode 100644 index 00000000..021e1d04 --- /dev/null +++ b/scenarios/command/ts/protoMessages/src/unlockRequestSerializer.ts @@ -0,0 +1,16 @@ +import { IMessageSerializer } from '@mqttapplicationsamples/mqttjsclientextensions'; +import { + UnlockRequest +} from './generated/unlock_command'; + +export class UnlockRequestSerializer implements IMessageSerializer { + public contentType = "application/protobuf"; + + public fromBytes(payload: Buffer): T { + return UnlockRequest.fromBinary(payload) as T; + } + + public toBytes(payload: T): Buffer { + return Buffer.from(UnlockRequest.toBinary(payload as UnlockRequest)); + } +} \ No newline at end of file diff --git a/scenarios/command/ts/protoMessages/src/unlockResponseSerializer.ts b/scenarios/command/ts/protoMessages/src/unlockResponseSerializer.ts new file mode 100644 index 00000000..8b572cca --- /dev/null +++ b/scenarios/command/ts/protoMessages/src/unlockResponseSerializer.ts @@ -0,0 +1,16 @@ +import { IMessageSerializer } from '@mqttapplicationsamples/mqttjsclientextensions'; +import { + UnlockResponse +} from './generated/unlock_command'; + +export class UnlockResponseSerializer implements IMessageSerializer { + public contentType = "application/protobuf"; + + public fromBytes(payload: Buffer): T { + return UnlockResponse.fromBinary(payload) as T; + } + + public toBytes(payload: T): Buffer { + return Buffer.from(UnlockResponse.toBinary(payload as UnlockResponse)); + } +} \ No newline at end of file diff --git a/scenarios/command/ts/protoMessages/tsconfig.json b/scenarios/command/ts/protoMessages/tsconfig.json new file mode 100644 index 00000000..e7d96ade --- /dev/null +++ b/scenarios/command/ts/protoMessages/tsconfig.json @@ -0,0 +1,17 @@ +{ + "extends": "../../../../tsconfig.base.json", + "compilerOptions": { + "composite": true, + "rootDir": "./src", + "outDir": "./dist", + "noUnusedParameters": false + }, + "include": [ + "src/**/*.ts" + ], + "references": [ + { + "path": "../../../../mqttclients/ts/mqttjsClientExtensions" + } + ] +} \ No newline at end of file diff --git a/scenarios/getting_started/README.md b/scenarios/getting_started/README.md index 2f550a55..e1132305 100644 --- a/scenarios/getting_started/README.md +++ b/scenarios/getting_started/README.md @@ -224,4 +224,34 @@ Run the sample using settings from an envfile ```bash # from folder scenarios/getting_started go/bin/getting_started .env -``` \ No newline at end of file +``` +### TypeScript +To build the TypeScript sample run: +>Note: The scenario should already be built from the initial `npm i` command at the root. + +```bash +# from folder scenarios/getting_started +npm run build --prefix ./ts/gettingStarted +``` + +The sample can be run and debugged either in the [Visual Studio Code IDE](https://code.visualstudio.com/), or from the command line. This will use the `.env` file created in the steps above. + +To run the sample from Visual Studio Code, select the Run and Debug option from the left pane then select the "TypeScript Getting Started" configuration from the Run and Debug dropdown menu. Then just use F5 or click on the green play button. + +To run the sample from the command line: +```bash +# from folder scenarios/getting_started +node ./ts/gettingStarted/dist/index.js --env-file .env +``` + +To see detailed MQTT.js debug logging configure the DEBUG environment variable before running the sample. + +Using Visual Studio Code, simply add another entry to the .env file: +```bash +DEBUG=mqttjs* +``` + +Using the command line: +```bash +export DEBUG=mqttjs* && node ./ts/gettingStarted/dist/index.js --env-file .env +``` diff --git a/scenarios/getting_started/ts/gettingStarted/package.json b/scenarios/getting_started/ts/gettingStarted/package.json new file mode 100644 index 00000000..7939e50c --- /dev/null +++ b/scenarios/getting_started/ts/gettingStarted/package.json @@ -0,0 +1,21 @@ +{ + "name": "@mqttapplicationsamples/gettingstarted", + "version": "1.0.0", + "description": "Publish and subscribe MQTT messages - TypeScript", + "main": "dist/index.js", + "scripts": { + "build": "tsc --build", + "clean": "tsc --build --clean", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "mqtt", + "iot", + "azure" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "dependencies": { + "@mqttapplicationsamples/mqttjsclientextensions": "^1.0.0" + } +} \ No newline at end of file diff --git a/scenarios/getting_started/ts/gettingStarted/src/index.ts b/scenarios/getting_started/ts/gettingStarted/src/index.ts new file mode 100644 index 00000000..e353856f --- /dev/null +++ b/scenarios/getting_started/ts/gettingStarted/src/index.ts @@ -0,0 +1,94 @@ +import { + IConnackPacket, + IPublishPacket +} from 'mqtt'; +import { + logger, + MqttConnectionSettings, + SampleMqttClient +} from '@mqttapplicationsamples/mqttjsclientextensions'; +import { Command } from 'commander'; + +// Parse command line arguments to get the environment file path +const programCommands = new Command(); +programCommands + .requiredOption('-e, --env-file ', 'Environment filepath') + .parse(process.argv); +const programOptions = programCommands.opts(); + +const ModuleName = 'SampleApp'; + +let sampleApp: SampleApp; + +class SampleApp { + private sampleMqttClient: SampleMqttClient; + + public async stopSample(): Promise { + if (this.sampleMqttClient) { + await this.sampleMqttClient.mqttClient.endAsync(true); + } + } + + public async startSample(): Promise { + try { + logger.info({ tags: [ModuleName] }, `Starting MQTT client sample`); + + const cs = MqttConnectionSettings.createFromEnvVars(programOptions.envFile); + + // Create the SampleMqttClient instance, this wraps the MQTT.js client + this.sampleMqttClient = SampleMqttClient.createFromConnectionSettings(cs); + + this.sampleMqttClient.mqttClient.on('connect', this.onConnect.bind(this)); + this.sampleMqttClient.mqttClient.on('message', this.onMessage.bind(this)); + + // Connect to the MQTT broker using the connection settings from the .env file + await this.sampleMqttClient.connectAsync(); + + const subscribeTopic = 'sample/+'; + + logger.info({ tags: [ModuleName] }, `Subscribing to MQTT topic: ${subscribeTopic}`); + + await this.sampleMqttClient.mqttClient.subscribeAsync(subscribeTopic, { + qos: 1 + }); + + const publishTopic = 'sample/topic1'; + + logger.info({ tags: [ModuleName] }, `Publishing to MQTT topic: ${publishTopic}`); + + await this.sampleMqttClient.mqttClient.publishAsync('sample/topic1', 'Hello World!'); + } + catch (ex) { + logger.error({ tags: [ModuleName] }, `MQTT client sample error: ${ex.message}`); + } + } + + private onConnect(connAck: IConnackPacket): void { + logger.info({ tags: [ModuleName] }, `Client Connected: ${this.sampleMqttClient.mqttClient.connected} with CONNACK: ${connAck.reasonCode}`); + } + + private onMessage(topic: string, payload: Buffer, _packet: IPublishPacket): void { + logger.info({ tags: [ModuleName] }, `Received message on topic: '${topic}' with content: '${payload.toString('utf8')}'`); + } +} + +process.on('SIGINT', async () => { + logger.error({ tags: [ModuleName] }, `SIGINT received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +process.on('SIGTERM', async () => { + logger.error({ tags: [ModuleName] }, `SIGTERM received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +void (async () => { + sampleApp = new SampleApp(); + await sampleApp.startSample(); +})().catch(); diff --git a/scenarios/getting_started/ts/gettingStarted/tsconfig.json b/scenarios/getting_started/ts/gettingStarted/tsconfig.json new file mode 100644 index 00000000..3ae5b201 --- /dev/null +++ b/scenarios/getting_started/ts/gettingStarted/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "../../../../tsconfig.base.json", + "compilerOptions": { + "composite": true, + "rootDir": "./src", + "outDir": "./dist" + }, + "include": [ + "src/**/*.ts" + ], + "references": [ + { + "path": "../../../../mqttclients/ts/mqttjsClientExtensions" + } + ] +} \ No newline at end of file diff --git a/scenarios/telemetry/README.md b/scenarios/telemetry/README.md index 8aea8473..fd820cf5 100644 --- a/scenarios/telemetry/README.md +++ b/scenarios/telemetry/README.md @@ -282,4 +282,31 @@ python python/telemetry_producer.py --env-file="vehicle02.env" ```bash # from folder scenarios/telemetry python python/telemetry_consumer.py --env-file=map-app.env -``` \ No newline at end of file +``` +### TypeScript +To build the TypeScript sample run: +>Note: The scenario should already be built from the initial `npm i` command at the root. + +```bash +# from folder scenarios/telemetry +npm run build --prefix ./ts/telemetryProducer && npm run build --prefix ./ts/telemetryConsumer +``` + +To run the dotnet sample execute each line below in a different shell/terminal: +```bash +# from folder scenarios/telemetry +node ./ts/telemetryProducer/dist/index.js --env-file vehicle01.env +``` +```bash +# from folder scenarios/telemetry +node ./ts/telemetryProducer/dist/index.js --env-file vehicle02.env +``` +```bash +# from folder scenarios/telemetry +node ./ts/telemetryConsumer/dist/index.js --env-file map-app.env +``` + +To see detailed MQTT.js debug logging configure the DEBUG environment variable before running the sample. +```bash +export DEBUG=mqttjs* && node ... +``` diff --git a/scenarios/telemetry/ts/telemetryConsumer/package.json b/scenarios/telemetry/ts/telemetryConsumer/package.json new file mode 100644 index 00000000..bdbf40ff --- /dev/null +++ b/scenarios/telemetry/ts/telemetryConsumer/package.json @@ -0,0 +1,21 @@ +{ + "name": "@mqttapplicationsamples/telemetryconsumer", + "version": "1.0.0", + "description": "Consumer topic for multiple producers - TypeScript", + "main": "dist/index.js", + "scripts": { + "build": "tsc --build", + "clean": "tsc --build --clean", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "mqtt", + "iot", + "azure" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "dependencies": { + "@mqttapplicationsamples/mqttjsclientextensions": "^1.0.0" + } +} \ No newline at end of file diff --git a/scenarios/telemetry/ts/telemetryConsumer/src/index.ts b/scenarios/telemetry/ts/telemetryConsumer/src/index.ts new file mode 100644 index 00000000..896f6414 --- /dev/null +++ b/scenarios/telemetry/ts/telemetryConsumer/src/index.ts @@ -0,0 +1,96 @@ +import { + IConnackPacket, + IDisconnectPacket +} from 'mqtt'; +import { + logger, + GeoJsonPoint, + MqttConnectionSettings, + SampleMqttClient, + TelemetryMessage +} from '@mqttapplicationsamples/mqttjsclientextensions'; +import { Command } from 'commander'; +import { PositionTelemetryConsumer } from './positionTelemetryConsumer'; + +// Parse command line arguments to get the environment file path +const programCommands = new Command(); +programCommands + .requiredOption('-e, --env-file ', 'Environment filepath') + .parse(process.argv); +const programOptions = programCommands.opts(); + +const ModuleName = 'TelemetryConsumerApp'; + +let sampleApp: SampleApp; + +class SampleApp { + private sampleMqttClient: SampleMqttClient = null as any; + + public async stopSample(): Promise { + if (this.sampleMqttClient) { + await this.sampleMqttClient.mqttClient.endAsync(true); + + this.sampleMqttClient = null as any; + } + } + + public async startSample(): Promise { + try { + logger.info({ tags: [ModuleName] }, `Starting MQTT client telemetry consumer`); + + const cs = MqttConnectionSettings.createFromEnvVars(programOptions.envFile); + + // Create the SampleMqttClient instance, this wraps the MQTT.js client + this.sampleMqttClient = SampleMqttClient.createFromConnectionSettings(cs); + + this.sampleMqttClient.mqttClient.on('connect', this.onConnect.bind(this)); + this.sampleMqttClient.mqttClient.on('disconnect', this.onDisconnect.bind(this)); + + // Connect to the MQTT broker using the connection settings from the .env file + await this.sampleMqttClient.connectAsync(); + + const telemetryConsumer = new PositionTelemetryConsumer(this.sampleMqttClient.mqttClient); + telemetryConsumer.onTelemetryReceived = async (msg: TelemetryMessage): Promise => { + await new Promise((resolve, reject) => { + process.nextTick(resolve, reject); + }); + + logger.info({ tags: [ModuleName] }, `Received msg from ${msg.clientIdFromTopic}. Coordinates lat: ${msg.payload.coordinates[0]}, lon: ${msg.payload.coordinates[1]}`); + }; + + await telemetryConsumer.startAsync(); + } + catch (ex) { + logger.error({ tags: [ModuleName] }, `MQTT client sample error: ${ex.message}`); + } + } + + private onConnect(connAck: IConnackPacket): void { + logger.info({ tags: [ModuleName] }, `Client Connected: ${this.sampleMqttClient.mqttClient.connected} with CONNACK: ${connAck.reasonCode}`); + } + + private onDisconnect(packet: IDisconnectPacket): void { + logger.info({ tags: [ModuleName] }, `Mqtt client disconnected with reason: ${packet.reasonCode}`); + } +} + +process.on('SIGINT', async () => { + logger.error({ tags: [ModuleName] }, `SIGINT received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +process.on('SIGTERM', async () => { + logger.error({ tags: [ModuleName] }, `SIGTERM received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +void (async () => { + sampleApp = new SampleApp(); + await sampleApp.startSample(); +})().catch(); diff --git a/scenarios/telemetry/ts/telemetryConsumer/src/positionTelemetryConsumer.ts b/scenarios/telemetry/ts/telemetryConsumer/src/positionTelemetryConsumer.ts new file mode 100644 index 00000000..8a18ed83 --- /dev/null +++ b/scenarios/telemetry/ts/telemetryConsumer/src/positionTelemetryConsumer.ts @@ -0,0 +1,14 @@ +import { + MqttClient +} from 'mqtt'; +import { + GeoJsonPoint, + Utf8JsonSerializer, + TelemetryConsumer +} from '@mqttapplicationsamples/mqttjsclientextensions'; + +export class PositionTelemetryConsumer extends TelemetryConsumer { + constructor(mqttClient: MqttClient) { + super(mqttClient, new Utf8JsonSerializer(), "vehicles/+/position"); + } +} diff --git a/scenarios/telemetry/ts/telemetryConsumer/tsconfig.json b/scenarios/telemetry/ts/telemetryConsumer/tsconfig.json new file mode 100644 index 00000000..3ae5b201 --- /dev/null +++ b/scenarios/telemetry/ts/telemetryConsumer/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "../../../../tsconfig.base.json", + "compilerOptions": { + "composite": true, + "rootDir": "./src", + "outDir": "./dist" + }, + "include": [ + "src/**/*.ts" + ], + "references": [ + { + "path": "../../../../mqttclients/ts/mqttjsClientExtensions" + } + ] +} \ No newline at end of file diff --git a/scenarios/telemetry/ts/telemetryProducer/package.json b/scenarios/telemetry/ts/telemetryProducer/package.json new file mode 100644 index 00000000..be8a15fb --- /dev/null +++ b/scenarios/telemetry/ts/telemetryProducer/package.json @@ -0,0 +1,22 @@ +{ + "name": "@mqttapplicationsamples/telemetryproducer", + "version": "1.0.0", + "description": "Multiple producer topic - TypeScript", + "main": "dist/index.js", + "scripts": { + "build": "tsc --build", + "clean": "tsc --build --clean", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "mqtt", + "iot", + "azure" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "dependencies": { + "@mqttapplicationsamples/mqttjsclientextensions": "^1.0.0", + "mqtt-packet": "^9.0.0" + } +} \ No newline at end of file diff --git a/scenarios/telemetry/ts/telemetryProducer/src/index.ts b/scenarios/telemetry/ts/telemetryProducer/src/index.ts new file mode 100644 index 00000000..e311b902 --- /dev/null +++ b/scenarios/telemetry/ts/telemetryProducer/src/index.ts @@ -0,0 +1,104 @@ +import { + IConnackPacket, + IDisconnectPacket +} from 'mqtt'; +import { + logger, + GeoJsonPoint, + MqttConnectionSettings, + SampleMqttClient +} from '@mqttapplicationsamples/mqttjsclientextensions'; +import { Command } from 'commander'; +import { PositionTelemetryProducer } from './positionTelemetryProducer'; + +// Parse command line arguments to get the environment file path +const programCommands = new Command(); +programCommands + .requiredOption('-e, --env-file ', 'Environment filepath') + .parse(process.argv); +const programOptions = programCommands.opts(); + +const ModuleName = 'TelemetryProducerApp'; +const VehicleTelemetryPublishIntervalInSeconds = 3; + +let sampleApp: SampleApp; + +class SampleApp { + private sampleMqttClient: SampleMqttClient = null as any; + + public async stopSample(): Promise { + if (this.sampleMqttClient) { + await this.sampleMqttClient.mqttClient.endAsync(true); + + this.sampleMqttClient = null as any; + } + } + + public async startSample(): Promise { + try { + logger.info({ tags: [ModuleName] }, `Starting MQTT client telemetry producer`); + + const cs = MqttConnectionSettings.createFromEnvVars(programOptions.envFile); + + // Create the SampleMqttClient instance, this wraps the MQTT.js client + this.sampleMqttClient = SampleMqttClient.createFromConnectionSettings(cs); + + this.sampleMqttClient.mqttClient.on('connect', this.onConnect.bind(this)); + this.sampleMqttClient.mqttClient.on('disconnect', this.onDisconnect.bind(this)); + + // Connect to the MQTT broker using the connection settings from the .env file + await this.sampleMqttClient.connectAsync(); + + const telemetryProducer = new PositionTelemetryProducer(this.sampleMqttClient.mqttClient); + + // Start sending vehicle telemetry data to the 'vehicles//position' topic + while (this.sampleMqttClient) { + const latMin = -90; + const latMax = 90; + const lonMin = -180; + const lonMax = 180; + + const lat = Math.floor(Math.random() * (latMax - latMin + 1) + latMin); + const lon = Math.floor(Math.random() * (lonMax - lonMin + 1) + lonMin); + + const pubAck = await telemetryProducer.SendTelemetryAsync(new GeoJsonPoint(lat, lon), 1); + + logger.info({ tags: [ModuleName] }, `Message published on topic '${pubAck.topic}' and mid ${pubAck?.messageId ?? -1}`); + + await new Promise((resolve) => setTimeout(resolve, 1000 * VehicleTelemetryPublishIntervalInSeconds)); + } + } + catch (ex) { + logger.error({ tags: [ModuleName] }, `MQTT client sample error: ${ex.message}`); + } + } + + private onConnect(connAck: IConnackPacket): void { + logger.info({ tags: [ModuleName] }, `Client Connected: ${this.sampleMqttClient.mqttClient.connected} with CONNACK: ${connAck.reasonCode}`); + } + + private onDisconnect(packet: IDisconnectPacket): void { + logger.info({ tags: [ModuleName] }, `Mqtt client disconnected with reason: ${packet.reasonCode}`); + } +} + +process.on('SIGINT', async () => { + logger.error({ tags: [ModuleName] }, `SIGINT received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +process.on('SIGTERM', async () => { + logger.error({ tags: [ModuleName] }, `SIGTERM received: ending the session and exiting the sample...`); + + if (sampleApp) { + await sampleApp.stopSample(); + } +}); + +void (async () => { + sampleApp = new SampleApp(); + await sampleApp.startSample(); +})().catch(); diff --git a/scenarios/telemetry/ts/telemetryProducer/src/positionTelemetryProducer.ts b/scenarios/telemetry/ts/telemetryProducer/src/positionTelemetryProducer.ts new file mode 100644 index 00000000..f83fa8ee --- /dev/null +++ b/scenarios/telemetry/ts/telemetryProducer/src/positionTelemetryProducer.ts @@ -0,0 +1,14 @@ +import { + MqttClient +} from 'mqtt'; +import { + GeoJsonPoint, + Utf8JsonSerializer, + TelemetryProducer, +} from '@mqttapplicationsamples/mqttjsclientextensions'; + +export class PositionTelemetryProducer extends TelemetryProducer { + constructor(mqttClient: MqttClient) { + super(mqttClient, new Utf8JsonSerializer(), "vehicles/{clientId}/position"); + } +} diff --git a/scenarios/telemetry/ts/telemetryProducer/tsconfig.json b/scenarios/telemetry/ts/telemetryProducer/tsconfig.json new file mode 100644 index 00000000..3ae5b201 --- /dev/null +++ b/scenarios/telemetry/ts/telemetryProducer/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "../../../../tsconfig.base.json", + "compilerOptions": { + "composite": true, + "rootDir": "./src", + "outDir": "./dist" + }, + "include": [ + "src/**/*.ts" + ], + "references": [ + { + "path": "../../../../mqttclients/ts/mqttjsClientExtensions" + } + ] +} \ No newline at end of file diff --git a/tsconfig.base.json b/tsconfig.base.json new file mode 100644 index 00000000..db5e66ed --- /dev/null +++ b/tsconfig.base.json @@ -0,0 +1,20 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "CommonJS", + "moduleResolution": "node", + "declaration": true, + "declarationMap": true, + "incremental": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "sourceMap": true, + "strict": false, + "strictNullChecks": true, + "noImplicitAny": false, + "noUnusedLocals": true, + "noUnusedParameters": true + }, + "compileOnSave": true, +} \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 00000000..ee0e4802 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,28 @@ +{ + "extends": "./tsconfig.base.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./dist" + }, + "files": [], + "references": [ + { + "path": "./scenarios/getting_started/ts/gettingStarted" + }, + { + "path": "./scenarios/telemetry/ts/telemetryProducer" + }, + { + "path": "./scenarios/telemetry/ts/telemetryConsumer" + }, + { + "path": "./scenarios/command/ts/protoMessages" + }, + { + "path": "./scenarios/command/ts/commandServer" + }, + { + "path": "./scenarios/command/ts/commandClient" + } + ] +} \ No newline at end of file