Skip to content

Commit

Permalink
Extendable messageHanlers
Browse files Browse the repository at this point in the history
  • Loading branch information
dmonad committed Feb 3, 2021
1 parent 9fd8f41 commit 356370d
Showing 1 changed file with 33 additions and 22 deletions.
55 changes: 33 additions & 22 deletions src/y-websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,33 @@ const messageQueryAwareness = 3
const messageAwareness = 1
const messageAuth = 2

/**
* encoder, decoder, provider, emitSynced, messageType
* @type {Array<function(encoding.Encoder, decoding.Decoder, WebsocketProvider, boolean, number):void>}
*/
const messageHandlers = []

messageHandlers[messageSync] = (encoder, decoder, provider, emitSynced, messageType) => {
encoding.writeVarUint(encoder, messageSync)
const syncMessageType = syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider)
if (emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 && !provider.synced) {
provider.synced = true
}
}

messageHandlers[messageQueryAwareness] = (encoder, decoder, provider, emitSynced, messageType) => {
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys())))
}

messageHandlers[messageAwareness] = (encoder, decoder, provider, emitSynced, messageType) => {
awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider)
}

messageHandlers[messageAuth] = (encoder, decoder, provider, emitSynced, messageType) => {
authProtocol.readAuthMessage(decoder, provider.doc, permissionDeniedHandler)
}

const reconnectTimeoutBase = 1200
const maxReconnectTimeout = 2500
// @todo - this should depend on awareness.outdatedTime
Expand All @@ -47,28 +74,11 @@ const readMessage = (provider, buf, emitSynced) => {
const decoder = decoding.createDecoder(buf)
const encoder = encoding.createEncoder()
const messageType = decoding.readVarUint(decoder)
switch (messageType) {
case messageSync: {
encoding.writeVarUint(encoder, messageSync)
const syncMessageType = syncProtocol.readSyncMessage(decoder, encoder, provider.doc, provider)
if (emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 && !provider.synced) {
provider.synced = true
}
break
}
case messageQueryAwareness:
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(provider.awareness, Array.from(provider.awareness.getStates().keys())))
break
case messageAwareness:
awarenessProtocol.applyAwarenessUpdate(provider.awareness, decoding.readVarUint8Array(decoder), provider)
break
case messageAuth:
authProtocol.readAuthMessage(decoder, provider.doc, permissionDeniedHandler)
break
default:
console.error('Unable to compute message')
return encoder
const messageHandler = provider.messageHandlers[messageType]
if (/** @type {any} */ (messageHandler)) {
messageHandler(encoder, decoder, provider, emitSynced, messageType)
} else {
console.error('Unable to compute message')
}
return encoder
}
Expand Down Expand Up @@ -198,6 +208,7 @@ export class WebsocketProvider extends Observable {
this.wsconnecting = false
this.bcconnected = false
this.wsUnsuccessfulReconnects = 0
this.messageHandlers = messageHandlers.slice()
this.mux = mutex.createMutex()
/**
* @type {boolean}
Expand Down

0 comments on commit 356370d

Please sign in to comment.