-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest-proxy.js
138 lines (117 loc) · 2.89 KB
/
test-proxy.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
var aedes = require('aedes')
var mqttPacket = require('mqtt-packet')
var net = require('net')
var proxyProtocol = require('proxy-protocol-js')
var protocolDecoder = require('./protocol-decoder');
var brokerPort = 4883
var proxyPort = 4884
var clientIp = '192.168.0.140'
var packet = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: 'my-client-proxyV1',
keepalive: 0
}
var packet2 = {
cmd: 'disconnect'
}
var packet3 = {
cmd: 'publish',
messageId: 42,
qos: 0,
dup: false,
topic: 'test',
payload: new Buffer('test'),
retain: false
}
var buf = mqttPacket.generate(packet)
var buf2 = mqttPacket.generate(packet2)
var buf3 = mqttPacket.generate(packet3)
var src = new proxyProtocol.Peer(clientIp, 12345)
var dst = new proxyProtocol.Peer('127.0.0.1', proxyPort)
const decodeProtocol = (client, buff) => {
const proto = protocolDecoder(client, buff);
if (proto.data) {
console.log('decodeProtocol:proto data', {
protocolData: proto.data.toString(),
mqttPacket: buf.toString(),
areEquals: proto.data.toString() === buf.toString()
});
// t.equal(proto.data, client.ip)
} else {
// t.fail('no MQTT packet extracted from TCP buffer')
}
// setImmediate(finish)
return proto;
};
var broker = aedes({
decodeProtocol,
trustProxy: true
})
broker.on('client', client => {
console.log('onClientConnect', client.id)
});
broker.on('clientDisconnect', client => {
console.log('onClientDisconnect', client.id)
});
broker.on('publish', (packet, client) => {
if (!packet.topic.startsWith('$SYS')) {
console.log('onPublish', packet)
}
});
var server = net.createServer(broker.handle)
server.listen(brokerPort, function (err) {
// t.error(err, 'no error')
})
var proxyServer = net.createServer()
proxyServer.listen(proxyPort, function (err) {
// t.error(err, 'no error')
})
var proxyClient
proxyServer.on('connection', function(socket) {
socket.on('end', function(data) {
proxyClient.end(data, function() {
proxyClient.connected = false
})
})
socket.on('data', function(data) {
if (proxyClient && proxyClient.connected) {
proxyClient.write(data)
} else {
var protocol = new proxyProtocol.V1BinaryProxyProtocol(
proxyProtocol.INETProtocol.TCP4,
src,
dst,
data
).build()
proxyClient = net.connect({
port: brokerPort,
timeout: 0
}, function() {
proxyClient.write(protocol, function() {
proxyClient.connected = true
})
})
// setTimeout(() => {
// proxyClient.write(protocol)
// }, 50)
}
})
})
var tcpClient
setInterval(() => {
tcpClient = net.connect({
port: proxyPort,
timeout: 0
}, function () {
tcpClient.write(buf)
})
setTimeout(() => {
tcpClient.write(buf3)
}, 200)
setTimeout(() => {
tcpClient = tcpClient.end(buf2)
}, 500)
}, 1000)