Skip to content

Commit

Permalink
feat: add prettier lint config and refactor the files (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
Souvikns committed Jul 5, 2023
1 parent f3cb503 commit f65a7a0
Show file tree
Hide file tree
Showing 35 changed files with 1,038 additions and 606 deletions.
5 changes: 5 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
dist
coverage
.github
examples
node_modules
4 changes: 4 additions & 0 deletions .prettierrc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
trailingComma: "es5"
tabWidth: 2
semi: false
singleQuote: true
12 changes: 6 additions & 6 deletions src/adapters/cluster/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ class RedisClusterAdapter extends ClusterAdapter {
this._channelName = `${this.serverName}-channel`

this._publisher = createClient({
url: this.serverUrlExpanded
url: this.serverUrlExpanded,
})
const subscriber = this._publisher.duplicate()

this._publisher.on('error', err => {
this._publisher.on('error', (err) => {
this.emit('error', err)
})

Expand All @@ -41,7 +41,7 @@ class RedisClusterAdapter extends ClusterAdapter {
this.emit('close', { name: this.name(), adapter: this })
})

subscriber.on('error', err => {
subscriber.on('error', (err) => {
this.emit('error', err)
})

Expand All @@ -50,14 +50,14 @@ class RedisClusterAdapter extends ClusterAdapter {
})

subscriber.on('end', () => {
this.emit('close', { name: this.name(), adapter: this })
this.emit('close', { name: this.name(), adapter: this })
})

await Promise.all([this._publisher.connect(), subscriber.connect()])

subscriber.subscribe(this._channelName, serialized => {
subscriber.subscribe(this._channelName, (serialized) => {
const message = this.deserializeMessage(serialized)
if ( message ) this.emit('message', message)
if (message) this.emit('message', message)
})

this.emit('connect', { name: this.name(), adapter: this })
Expand Down
11 changes: 5 additions & 6 deletions src/adapters/http/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import got from 'got'
import { HttpAuthConfig, HttpAdapterConfig } from '../../lib/index.js'
import http from 'http'
class HttpClientAdapter extends Adapter {

name(): string {
return 'HTTP client'
}
Expand All @@ -28,14 +27,14 @@ class HttpClientAdapter extends Adapter {
const channelInfo = this.parsedAsyncAPI.channel(channelName)
const httpChannelBinding = channelInfo.binding('http')
const channelServers = channelInfo.servers()
const isChannelServers = !channelServers.length || channelServers.includes(message.serverName)
if (
httpChannelBinding && isChannelServers
) {
const isChannelServers =
!channelServers.length || channelServers.includes(message.serverName)
if (httpChannelBinding && isChannelServers) {
const method = httpChannelBinding.method
const url = `${serverUrl}/${channelName}`
const body: any = message.payload
const query: { [key: string]: string } | { [key: string]: string[] } = message.query
const query: { [key: string]: string } | { [key: string]: string[] } =
message.query
got({
method,
url,
Expand Down
58 changes: 33 additions & 25 deletions src/adapters/http/server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import Adapter from "../../lib/adapter.js"
import GleeMessage from "../../lib/message.js"
import http from "http"
import { validateData } from "../../lib/util.js"
import GleeError from "../../errors/glee-error.js"
import * as url from "url"
import Adapter from '../../lib/adapter.js'
import GleeMessage from '../../lib/message.js'
import http from 'http'
import { validateData } from '../../lib/util.js'
import GleeError from '../../errors/glee-error.js'
import * as url from 'url'

class HttpAdapter extends Adapter {
private httpResponses = new Map()

name(): string {
return "HTTP server"
return 'HTTP server'
}

async connect(): Promise<this> {
Expand All @@ -20,41 +20,42 @@ class HttpAdapter extends Adapter {
return this._send(message)
}

async _connect(): Promise<this> { // NOSONAR
const config = await this.resolveProtocolConfig("http")
async _connect(): Promise<this> {
// NOSONAR
const config = await this.resolveProtocolConfig('http')
const httpOptions = config?.server
const serverUrl = new URL(this.serverUrlExpanded)
const httpServer = httpOptions?.httpServer || http.createServer()
const asyncapiServerPort = serverUrl.port || 80
const optionsPort = httpOptions?.port
const port = optionsPort || asyncapiServerPort

httpServer.on("request", (req, res) => {
res.setHeader("Content-Type", "application/json")
httpServer.on('request', (req, res) => {
res.setHeader('Content-Type', 'application/json')
const bodyBuffer = []
let body: object
req.on("data", (chunk) => {
req.on('data', (chunk) => {
bodyBuffer.push(chunk)
})
req.on("end", () => {
req.on('end', () => {
body = JSON.parse(Buffer.concat(bodyBuffer).toString())
this.httpResponses.set(this.serverName, res)
let { pathname } = new URL(req.url, serverUrl)
pathname = pathname.startsWith("/") ? pathname.substring(1) : pathname
pathname = pathname.startsWith('/') ? pathname.substring(1) : pathname
if (!this.parsedAsyncAPI.channel(pathname)) {
res.end("HTTP/1.1 404 Not Found1\r\n\r\n")
res.end('HTTP/1.1 404 Not Found1\r\n\r\n')
const err = new Error(
`A client attempted to connect to channel ${pathname} but this channel is not defined in your AsyncAPI file. here`
)
this.emit("error", err)
this.emit('error', err)
return err
}
const { query } = url.parse(req.url, true)
const searchParams = { query }
const payload = body
const httpChannelBinding = this.parsedAsyncAPI
.channel(pathname)
.binding("http")
.binding('http')
if (httpChannelBinding) {
this._checkHttpBinding(
req,
Expand All @@ -65,26 +66,33 @@ class HttpAdapter extends Adapter {
payload
)
}
this.emit("connect", {
this.emit('connect', {
name: this.name(),
adapter: this,
connection: http,
channel: pathname,
})
const msg = this._createMessage(pathname, payload, searchParams)
this.emit("message", msg, http)
this.emit('message', msg, http)
})
})

httpServer.listen(port)
this.emit("server:ready", { name: this.name(), adapter: this })
this.emit('server:ready', { name: this.name(), adapter: this })
return this
}
_checkHttpBinding(req:any, res:any, pathname:any, httpChannelBinding:any, searchParams:any, payload:any) {
_checkHttpBinding(
req: any,
res: any,
pathname: any,
httpChannelBinding: any,
searchParams: any,
payload: any
) {
const { query, body, method } = httpChannelBinding
if (method && req.method !== method) {
const err = new Error(`Cannot ${req.method} ${pathname}`)
this.emit("error", err)
this.emit('error', err)
res.end(err.message)
return
}
Expand All @@ -95,7 +103,7 @@ class HttpAdapter extends Adapter {
)
if (!isValid) {
const err = new GleeError({ humanReadableError, errors })
this.emit("error", err)
this.emit('error', err)
res.end(JSON.stringify(err.errors))
return
}
Expand All @@ -107,7 +115,7 @@ class HttpAdapter extends Adapter {
)
if (!isValid) {
const err = new GleeError({ humanReadableError, errors })
this.emit("error", err)
this.emit('error', err)
res.end(JSON.stringify(err.errors))
return
}
Expand All @@ -123,7 +131,7 @@ class HttpAdapter extends Adapter {
return new GleeMessage({
payload: JSON.parse(JSON.stringify(body)),
channel: pathName,
query: JSON.parse(JSON.stringify( params.query))
query: JSON.parse(JSON.stringify(params.query)),
})
}
}
Expand Down
30 changes: 20 additions & 10 deletions src/adapters/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Kafka, SASLOptions } from 'kafkajs'
import Adapter from '../../lib/adapter.js'
import GleeMessage from '../../lib/message.js'
import {KafkaAdapterConfig, KafkaAuthConfig} from '../../lib/index.js'
import { KafkaAdapterConfig, KafkaAuthConfig } from '../../lib/index.js'

class KafkaAdapter extends Adapter {
private kafka: Kafka
Expand All @@ -11,7 +11,9 @@ class KafkaAdapter extends Adapter {
}

async connect() {
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig('kafka')
const kafkaOptions: KafkaAdapterConfig = await this.resolveProtocolConfig(
'kafka'
)
const auth: KafkaAuthConfig = await this.getAuthConfig(kafkaOptions.auth)
const securityRequirements = (this.AsyncAPIServer.security() || []).map(
(sec) => {
Expand Down Expand Up @@ -39,7 +41,10 @@ class KafkaAdapter extends Adapter {
cert: auth?.cert,
},
sasl: {
mechanism: (scramSha256SecurityReq ? 'scram-sha-256' : undefined) || (scramSha512SecurityReq ? 'scram-sha-512' : undefined) || 'plain',
mechanism:
(scramSha256SecurityReq ? 'scram-sha-256' : undefined) ||
(scramSha512SecurityReq ? 'scram-sha-512' : undefined) ||
'plain',
username: userAndPasswordSecurityReq ? auth?.username : undefined,
password: userAndPasswordSecurityReq ? auth?.password : undefined,
} as SASLOptions,
Expand All @@ -53,13 +58,16 @@ class KafkaAdapter extends Adapter {
name: this.name(),
adapter: this,
connection: consumer,
channels: this.getSubscribedChannels()
channels: this.getSubscribedChannels(),
})
}
})
await consumer.connect()
const subscribedChannels = this.getSubscribedChannels()
await consumer.subscribe({ topics: subscribedChannels, fromBeginning: true })
await consumer.subscribe({
topics: subscribedChannels,
fromBeginning: true,
})
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const msg = this._createMessage(topic, partition, message)
Expand All @@ -73,11 +81,13 @@ class KafkaAdapter extends Adapter {
await producer.connect()
await producer.send({
topic: message.channel,
messages: [{
key: message.headers.key,
value: message.payload,
timestamp: message.headers.timestamp,
}],
messages: [
{
key: message.headers.key,
value: message.payload,
timestamp: message.headers.timestamp,
},
],
})
await producer.disconnect()
}
Expand Down
Loading

0 comments on commit f65a7a0

Please sign in to comment.