Skip to content
This repository has been archived by the owner on May 29, 2023. It is now read-only.

Commit

Permalink
new protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Hugo Arregui authored and hugoArregui committed Apr 14, 2022
1 parent 9aa5dcf commit 0662a13
Show file tree
Hide file tree
Showing 15 changed files with 4,561 additions and 1,471 deletions.
124 changes: 71 additions & 53 deletions src/controllers/handlers/ws-room-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ import { upgradeWebSocketResponse } from "@well-known-components/http-server/dis
import { IHttpServerComponent } from "@well-known-components/interfaces"
import { WebSocket } from "ws"
import { GlobalContext } from "../../types"
import * as proto from "../proto/broker"
import {
MessageType,
MessageHeader,
MessageTypeMap,
SystemMessage,
IdentityMessage
} from "../proto/ws_pb"

const connectionsPerRoom = new Map<string, Set<WebSocket>>()

function getConnectionsList(roomId: string): Set<WebSocket> {
let set = connectionsPerRoom.get(roomId)
if (!set) {
Expand Down Expand Up @@ -41,65 +48,76 @@ export async function websocketRoomHandler(

ws.on("message", (message) => {
const data = message as Buffer
const msgType = proto.CoordinatorMessage.deserializeBinary(data).getType()

if (msgType === proto.MessageType.PING) {
ws.send(data)
} else if (msgType === proto.MessageType.TOPIC) {
const topicMessage = proto.TopicMessage.deserializeBinary(data)

const topicFwMessage = new proto.TopicFWMessage()
topicFwMessage.setType(proto.MessageType.TOPIC_FW)
topicFwMessage.setFromAlias(alias)
topicFwMessage.setBody(topicMessage.getBody_asU8())

const topicData = topicFwMessage.serializeBinary()
let msgType = MessageType.UNKNOWN_MESSAGE_TYPE as MessageTypeMap[keyof MessageTypeMap]
try {
msgType = MessageHeader.deserializeBinary(data).getType()
} catch (err) {
logger.error('cannot deserialize message header')
return
}

// Reliable/unreliable data
connections.forEach(($) => {
if (ws !== $) {
$.send(topicData)
switch (msgType) {
case MessageType.UNKNOWN_MESSAGE_TYPE: {
logger.log('unsupported message')
break
}
case MessageType.SYSTEM: {
try {
const message = SystemMessage.deserializeBinary(data)
message.setFromAlias(alias)

// Reliable/unreliable data
connections.forEach(($) => {
if (ws !== $) {
$.send(message.serializeBinary())
}
})
} catch (e) {
logger.error(`cannot process system message ${e}`)
}
})
} else if (msgType === proto.MessageType.TOPIC_IDENTITY) {
const topicMessage = proto.TopicIdentityMessage.deserializeBinary(data)

const topicFwMessage = new proto.TopicIdentityFWMessage()
topicFwMessage.setType(proto.MessageType.TOPIC_IDENTITY_FW)
topicFwMessage.setFromAlias(alias)
topicFwMessage.setIdentity(aliasToUserId.get(alias)!)
topicFwMessage.setRole(proto.Role.CLIENT)
topicFwMessage.setBody(topicMessage.getBody_asU8())

const topicData = topicFwMessage.serializeBinary()

// Reliable/unreliable data
connections.forEach(($) => {
if (ws !== $) {
$.send(topicData)
break
}
case MessageType.IDENTITY: {
try {
const message = IdentityMessage.deserializeBinary(data)
message.setFromAlias(alias)
message.setIdentity(userId)

// Reliable/unreliable data
connections.forEach(($) => {
if (ws !== $) {
$.send(message.serializeBinary())
}
})
} catch (e) {
logger.error(`cannot process identity message ${e}`)
}
})
break
}
default: {
logger.log(`ignoring msgType ${msgType}`)
break
}
}
})

setTimeout(() => {
const welcome = new proto.WelcomeMessage()
welcome.setType(proto.MessageType.WELCOME)
welcome.setAlias(alias)
const data = welcome.serializeBinary()

ws.send(data)
}, 100)

ws.on("error", (error) => {
logger.error(error)
ws.close()
connections.delete(ws)
})

ws.on("close", () => {
logger.info("Websocket closed")
connections.delete(ws)
ws.on("error", (error) => {
logger.error(error)
ws.close()
const room = connectionsPerRoom.get(roomId)
if (room) {
room.delete(ws)
}
})

ws.on("close", () => {
logger.info("Websocket closed")
const room = connectionsPerRoom.get(roomId)
if (room) {
room.delete(ws)
}
})
})
})
}
139 changes: 0 additions & 139 deletions src/controllers/handlers/ws-rooms-handler.ts

This file was deleted.

33 changes: 33 additions & 0 deletions src/controllers/proto/bff.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";

package protocol;

enum MessageType {
UNKNOWN_MESSAGE_TYPE = 0;
HEARTBEAT = 1;
SUBSCRIPTION = 2;
TOPIC = 3;
}

message MessageHeader {
MessageType type = 1;
}

message HeartBeatMessage {
MessageType type = 1;
double time = 2;
bytes data = 3;
}

// NOTE: topics is a space separated string in the format specified by Format
message SubscriptionMessage {
MessageType type = 1;
bytes topics = 2;
}

message TopicMessage {
MessageType type = 1;
uint64 from_alias = 2;
string topic = 3;
bytes body = 4;
}
Loading

0 comments on commit 0662a13

Please sign in to comment.