Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: add message byte batching (#235)
Browse files Browse the repository at this point in the history
* feat: add message byte batching

Adds a new setting `minSendBytes` that is `undefined` by default.

If `undefined` all messages sent through multiplexed streams will
be serialized and sent over the wire immediately.

If set to a number, it will be used as a byte value, and the
serialized bytes of all messages sent during the current tick will
be buffered up to that value.

Once either the buffer lengths hits that value or the next tick
begins, all bytes in the buffer will be sent over the wire.

* chore: add readme note
  • Loading branch information
achingbrain committed Nov 25, 2022
1 parent 618a917 commit 4e2a49d
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Creates a factory that can be used to create new muxers.

- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: 4194304 - e.g. 4MB)
- `minSendBytes` - if set, message bytes from the current tick will be batched up to this amount before being yielded by the muxer source, unless the next tick begins in which case all available bytes will be yielded
- `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024)
- `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024)
- `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"any-signal": "^3.0.0",
"benchmark": "^2.1.4",
"err-code": "^3.0.1",
"it-batched-bytes": "^1.0.0",
"it-pushable": "^3.1.0",
"it-stream-types": "^1.0.4",
"rate-limiter-flexible": "^2.3.9",
Expand Down
28 changes: 22 additions & 6 deletions src/encode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import varint from 'varint'
import { Uint8ArrayList } from 'uint8arraylist'
import { allocUnsafe } from './alloc-unsafe.js'
import { Message, MessageTypes } from './message-types.js'
import batchedBytes from 'it-batched-bytes'

const POOL_SIZE = 10 * 1024

Expand Down Expand Up @@ -55,14 +56,29 @@ const encoder = new Encoder()
/**
* Encode and yield one or more messages
*/
export async function * encode (source: Source<Message[]>) {
for await (const msgs of source) {
const list = new Uint8ArrayList()
export async function * encode (source: Source<Message[]>, minSendBytes: number = 0) {
if (minSendBytes == null || minSendBytes === 0) {
// just send the messages
for await (const messages of source) {
const list = new Uint8ArrayList()

for (const msg of msgs) {
encoder.write(msg, list)
for (const msg of messages) {
encoder.write(msg, list)
}

yield list.subarray()
}

yield list.subarray()
return
}

// batch messages up for sending
yield * batchedBytes(source, {
size: minSendBytes,
serialize: (obj, list) => {
for (const m of obj) {
encoder.write(m, list)
}
}
})
}
12 changes: 12 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ export interface MplexInit {
*/
maxUnprocessedMessageQueueSize?: number

/**
* Each byte array written into a multiplexed stream is converted to one or
* more messages which are sent as byte arrays to the remote node. Sending
* lots of small messages can be expensive - use this setting to batch up
* the serialized bytes of all messages sent during the current tick up to
* this limit to send in one go similar to Nagle's algorithm. N.b. you
* should benchmark your application carefully when using this setting as it
* may cause the opposite of the desired effect. Omit this setting to send
* all messages as they become available. (default: undefined)
*/
minSendBytes?: number

/**
* The maximum number of multiplexed streams that can be open at any
* one time. A request to open more than this will have a stream
Expand Down
2 changes: 1 addition & 1 deletion src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class MplexStreamMuxer implements StreamMuxer {
onEnd
})

return Object.assign(encode(source), {
return Object.assign(encode(source, this._init.minSendBytes), {
push: source.push,
end: source.end,
return: source.return
Expand Down
59 changes: 59 additions & 0 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,17 @@ describe('mplex', () => {
stream.end()

const bufs: Uint8Array[] = []
const sinkDone = pDefer()

void Promise.resolve().then(async () => {
for await (const buf of muxer.source) {
bufs.push(buf)
}
sinkDone.resolve()
})

await muxer.sink(stream)
await sinkDone.promise

const messages = await all(decode()(bufs))

Expand Down Expand Up @@ -162,4 +165,60 @@ describe('mplex', () => {
expect(messages).to.have.nested.property('[0].id', id)
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
})

it('should batch bytes to send', async () => {
const minSendBytes = 10

// input bytes, smaller than batch size
const input: Uint8Array[] = [
Uint8Array.from([0, 1, 2, 3, 4]),
Uint8Array.from([0, 1, 2, 3, 4]),
Uint8Array.from([0, 1, 2, 3, 4])
]

// create the muxer
const factory = mplex({
minSendBytes
})()
const muxer = factory.createStreamMuxer({})

// collect outgoing mplex messages
const muxerFinished = pDefer()
let output: Uint8Array[] = []
void Promise.resolve().then(async () => {
output = await all(muxer.source)
muxerFinished.resolve()
})

// create a stream
const stream = await muxer.newStream()
const streamFinished = pDefer()
// send messages over the stream
void Promise.resolve().then(async () => {
await stream.sink(async function * () {
yield * input
}())
stream.close()
streamFinished.resolve()
})

// wait for all data to be sent over the stream
await streamFinished.promise

// close the muxer
await muxer.sink([])

// wait for all output to be collected
await muxerFinished.promise

// last message is unbatched
const closeMessage = output.pop()
expect(closeMessage).to.have.lengthOf(2)

// all other messages should be above or equal to the batch size
expect(output).to.have.lengthOf(2)
for (const buf of output) {
expect(buf).to.have.length.that.is.at.least(minSendBytes)
}
})
})

0 comments on commit 4e2a49d

Please sign in to comment.