Skip to content

Commit

Permalink
Adds mqtt protocol to protofy agents
Browse files Browse the repository at this point in the history
  • Loading branch information
jcarlosn committed Nov 22, 2024
1 parent dd679b5 commit 6d77d8d
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 28 deletions.
3 changes: 2 additions & 1 deletion packages/protofy/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
},
"devDependencies": {
"@types/jest": "~29.5.5",
"express": "^4.18.2",
"aedes": "^0.50.0",
"express": "^4.18.2",
"jest": "~29.7.0",
"mqtt": "^5.2.1",
"ts-jest": "~29.1.1",
"typescript": "~5.3.3",
"zod": "^3.22.2",
Expand Down
8 changes: 5 additions & 3 deletions packages/protofy/src/AgentProtocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import { Agent } from "./Agent";
export class AgentProtocol {
agent: Agent;
listeners: Function[];
constructor(agent: Agent) {
options: any
constructor(agent: Agent, options?: any) {
this.agent = agent;
this.listeners = [];
this.options = options;
}

onMessage(cb: Function) {
Expand All @@ -20,7 +22,7 @@ export class AgentProtocol {
throw new Error('Not implemented');
}

static create(agent: Agent) {
return new AgentProtocol(agent);
static create(agent: Agent, options?: any) {
return new AgentProtocol(agent, options);
}
}
39 changes: 22 additions & 17 deletions packages/protofy/src/protocols/mqtt.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
import { Agent } from "../Agent";

//options is for auth params and similar protocol-specific options
export default async (agent: Agent, params: any, options = {}) => {
const protocol = agent.getProtocol();
if (protocol.type !== 'mqtt') {
throw new Error('Error: Invalid protocol type, expected http');
import { AgentProtocol } from "../AgentProtocol";
export class MQTTProtocol extends AgentProtocol {
mqttClient: any;
constructor(agent: Agent, mqttClient: any) {
super(agent);
this.mqttClient = mqttClient;
}

if (!protocol.config || !protocol.config.url || !protocol.config.topic) {
throw new Error('Error: Missing URL or topic in protocol config');
}
async send(params, options?) {
const agent = this.agent
const protocol = agent.getProtocol();
if (protocol.type !== 'mqtt') {
throw new Error('Error: Invalid protocol type, expected http');
}

const {
topic,
} = protocol.config;

const {
url,
topic,
encoder = 'body',
serializer = 'json',
} = protocol.config;
this.mqttClient.publish(topic, JSON.stringify(params));
}


};
static create(agent: Agent, mqttClient: any) {
return new MQTTProtocol(agent, mqttClient);
}
}
19 changes: 12 additions & 7 deletions packages/protofy/tests/mqtt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import aedes from 'aedes';
import { Agent } from '../src/Agent';
import { z } from 'zod';
import { zodToJsonSchema } from 'zod-to-json-schema';
import mqttRunner from '../src/protocols/mqtt';
import {MQTTProtocol} from '../src/protocols/mqtt';
import * as mqtt from 'mqtt';

const aedesInstance = new aedes();
aedesInstance.authenticate = function (client, username, password, callback) { callback(null, true); };
Expand All @@ -12,15 +13,19 @@ let paramsSchema;
let returnSchema;
let agent;
let server;
let mqttClient;

describe('MQTT Agents', () => {
beforeEach(() => {
beforeAll(async () => {
server = net.createServer((socket) => {
aedesInstance.handle(socket);
});

server.listen(12346);

await new Promise((resolve) => setTimeout(resolve, 1000));
mqttClient = mqtt.connect('mqtt://localhost:12346');
await new Promise((resolve) => setTimeout(resolve, 1000));

paramsSchema = z.object({
id: z.string().uuid(),
name: z.string().min(1),
Expand Down Expand Up @@ -52,12 +57,12 @@ describe('MQTT Agents', () => {
});
});

afterEach(() => {
afterAll(() => {
server.close();
aedesInstance.close();
});

it.skip('Should be able to run the agent through mqtt', async () => {
it('Should be able to run the agent through mqtt', async () => {
// Variables para almacenar el estado y el mensaje publicado
let messagePublished = false;
let publishedPayload = null;
Expand All @@ -76,8 +81,8 @@ describe('MQTT Agents', () => {
age: 30,
email: 'a@a.com',
};

await mqttRunner(agent, payload);
const protocol = MQTTProtocol.create(agent, mqttClient);
await protocol.send(payload);
await new Promise((resolve) => setTimeout(resolve, 100));
expect(messagePublished).toBe(true);
expect(() => paramsSchema.parse(JSON.parse(publishedPayload))).not.toThrow();
Expand Down
2 changes: 2 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -27873,10 +27873,12 @@ __metadata:
resolution: "protofy@workspace:packages/protofy"
dependencies:
"@types/jest": "npm:~29.5.5"
aedes: "npm:^0.50.0"
ajv: "npm:8.17.1"
axios: "npm:^1.7.7"
express: "npm:^4.18.2"
jest: "npm:~29.7.0"
mqtt: "npm:^5.2.1"
ts-jest: "npm:~29.1.1"
typescript: "npm:~5.3.3"
zod: "npm:^3.22.2"
Expand Down

0 comments on commit 6d77d8d

Please sign in to comment.