Skip to content
This repository has been archived by the owner on May 29, 2023. It is now read-only.

Commit

Permalink
feat: implement cluster-status endpoint (#80)
Browse files Browse the repository at this point in the history
* feat: implement cluster-status endpoint

* chore: setup subscription on connected event
  • Loading branch information
0xJuancito authored Jun 10, 2022
1 parent 72c0ecd commit cdce1ca
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .env.default
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ HTTP_SERVER_PORT=3000
HTTP_SERVER_HOST=0.0.0.0
NATS_URL=localhost:4222
WS_ROOM_SERVICE_SECRET=

SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL=60000
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"dcl-crypto": "^2.3.0",
"google-protobuf": "^3.20.1",
"jsonwebtoken": "^8.5.1",
"mitt": "^3.0.0",
"nats": "^2.7.1",
"protoc-gen-dcl": "^1.0.0-20220109214200.commit-f45e34a",
"ts-proto": "^1.112.1",
Expand Down
3 changes: 3 additions & 0 deletions src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { createMessageBrokerComponent } from './ports/message-broker'
import { createRpcServer } from '@dcl/rpc'
import { httpProviderForNetwork } from '@dcl/catalyst-contracts'
import mitt from 'mitt'
import { createServiceDiscoveryComponent } from './ports/service-discovery'

const DEFAULT_ETH_NETWORK = 'ropsten'

Expand All @@ -35,6 +36,7 @@ export async function initComponents(): Promise<AppComponents> {
const fetch = await createFetchComponent()
const metrics = await createMetricsComponent(metricDeclarations, { server, config })
const messageBroker = await createMessageBrokerComponent({ config, logs })
const serviceDiscovery = await createServiceDiscoveryComponent({ messageBroker, logs, config })

// TODO: deprecate web3x and use ethersjs
const CURRENT_ETH_NETWORK = (await config.getString('ETH_NETWORK')) ?? DEFAULT_ETH_NETWORK
Expand All @@ -49,6 +51,7 @@ export async function initComponents(): Promise<AppComponents> {
metrics,
ws,
messageBroker,
serviceDiscovery,
ethereumProvider,
rpcServer,
roomsMessages: mitt(),
Expand Down
10 changes: 10 additions & 0 deletions src/controllers/handlers/cluster-status-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { IHttpServerComponent } from '@well-known-components/interfaces'
import { GlobalContext } from '../../types'

// handlers arguments only type what they need, to make unit testing easier
export async function clusterStatusHandler(context: IHttpServerComponent.DefaultContext<GlobalContext>) {
const clusterStatus = await context.components.serviceDiscovery.getClusterStatus()
return {
body: clusterStatus
}
}
2 changes: 2 additions & 0 deletions src/controllers/routes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Router } from '@well-known-components/http-server'
import { GlobalContext } from '../types'
import { pingHandler } from './handlers/ping-handler'
import { clusterStatusHandler } from './handlers/cluster-status-handler'
import { statusHandler } from './handlers/status-handler'
import { websocketHandler } from './handlers/ws-handler'
import { websocketRoomHandler } from './handlers/ws-room-handler'
Expand All @@ -12,6 +13,7 @@ export async function setupRouter(_: GlobalContext): Promise<Router<GlobalContex

router.get('/ping', pingHandler)

router.get('/cluster-status', clusterStatusHandler)
router.get('/status', statusHandler)
router.get('/ws', websocketHandler)
router.get('/rpc', websocketRpcHandler)
Expand Down
14 changes: 13 additions & 1 deletion src/ports/message-broker.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import { IBaseComponent } from '@well-known-components/interfaces'
import { connect, NatsConnection } from 'nats'
import { Subscription, BaseComponents } from '../types'
import mitt from 'mitt'
import { Emitter } from 'mitt'

export type MessageBrokerEvents = {
connected: void
}

export type IMessageBrokerComponent = {
publish(topic: string, message?: Uint8Array): void
subscribe(topic: string): Subscription

start(): Promise<void>
stop(): Promise<void>

events: Emitter<MessageBrokerEvents>
}

export async function createMessageBrokerComponent(
Expand All @@ -21,6 +29,8 @@ export async function createMessageBrokerComponent(
const natsConfig = { servers: `${natsUrl}` }
let natsConnection: NatsConnection

const events = mitt<MessageBrokerEvents>()

function publish(topic: string, message?: Uint8Array): void {
natsConnection.publish(topic, message)
}
Expand All @@ -43,6 +53,7 @@ export async function createMessageBrokerComponent(
async function start() {
try {
natsConnection = await connect(natsConfig)
events.emit('connected')
logger.info(`Connected to NATS: ${natsUrl}`)
} catch (error) {
logger.error(`An error occurred trying to connect to the NATS server: ${natsUrl}`)
Expand All @@ -62,6 +73,7 @@ export async function createMessageBrokerComponent(
publish,
subscribe,
start,
stop
stop,
events
}
}
77 changes: 77 additions & 0 deletions src/ports/service-discovery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { IBaseComponent } from '@well-known-components/interfaces'
import { JSONCodec } from 'nats'
import { BaseComponents, Subscription } from '../types'

export type ServiceDiscoveryMessage = {
serverName: string
status: any
}

export type ClusterStatus = Map<string, any>
export type LastStatusUpdate = Map<string, number>

export type IServiceDiscoveryComponent = IBaseComponent & {
getClusterStatus(): Promise<any>
stop(): Promise<void>
}

export async function createServiceDiscoveryComponent(
components: Pick<BaseComponents, 'messageBroker' | 'logs' | 'config'>
): Promise<IServiceDiscoveryComponent> {
let healthCheckTimer: NodeJS.Timer
let subscription: Subscription

const { messageBroker, logs, config } = components
const logger = logs.getLogger('Service Discovery')
const jsonCodec = JSONCodec()

const clusterStatus = new Map<string, any>()
const lastStatusUpdate = new Map<string, number>()

messageBroker.events.on('connected', async () => {
await setupServiceDiscovery()
await setupHealthCheck()
})

async function setupServiceDiscovery() {
subscription = messageBroker.subscribe('service.discovery')
;(async () => {
for await (const message of subscription.generator) {
try {
const discoveryMsg = jsonCodec.decode(message.data) as ServiceDiscoveryMessage
clusterStatus.set(discoveryMsg.serverName, discoveryMsg.status)
lastStatusUpdate.set(discoveryMsg.serverName, Date.now())
} catch (err: any) {
logger.error(`Could not decode status discovery message: ${err.message}`)
}
}
})().catch((err: any) => logger.error(`error processing subscription message; ${err.toString()}`))
}

async function setupHealthCheck() {
const interval = await config.requireNumber('SERVICE_DISCOVERY_HEALTH_CHECK_INTERVAL')
healthCheckTimer = setInterval(() => {
for (const [serverName, lastUpdate] of lastStatusUpdate) {
const unhealthy = lastUpdate < Date.now() - interval
if (unhealthy) {
clusterStatus.delete(serverName)
lastStatusUpdate.delete(serverName)
}
}
}, interval)
}

async function getClusterStatus() {
return Object.fromEntries(clusterStatus)
}

async function stop() {
clearInterval(healthCheckTimer)
subscription?.unsubscribe()
}

return {
getClusterStatus,
stop
}
}
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { WebSocket } from 'ws'
import { HttpProvider } from 'web3x/providers'
import { RpcServer, RpcServerPort } from '@dcl/rpc'
import { Emitter } from 'mitt'
import { IServiceDiscoveryComponent } from './ports/service-discovery'

export type GlobalContext = {
components: BaseComponents
Expand All @@ -32,6 +33,7 @@ export type BaseComponents = {
metrics: IMetricsComponent<keyof typeof metricDeclarations>
ws: WebSocketComponent
messageBroker: IMessageBrokerComponent
serviceDiscovery: IServiceDiscoveryComponent
// TODO: deprecate web3x and use ethersjs
ethereumProvider: HttpProvider

Expand Down
12 changes: 9 additions & 3 deletions test/helpers/message-broker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { IBaseComponent } from '@well-known-components/interfaces'
import { IMessageBrokerComponent } from '../../src/ports/message-broker'
import { IMessageBrokerComponent, MessageBrokerEvents } from '../../src/ports/message-broker'
import { BaseComponents, NatsMsg, Subscription } from '../../src/types'
import { pushableChannel } from '@dcl/rpc/dist/push-channel'
import mitt from 'mitt'
import { Emitter } from 'mitt'

type PushableChannel = {
push(msg: NatsMsg): void
Expand All @@ -11,6 +13,7 @@ export async function createLocalMessageBrokerComponent(
_: Pick<BaseComponents, 'config' | 'logs'>
): Promise<IMessageBrokerComponent & IBaseComponent> {
const channels = new Map<string, PushableChannel>()
const events = mitt<MessageBrokerEvents>()

function publish(topic: string, data: Uint8Array): void {
channels.forEach((ch, pattern) => {
Expand Down Expand Up @@ -42,14 +45,17 @@ export async function createLocalMessageBrokerComponent(
}
}

async function start() {}
async function start() {
events.emit('connected')
}

async function stop() {}

return {
publish,
subscribe,
start,
stop
stop,
events
}
}

0 comments on commit cdce1ca

Please sign in to comment.