forked from Kong/kong-js-pdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.js
155 lines (131 loc) · 3.62 KB
/
listener.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
'use strict'
const net = require('net')
const fs = require('fs')
const {Encoder, Decoder} = require('@msgpack/msgpack')
const TYPE_EXP = /^\[object (.*)\]$/
const ERR_UNKNOWN = 'Unknown plugin listener error encountered'
const toString = Object.prototype.toString
function typeOf(value) {
if (!value) return ''
const parts = TYPE_EXP.exec(toString.call(value))
return parts[1].toLowerCase()
}
function thenable(obj) {
if (!obj) return false
return (typeof obj.then === 'function' && typeof obj.catch === 'function')
}
function write_response (client, msgid, response) {
client.write(client.encoder.encode([
1, // is response
msgid,
undefined,
response
]))
}
function write_error (client, msgid, error) {
client.write(client.encoder.encode([
1, // is response
msgid,
errToString(error),
undefined
]))
}
function errToString (err) {
if (typeof err === 'string') return err
if (has(err, 'message')) return err.message
if (typeof err.toString === 'function') return err.toString()
return ERR_UNKNOWN
}
function getStreamDecoder () {
const decoder = new Decoder()
let buffer
return function (chunk) {
let decoded
try {
let data = chunk
if (buffer !== undefined) {
buffer.push(chunk)
data = Buffer.concat(buffer)
}
decoded = decoder.decode(data)
buffer = undefined
} catch (ex) {
// TODO: less hacky way to detect insufficient data
if (ex.message === 'Insufficient data') {
if (buffer === undefined) {
buffer = [chunk]
}
return
}
throw ex
}
return decoded
}
}
class Listener {
get [Symbol.toStringTag]() {
return 'PluginListener'
}
constructor(pluginServer, prefix) {
this.ps = pluginServer
this.prefix = prefix
this.logger = pluginServer.getLogger()
}
serve() {
const listen_path = this.prefix
const logger = this.logger
try {
fs.unlinkSync(listen_path)
} catch (ex) {
if (ex.code !== 'ENOENT') throw ex
}
const server = net.createServer((client) => {
client.encoder = new Encoder()
const decodeStream = getStreamDecoder()
client.on('data', (chunk) => {
let decoded = decodeStream(chunk)
// partial data received, wait for next chunk
if (!decoded) return
let [_, msgid, method, args] = decoded
let [ns, cmd] = method.split('.')
if (ns != 'plugin') {
write_error(client, msgid, `RPC for ${ns} is not supported`)
return
}
logger.debug(`rpc: #${msgid} method: ${method} args: ${JSON.stringify(args)}`)
if (!this.ps[cmd]) {
const err = `method ${cmd} not implemented`
logger.error(`rpc: #${msgid} ${err}`)
write_error(client, msgid, err)
return
}
let promise
try {
promise = this.ps[cmd](...args)
} catch (ex) {
logger.error(ex.stack)
write_error(client, msgid, ex)
return
}
if (!thenable(promise)) {
const err = `${cmd} should return a Promise or thenable object, got ${typeOf(promise)}`
logger.error(`rpc: #${msgid} ${err}`)
write_error(client, msgid, err)
return
}
promise
.then((ret) => {
write_response(client, msgid, ret)
})
.catch((err) => {
logger.error(`rpc: # ${msgid} ${err}`)
write_error(client, msgid, err)
})
})
})
server.listen(listen_path)
logger.info('server started at', listen_path)
return server
}
}
module.exports = Listener