diff --git a/.prettierignore b/.prettierignore new file mode 100644 index 000000000..8b1c374f5 --- /dev/null +++ b/.prettierignore @@ -0,0 +1,5 @@ +dist +coverage +.github +examples +node_modules diff --git a/.prettierrc.yaml b/.prettierrc.yaml new file mode 100644 index 000000000..45c45fea2 --- /dev/null +++ b/.prettierrc.yaml @@ -0,0 +1,4 @@ +trailingComma: "es5" +tabWidth: 2 +semi: false +singleQuote: true \ No newline at end of file diff --git a/src/adapters/cluster/redis/index.ts b/src/adapters/cluster/redis/index.ts index 98e911bc1..77996c702 100644 --- a/src/adapters/cluster/redis/index.ts +++ b/src/adapters/cluster/redis/index.ts @@ -25,11 +25,11 @@ class RedisClusterAdapter extends ClusterAdapter { this._channelName = `${this.serverName}-channel` this._publisher = createClient({ - url: this.serverUrlExpanded + url: this.serverUrlExpanded, }) const subscriber = this._publisher.duplicate() - this._publisher.on('error', err => { + this._publisher.on('error', (err) => { this.emit('error', err) }) @@ -41,7 +41,7 @@ class RedisClusterAdapter extends ClusterAdapter { this.emit('close', { name: this.name(), adapter: this }) }) - subscriber.on('error', err => { + subscriber.on('error', (err) => { this.emit('error', err) }) @@ -50,14 +50,14 @@ class RedisClusterAdapter extends ClusterAdapter { }) subscriber.on('end', () => { - this.emit('close', { name: this.name(), adapter: this }) + this.emit('close', { name: this.name(), adapter: this }) }) await Promise.all([this._publisher.connect(), subscriber.connect()]) - subscriber.subscribe(this._channelName, serialized => { + subscriber.subscribe(this._channelName, (serialized) => { const message = this.deserializeMessage(serialized) - if ( message ) this.emit('message', message) + if (message) this.emit('message', message) }) this.emit('connect', { name: this.name(), adapter: this }) diff --git a/src/adapters/http/client.ts b/src/adapters/http/client.ts index 1e5a09d4d..d8b35aefa 100644 --- a/src/adapters/http/client.ts +++ b/src/adapters/http/client.ts @@ -4,7 +4,6 @@ import got from 'got' import { HttpAuthConfig, HttpAdapterConfig } from '../../lib/index.js' import http from 'http' class HttpClientAdapter extends Adapter { - name(): string { return 'HTTP client' } @@ -28,14 +27,14 @@ class HttpClientAdapter extends Adapter { const channelInfo = this.parsedAsyncAPI.channel(channelName) const httpChannelBinding = channelInfo.binding('http') const channelServers = channelInfo.servers() - const isChannelServers = !channelServers.length || channelServers.includes(message.serverName) - if ( - httpChannelBinding && isChannelServers - ) { + const isChannelServers = + !channelServers.length || channelServers.includes(message.serverName) + if (httpChannelBinding && isChannelServers) { const method = httpChannelBinding.method const url = `${serverUrl}/${channelName}` const body: any = message.payload - const query: { [key: string]: string } | { [key: string]: string[] } = message.query + const query: { [key: string]: string } | { [key: string]: string[] } = + message.query got({ method, url, diff --git a/src/adapters/http/server.ts b/src/adapters/http/server.ts index ed0424698..d207d2dad 100644 --- a/src/adapters/http/server.ts +++ b/src/adapters/http/server.ts @@ -1,15 +1,15 @@ -import Adapter from "../../lib/adapter.js" -import GleeMessage from "../../lib/message.js" -import http from "http" -import { validateData } from "../../lib/util.js" -import GleeError from "../../errors/glee-error.js" -import * as url from "url" +import Adapter from '../../lib/adapter.js' +import GleeMessage from '../../lib/message.js' +import http from 'http' +import { validateData } from '../../lib/util.js' +import GleeError from '../../errors/glee-error.js' +import * as url from 'url' class HttpAdapter extends Adapter { private httpResponses = new Map() name(): string { - return "HTTP server" + return 'HTTP server' } async connect(): Promise { @@ -20,8 +20,9 @@ class HttpAdapter extends Adapter { return this._send(message) } - async _connect(): Promise { // NOSONAR - const config = await this.resolveProtocolConfig("http") + async _connect(): Promise { + // NOSONAR + const config = await this.resolveProtocolConfig('http') const httpOptions = config?.server const serverUrl = new URL(this.serverUrlExpanded) const httpServer = httpOptions?.httpServer || http.createServer() @@ -29,24 +30,24 @@ class HttpAdapter extends Adapter { const optionsPort = httpOptions?.port const port = optionsPort || asyncapiServerPort - httpServer.on("request", (req, res) => { - res.setHeader("Content-Type", "application/json") + httpServer.on('request', (req, res) => { + res.setHeader('Content-Type', 'application/json') const bodyBuffer = [] let body: object - req.on("data", (chunk) => { + req.on('data', (chunk) => { bodyBuffer.push(chunk) }) - req.on("end", () => { + req.on('end', () => { body = JSON.parse(Buffer.concat(bodyBuffer).toString()) this.httpResponses.set(this.serverName, res) let { pathname } = new URL(req.url, serverUrl) - pathname = pathname.startsWith("/") ? pathname.substring(1) : pathname + pathname = pathname.startsWith('/') ? pathname.substring(1) : pathname if (!this.parsedAsyncAPI.channel(pathname)) { - res.end("HTTP/1.1 404 Not Found1\r\n\r\n") + res.end('HTTP/1.1 404 Not Found1\r\n\r\n') const err = new Error( `A client attempted to connect to channel ${pathname} but this channel is not defined in your AsyncAPI file. here` ) - this.emit("error", err) + this.emit('error', err) return err } const { query } = url.parse(req.url, true) @@ -54,7 +55,7 @@ class HttpAdapter extends Adapter { const payload = body const httpChannelBinding = this.parsedAsyncAPI .channel(pathname) - .binding("http") + .binding('http') if (httpChannelBinding) { this._checkHttpBinding( req, @@ -65,26 +66,33 @@ class HttpAdapter extends Adapter { payload ) } - this.emit("connect", { + this.emit('connect', { name: this.name(), adapter: this, connection: http, channel: pathname, }) const msg = this._createMessage(pathname, payload, searchParams) - this.emit("message", msg, http) + this.emit('message', msg, http) }) }) httpServer.listen(port) - this.emit("server:ready", { name: this.name(), adapter: this }) + this.emit('server:ready', { name: this.name(), adapter: this }) return this } - _checkHttpBinding(req:any, res:any, pathname:any, httpChannelBinding:any, searchParams:any, payload:any) { + _checkHttpBinding( + req: any, + res: any, + pathname: any, + httpChannelBinding: any, + searchParams: any, + payload: any + ) { const { query, body, method } = httpChannelBinding if (method && req.method !== method) { const err = new Error(`Cannot ${req.method} ${pathname}`) - this.emit("error", err) + this.emit('error', err) res.end(err.message) return } @@ -95,7 +103,7 @@ class HttpAdapter extends Adapter { ) if (!isValid) { const err = new GleeError({ humanReadableError, errors }) - this.emit("error", err) + this.emit('error', err) res.end(JSON.stringify(err.errors)) return } @@ -107,7 +115,7 @@ class HttpAdapter extends Adapter { ) if (!isValid) { const err = new GleeError({ humanReadableError, errors }) - this.emit("error", err) + this.emit('error', err) res.end(JSON.stringify(err.errors)) return } @@ -123,7 +131,7 @@ class HttpAdapter extends Adapter { return new GleeMessage({ payload: JSON.parse(JSON.stringify(body)), channel: pathName, - query: JSON.parse(JSON.stringify( params.query)) + query: JSON.parse(JSON.stringify(params.query)), }) } } diff --git a/src/adapters/kafka/index.ts b/src/adapters/kafka/index.ts index 20a395a3e..c856a328a 100644 --- a/src/adapters/kafka/index.ts +++ b/src/adapters/kafka/index.ts @@ -1,7 +1,7 @@ import { Kafka, SASLOptions } from 'kafkajs' import Adapter from '../../lib/adapter.js' import GleeMessage from '../../lib/message.js' -import {KafkaAdapterConfig, KafkaAuthConfig} from '../../lib/index.js' +import { KafkaAdapterConfig, KafkaAuthConfig } from '../../lib/index.js' class KafkaAdapter extends Adapter { private kafka: Kafka @@ -11,7 +11,9 @@ class KafkaAdapter extends Adapter { } async connect() { - const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig('kafka') + const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig( + 'kafka' + ) const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions.auth) const securityRequirements = (this.AsyncAPIServer.security() || []).map( (sec) => { @@ -39,7 +41,10 @@ class KafkaAdapter extends Adapter { cert: auth?.cert, }, sasl: { - mechanism: (scramSha256SecurityReq ? 'scram-sha-256' : undefined) || (scramSha512SecurityReq ? 'scram-sha-512' : undefined) || 'plain', + mechanism: + (scramSha256SecurityReq ? 'scram-sha-256' : undefined) || + (scramSha512SecurityReq ? 'scram-sha-512' : undefined) || + 'plain', username: userAndPasswordSecurityReq ? auth?.username : undefined, password: userAndPasswordSecurityReq ? auth?.password : undefined, } as SASLOptions, @@ -53,13 +58,16 @@ class KafkaAdapter extends Adapter { name: this.name(), adapter: this, connection: consumer, - channels: this.getSubscribedChannels() + channels: this.getSubscribedChannels(), }) } }) await consumer.connect() const subscribedChannels = this.getSubscribedChannels() - await consumer.subscribe({ topics: subscribedChannels, fromBeginning: true }) + await consumer.subscribe({ + topics: subscribedChannels, + fromBeginning: true, + }) await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const msg = this._createMessage(topic, partition, message) @@ -73,11 +81,13 @@ class KafkaAdapter extends Adapter { await producer.connect() await producer.send({ topic: message.channel, - messages: [{ - key: message.headers.key, - value: message.payload, - timestamp: message.headers.timestamp, - }], + messages: [ + { + key: message.headers.key, + value: message.payload, + timestamp: message.headers.timestamp, + }, + ], }) await producer.disconnect() } diff --git a/src/adapters/mqtt/index.ts b/src/adapters/mqtt/index.ts index 438f751fc..188cf4b1f 100644 --- a/src/adapters/mqtt/index.ts +++ b/src/adapters/mqtt/index.ts @@ -2,22 +2,22 @@ import mqtt, { IPublishPacket, MqttClient, QoS } from 'mqtt' import Adapter from '../../lib/adapter.js' import GleeMessage from '../../lib/message.js' import { MqttAuthConfig, MqttAdapterConfig } from '../../lib/index.js' -import { SecurityScheme } from '@asyncapi/parser'; +import { SecurityScheme } from '@asyncapi/parser' interface IMQTTHeaders { - cmd?: string; - retain?: boolean; - qos: QoS; - dup: boolean; - length: number; + cmd?: string + retain?: boolean + qos: QoS + dup: boolean + length: number } interface ClientData { - url?: URL, - auth?: MqttAuthConfig, - serverBinding?: any, - protocolVersion?: number, - userAndPasswordSecurityReq?: SecurityScheme, + url?: URL + auth?: MqttAuthConfig + serverBinding?: any + protocolVersion?: number + userAndPasswordSecurityReq?: SecurityScheme X509SecurityReq?: SecurityScheme } @@ -41,10 +41,11 @@ class MqttAdapter extends Adapter { } private getSecurityReqs() { - const securityRequirements = (this.AsyncAPIServer.security() || []).map(sec => { - const secName = Object.keys(sec.json())[0] - return this.parsedAsyncAPI.components().securityScheme(secName) - } + const securityRequirements = (this.AsyncAPIServer.security() || []).map( + (sec) => { + const secName = Object.keys(sec.json())[0] + return this.parsedAsyncAPI.components().securityScheme(secName) + } ) const userAndPasswordSecurityReq = securityRequirements.find( (sec) => sec.type() === 'userPassword' @@ -55,23 +56,20 @@ class MqttAdapter extends Adapter { return { userAndPasswordSecurityReq, - X509SecurityReq + X509SecurityReq, } - } private async initializeClient(data: ClientData) { - const { url, auth, serverBinding, protocolVersion, userAndPasswordSecurityReq, - X509SecurityReq + X509SecurityReq, } = data - return mqtt.connect({ host: url.hostname, port: url.port || (url.protocol === 'mqtt:' ? 1883 : 8883), @@ -82,15 +80,11 @@ class MqttAdapter extends Adapter { topic: serverBinding?.lastWill?.topic, qos: serverBinding?.lastWill?.qos, payload: serverBinding?.lastWill?.message, - retain: serverBinding?.lastWill?.retain + retain: serverBinding?.lastWill?.retain, }, keepalive: serverBinding?.keepAlive, - username: userAndPasswordSecurityReq - ? auth?.username - : undefined, - password: userAndPasswordSecurityReq - ? auth?.password - : undefined, + username: userAndPasswordSecurityReq ? auth?.username : undefined, + password: userAndPasswordSecurityReq ? auth?.password : undefined, ca: X509SecurityReq ? auth?.cert : undefined, protocolVersion, customHandleAcks: this._customAckHandler.bind(this), @@ -98,7 +92,6 @@ class MqttAdapter extends Adapter { } private async listenToEvents(data: ClientData) { - const { protocolVersion } = data this.client.on('close', () => { @@ -114,7 +107,7 @@ class MqttAdapter extends Adapter { this.client.on('message', (channel, message, mqttPacket) => { const qos = mqttPacket.qos - if (protocolVersion === 5 && qos > 0) return // ignore higher qos messages. already processed + if (protocolVersion === 5 && qos > 0) return // ignore higher qos messages. already processed const msg = this._createMessage(mqttPacket as IPublishPacket) this.emit('message', msg, this.client) @@ -142,18 +135,24 @@ class MqttAdapter extends Adapter { } async _connect(): Promise { - const mqttOptions: MqttAdapterConfig = await this.resolveProtocolConfig('mqtt') + const mqttOptions: MqttAdapterConfig = await this.resolveProtocolConfig( + 'mqtt' + ) const auth: MqttAuthConfig = await this.getAuthConfig(mqttOptions.auth) const subscribedChannels = this.getSubscribedChannels() const mqttServerBinding = this.AsyncAPIServer.binding('mqtt') const mqtt5ServerBinding = this.AsyncAPIServer.binding('mqtt5') - const { userAndPasswordSecurityReq, X509SecurityReq } = this.getSecurityReqs() + const { userAndPasswordSecurityReq, X509SecurityReq } = + this.getSecurityReqs() const url = new URL(this.AsyncAPIServer.url()) - const protocolVersion = parseInt(this.AsyncAPIServer.protocolVersion() || '4') - const serverBinding = protocolVersion === 5 ? mqtt5ServerBinding : mqttServerBinding + const protocolVersion = parseInt( + this.AsyncAPIServer.protocolVersion() || '4' + ) + const serverBinding = + protocolVersion === 5 ? mqtt5ServerBinding : mqttServerBinding this.client = await this.initializeClient({ url, @@ -161,21 +160,22 @@ class MqttAdapter extends Adapter { serverBinding, protocolVersion, userAndPasswordSecurityReq, - X509SecurityReq + X509SecurityReq, }) await this.listenToEvents({ protocolVersion }) const connectClient = (): Promise => { return new Promise((resolve) => { - this.client.on('connect', connAckPacket => { + this.client.on('connect', (connAckPacket) => { const isSessionResume = connAckPacket.sessionPresent if (!this.firstConnect) { this.checkFirstConnect() } - const shouldSubscribe = !isSessionResume && Array.isArray(subscribedChannels) + const shouldSubscribe = + !isSessionResume && Array.isArray(subscribedChannels) if (shouldSubscribe) { this.subscribe(subscribedChannels) @@ -187,14 +187,11 @@ class MqttAdapter extends Adapter { } return connectClient() - } _send(message: GleeMessage): Promise { return new Promise((resolve, reject) => { - const operation = this.parsedAsyncAPI - .channel(message.channel) - .subscribe() + const operation = this.parsedAsyncAPI.channel(message.channel).subscribe() const binding = operation ? operation.binding('mqtt') : undefined this.client.publish( message.channel, diff --git a/src/adapters/socket.io/index.ts b/src/adapters/socket.io/index.ts index 8c26a1742..b1d105b14 100644 --- a/src/adapters/socket.io/index.ts +++ b/src/adapters/socket.io/index.ts @@ -37,7 +37,8 @@ class SocketIOAdapter extends Adapter { const server = websocketOptions.httpServer if (!optionsPort && String(server.address().port) !== String(port)) { console.error( - `Your custom HTTP server is listening on port ${server.address().port + `Your custom HTTP server is listening on port ${ + server.address().port } but your AsyncAPI file says it must listen on ${port}. Please fix the inconsistency.` ) process.exit(1) diff --git a/src/adapters/ws/client.ts b/src/adapters/ws/client.ts index 5aa9aed08..133927074 100644 --- a/src/adapters/ws/client.ts +++ b/src/adapters/ws/client.ts @@ -1,20 +1,20 @@ /* eslint-disable security/detect-object-injection */ -import Adapter from "../../lib/adapter.js" -import GleeMessage from "../../lib/message.js" -import ws from "ws" +import Adapter from '../../lib/adapter.js' +import GleeMessage from '../../lib/message.js' +import ws from 'ws' import { WsAuthConfig, WebsocketAdapterConfig } from '../../lib/index.js' interface Client { - channel: string; - client: ws; - binding?: any; + channel: string + client: ws + binding?: any } class WsClientAdapter extends Adapter { private clients: Array = [] name(): string { - return "WS adapter" + return 'WS adapter' } async connect(): Promise { @@ -30,24 +30,23 @@ class WsClientAdapter extends Adapter { for (const channel of channelsOnThisServer) { const headers = {} - const wsOptions: WebsocketAdapterConfig = await this.resolveProtocolConfig('ws') + const wsOptions: WebsocketAdapterConfig = + await this.resolveProtocolConfig('ws') const auth: WsAuthConfig = await this.getAuthConfig(wsOptions.client.auth) headers['Authentication'] = `bearer ${auth?.token}` - const url = new URL( - this.AsyncAPIServer.url() + channel - ) + const url = new URL(this.AsyncAPIServer.url() + channel) this.clients.push({ channel, client: new ws(url, { headers }), - binding: this.parsedAsyncAPI.channel(channel).binding("ws"), + binding: this.parsedAsyncAPI.channel(channel).binding('ws'), }) } for (const { client, channel } of this.clients) { - client.on("open", () => { - this.emit("connect", { + client.on('open', () => { + this.emit('connect', { name: this.name(), adapter: this, connection: client, @@ -55,14 +54,14 @@ class WsClientAdapter extends Adapter { }) }) - client.on("message", (data) => { + client.on('message', (data) => { const msg = this._createMessage(channel, data) - this.emit("message", msg, client) + this.emit('message', msg, client) }) - client.on("error", (err) => { + client.on('error', (err) => { console.log('GETING ERROR') - this.emit("error", err) + this.emit('error', err) }) } return this @@ -71,7 +70,7 @@ class WsClientAdapter extends Adapter { private getWsChannels() { const channels = [] for (const channel of this.channelNames) { - if (this.parsedAsyncAPI.channel(channel).hasBinding("ws")) { + if (this.parsedAsyncAPI.channel(channel).hasBinding('ws')) { if (this.parsedAsyncAPI.channel(channel).hasServers()) { if ( this.parsedAsyncAPI @@ -98,7 +97,7 @@ class WsClientAdapter extends Adapter { client.send(message.payload) } else { throw new Error( - "There is no WebSocker connection to send the message yet." + 'There is no WebSocker connection to send the message yet.' ) } } diff --git a/src/adapters/ws/server.ts b/src/adapters/ws/server.ts index 18f9a3a1e..4591b74f0 100644 --- a/src/adapters/ws/server.ts +++ b/src/adapters/ws/server.ts @@ -7,12 +7,11 @@ import GleeMessage from '../../lib/message.js' import GleeError from '../../errors/glee-error.js' type QueryData = { - searchParams: URLSearchParams, + searchParams: URLSearchParams query: any } class WebSocketsAdapter extends Adapter { - name(): string { return 'WebSockets adapter' } @@ -27,7 +26,9 @@ class WebSocketsAdapter extends Adapter { private emitPathnameError(socket, pathname: string) { socket.end('HTTP/1.1 404 Not Found\r\n\r\n') - const err = new Error(`A client attempted to connect to channel ${pathname} but this channel is not defined in your AsyncAPI file.`) + const err = new Error( + `A client attempted to connect to channel ${pathname} but this channel is not defined in your AsyncAPI file.` + ) this.emit('error', err) throw err } @@ -47,7 +48,6 @@ class WebSocketsAdapter extends Adapter { }) return validateData(Object.fromEntries(queryParams.entries()), query) - } private checkHeaders(requestDetails) { @@ -59,20 +59,28 @@ class WebSocketsAdapter extends Adapter { const { servers, ws, pathname, request } = serverData servers.get(pathname).emit('connect', ws, request) - + ws.on('message', (payload) => { const msg = this._createMessage(pathname, payload) this.emit('message', msg, ws) }) - this.emit('server:connection:open', { name: this.name(), adapter: this, connection: ws, channel: pathname, request }) + this.emit('server:connection:open', { + name: this.name(), + adapter: this, + connection: ws, + channel: pathname, + request, + }) } private pathnameChecks(socket, pathname: string, serverOptions) { - const { serverUrl, servers } = serverOptions - if (!pathname.startsWith(serverUrl.pathname) && !pathname.startsWith(`/${serverUrl.pathname}`)) { + if ( + !pathname.startsWith(serverUrl.pathname) && + !pathname.startsWith(`/${serverUrl.pathname}`) + ) { this.emitPathnameError(socket, pathname) } @@ -82,7 +90,11 @@ class WebSocketsAdapter extends Adapter { // If pathname is /something but AsyncAPI file says the channel name is "something" // then we convert pathname to "something". - if (pathname.startsWith('/') && !servers.has(pathname) && servers.has(pathname.substring(1))) { + if ( + pathname.startsWith('/') && + !servers.has(pathname) && + servers.has(pathname.substring(1)) + ) { pathname = pathname.substring(1) } @@ -94,13 +106,19 @@ class WebSocketsAdapter extends Adapter { } private portChecks(portOptions) { - const { port, config, optionsPort, wsHttpServer } = portOptions - const checkWrongPort = !optionsPort && config?.httpServer && String(wsHttpServer.address().port) !== String(port) + const checkWrongPort = + !optionsPort && + config?.httpServer && + String(wsHttpServer.address().port) !== String(port) if (checkWrongPort) { - console.error(`Your custom HTTP server is listening on port ${wsHttpServer.address().port} but your AsyncAPI file says it must listen on ${port}. Please fix the inconsistency.`) + console.error( + `Your custom HTTP server is listening on port ${ + wsHttpServer.address().port + } but your AsyncAPI file says it must listen on ${port}. Please fix the inconsistency.` + ) process.exit(1) } } @@ -119,17 +137,19 @@ class WebSocketsAdapter extends Adapter { wsHttpServer, asyncapiServerPort, optionsPort, - port + port, } } private checkBindings(socket, bindingOpts) { - const { wsChannelBinding, request, searchParams } = bindingOpts const { query, headers } = wsChannelBinding if (query) { - const { isValid, humanReadableError, errors } = this.checkQuery({searchParams, query}) + const { isValid, humanReadableError, errors } = this.checkQuery({ + searchParams, + query, + }) if (!isValid) { this.emitGleeError(socket, { humanReadableError, errors }) @@ -138,7 +158,10 @@ class WebSocketsAdapter extends Adapter { } if (headers) { - const { isValid, humanReadableError, errors } = this.checkHeaders({request, headers}) + const { isValid, humanReadableError, errors } = this.checkHeaders({ + request, + headers, + }) if (!isValid) { this.emitGleeError(socket, { humanReadableError, errors }) return false @@ -149,32 +172,36 @@ class WebSocketsAdapter extends Adapter { } async _connect(): Promise { - const { - config, - serverUrl, - wsHttpServer, - optionsPort, - port - } = await this.initializeConstants() + const { config, serverUrl, wsHttpServer, optionsPort, port } = + await this.initializeConstants() this.portChecks({ port, config, optionsPort, wsHttpServer }) const servers = new Map() - this.channelNames.forEach(channelName => { + this.channelNames.forEach((channelName) => { servers.set(channelName, new WebSocket.Server({ noServer: true })) }) wsHttpServer.on('upgrade', (request, socket, head) => { let { pathname } = new URL(request.url, `ws://${request.headers.host}`) - pathname = this.pathnameChecks(socket, pathname, {serverUrl, servers}) + pathname = this.pathnameChecks(socket, pathname, { serverUrl, servers }) - const { searchParams } = new URL(request.url, `ws://${request.headers.host}`) - const wsChannelBinding = this.parsedAsyncAPI.channel(pathname).binding('ws') + const { searchParams } = new URL( + request.url, + `ws://${request.headers.host}` + ) + const wsChannelBinding = this.parsedAsyncAPI + .channel(pathname) + .binding('ws') if (wsChannelBinding) { - const correctBindings = this.checkBindings(socket, { wsChannelBinding, request, searchParams}) - if(!correctBindings) return + const correctBindings = this.checkBindings(socket, { + wsChannelBinding, + request, + searchParams, + }) + if (!correctBindings) return } if (servers.has(pathname)) { @@ -199,15 +226,18 @@ class WebSocketsAdapter extends Adapter { if (message.broadcast) { this.glee.syncCluster(message) - this - .connections + this.connections .filter(({ channels }) => channels.includes(message.channel)) .forEach((connection) => { connection.getRaw().send(message.payload) }) } else { - if (!message.connection) throw new Error('There is no WebSocket connection to send the message yet.') - if (!(message.connection instanceof GleeConnection)) throw new Error('Connection object is not of GleeConnection type.') + if (!message.connection) + {throw new Error( + 'There is no WebSocket connection to send the message yet.' + )} + if (!(message.connection instanceof GleeConnection)) + {throw new Error('Connection object is not of GleeConnection type.')} message.connection.getRaw().send(message.payload) } } @@ -215,7 +245,7 @@ class WebSocketsAdapter extends Adapter { _createMessage(eventName: string, payload: any): GleeMessage { return new GleeMessage({ payload, - channel: eventName + channel: eventName, }) } } diff --git a/src/cli/index.ts b/src/cli/index.ts index 4a189e7bb..b6f227006 100755 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -3,7 +3,7 @@ import { compileAndWatch } from '../lib/compiler.js' import spawn from 'cross-spawn' import { logLineWithIcon, logTypeScriptMessage } from '../lib/logger.js' -import docs from "../docs.js" +import docs from '../docs.js' const args = process.argv.splice(2) const command = args[0] @@ -44,7 +44,7 @@ if (command === 'dev') { } else if (command === 'start') { import('./start.js') } else if (command === 'docs') { - docs().catch(e => logTypeScriptMessage(e)) + docs().catch((e) => logTypeScriptMessage(e)) } else { console.error(`Unknown command "${args[0]}"`) } diff --git a/src/errors/glee-error.ts b/src/errors/glee-error.ts index edf880621..1d9049fd7 100644 --- a/src/errors/glee-error.ts +++ b/src/errors/glee-error.ts @@ -1,7 +1,7 @@ export default class ValidationError extends Error { private _errors: Error[] private _details: string - + constructor({ humanReadableError, errors }) { super(humanReadableError) this._errors = errors diff --git a/src/index.ts b/src/index.ts index 01bacb914..aa5328887 100755 --- a/src/index.ts +++ b/src/index.ts @@ -5,8 +5,14 @@ import Glee from './lib/glee.js' import { logWelcome, logLineWithIcon } from './lib/logger.js' import experimentalFlags from './lib/experimentalFlags.js' import registerAdapters from './registerAdapters.js' -import { register as registerLifecycleEvents, run as runLifecycleEvents } from './lib/lifecycleEvents.js' -import { register as registerFunctions, trigger as triggerFunction } from './lib/functions.js' +import { + register as registerLifecycleEvents, + run as runLifecycleEvents, +} from './lib/lifecycleEvents.js' +import { + register as registerFunctions, + trigger as triggerFunction, +} from './lib/functions.js' import buffer2string from './middlewares/buffer2string.js' import string2json from './middlewares/string2json.js' import json2string from './middlewares/json2string.js' @@ -24,14 +30,10 @@ import { ClusterEvent } from './lib/cluster.js' dotenvExpand(dotenv.config()) -export default async function GleeAppInitializer () { +export default async function GleeAppInitializer() { const config = await initializeConfigs() - const { - GLEE_DIR, - GLEE_PROJECT_DIR, - GLEE_LIFECYCLE_DIR, - GLEE_FUNCTIONS_DIR, - } = config + const { GLEE_DIR, GLEE_PROJECT_DIR, GLEE_LIFECYCLE_DIR, GLEE_FUNCTIONS_DIR } = + config logWelcome({ dev: process.env.NODE_ENV === 'development', @@ -69,18 +71,30 @@ export default async function GleeAppInitializer () { if (channel.hasPublish()) { const operationId = channel.publish().json('operationId') if (operationId) { - const schema = {oneOf: channel.publish().messages().map(message => message.payload().json())} as any + const schema = { + oneOf: channel + .publish() + .messages() + .map((message) => message.payload().json()), + } as any app.use(channelName, validate(schema), (event, next) => { triggerFunction({ app, operationId, message: event, - }).then(next).catch(next) + }) + .then(next) + .catch(next) }) } } if (channel.hasSubscribe()) { - const schema = {oneOf: channel.subscribe().messages().map(message => message.payload().json())} as any + const schema = { + oneOf: channel + .subscribe() + .messages() + .map((message) => message.payload().json()), + } as any app.useOutbound(channelName, validate(schema), json2string) } }) @@ -95,7 +109,7 @@ export default async function GleeAppInitializer () { connection: e.connection, }) }) - + app.on('adapter:reconnect', async (e: EnrichedEvent) => { logLineWithIcon('↪', `Reconnected to server ${e.serverName}.`, { highlightedWords: [e.serverName], @@ -107,7 +121,7 @@ export default async function GleeAppInitializer () { connection: e.connection, }) }) - + app.on('adapter:close', async (e: EnrichedEvent) => { logLineWithIcon('x', `Closed connection with server ${e.serverName}.`, { highlightedWords: [e.serverName], @@ -122,9 +136,13 @@ export default async function GleeAppInitializer () { }) app.on('adapter:server:ready', async (e: EnrichedEvent) => { - logLineWithIcon(':zap:', `Server ${e.serverName} is ready to accept connections.`, { - highlightedWords: [e.serverName], - }) + logLineWithIcon( + ':zap:', + `Server ${e.serverName} is ready to accept connections.`, + { + highlightedWords: [e.serverName], + } + ) await runLifecycleEvents('onServerReady', { glee: app, serverName: e.serverName, @@ -138,7 +156,7 @@ export default async function GleeAppInitializer () { connection: e.connection, }) }) - + app.on('adapter:server:connection:close', async (e: EnrichedEvent) => { await runLifecycleEvents('onServerConnectionClose', { glee: app, @@ -168,7 +186,5 @@ export default async function GleeAppInitializer () { }) }) - app - .listen() - .catch(console.error) + app.listen().catch(console.error) } diff --git a/src/lib/adapter.ts b/src/lib/adapter.ts index d604c16fc..c8bac0482 100644 --- a/src/lib/adapter.ts +++ b/src/lib/adapter.ts @@ -8,9 +8,9 @@ import GleeMessage from './message.js' import { resolveFunctions } from './util.js' export type EnrichedEvent = { - connection?: GleeConnection, - serverName: string, - server: Server, + connection?: GleeConnection + serverName: string + server: Server } class GleeAdapter extends EventEmitter { @@ -30,7 +30,12 @@ class GleeAdapter extends EventEmitter { * @param {AsyncAPIServer} server The AsyncAPI server to use for the connection. * @param {AsyncAPIDocument} parsedAsyncAPI The AsyncAPI document. */ - constructor (glee: Glee, serverName: string, server: Server, parsedAsyncAPI: AsyncAPIDocument) { + constructor( + glee: Glee, + serverName: string, + server: Server, + parsedAsyncAPI: AsyncAPIDocument + ) { super() this._glee = glee @@ -42,17 +47,23 @@ class GleeAdapter extends EventEmitter { this._connections = [] const uriTemplateValues = new Map() - process.env.GLEE_SERVER_VARIABLES?.split(',').forEach(t => { + process.env.GLEE_SERVER_VARIABLES?.split(',').forEach((t) => { const [localServerName, variable, value] = t.split(':') - if (localServerName === this._serverName) uriTemplateValues.set(variable, value) + if (localServerName === this._serverName) + {uriTemplateValues.set(variable, value)} }) - this._serverUrlExpanded = uriTemplates(this._AsyncAPIServer.url()).fill(Object.fromEntries(uriTemplateValues.entries())) + this._serverUrlExpanded = uriTemplates(this._AsyncAPIServer.url()).fill( + Object.fromEntries(uriTemplateValues.entries()) + ) - this.on('error', err => { this._glee.injectError(err) }) + this.on('error', (err) => { + this._glee.injectError(err) + }) this.on('message', (message, connection) => { const conn = new GleeConnection({ connection, - channels: this._connections.find(c => c.rawConnection === connection).channels, + channels: this._connections.find((c) => c.rawConnection === connection) + .channels, serverName, server, parsedAsyncAPI, @@ -66,11 +77,15 @@ class GleeAdapter extends EventEmitter { ...{ serverName, server, - } + }, } } - function createConnection(ev: { channels?: string[], channel?: string, connection: any }): GleeConnection { + function createConnection(ev: { + channels?: string[] + channel?: string + connection: any + }): GleeConnection { let channels = ev.channels if (!channels && ev.channel) channels = [ev.channel] @@ -87,9 +102,12 @@ class GleeAdapter extends EventEmitter { const conn = createConnection(ev) this._connections.push(conn) - this._glee.emit('adapter:connect', enrichEvent({ - connection: conn, - })) + this._glee.emit( + 'adapter:connect', + enrichEvent({ + connection: conn, + }) + ) }) this.on('server:ready', (ev) => { @@ -100,25 +118,34 @@ class GleeAdapter extends EventEmitter { const conn = createConnection(ev) this._connections.push(conn) - this._glee.emit('adapter:server:connection:open', enrichEvent({ - connection: conn, - })) + this._glee.emit( + 'adapter:server:connection:open', + enrichEvent({ + connection: conn, + }) + ) }) this.on('reconnect', (ev) => { const conn = createConnection(ev) - this._glee.emit('adapter:reconnect', enrichEvent({ - connection: conn, - })) + this._glee.emit( + 'adapter:reconnect', + enrichEvent({ + connection: conn, + }) + ) }) this.on('close', (ev) => { const conn = createConnection(ev) - this._glee.emit('adapter:close', enrichEvent({ - connection: conn, - })) + this._glee.emit( + 'adapter:close', + enrichEvent({ + connection: conn, + }) + ) }) } @@ -151,8 +178,8 @@ class GleeAdapter extends EventEmitter { } async resolveProtocolConfig(protocol: string) { - if(!this.glee.options[protocol]) return undefined - const protocolConfig = {...this.glee.options[protocol]} + if (!this.glee.options[protocol]) return undefined + const protocolConfig = { ...this.glee.options[protocol] } if (!protocolConfig) return undefined await resolveFunctions(protocolConfig) @@ -166,21 +193,25 @@ class GleeAdapter extends EventEmitter { return auth } - return await auth({serverName: this._serverName, parsedAsyncAPI: this._parsedAsyncAPI}) + return await auth({ + serverName: this._serverName, + parsedAsyncAPI: this._parsedAsyncAPI, + }) } /** * Returns a list of the channels a given adapter has to subscribe to. */ getSubscribedChannels(): string[] { - return this._channelNames - .filter(channelName => { - const channel = this._parsedAsyncAPI.channel(channelName) - if (!channel.hasPublish()) return false - - const channelServers = channel.hasServers() ? channel.servers() : channel.ext('x-servers') || this._parsedAsyncAPI.serverNames() - return channelServers.includes(this._serverName) - }) + return this._channelNames.filter((channelName) => { + const channel = this._parsedAsyncAPI.channel(channelName) + if (!channel.hasPublish()) return false + + const channelServers = channel.hasServers() + ? channel.servers() + : channel.ext('x-servers') || this._parsedAsyncAPI.serverNames() + return channelServers.includes(this._serverName) + }) } /** diff --git a/src/lib/cluster.ts b/src/lib/cluster.ts index 44ba5761c..8f6096001 100644 --- a/src/lib/cluster.ts +++ b/src/lib/cluster.ts @@ -7,7 +7,7 @@ import { validateData } from './util.js' import GleeError from '../errors/glee-error.js' export type ClusterEvent = { - serverName: string, + serverName: string adapter: GleeClusterAdapter } @@ -19,17 +19,17 @@ const ClusterMessageSchema = { headers: { type: 'object', propertyNames: { type: 'string' }, - additionProperties: { type: 'string' } + additionProperties: { type: 'string' }, }, channel: { type: 'string' }, serverName: { type: 'string' }, broadcast: { type: 'boolean' }, cluster: { type: 'boolean' }, outbound: { type: 'boolean' }, - inbound: { type: 'boolean' } + inbound: { type: 'boolean' }, }, required: ['instanceId', 'payload', 'channel', 'serverName', 'broadcast'], - additionalProperties: false + additionalProperties: false, } class GleeClusterAdapter extends EventEmitter { @@ -43,7 +43,7 @@ class GleeClusterAdapter extends EventEmitter { * * @param {Glee} glee A reference to the Glee app. */ - constructor (glee: Glee) { + constructor(glee: Glee) { super() this._instanceId = uuidv4() @@ -52,40 +52,47 @@ class GleeClusterAdapter extends EventEmitter { this._serverName = serverName const url = this._glee.options?.cluster?.url - if ( !url ) { - console.log('Please provide a URL for your cluster adapter in glee.config.js') + if (!url) { + console.log( + 'Please provide a URL for your cluster adapter in glee.config.js' + ) process.exit(1) } const uriTemplateValues = new Map() - process.env.GLEE_SERVER_VARIABLES?.split(',').forEach(t => { + process.env.GLEE_SERVER_VARIABLES?.split(',').forEach((t) => { const [localServerName, variable, value] = t.split(':') - if (localServerName === this._serverName) uriTemplateValues.set(variable, value) + if (localServerName === this._serverName) + {uriTemplateValues.set(variable, value)} }) - this._serverUrlExpanded = uriTemplates(url).fill(Object.fromEntries(uriTemplateValues.entries())) + this._serverUrlExpanded = uriTemplates(url).fill( + Object.fromEntries(uriTemplateValues.entries()) + ) function genClusterEvent(ev): ClusterEvent { return { ...ev, - serverName + serverName, } } - this.on('error', err => { this._glee.injectError(err) }) - this.on('message', message => { + this.on('error', (err) => { + this._glee.injectError(err) + }) + this.on('message', (message) => { message.cluster = true this._glee.send(message) }) - this.on('connect', ev => { + this.on('connect', (ev) => { this._glee.emit('adapter:cluster:connect', genClusterEvent(ev)) }) - this.on('reconnect', ev => { + this.on('reconnect', (ev) => { this._glee.emit('adapter:cluster:reconnect', genClusterEvent(ev)) }) - - this.on('close', ev => { + + this.on('close', (ev) => { this._glee.emit('adapter:cluster:close', genClusterEvent(ev)) }) } @@ -138,7 +145,7 @@ class GleeClusterAdapter extends EventEmitter { broadcast: message.broadcast, cluster: message.cluster, inbound: message.isInbound(), - outbound: message.isOutbound() + outbound: message.isOutbound(), }) } @@ -152,11 +159,14 @@ class GleeClusterAdapter extends EventEmitter { let messageData try { messageData = JSON.parse(serialized) - const { errors, humanReadableError, isValid } = validateData(messageData, ClusterMessageSchema) - if ( !isValid ) { + const { errors, humanReadableError, isValid } = validateData( + messageData, + ClusterMessageSchema + ) + if (!isValid) { throw new GleeError({ humanReadableError, errors }) } - } catch ( e ) { + } catch (e) { this._glee.injectError(e) return } @@ -164,11 +174,11 @@ class GleeClusterAdapter extends EventEmitter { let payload = messageData.payload try { payload = JSON.parse(messageData.payload) - } catch ( e ) { + } catch (e) { // payload isn't JSON } - if ( messageData.instanceId === this._instanceId ) return + if (messageData.instanceId === this._instanceId) return const message = new GleeMessage({ payload: payload, @@ -176,7 +186,7 @@ class GleeClusterAdapter extends EventEmitter { channel: messageData.channel, serverName: messageData.serverName, broadcast: messageData.broadcast, - cluster: messageData.cluster + cluster: messageData.cluster, }) if (messageData.inbound && !messageData.outbound) { @@ -184,10 +194,9 @@ class GleeClusterAdapter extends EventEmitter { } else { message.setOutbound() } - + return message } - } export default GleeClusterAdapter diff --git a/src/lib/compiler.ts b/src/lib/compiler.ts index 832d329e0..1c9cbd6a2 100644 --- a/src/lib/compiler.ts +++ b/src/lib/compiler.ts @@ -3,20 +3,22 @@ import ts from 'typescript' import { logTypeScriptError } from './logger.js' const formatHost: ts.FormatDiagnosticsHost = { - getCanonicalFileName: path => path, + getCanonicalFileName: (path) => path, getCurrentDirectory: ts.sys.getCurrentDirectory, getNewLine: () => ts.sys.newLine, } interface ICompileAndWatch { - projectDir: string, - onStart?: () => void, - onFileChanged?: () => void, - onCompilationFailed?: (message:string, error:ts.Diagnostic) => void, - onCompilationDone?: () => void, + projectDir: string + onStart?: () => void + onFileChanged?: () => void + onCompilationFailed?: (message: string, error: ts.Diagnostic) => void + onCompilationDone?: () => void } -const noop = function() { /* Do nothing */ } +const noop = function () { + /* Do nothing */ +} export function compileAndWatch({ projectDir, @@ -24,22 +26,30 @@ export function compileAndWatch({ onFileChanged = noop, onCompilationFailed = noop, onCompilationDone = noop, -} : ICompileAndWatch) { +}: ICompileAndWatch) { const tsConfigPath = resolve(projectDir, 'tsconfig.json') if (!ts.sys.fileExists(tsConfigPath)) { - ts.sys.writeFile(tsConfigPath, JSON.stringify({ // eslint-disable-line security/detect-non-literal-fs-filename - compilerOptions: { - allowJs: true, - target: 'es6', - esModuleInterop: true, - moduleResolution: 'node', - module: 'commonjs', - } - }, undefined, 2)) + ts.sys.writeFile( + tsConfigPath, + JSON.stringify( + { + // eslint-disable-line security/detect-non-literal-fs-filename + compilerOptions: { + allowJs: true, + target: 'es6', + esModuleInterop: true, + moduleResolution: 'node', + module: 'commonjs', + }, + }, + undefined, + 2 + ) + ) } const createProgram = ts.createSemanticDiagnosticsBuilderProgram - + const host = ts.createWatchCompilerHost( tsConfigPath, { @@ -49,7 +59,7 @@ export function compileAndWatch({ target: ts.ScriptTarget.ES2016, module: ts.ModuleKind.ES2020, esModuleInterop: true, - } as {[key: string]: any}, + } as { [key: string]: any }, ts.sys, createProgram, reportDiagnostic, @@ -57,36 +67,64 @@ export function compileAndWatch({ ) const origCreateProgram = host.createProgram - host.createProgram = (rootNames: ReadonlyArray, options, host, oldProgram) => { + host.createProgram = ( + rootNames: ReadonlyArray, + options, + host, + oldProgram + ) => { return origCreateProgram(rootNames, options, host, oldProgram) } const origPostProgramCreate = host.afterProgramCreate - host.afterProgramCreate = program => { + host.afterProgramCreate = (program) => { origPostProgramCreate(program) } ts.createWatchProgram(host) function reportDiagnostic(diagnostic: ts.Diagnostic) { - const fileName = relative(process.cwd(), diagnostic.file.getSourceFile().fileName) + const fileName = relative( + process.cwd(), + diagnostic.file.getSourceFile().fileName + ) const { line, character } = diagnostic.file.getLineAndCharacterOfPosition( diagnostic.start ) - logTypeScriptError(diagnostic.code, ts.flattenDiagnosticMessageText(diagnostic.messageText, formatHost.getNewLine()), fileName, line, character) + logTypeScriptError( + diagnostic.code, + ts.flattenDiagnosticMessageText( + diagnostic.messageText, + formatHost.getNewLine() + ), + fileName, + line, + character + ) } - function onWatchStatusChanged(diagnostic: ts.Diagnostic, newLine: string, options: ts.CompilerOptions, errorCount?: number) { + function onWatchStatusChanged( + diagnostic: ts.Diagnostic, + newLine: string, + options: ts.CompilerOptions, + errorCount?: number + ) { switch (diagnostic.code) { - case 6031: // Starting compilation - return onStart() - case 6032: // File change detected - return onFileChanged() - case 6194: // Found x errors. Watching for file changes... - if (errorCount === 0) { - return onCompilationDone() - } - return onCompilationFailed(ts.flattenDiagnosticMessageText(diagnostic.messageText, ts.sys.newLine), diagnostic) + case 6031: // Starting compilation + return onStart() + case 6032: // File change detected + return onFileChanged() + case 6194: // Found x errors. Watching for file changes... + if (errorCount === 0) { + return onCompilationDone() + } + return onCompilationFailed( + ts.flattenDiagnosticMessageText( + diagnostic.messageText, + ts.sys.newLine + ), + diagnostic + ) } } } diff --git a/src/lib/configs.ts b/src/lib/configs.ts index e122e65ac..438582581 100644 --- a/src/lib/configs.ts +++ b/src/lib/configs.ts @@ -4,7 +4,7 @@ import { pathToFileURL } from 'url' import { logErrorLine, logWarningMessage } from './logger.js' interface Config { - functionsDir?: string, + functionsDir?: string } let GLEE_DIR: string @@ -16,45 +16,57 @@ let GLEE_CONFIG_FILE_PATH_JS: string let GLEE_CONFIG_FILE_PATH_TS: string let ASYNCAPI_FILE_PATH: string -let errorMessage: string -export async function initializeConfigs(config: Config = {}): Promise<{ [key: string]: string }> { +let errorMessage: string +export async function initializeConfigs( + config: Config = {} +): Promise<{ [key: string]: string }> { GLEE_PROJECT_DIR = process.cwd() GLEE_DIR = path.resolve(GLEE_PROJECT_DIR, '.glee') - GLEE_LIFECYCLE_DIR = path.resolve(GLEE_DIR, config.functionsDir || 'lifecycle') - GLEE_FUNCTIONS_DIR = path.resolve(GLEE_DIR, config.functionsDir || 'functions') + GLEE_LIFECYCLE_DIR = path.resolve( + GLEE_DIR, + config.functionsDir || 'lifecycle' + ) + GLEE_FUNCTIONS_DIR = path.resolve( + GLEE_DIR, + config.functionsDir || 'functions' + ) GLEE_CONFIG_FILE_PATH_TS = path.resolve(GLEE_DIR, 'glee.config.ts') GLEE_CONFIG_FILE_PATH_JS = path.resolve(GLEE_DIR, 'glee.config.js') const configJSExists = existsSync(GLEE_CONFIG_FILE_PATH_JS) const configTSExists = existsSync(GLEE_CONFIG_FILE_PATH_TS) - GLEE_CONFIG_FILE_PATH = configTSExists ? GLEE_CONFIG_FILE_PATH_TS : GLEE_CONFIG_FILE_PATH_JS + GLEE_CONFIG_FILE_PATH = configTSExists + ? GLEE_CONFIG_FILE_PATH_TS + : GLEE_CONFIG_FILE_PATH_JS - if(configTSExists && configJSExists) { + if (configTSExists && configJSExists) { logWarningMessage( `Both 'glee.config.js' and 'glee.config.ts' files were found at ${GLEE_DIR}. The 'glee.config.ts' file will be used and 'glee.config.js' will be ignored. - Consider migrating 'glee.config.js' to TypeScript or removing it.`, { - highlightedWords: ['glee.config.js', 'glee.config.ts'] - }) + Consider migrating 'glee.config.js' to TypeScript or removing it.`, + { + highlightedWords: ['glee.config.js', 'glee.config.ts'], + } + ) } ASYNCAPI_FILE_PATH = findSpecFile(GLEE_PROJECT_DIR) const configsFromFile = await loadConfigsFromFile() - if(!ASYNCAPI_FILE_PATH){ + if (!ASYNCAPI_FILE_PATH) { logErrorLine(errorMessage) process.exit(1) } return { ...configsFromFile, - ...getConfigs() + ...getConfigs(), } } -function isFileReadable(filePath: string){ +function isFileReadable(filePath: string) { try { - accessSync(filePath,constants.R_OK) + accessSync(filePath, constants.R_OK) return statSync(filePath).isFile() - } catch (err){ + } catch (err) { // No error logging is required since we expect accessSync to fail most of the time. return false } @@ -63,32 +75,40 @@ function isFileReadable(filePath: string){ * Loads the configuration from glee project. */ async function loadConfigsFromFile() { - if (!isFileReadable(GLEE_CONFIG_FILE_PATH)) return + if (!isFileReadable(GLEE_CONFIG_FILE_PATH)) return try { - let { default: projectConfigs } = await import(pathToFileURL(GLEE_CONFIG_FILE_PATH).href) - if (typeof projectConfigs === 'function') projectConfigs = await projectConfigs() + let { default: projectConfigs } = await import( + pathToFileURL(GLEE_CONFIG_FILE_PATH).href + ) + if (typeof projectConfigs === 'function') + {projectConfigs = await projectConfigs()} if (!projectConfigs) return - GLEE_DIR = projectConfigs.glee?.gleeDir || GLEE_DIR - GLEE_LIFECYCLE_DIR = projectConfigs.glee?.lifecycleDir ?? GLEE_LIFECYCLE_DIR - GLEE_FUNCTIONS_DIR = projectConfigs.glee?.functionsDir ?? GLEE_FUNCTIONS_DIR - ASYNCAPI_FILE_PATH = projectConfigs.glee?.asyncapiFilePath ?? ASYNCAPI_FILE_PATH + GLEE_DIR = projectConfigs.glee?.gleeDir || GLEE_DIR + GLEE_LIFECYCLE_DIR = projectConfigs.glee?.lifecycleDir ?? GLEE_LIFECYCLE_DIR + GLEE_FUNCTIONS_DIR = projectConfigs.glee?.functionsDir ?? GLEE_FUNCTIONS_DIR + ASYNCAPI_FILE_PATH = + projectConfigs.glee?.asyncapiFilePath ?? ASYNCAPI_FILE_PATH return projectConfigs } catch (e) { return console.error(e) } } -export function findSpecFile(baseDir: string): string{ +export function findSpecFile(baseDir: string): string { const files = ['asyncapi.yaml', 'asyncapi.json', 'asyncapi.yml'] - const foundFiles = files.filter(file => isFileReadable(path.resolve(baseDir, file))) - + const foundFiles = files.filter((file) => + isFileReadable(path.resolve(baseDir, file)) + ) + if (foundFiles.length === 1) { return path.resolve(baseDir, foundFiles[0]) - } else if(foundFiles.length > 1) { - errorMessage = "Multiple AsyncAPI files found. Please choose one in you config file (https://github.com/asyncapi/glee/blob/master/docs/config-file.md)." + } else if (foundFiles.length > 1) { + errorMessage = + 'Multiple AsyncAPI files found. Please choose one in you config file (https://github.com/asyncapi/glee/blob/master/docs/config-file.md).' } else { - errorMessage = "Unable fo find the AsyncAPI file. Please make sure it's in your project's directory or set its path in the config file (https://github.com/asyncapi/glee/blob/master/docs/config-file.md)." + errorMessage = + "Unable fo find the AsyncAPI file. Please make sure it's in your project's directory or set its path in the config file (https://github.com/asyncapi/glee/blob/master/docs/config-file.md)." } return undefined } @@ -100,6 +120,6 @@ export function getConfigs(): { [key: string]: string } { GLEE_LIFECYCLE_DIR, GLEE_FUNCTIONS_DIR, GLEE_CONFIG_FILE_PATH, - ASYNCAPI_FILE_PATH + ASYNCAPI_FILE_PATH, } -} \ No newline at end of file +} diff --git a/src/lib/connection.ts b/src/lib/connection.ts index 263bae690..9fafcc0e4 100644 --- a/src/lib/connection.ts +++ b/src/lib/connection.ts @@ -1,11 +1,11 @@ import { Server as AsyncAPIServer, AsyncAPIDocument } from '@asyncapi/parser' interface IGleeConnectionConstructor { - connection: any, - channels: string[], - serverName: string, - server: AsyncAPIServer, - parsedAsyncAPI: AsyncAPIDocument, + connection: any + channels: string[] + serverName: string + server: AsyncAPIServer + parsedAsyncAPI: AsyncAPIDocument } class GleeConnection { @@ -25,7 +25,13 @@ class GleeConnection { * @param {AsyncAPIServer} options.server The AsyncAPI server the connection is pointing to. * @param {AsyncAPIDocument} options.parsedAsyncAPI The AsyncAPI document. */ - constructor({ connection, channels, serverName, server, parsedAsyncAPI }: IGleeConnectionConstructor) { + constructor({ + connection, + channels, + serverName, + server, + parsedAsyncAPI, + }: IGleeConnectionConstructor) { this._rawConnection = connection this._channels = channels this._serverName = serverName @@ -59,16 +65,16 @@ class GleeConnection { * @param {String} channelName The name of the channel. * @return {Boolean} */ - hasChannel (channelName: string): boolean { + hasChannel(channelName: string): boolean { return this.channels.includes(channelName) } - + /** * Returns the real connection object. * * @return {Any} */ - getRaw (): any { + getRaw(): any { return this.rawConnection } } diff --git a/src/lib/docs.ts b/src/lib/docs.ts index 6814b1f7e..341d3b914 100644 --- a/src/lib/docs.ts +++ b/src/lib/docs.ts @@ -8,9 +8,7 @@ export default async (spec, config, resDir) => { logInfoMessage(`Generating docs for your parsed specification...`) const resolvedData = spec.json() const generator = new Generator( - configData?.template - ? configData.template - : '@asyncapi/markdown-template', + configData?.template ? configData.template : '@asyncapi/markdown-template', path.resolve( resDir ? resDir : './', configData?.folder ? configData.folder : 'docs' diff --git a/src/lib/experimentalFlags.ts b/src/lib/experimentalFlags.ts index 9241e20dd..baac84285 100644 --- a/src/lib/experimentalFlags.ts +++ b/src/lib/experimentalFlags.ts @@ -1,8 +1,10 @@ const environmentVariables = new Map(Object.entries(process.env)) export default new Map( - Object - .keys(process.env) - .filter(flag => flag.startsWith('GLEE_EXPERIMENTAL_')) - .map(flag => [flag.substring('GLEE_EXPERIMENTAL_'.length), environmentVariables.get(flag)]) + Object.keys(process.env) + .filter((flag) => flag.startsWith('GLEE_EXPERIMENTAL_')) + .map((flag) => [ + flag.substring('GLEE_EXPERIMENTAL_'.length), + environmentVariables.get(flag), + ]) ) diff --git a/src/lib/functions.ts b/src/lib/functions.ts index 2cb550317..16c02f0ff 100644 --- a/src/lib/functions.ts +++ b/src/lib/functions.ts @@ -6,13 +6,17 @@ import { logWarningMessage, logError } from './logger.js' import GleeMessage from './message.js' import { GleeFunction } from './index.js' import Glee from './glee.js' -import { gleeMessageToFunctionEvent, validateData, isRemoteServer } from './util.js' +import { + gleeMessageToFunctionEvent, + validateData, + isRemoteServer, +} from './util.js' import { pathToFileURL } from 'url' import GleeError from '../errors/glee-error.js' import { getParsedAsyncAPI } from './asyncapiFile.js' interface FunctionInfo { - run: GleeFunction, + run: GleeFunction } const OutboundMessageSchema = { @@ -22,37 +26,33 @@ const OutboundMessageSchema = { headers: { type: 'object', propertyNames: { type: 'string' }, - additionalProperties: { type: 'string' } + additionalProperties: { type: 'string' }, }, channel: { type: 'string' }, server: { type: 'string' }, - query: { type: 'object' } - } + query: { type: 'object' }, + }, } const FunctionReturnSchema = { type: ['object', 'null'], properties: { send: { type: 'array', - items: OutboundMessageSchema + items: OutboundMessageSchema, }, reply: { type: 'array', - items: OutboundMessageSchema - } + items: OutboundMessageSchema, + }, }, additionalProperties: false, - anyOf: [ - { required: ['send'] }, - { required: ['reply'] } - ] + anyOf: [{ required: ['send'] }, { required: ['reply'] }], } const { GLEE_DIR, GLEE_FUNCTIONS_DIR } = getConfigs() export const functions: Map = new Map() export async function register(dir: string) { - try { const statsDir = await stat(dir) if (!statsDir.isDirectory()) return @@ -63,17 +63,19 @@ export async function register(dir: string) { try { const files = await walkdir.async(dir, { return_object: true }) - return await Promise.all(Object.keys(files).map(async (filePath) => { - try { - const functionName = basename(filePath, extname(filePath)) - const { default: fn } = await import(pathToFileURL(filePath).href) - functions.set(functionName, { - run: fn, - }) - } catch (e) { - console.error(e) - } - })) + return await Promise.all( + Object.keys(files).map(async (filePath) => { + try { + const functionName = basename(filePath, extname(filePath)) + const { default: fn } = await import(pathToFileURL(filePath).href) + functions.set(functionName, { + run: fn, + }) + } catch (e) { + console.error(e) + } + }) + ) } catch (e) { console.error(e) } @@ -82,18 +84,23 @@ export async function register(dir: string) { export async function trigger({ app, operationId, - message + message, }: { - app: Glee, - operationId: string, - message: GleeMessage, + app: Glee + operationId: string + message: GleeMessage }) { try { const parsedAsyncAPI = await getParsedAsyncAPI() - let res = await functions.get(operationId).run(gleeMessageToFunctionEvent(message, app)) + let res = await functions + .get(operationId) + .run(gleeMessageToFunctionEvent(message, app)) if (res === undefined) res = null - const { humanReadableError, errors, isValid } = validateData(res, FunctionReturnSchema) + const { humanReadableError, errors, isValid } = validateData( + res, + FunctionReturnSchema + ) if (!isValid) { const err = new GleeError({ @@ -103,7 +110,7 @@ export async function trigger({ err.message = `Function ${operationId} returned invalid data.` logError(err, { - highlightedWords: [operationId] + highlightedWords: [operationId], }) return @@ -111,16 +118,23 @@ export async function trigger({ res?.send?.forEach((msg) => { const localServerProtocols = ['ws', 'wss', 'http', 'https'] - const serverProtocol = parsedAsyncAPI.server(msg.server || message.serverName).protocol().toLowerCase() - const isBroadcast = localServerProtocols.includes(serverProtocol) && !isRemoteServer(parsedAsyncAPI, msg.server) - app.send(new GleeMessage({ - payload: msg.payload, - query: msg.query, - headers: msg.headers, - channel: msg.channel || message.channel, - serverName: msg.server, - broadcast: isBroadcast - })) + const serverProtocol = parsedAsyncAPI + .server(msg.server || message.serverName) + .protocol() + .toLowerCase() + const isBroadcast = + localServerProtocols.includes(serverProtocol) && + !isRemoteServer(parsedAsyncAPI, msg.server) + app.send( + new GleeMessage({ + payload: msg.payload, + query: msg.query, + headers: msg.headers, + channel: msg.channel || message.channel, + serverName: msg.server, + broadcast: isBroadcast, + }) + ) }) res?.reply?.forEach((msg) => { diff --git a/src/lib/glee.ts b/src/lib/glee.ts index 5acb2408e..c144248e9 100644 --- a/src/lib/glee.ts +++ b/src/lib/glee.ts @@ -4,7 +4,11 @@ import Debug from 'debug' import { AsyncAPIDocument, Server } from '@asyncapi/parser' import GleeAdapter from './adapter.js' import GleeClusterAdapter from './cluster.js' -import GleeRouter, { ChannelErrorMiddlewareTuple, ChannelMiddlewareTuple, GenericMiddleware } from './router.js' +import GleeRouter, { + ChannelErrorMiddlewareTuple, + ChannelMiddlewareTuple, + GenericMiddleware, +} from './router.js' import GleeMessage from './message.js' import { matchChannel, duplicateMessage, getParams } from './util.js' import { GleeConfig } from './index.js' @@ -14,15 +18,15 @@ import { MiddlewareCallback } from '../middlewares/index.js' const debug = Debug('glee') type AdapterRecord = { - Adapter: typeof GleeAdapter, - instance?: GleeAdapter, - serverName: string, - server: Server, - parsedAsyncAPI: AsyncAPIDocument, + Adapter: typeof GleeAdapter + instance?: GleeAdapter + serverName: string + server: Server + parsedAsyncAPI: AsyncAPIDocument } type ClusterAdapterRecord = { - Adapter: typeof GleeClusterAdapter, + Adapter: typeof GleeClusterAdapter instance?: GleeClusterAdapter } @@ -59,8 +63,6 @@ export default class Glee extends EventEmitter { return this._clusterAdapter } - - /** * Adds a connection adapter. * @@ -69,7 +71,14 @@ export default class Glee extends EventEmitter { * @param {AsyncAPIServer} server AsyncAPI Server to use with the adapter. * @param {AsyncAPIDocument} parsedAsyncAPI The AsyncAPI document. */ - addAdapter(Adapter: typeof GleeAdapter, { serverName, server, parsedAsyncAPI }: { serverName: string, server: Server, parsedAsyncAPI: AsyncAPIDocument }) { + addAdapter( + Adapter: typeof GleeAdapter, + { + serverName, + server, + parsedAsyncAPI, + }: { serverName: string; server: Server; parsedAsyncAPI: AsyncAPIDocument } + ) { this._adapters.push({ Adapter, serverName, server, parsedAsyncAPI }) } @@ -80,7 +89,7 @@ export default class Glee extends EventEmitter { */ setClusterAdapter(Adapter: typeof GleeClusterAdapter) { this._clusterAdapter = { - Adapter + Adapter, } } @@ -89,8 +98,8 @@ export default class Glee extends EventEmitter { * @param {String} [channel] The channel you want to scope the middleware to. * @param {Function|GleeRouter} ...middlewares A function or GleeRouter to use as a middleware. */ - use(...middlewares: GenericMiddleware[]): void; - use(channel: string, ...middlewares: GenericMiddleware[]): void; + use(...middlewares: GenericMiddleware[]): void + use(channel: string, ...middlewares: GenericMiddleware[]): void use(channel: string | GenericMiddleware, ...middlewares: GenericMiddleware[]): void { // eslint-disable-line @typescript-eslint/no-unused-vars this._router.use(...arguments) // eslint-disable-line prefer-rest-params } @@ -100,9 +109,13 @@ export default class Glee extends EventEmitter { * @param {String} [channel] The channel you want to scope the middleware to. * @param {Function|GleeRouter} ...middlewares A function or GleeRouter to use as a middleware. */ - useOutbound(...middlewares: GenericMiddleware[]): void; - useOutbound(channel: string, ...middlewares: GenericMiddleware[]): void; - useOutbound(channel: string | GenericMiddleware, ...middlewares: GenericMiddleware[]): void { // eslint-disable-line @typescript-eslint/no-unused-vars + useOutbound(...middlewares: GenericMiddleware[]): void + useOutbound(channel: string, ...middlewares: GenericMiddleware[]): void + useOutbound( + channel: string | GenericMiddleware, + ...middlewares: GenericMiddleware[] + ): void { + // eslint-disable-line @typescript-eslint/no-unused-vars this._router.useOutbound(...arguments) // eslint-disable-line prefer-rest-params } @@ -127,7 +140,7 @@ export default class Glee extends EventEmitter { async connect(): Promise { const promises = [] - this._adapters.forEach(a => { + this._adapters.forEach((a) => { a.instance = new a.Adapter(this, a.serverName, a.server, a.parsedAsyncAPI) promises.push(a.instance.connect()) }) @@ -154,7 +167,11 @@ export default class Glee extends EventEmitter { * @param {String} serverName The name of the server this message is coming from. * @param {GleeConnection} [connection] The connection used when receiving the message. Its type is unknown and must be handled by the adapters. */ - injectMessage(message: GleeMessage, serverName: string, connection: GleeConnection) { + injectMessage( + message: GleeMessage, + serverName: string, + connection: GleeConnection + ) { message.serverName = serverName message.connection = connection message.setInbound() @@ -201,29 +218,39 @@ export default class Glee extends EventEmitter { * @param {GleeMessage} message The message to pass to the middlewares. * @private */ - private _processMessage(middlewares: ChannelMiddlewareTuple[], errorMiddlewares: ChannelErrorMiddlewareTuple[], message: GleeMessage): void { - const mws = - middlewares - .filter(mw => matchChannel(mw.channel, message.channel)) - .map(mw => (msg: GleeMessage, next: MiddlewareCallback) => { - const msgForMiddleware: GleeMessage = duplicateMessage(msg) - msgForMiddleware.params = getParams(mw.channel, msgForMiddleware.channel) - - msgForMiddleware.on('send', (m: GleeMessage) => { - m.setOutbound() - this._processMessage( - this._router.getOutboundMiddlewares(), - this._router.getOutboundErrorMiddlewares(), - m - ) - }) - - mw.fn.call(mw.fn, msgForMiddleware, (err: Error, newMessage: GleeMessage) => { + private _processMessage( + middlewares: ChannelMiddlewareTuple[], + errorMiddlewares: ChannelErrorMiddlewareTuple[], + message: GleeMessage + ): void { + const mws = middlewares + .filter((mw) => matchChannel(mw.channel, message.channel)) + .map((mw) => (msg: GleeMessage, next: MiddlewareCallback) => { + const msgForMiddleware: GleeMessage = duplicateMessage(msg) + msgForMiddleware.params = getParams( + mw.channel, + msgForMiddleware.channel + ) + + msgForMiddleware.on('send', (m: GleeMessage) => { + m.setOutbound() + this._processMessage( + this._router.getOutboundMiddlewares(), + this._router.getOutboundErrorMiddlewares(), + m + ) + }) + + mw.fn.call( + mw.fn, + msgForMiddleware, + (err: Error, newMessage: GleeMessage) => { const nextMessage = newMessage || msgForMiddleware nextMessage.channel = message.channel // This is to avoid the channel to be modified. next(err, nextMessage) - }) - }) + } + ) + }) async.seq(...mws)(message, (err: Error, msg: GleeMessage) => { if (err) { @@ -237,7 +264,10 @@ export default class Glee extends EventEmitter { debug('Outbound pipeline finished. Sending message...') debug(msg) this._adapters.forEach((a: AdapterRecord) => { - if (a.instance && (!msg.serverName || msg.serverName === a.serverName)) { + if ( + a.instance && + (!msg.serverName || msg.serverName === a.serverName) + ) { a.instance.send(msg).catch((e: Error) => { this._processError(errorMiddlewares, e, msg) }) @@ -258,14 +288,25 @@ export default class Glee extends EventEmitter { * @param {GleeMessage} message The message to pass to the middlewares. * @private */ - private _processError(errorMiddlewares: ChannelErrorMiddlewareTuple[], error: Error, message: GleeMessage): void { - const emws = errorMiddlewares.filter(emw => matchChannel(emw.channel, message.channel)) + private _processError( + errorMiddlewares: ChannelErrorMiddlewareTuple[], + error: Error, + message: GleeMessage + ): void { + const emws = errorMiddlewares.filter((emw) => + matchChannel(emw.channel, message.channel) + ) if (!emws.length) return this._execErrorMiddleware(emws, 0, error, message) } - private _execErrorMiddleware(emws: ChannelErrorMiddlewareTuple[], index: number, error: Error, message: GleeMessage) { + private _execErrorMiddleware( + emws: ChannelErrorMiddlewareTuple[], + index: number, + error: Error, + message: GleeMessage + ) { const emwsLength = emws.length emws[(index + emwsLength) % emwsLength].fn(error, message, (err: Error) => { if (!emws[index + 1]) return diff --git a/src/lib/index.d.ts b/src/lib/index.d.ts index 6d4202ef1..4b3d3c5a1 100644 --- a/src/lib/index.d.ts +++ b/src/lib/index.d.ts @@ -8,17 +8,23 @@ type WebSocketServerType = 'native' | 'socket.io' type HttpServerType = 'native' type QueryParam = { [key: string]: string } | { [key: string]: string[] } -export type AuthFunction = ({serverName, parsedAsyncAPI}: {serverName: string, parsedAsyncAPI: AsyncAPIDocument}) => Promise +export type AuthFunction = ({ + serverName, + parsedAsyncAPI, +}: { + serverName: string + parsedAsyncAPI: AsyncAPIDocument +}) => Promise export interface MqttAuthConfig { - cert?: string - username?: string - password?: string - clientId?: string + cert?: string + username?: string + password?: string + clientId?: string } export interface WsAuthConfig { - token?: string + token?: string } export interface HttpAuthConfig { @@ -35,17 +41,17 @@ export interface KafkaAuthConfig { } export type GleeClusterAdapterConfig = { - adapter?: string | typeof GleeClusterAdapter, - name?: string, - url: string, + adapter?: string | typeof GleeClusterAdapter + name?: string + url: string } export type WebsocketAdapterConfig = { server?: { - httpServer?: any, - adapter?: WebSocketServerType | typeof GleeAdapter, - port?: number, - }, + httpServer?: any + adapter?: WebSocketServerType | typeof GleeAdapter + port?: number + } client?: { query?: any auth?: WsAuthConfig | AuthFunction @@ -54,12 +60,12 @@ export type WebsocketAdapterConfig = { export type HttpAdapterConfig = { server: { - httpServer?: any, + httpServer?: any port?: number - }, + } client?: { - auth?: HttpAuthConfig | AuthFunction, - query?: QueryParam, + auth?: HttpAuthConfig | AuthFunction + query?: QueryParam body?: any } } @@ -72,43 +78,43 @@ export type KafkaAdapterConfig = { } export type CoreGleeConfig = { - gleeDir?: string, - lifecycleDir?: string, - functionsDir?: string, - asyncapiFilePath?: string, + gleeDir?: string + lifecycleDir?: string + functionsDir?: string + asyncapiFilePath?: string } export type GleeConfig = { glee?: CoreGleeConfig - ws?: WebsocketAdapterConfig, - cluster?: GleeClusterAdapterConfig, - mqtt?: MqttAdapterConfig, + ws?: WebsocketAdapterConfig + cluster?: GleeClusterAdapterConfig + mqtt?: MqttAdapterConfig http?: HttpAdapterConfig kafka?: KafkaAdapterConfig } export type GleeFunctionReturn = { - send?: GleeFunctionReturnSend[], - reply?: GleeFunctionReturnReply[], + send?: GleeFunctionReturnSend[] + reply?: GleeFunctionReturnReply[] broadcast?: GleeFunctionReturnBroadcast[] } export type GleeFunctionEvent = { - glee: Glee, - serverName: string, - connection?: GleeConnection, - payload?: any, - query?: QueryParam, - headers?: { [key: string]: string }, + glee: Glee + serverName: string + connection?: GleeConnection + payload?: any + query?: QueryParam + headers?: { [key: string]: string } channel?: string } export type GleeFunctionReturnSend = { - payload?: any, - query?: QueryParam, - headers?: { [key: string]: string }, - channel?: string, - server?: string, + payload?: any + query?: QueryParam + headers?: { [key: string]: string } + channel?: string + server?: string } export type GleeFunctionReturnReply = GleeFunctionReturnSend diff --git a/src/lib/lifecycleEvents.ts b/src/lib/lifecycleEvents.ts index 36e088290..3d1a984ae 100644 --- a/src/lib/lifecycleEvents.ts +++ b/src/lib/lifecycleEvents.ts @@ -1,19 +1,23 @@ import { stat } from 'fs/promises' import walkdir from 'walkdir' -import { GleeFunctionEvent, GleeFunctionReturn, GleeFunctionReturnSend } from './index.js' +import { + GleeFunctionEvent, + GleeFunctionReturn, + GleeFunctionReturnSend, +} from './index.js' import { logInfoMessage } from './logger.js' import GleeMessage from './message.js' import { arrayHasDuplicates } from './util.js' import { pathToFileURL } from 'url' interface IEvent { - fn: (event: GleeFunctionEvent) => GleeFunctionReturn, - channels: string[], - servers: string[], + fn: (event: GleeFunctionEvent) => GleeFunctionReturn + channels: string[] + servers: string[] } export const events: Map = new Map() -export async function register (dir: string) { +export async function register(dir: string) { try { const statsDir = await stat(dir) if (!statsDir.isDirectory()) return @@ -23,26 +27,31 @@ export async function register (dir: string) { try { const files = await walkdir.async(dir, { return_object: true }) - return await Promise.all(Object.keys(files).map(async (filePath) => { - try { - const { - default: fn, - lifecycleEvent, - channels, - servers - } = await import(pathToFileURL(filePath).href) + return await Promise.all( + Object.keys(files).map(async (filePath) => { + try { + const { + default: fn, + lifecycleEvent, + channels, + servers, + } = await import(pathToFileURL(filePath).href) - if (!events.has(lifecycleEvent)) events.set(lifecycleEvent, []) + if (!events.has(lifecycleEvent)) events.set(lifecycleEvent, []) - events.set(lifecycleEvent, [...events.get(lifecycleEvent), { - fn, - channels, - servers, - }]) - } catch (e) { - console.error(e) - } - })) + events.set(lifecycleEvent, [ + ...events.get(lifecycleEvent), + { + fn, + channels, + servers, + }, + ]) + } catch (e) { + console.error(e) + } + }) + ) } catch (e) { console.error(e) } @@ -54,43 +63,46 @@ export async function run(lifecycleEvent: string, params: GleeFunctionEvent) { try { const connectionChannels = params.connection.channels const connectionServer = params.connection.serverName - const handlers = events.get(lifecycleEvent) - .filter(info => { - if (info.channels && !arrayHasDuplicates([ - ...connectionChannels, - ...(info.channels) - ])) { - return false - } + const handlers = events.get(lifecycleEvent).filter((info) => { + if ( + info.channels && + !arrayHasDuplicates([...connectionChannels, ...info.channels]) + ) { + return false + } - if (info.servers) { - return info.servers.includes(connectionServer) - } + if (info.servers) { + return info.servers.includes(connectionServer) + } - return true - }) + return true + }) if (!handlers.length) return logInfoMessage(`Running ${lifecycleEvent} lifecycle event...`, { - highlightedWords: [lifecycleEvent] + highlightedWords: [lifecycleEvent], }) - const responses = await Promise.all(handlers.map(info => info.fn(params))) + const responses = await Promise.all(handlers.map((info) => info.fn(params))) - responses.forEach(res => { + responses.forEach((res) => { res?.send?.forEach((event: GleeFunctionReturnSend) => { try { - params.glee.send(new GleeMessage({ - payload: event.payload, - headers: event.headers, - channel: event.channel, - serverName: event.server, - connection: params.connection, - query: event.query - })) + params.glee.send( + new GleeMessage({ + payload: event.payload, + headers: event.headers, + channel: event.channel, + serverName: event.server, + connection: params.connection, + query: event.query, + }) + ) } catch (e) { - console.error(`The ${lifecycleEvent} lifecycle function failed to send an event to channel ${event.channel}.`) + console.error( + `The ${lifecycleEvent} lifecycle function failed to send an event to channel ${event.channel}.` + ) console.error(e) } }) diff --git a/src/lib/logger.ts b/src/lib/logger.ts index 7fc3c595b..83589db15 100644 --- a/src/lib/logger.ts +++ b/src/lib/logger.ts @@ -11,21 +11,21 @@ import GleeError from '../errors/glee-error.js' export { chalk } interface ILogLineWithIconOptions { - iconColor?: string, - textColor?: string, - highlightedWords?: string[], - disableEmojis?: boolean, - emptyLinesBefore?: number, - emptyLinesAfter?: number, + iconColor?: string + textColor?: string + highlightedWords?: string[] + disableEmojis?: boolean + emptyLinesBefore?: number + emptyLinesAfter?: number } interface ILogOptions { - highlightedWords?: string[], + highlightedWords?: string[] } interface ILogErrorOptions { - highlightedWords?: string[], - showStack?: boolean, + highlightedWords?: string[] + showStack?: boolean } const TSLogo = chalk.bgHex('#3178C6').white(' TS') @@ -33,7 +33,7 @@ const TSLogo = chalk.bgHex('#3178C6').white(' TS') const highlightWords = (words: string[], text: string) => { let result = text - words.filter(Boolean).forEach(word => { + words.filter(Boolean).forEach((word) => { result = result.split(word).join(chalk.white(word)) }) @@ -59,43 +59,96 @@ export const logWelcome = ({ const bgPrimary = chalk.bgHex(primaryColor) const fgPrimary = chalk.hex(primaryColor) const fgWarning = chalk.yellow - - const pkg = JSON.parse(readFileSync(resolve(dirname(fileURLToPath(import.meta.url)), '../../package.json')).toString()) + + const pkg = JSON.parse( + readFileSync( + resolve(dirname(fileURLToPath(import.meta.url)), '../../package.json') + ).toString() + ) logEmptyLines(1) console.log(bgPrimary.black(` Glee ${pkg.version} \n`)) if (dev) { console.log(fgPrimary('{}'), chalk.gray('Running in development mode...')) } - console.log(fgPrimary('↙↗'), chalk.gray(wordWrap(`Selected server(s): ${servers.join(', ')}`, { width: 37, indent: ' ' }).trim())) + console.log( + fgPrimary('↙↗'), + chalk.gray( + wordWrap(`Selected server(s): ${servers.join(', ')}`, { + width: 37, + indent: ' ', + }).trim() + ) + ) if (showAppDir) { - console.log(fgPrimary('./'), chalk.gray(wordWrap(`App directory: ${dir}`, { width: 37, indent: ' ', cut: true }).trim())) + console.log( + fgPrimary('./'), + chalk.gray( + wordWrap(`App directory: ${dir}`, { + width: 37, + indent: ' ', + cut: true, + }).trim() + ) + ) } if (showFunctionsDir) { - console.log(fgPrimary('𝑓×'), chalk.gray(wordWrap(`Functions directory: ${functionsDir}`, { width: 37, indent: ' ', cut: true }).trim())) + console.log( + fgPrimary('𝑓×'), + chalk.gray( + wordWrap(`Functions directory: ${functionsDir}`, { + width: 37, + indent: ' ', + cut: true, + }).trim() + ) + ) } if (experimentalFlags.has('JAVA')) { - console.log(emojis.unicode(':coffee:'), fgWarning('Java experimental support has been enabled')) + console.log( + emojis.unicode(':coffee:'), + fgWarning('Java experimental support has been enabled') + ) } console.log(chalk.gray('─'.repeat(40))) } -export const logLineWithIcon = (icon: string, text: string, { iconColor = '#0ff', textColor = '#999', highlightedWords = [], disableEmojis = false, emptyLinesBefore = 0, emptyLinesAfter = 0 }: ILogLineWithIconOptions = {}) => { +export const logLineWithIcon = ( + icon: string, + text: string, + { + iconColor = '#0ff', + textColor = '#999', + highlightedWords = [], + disableEmojis = false, + emptyLinesBefore = 0, + emptyLinesAfter = 0, + }: ILogLineWithIconOptions = {} +) => { const iconColorFn = chalk.hex(iconColor) const textColorFn = chalk.hex(textColor) icon = !disableEmojis ? emojis.unicode(icon) : icon if (emptyLinesBefore) logEmptyLines(emptyLinesBefore) - console.log(iconColorFn(icon), textColorFn(highlightWords(highlightedWords, text))) + console.log( + iconColorFn(icon), + textColorFn(highlightWords(highlightedWords, text)) + ) if (emptyLinesAfter) logEmptyLines(emptyLinesAfter) } -export const logInfoMessage = (text: string, { highlightedWords = [] }: ILogOptions = {}) => { +export const logInfoMessage = ( + text: string, + { highlightedWords = [] }: ILogOptions = {} +) => { logLineWithIcon('ⓘ ', text, { highlightedWords, }) } -export const logWarningMessage = (text: string, { highlightedWords = [] }: ILogOptions = {}) => { +export const logWarningMessage = ( + text: string, + { highlightedWords = [] }: ILogOptions = {} +) => { logLineWithIcon(':warning: ', text, { highlightedWords, textColor: '#fa0', @@ -104,11 +157,23 @@ export const logWarningMessage = (text: string, { highlightedWords = [] }: ILogO export const logJSON = (json: object | Array, { error = false } = {}) => { const logFn = error ? console.error : console.log - logFn(util.inspect(json, { depth: null, sorted: true, breakLength: 40, colors: true })) + logFn( + util.inspect(json, { + depth: null, + sorted: true, + breakLength: 40, + colors: true, + }) + ) } export const logInboundMessage = (message: GleeMessage) => { - console.log(chalk.reset.blue('↙'), chalk.yellow(message.channel), 'was received from', chalk.gray(message.serverName)) + console.log( + chalk.reset.blue('↙'), + chalk.yellow(message.channel), + 'was received from', + chalk.gray(message.serverName) + ) logJSON(message.payload) } @@ -116,20 +181,35 @@ export const logOutboundMessage = (message: GleeMessage) => { const icon = message.broadcast ? '⇶' : '↗' const serverName = message.serverName || 'all servers' const verb = message.broadcast ? 'broadcasted' : 'sent' - console.log(chalk.reset.magenta(icon), chalk.yellow(message.channel), 'was', verb ,'to', chalk.gray(serverName)) + console.log( + chalk.reset.magenta(icon), + chalk.yellow(message.channel), + 'was', + verb, + 'to', + chalk.gray(serverName) + ) logJSON(message.payload) } -export const logErrorLine = (message: string, { highlightedWords = [] }: ILogOptions = {}) => { +export const logErrorLine = ( + message: string, + { highlightedWords = [] }: ILogOptions = {} +) => { const icon = chalk.reset.red('x') - const msg = chalk.gray(emojis.unicode(highlightWords(highlightedWords, message))) + const msg = chalk.gray( + emojis.unicode(highlightWords(highlightedWords, message)) + ) console.error(`${icon} ${msg}`) } -export const logError = (error: GleeError | Error, options: ILogErrorOptions = {}) => { +export const logError = ( + error: GleeError | Error, + options: ILogErrorOptions = {} +) => { const { showStack = true } = options logErrorLine(error.message, options) - + if (error instanceof GleeError) { if (typeof error.details === 'string') { console.error(chalk.gray(emojis.unicode(error.details))) @@ -139,7 +219,9 @@ export const logError = (error: GleeError | Error, options: ILogErrorOptions = { } if (showStack && error.stack) { - console.error(chalk.gray(error.stack.substring(error.stack.indexOf('\n') + 1))) + console.error( + chalk.gray(error.stack.substring(error.stack.indexOf('\n') + 1)) + ) } } @@ -147,8 +229,16 @@ export const logTypeScriptMessage = (message: string) => { console.log(TSLogo, message) } -export const logTypeScriptError = (code: number, message: string, fileName: string, line: number, character: number) => { - const fileInfo = `${chalk.cyan(fileName)}:${chalk.yellow(line + 1)}:${chalk.yellow(character + 1)}` +export const logTypeScriptError = ( + code: number, + message: string, + fileName: string, + line: number, + character: number +) => { + const fileInfo = `${chalk.cyan(fileName)}:${chalk.yellow( + line + 1 + )}:${chalk.yellow(character + 1)}` const error = chalk.red('error') const errorCode = chalk.gray(`TS${code}:`) console.error(`${TSLogo} ${fileInfo} - ${error} ${errorCode} ${message}`) diff --git a/src/lib/message.ts b/src/lib/message.ts index eb9896642..efa436692 100644 --- a/src/lib/message.ts +++ b/src/lib/message.ts @@ -5,21 +5,21 @@ type MessageHeaders = { [key: string]: any } type QueryParam = { [key: string]: string } | { [key: string]: string[] } interface IGleeMessageConstructor { - payload?: any, - headers?: MessageHeaders, - channel?: string, - serverName?: string, - connection?: GleeConnection, - broadcast?: boolean, - cluster?: boolean, + payload?: any + headers?: MessageHeaders + channel?: string + serverName?: string + connection?: GleeConnection + broadcast?: boolean + cluster?: boolean query?: QueryParam } interface IReply { - payload?: any, - headers?: { [key: string]: any }, - channel?: string, - query?: QueryParam, + payload?: any + headers?: { [key: string]: any } + channel?: string + query?: QueryParam } class GleeMessage extends EventEmitter { @@ -48,7 +48,7 @@ class GleeMessage extends EventEmitter { * @param {Boolean} [options.cluster=false] Whether the message is from a cluster adapter or not. * @param {Object} [options.query] The query parameters to send or receive query when using the HTTP protocol. */ - constructor ({ + constructor({ payload, headers, channel, @@ -56,7 +56,7 @@ class GleeMessage extends EventEmitter { connection, broadcast = false, cluster = false, - query + query, }: IGleeMessageConstructor) { super() @@ -146,7 +146,7 @@ class GleeMessage extends EventEmitter { * @param {String} [options.channel] The channel where the reply should go to. * @param {Object} [options.query] The new message query parameters. Pass a falsy value if you don't want to change them. */ - reply ({ payload, headers, channel, query } : IReply) { + reply({ payload, headers, channel, query }: IReply) { if (payload) this._payload = payload if (query) this._query = query @@ -163,7 +163,9 @@ class GleeMessage extends EventEmitter { if (typeof channel === 'string') { this._channel = channel } else { - return console.error('GleeMessage.reply(): when specified, "channel" must be a string.') + return console.error( + 'GleeMessage.reply(): when specified, "channel" must be a string.' + ) } } diff --git a/src/lib/router.ts b/src/lib/router.ts index 964a91e02..1df49bb54 100644 --- a/src/lib/router.ts +++ b/src/lib/router.ts @@ -1,28 +1,30 @@ import { ErrorMiddleware, Middleware } from '../middlewares/index.js' export type ChannelMiddlewareTuple = { - channel: string, - fn: Middleware, + channel: string + fn: Middleware } export type ChannelErrorMiddlewareTuple = { - channel: string, - fn: ErrorMiddleware, + channel: string + fn: ErrorMiddleware } export type GenericMiddleware = Middleware | ErrorMiddleware -export type GenericChannelMiddlewareTuple = ChannelMiddlewareTuple | ChannelErrorMiddlewareTuple +export type GenericChannelMiddlewareTuple = + | ChannelMiddlewareTuple + | ChannelErrorMiddlewareTuple class GleeRouter { - private middlewares:ChannelMiddlewareTuple[] - private outboundMiddlewares:ChannelMiddlewareTuple[] - private errorMiddlewares:ChannelErrorMiddlewareTuple[] - private outboundErrorMiddlewares:ChannelErrorMiddlewareTuple[] + private middlewares: ChannelMiddlewareTuple[] + private outboundMiddlewares: ChannelMiddlewareTuple[] + private errorMiddlewares: ChannelErrorMiddlewareTuple[] + private outboundErrorMiddlewares: ChannelErrorMiddlewareTuple[] /** * Instantiates a GleeRouter. */ - constructor () { + constructor() { this.middlewares = [] this.outboundMiddlewares = [] this.errorMiddlewares = [] @@ -36,12 +38,16 @@ class GleeRouter { * @param {String} [channel] The channel you want to scope the middleware to. * @param {GenericMiddleware} middleware A middleware function. */ - use(...middlewares: GenericMiddleware[]) : void; - use(channel: string, ...middlewares: GenericMiddleware[]) : void; - use(channel: string | GenericMiddleware, ...middlewares: GenericMiddleware[]) : void { - const mws: GenericChannelMiddlewareTuple[] = this.middlewaresToChannelMiddlewaresTuples(channel, ...middlewares) - - mws.forEach(mw => { + use(...middlewares: GenericMiddleware[]): void + use(channel: string, ...middlewares: GenericMiddleware[]): void + use( + channel: string | GenericMiddleware, + ...middlewares: GenericMiddleware[] + ): void { + const mws: GenericChannelMiddlewareTuple[] = + this.middlewaresToChannelMiddlewaresTuples(channel, ...middlewares) + + mws.forEach((mw) => { if (mw.fn.length <= 2) { this.addMiddlewares([mw as ChannelMiddlewareTuple]) } else { @@ -56,12 +62,16 @@ class GleeRouter { * @param {String} [channel] The channel you want to scope the middleware to. * @param {Function|GleeRouter} ...middlewares A function or GleeRouter to use as a middleware. */ - useOutbound(...middlewares: GenericMiddleware[]): void; - useOutbound(channel: string, ...middlewares: GenericMiddleware[]): void; - useOutbound(channel: string | GenericMiddleware, ...middlewares: GenericMiddleware[]) { - const mws: GenericChannelMiddlewareTuple[] = this.middlewaresToChannelMiddlewaresTuples(channel, ...middlewares) - - mws.forEach(mw => { + useOutbound(...middlewares: GenericMiddleware[]): void + useOutbound(channel: string, ...middlewares: GenericMiddleware[]): void + useOutbound( + channel: string | GenericMiddleware, + ...middlewares: GenericMiddleware[] + ) { + const mws: GenericChannelMiddlewareTuple[] = + this.middlewaresToChannelMiddlewaresTuples(channel, ...middlewares) + + mws.forEach((mw) => { if (mw.fn.length <= 2) { this.addOutboundMiddlewares([mw as ChannelMiddlewareTuple]) } else { @@ -70,13 +80,21 @@ class GleeRouter { }) } - private middlewaresToChannelMiddlewaresTuples(channel: string | GenericMiddleware, ...middlewares: GenericMiddleware[]): GenericChannelMiddlewareTuple[] { + private middlewaresToChannelMiddlewaresTuples( + channel: string | GenericMiddleware, + ...middlewares: GenericMiddleware[] + ): GenericChannelMiddlewareTuple[] { const realChannel = typeof channel === 'string' ? channel : undefined - const allMiddlewares: GenericMiddleware[] = realChannel ? middlewares : [channel as GenericMiddleware].concat(middlewares) - return allMiddlewares.map(fn => ({ - channel: realChannel, - fn, - } as GenericChannelMiddlewareTuple)) + const allMiddlewares: GenericMiddleware[] = realChannel + ? middlewares + : [channel as GenericMiddleware].concat(middlewares) + return allMiddlewares.map( + (fn) => + ({ + channel: realChannel, + fn, + } as GenericChannelMiddlewareTuple) + ) } /** @@ -119,10 +137,16 @@ class GleeRouter { * @param {String} [channel] The scope channel. * @private */ - private _addMiddlewares (target:GenericChannelMiddlewareTuple[], middlewares:GenericChannelMiddlewareTuple[], channel:string) { - middlewares.forEach(mw => { + private _addMiddlewares( + target: GenericChannelMiddlewareTuple[], + middlewares: GenericChannelMiddlewareTuple[], + channel: string + ) { + middlewares.forEach((mw) => { if (channel) { - const compoundchannel = mw.channel ? `${channel}/${mw.channel}` : channel + const compoundchannel = mw.channel + ? `${channel}/${mw.channel}` + : channel target.push({ ...mw, ...{ channel: compoundchannel } }) } else { target.push(mw) @@ -136,7 +160,7 @@ class GleeRouter { * @param {Array} middlewares The middlewares to add to the collection. * @param {String} [channel] The scope channel. */ - addMiddlewares(middlewares: ChannelMiddlewareTuple[], channel?:string) { + addMiddlewares(middlewares: ChannelMiddlewareTuple[], channel?: string) { this._addMiddlewares(this.middlewares, middlewares, channel) } @@ -146,7 +170,10 @@ class GleeRouter { * @param {Array} middlewares The middlewares to add to the collection. * @param {String} [channel] The scope channel. */ - addOutboundMiddlewares (middlewares:ChannelMiddlewareTuple[], channel?:string) { + addOutboundMiddlewares( + middlewares: ChannelMiddlewareTuple[], + channel?: string + ) { this._addMiddlewares(this.outboundMiddlewares, middlewares, channel) } @@ -156,7 +183,10 @@ class GleeRouter { * @param {Array} errorMiddlewares The middlewares to add to the collection. * @param {String} [channel] The scope channel. */ - addErrorMiddlewares(errorMiddlewares:ChannelErrorMiddlewareTuple[], channel?:string) { + addErrorMiddlewares( + errorMiddlewares: ChannelErrorMiddlewareTuple[], + channel?: string + ) { this._addMiddlewares(this.errorMiddlewares, errorMiddlewares, channel) } @@ -166,8 +196,15 @@ class GleeRouter { * @param {Array} errorMiddlewares The middlewares to add to the collection. * @param {String} [channel] The scope channel. */ - addOutboundErrorMiddlewares (errorMiddlewares:ChannelErrorMiddlewareTuple[], channel?:string) { - this._addMiddlewares(this.outboundErrorMiddlewares, errorMiddlewares, channel) + addOutboundErrorMiddlewares( + errorMiddlewares: ChannelErrorMiddlewareTuple[], + channel?: string + ) { + this._addMiddlewares( + this.outboundErrorMiddlewares, + errorMiddlewares, + channel + ) } } diff --git a/src/lib/servers.ts b/src/lib/servers.ts index cc8c9a898..309680d06 100644 --- a/src/lib/servers.ts +++ b/src/lib/servers.ts @@ -2,13 +2,13 @@ import { getParsedAsyncAPI } from './asyncapiFile.js' export async function getSelectedServerNames(): Promise { const parsedAsyncAPI = await getParsedAsyncAPI() - + if (!process.env.GLEE_SERVER_NAMES) { return parsedAsyncAPI.serverNames() } const arrayOfNames = process.env.GLEE_SERVER_NAMES.split(',') - return parsedAsyncAPI.serverNames().filter(name => { + return parsedAsyncAPI.serverNames().filter((name) => { return arrayOfNames.includes(name) }) } diff --git a/src/lib/util.ts b/src/lib/util.ts index e793aa530..c2ac3aaad 100644 --- a/src/lib/util.ts +++ b/src/lib/util.ts @@ -7,9 +7,9 @@ import { GleeFunctionEvent } from './index.js' import GleeMessage from './message.js' interface IValidateDataReturn { - errors?: void | betterAjvErrors.IOutputError[], - humanReadableError?: void | betterAjvErrors.IOutputError[], - isValid: boolean | PromiseLike, + errors?: void | betterAjvErrors.IOutputError[] + humanReadableError?: void | betterAjvErrors.IOutputError[] + isValid: boolean | PromiseLike } /** @@ -19,7 +19,10 @@ interface IValidateDataReturn { * @param {String} path The path. * @param {String} channel The channel. */ -export const getParams = (path: string, channel: string): { [key: string]: string } | null => { +export const getParams = ( + path: string, + channel: string +): { [key: string]: string } | null => { if (path === undefined) return {} const keys = [] @@ -28,10 +31,15 @@ export const getParams = (path: string, channel: string): { [key: string]: strin if (result === null) return null - return keys.map((key, index) => ({ [key.name]: result[index + 1] })).reduce((prev, val) => ({ - ...prev, - ...val, - }), {}) + return keys + .map((key, index) => ({ [key.name]: result[index + 1] })) + .reduce( + (prev, val) => ({ + ...prev, + ...val, + }), + {} + ) } /** @@ -50,7 +58,7 @@ export const duplicateMessage = (message: GleeMessage): GleeMessage => { connection: message.connection, broadcast: message.broadcast, cluster: message.cluster, - query: message.query + query: message.query, }) if (message.isInbound()) { @@ -71,7 +79,7 @@ export const duplicateMessage = (message: GleeMessage): GleeMessage => { * @return {Boolean} */ export const matchChannel = (path: string, channel: string): boolean => { - return (getParams(path, channel) !== null) + return getParams(path, channel) !== null } /** @@ -82,7 +90,10 @@ export const matchChannel = (path: string, channel: string): boolean => { * @param {Object} schema A JSON Schema definition * @returns Object */ -export const validateData = (data: any, schema: object): IValidateDataReturn => { +export const validateData = ( + data: any, + schema: object +): IValidateDataReturn => { const ajv = new Ajv({ allErrors: true, jsonPointers: true }) const validation = ajv.compile(schema) const isValid = validation(data) @@ -105,10 +116,13 @@ export const validateData = (data: any, schema: object): IValidateDataReturn => } export const arrayHasDuplicates = (array: any[]) => { - return (new Set(array)).size !== array.length + return new Set(array).size !== array.length } -export const gleeMessageToFunctionEvent = (message: GleeMessage, glee: Glee): GleeFunctionEvent => { +export const gleeMessageToFunctionEvent = ( + message: GleeMessage, + glee: Glee +): GleeFunctionEvent => { return { payload: message.payload, query: message.query, @@ -120,7 +134,10 @@ export const gleeMessageToFunctionEvent = (message: GleeMessage, glee: Glee): Gl } as GleeFunctionEvent } -export const isRemoteServer = (parsedAsyncAPI: AsyncAPIDocument, serverName: string): boolean => { +export const isRemoteServer = ( + parsedAsyncAPI: AsyncAPIDocument, + serverName: string +): boolean => { const remoteServers = parsedAsyncAPI.extension('x-remoteServers') if (remoteServers) { return remoteServers.includes(serverName) @@ -130,7 +147,10 @@ export const isRemoteServer = (parsedAsyncAPI: AsyncAPIDocument, serverName: str export const resolveFunctions = async (object: any) => { for (const key in object) { - if (typeof object[String(key)] === 'object' && !Array.isArray(object[String(key)])) { + if ( + typeof object[String(key)] === 'object' && + !Array.isArray(object[String(key)]) + ) { await resolveFunctions(object[String(key)]) } else if (typeof object[String(key)] === 'function' && key !== 'auth') { object[String(key)] = await object[String(key)]() diff --git a/src/middlewares/errorLogger.ts b/src/middlewares/errorLogger.ts index d2c0fa229..7286b087e 100644 --- a/src/middlewares/errorLogger.ts +++ b/src/middlewares/errorLogger.ts @@ -6,10 +6,12 @@ import { MiddlewareCallback } from './index.js' export default (err: Error, message: GleeMessage, next: MiddlewareCallback) => { if (err instanceof GleeError) { if (message && message.isInbound()) { - err.message = 'You have received a malformed event or there has been error processing it. Please review the error below:' + err.message = + 'You have received a malformed event or there has been error processing it. Please review the error below:' logError(err, { showStack: false }) } else if (message && message.isOutbound()) { - err.message = 'One of your functions is producing a malformed event or there has been an error processing it. Please review the error below:' + err.message = + 'One of your functions is producing a malformed event or there has been an error processing it. Please review the error below:' logError(err, { showStack: false }) } } else { diff --git a/src/middlewares/existsInAsyncAPI.ts b/src/middlewares/existsInAsyncAPI.ts index 6222c5302..93604b6b2 100644 --- a/src/middlewares/existsInAsyncAPI.ts +++ b/src/middlewares/existsInAsyncAPI.ts @@ -2,8 +2,12 @@ import { AsyncAPIDocument } from '@asyncapi/parser' import { MiddlewareCallback } from './index.js' import GleeMessage from '../lib/message.js' -export default (asyncapi: AsyncAPIDocument) => (event: GleeMessage, next: MiddlewareCallback) => { - if (typeof event.channel !== 'string') return next(new Error(`Invalid channel name: ${event.channel}.`)) - if (asyncapi.channel(event.channel)) return next() - next(new Error(`Channel ${event.channel} is not defined in the AsyncAPI file.`)) -} +export default (asyncapi: AsyncAPIDocument) => + (event: GleeMessage, next: MiddlewareCallback) => { + if (typeof event.channel !== 'string') + {return next(new Error(`Invalid channel name: ${event.channel}.`))} + if (asyncapi.channel(event.channel)) return next() + next( + new Error(`Channel ${event.channel} is not defined in the AsyncAPI file.`) + ) + } diff --git a/src/middlewares/index.d.ts b/src/middlewares/index.d.ts index 30b82eb2f..9e6f271fd 100644 --- a/src/middlewares/index.d.ts +++ b/src/middlewares/index.d.ts @@ -1,5 +1,12 @@ import GleeMessage from '../lib/message.js' -export type Middleware = (message: GleeMessage, next: MiddlewareCallback) => void -export type ErrorMiddleware = (error: Error, message: GleeMessage, next: MiddlewareCallback) => void +export type Middleware = ( + message: GleeMessage, + next: MiddlewareCallback +) => void +export type ErrorMiddleware = ( + error: Error, + message: GleeMessage, + next: MiddlewareCallback +) => void export type MiddlewareCallback = (error?: Error, message?: GleeMessage) => void diff --git a/src/middlewares/validate.ts b/src/middlewares/validate.ts index b225fa46f..ac982dc9d 100644 --- a/src/middlewares/validate.ts +++ b/src/middlewares/validate.ts @@ -4,13 +4,19 @@ import GleeMessage from '../lib/message.js' import { validateData } from '../lib/util.js' import { MiddlewareCallback } from './index.js' -export default (schema: Schema) => (event: GleeMessage, next: MiddlewareCallback) => { - const { humanReadableError, errors, isValid } = validateData(event.payload, schema) - if (!isValid) { - return next(new GleeError({ - humanReadableError, - errors, - })) +export default (schema: Schema) => + (event: GleeMessage, next: MiddlewareCallback) => { + const { humanReadableError, errors, isValid } = validateData( + event.payload, + schema + ) + if (!isValid) { + return next( + new GleeError({ + humanReadableError, + errors, + }) + ) + } + next() } - next() -} diff --git a/src/middlewares/validateConnection.ts b/src/middlewares/validateConnection.ts index 1612405ed..061bfb7c3 100644 --- a/src/middlewares/validateConnection.ts +++ b/src/middlewares/validateConnection.ts @@ -5,7 +5,11 @@ export default (event: GleeMessage, next: MiddlewareCallback) => { if (!event.connection) return next() const { connection, channel } = event if (!connection.hasChannel(channel)) { - return next(new Error(`Can't send a message to channel ${channel} using this connection.`)) + return next( + new Error( + `Can't send a message to channel ${channel} using this connection.` + ) + ) } next() } diff --git a/src/registerAdapters.ts b/src/registerAdapters.ts index cd2d3b790..9666e94d6 100644 --- a/src/registerAdapters.ts +++ b/src/registerAdapters.ts @@ -11,14 +11,20 @@ import HttpServerAdapter from './adapters/http/server.js' import HttpClientAdapter from './adapters/http/client.js' import KafkaAdapter from './adapters/kafka/index.js' -export default async (app: Glee, parsedAsyncAPI: AsyncAPIDocument, config: GleeConfig) => { +export default async ( + app: Glee, + parsedAsyncAPI: AsyncAPIDocument, + config: GleeConfig +) => { const serverNames = await getSelectedServerNames() - serverNames.forEach(serverName => { + serverNames.forEach((serverName) => { const server = parsedAsyncAPI.server(serverName) if (!server) { - throw new Error(`Server "${serverName}" is not defined in the AsyncAPI file.`) + throw new Error( + `Server "${serverName}" is not defined in the AsyncAPI file.` + ) } registerAdapterForServer(serverName, server, app, parsedAsyncAPI, config) @@ -27,7 +33,13 @@ export default async (app: Glee, parsedAsyncAPI: AsyncAPIDocument, config: GleeC if (config.cluster) registerAdapterForCluster(app, config.cluster) } -function registerAdapterForServer(serverName: string, server: Server, app: Glee, parsedAsyncAPI: AsyncAPIDocument, config: GleeConfig) { +function registerAdapterForServer( + serverName: string, + server: Server, + app: Glee, + parsedAsyncAPI: AsyncAPIDocument, + config: GleeConfig +) { const protocol = server.protocol() const remoteServers = parsedAsyncAPI.extension('x-remoteServers') if (['mqtt', 'mqtts', 'secure-mqtt'].includes(protocol)) { @@ -50,7 +62,7 @@ function registerAdapterForServer(serverName: string, server: Server, app: Glee, app.addAdapter(WebsocketClientAdapter, { serverName, server, - parsedAsyncAPI + parsedAsyncAPI, }) } else { if (!configWsAdapter || configWsAdapter === 'native') { @@ -72,33 +84,35 @@ function registerAdapterForServer(serverName: string, server: Server, app: Glee, parsedAsyncAPI, }) } else { - throw new Error(`Unknown value for websocket.adapter found in glee.config.js: ${config.ws.server.adapter}. Allowed values are 'native-websocket', 'socket.io', or a reference to a custom Glee adapter.`) + throw new Error( + `Unknown value for websocket.adapter found in glee.config.js: ${config.ws.server.adapter}. Allowed values are 'native-websocket', 'socket.io', or a reference to a custom Glee adapter.` + ) } - } - }else if (['http', 'https'].includes(protocol)) { - if(remoteServers && remoteServers.includes(serverName)){ + } else if (['http', 'https'].includes(protocol)) { + if (remoteServers && remoteServers.includes(serverName)) { app.addAdapter(HttpClientAdapter, { serverName, server, - parsedAsyncAPI + parsedAsyncAPI, }) - }else{ + } else { app.addAdapter(HttpServerAdapter, { serverName, server, - parsedAsyncAPI + parsedAsyncAPI, }) } - } - - else { + } else { // TODO: Improve error message with link to repo encouraging the developer to contribute. throw new Error(`Protocol "${server.protocol()}" is not supported yet.`) } } -function registerAdapterForCluster(app: Glee, config: GleeClusterAdapterConfig) { +function registerAdapterForCluster( + app: Glee, + config: GleeClusterAdapterConfig +) { const adapter = config.adapter if (!adapter || adapter === 'redis') {