Skip to content

Commit

Permalink
maps and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
tudddorrr committed Dec 16, 2024
1 parent 11fcceb commit 9cba698
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 27 deletions.
56 changes: 31 additions & 25 deletions src/socket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import authenticateSocket from './authenticateSocket'
import SocketConnection from './socketConnection'
import SocketRouter from './router/socketRouter'
import { sendMessage } from './messages/socketMessage'
import { logConnection, logConnectionClosed } from './messages/socketLogger'

type CloseConnectionOptions = {
code?: number
Expand All @@ -16,7 +17,7 @@ type CloseConnectionOptions = {

export default class Socket {
private readonly wss: WebSocketServer
private connections: SocketConnection[] = []
private connections: Map<WebSocket, SocketConnection> = new Map()
private router: SocketRouter

constructor(server: Server, private readonly em: EntityManager) {
Expand All @@ -39,32 +40,35 @@ export default class Socket {
return this.wss
}

/* v8 ignore start */
heartbeat(): void {
const interval = setInterval(() => {
this.connections.forEach((conn) => {
/* v8 ignore start */
for (const [ws, conn] of this.connections.entries()) {
if (!conn.alive) {
this.closeConnection(conn.ws, { terminate: true })
return
this.closeConnection(ws, { terminate: true })
continue
}

conn.alive = false
conn.ws.ping()
/* v8 ignore end */
})
ws.ping()
}
}, 30_000)

this.wss.on('close', () => {
clearInterval(interval)
})
}
/* v8 ignore stop */

async handleConnection(ws: WebSocket, req: IncomingMessage): Promise<void> {
logConnection(req)

await RequestContext.create(this.em, async () => {
const key = await authenticateSocket(req.headers?.authorization ?? '')
if (key) {
this.connections.push(new SocketConnection(ws, key, req))
sendMessage(this.connections.at(-1), 'v1.connected', {})
const connection = new SocketConnection(ws, key, req)
this.connections.set(ws, connection)
sendMessage(connection, 'v1.connected', {})
} else {
this.closeConnection(ws)
}
Expand All @@ -73,48 +77,50 @@ export default class Socket {

async handleMessage(ws: WebSocket, data: RawData): Promise<void> {
await RequestContext.create(this.em, async () => {
await this.router.handleMessage(this.findConnectionBySocket(ws), data)
const connection = this.connections.get(ws)
if (connection) {
await this.router.handleMessage(connection, data)
/* v8 ignore next 3 */
} else {
this.closeConnection(ws)
}
})
}

/* v8 ignore start */
handlePong(ws: WebSocket): void {
const connection = this.findConnectionBySocket(ws)
const connection = this.connections.get(ws)
if (!connection) return

connection.alive = true
if (connection.rateLimitWarnings > 0) {
connection.rateLimitWarnings--
}
}
/* v8 ignore end */
/* v8 ignore stop */

closeConnection(ws: WebSocket, options: CloseConnectionOptions = {}): void {
const terminate = options.terminate ?? false
const preclosed = options.preclosed ?? false
const code = options.code ?? 3000

if (terminate) {
ws.terminate()
} else if (!preclosed) {
ws.close(options.code ?? 3000, options.reason)
ws.close(code, options.reason)
}

this.connections = this.connections.filter((conn) => conn.ws !== ws)
const conn = this.connections.get(ws)
logConnectionClosed(conn, preclosed, code, options.reason)

this.connections.delete(ws)
}

findConnectionBySocket(ws: WebSocket): SocketConnection | undefined {
const connection = this.connections.find((conn) => conn.ws === ws)
/* v8 ignore start */
if (!connection) {
this.closeConnection(ws)
return
}
/* v8 ignore end */

return connection
return this.connections.get(ws)

Check warning on line 120 in src/socket/index.ts

View check run for this annotation

Codecov / codecov/patch

src/socket/index.ts#L120

Added line #L120 was not covered by tests
}

findConnections(filter: (conn: SocketConnection) => boolean): SocketConnection[] {
return this.connections.filter(filter)
return Array.from(this.connections.values()).filter(filter)
}
}
37 changes: 37 additions & 0 deletions src/socket/messages/socketLogger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { RawData } from 'ws'
import SocketConnection from '../socketConnection'
import { SocketMessageResponse } from './socketMessage'
import { IncomingMessage } from 'http'

function getSocketUrl(conn: SocketConnection | undefined): string {
if (!conn) {
return 'WSS /'
}
return `WSS /games/${conn.game.id}/${conn.playerAliasId ? `aliases/${conn.playerAliasId}/` : ''}`
}

function getSize(message: string): string {
return `${Buffer.byteLength(message)}b`
}

export function logRequest(conn: SocketConnection, rawData: RawData) {
const message = rawData.toString()
const req = JSON.parse(message)?.req
console.log(` <-- ${getSocketUrl(conn)}{${req}} ${conn.ip} ${getSize(message)}`)
}

export function logResponse(conn: SocketConnection, res: SocketMessageResponse, message: string) {
console.log(` --> ${getSocketUrl(conn)}{${res}} ${conn.ip} ${getSize(message)}`)
}

export function logConnection(req: IncomingMessage) {
console.log(` <-- WSS /open ${req.socket.remoteAddress}`)
}

export function logConnectionClosed(conn: SocketConnection | undefined, preclosed: boolean, code: number, reason?: string) {
const direction = preclosed ? '<--' : '-->'
const ip = conn?.ip ?? 'unknown'
const displayCode = preclosed ? '' : code
const displayReason = reason ?? ''
console.log(` ${direction} ${getSocketUrl(conn)}close ${ip} ${displayCode} ${displayReason}`)
}
9 changes: 7 additions & 2 deletions src/socket/messages/socketMessage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import SocketConnection from '../socketConnection'
import { logResponse } from './socketLogger'

export const requests = [
'v1.players.identify',
Expand All @@ -20,10 +21,14 @@ export type SocketMessageResponse = typeof responses[number]

export function sendMessage<T>(conn: SocketConnection, res: SocketMessageResponse, data: T) {
if (conn.ws.readyState === conn.ws.OPEN) {
conn.ws.send(JSON.stringify({
const message = JSON.stringify({
res,
data
}))
})

logResponse(conn, res, message)

conn.ws.send(message)
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/socket/router/socketRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import SocketError, { sendError } from '../messages/socketError'
import { APIKeyScope } from '../../entities/api-key'
import playerListeners from '../listeners/playerListeners'
import gameChannelListeners from '../listeners/gameChannelListeners'
import { logRequest } from '../messages/socketLogger'

const socketMessageValidator = z.object({
req: z.enum(requests),
Expand All @@ -26,6 +27,8 @@ export default class SocketRouter {
constructor(readonly socket: Socket) {}

async handleMessage(conn: SocketConnection, rawData: RawData): Promise<void> {
logRequest(conn, rawData)

addBreadcrumb({
category: 'message',
message: rawData.toString(),
Expand Down
2 changes: 2 additions & 0 deletions src/socket/socketConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export default class SocketConnection {
game: Game | null = null
scopes: APIKeyScope[] = []
private headers: IncomingHttpHeaders = {}
ip: string = ''

rateLimitKey: string = v4()
rateLimitWarnings: number = 0
Expand All @@ -24,6 +25,7 @@ export default class SocketConnection {
this.game = apiKey.game
this.scopes = apiKey.scopes
this.headers = req.headers
this.ip = req.socket.remoteAddress
}

async getPlayerAlias(): Promise<PlayerAlias | null> {
Expand Down

0 comments on commit 9cba698

Please sign in to comment.