Skip to content

Commit

Permalink
feat: Add stream option to limit inbound message size (#349)
Browse files Browse the repository at this point in the history
* Add stream option to limit inbound message size

* give empty default to opts

* add the option in gossibsub opts to set the inbound max len

* add max inbout size validation spec

* fix test
  • Loading branch information
g11tech committed Oct 28, 2022
1 parent 19507d9 commit 3475242
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 6 deletions.
21 changes: 18 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
*/
maxOutboundBufferSize?: number

/**
* Specify max size to skip decoding messages whose data
* section exceeds this size.
*
*/
maxInboundDataLength?: number

/**
* If provided, only allow topics in this list
*/
Expand Down Expand Up @@ -772,7 +779,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G

this.log('create inbound stream %s', id)

const inboundStream = new InboundStream(stream)
const inboundStream = new InboundStream(stream, { maxDataLength: this.opts.maxInboundDataLength })
this.streamsInbound.set(id, inboundStream)

this.pipePeerReadStream(peerId, inboundStream.source).catch((err) => this.log(err))
Expand Down Expand Up @@ -923,11 +930,19 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
}
})
} catch (err) {
this.log.error(err)
this.onPeerDisconnected(peerId)
this.handlePeerReadStreamError(err as Error, peerId)
}
}

/**
* Handle error when read stream pipe throws, less of the functional use but more
* to for testing purposes to spy on the error handling
* */
private handlePeerReadStreamError(err: Error, peerId: PeerId): void {
this.log.error(err)
this.onPeerDisconnected(peerId)
}

/**
* Handles an rpc request from a peer
*/
Expand Down
11 changes: 9 additions & 2 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ type OutboundStreamOpts = {
maxBufferSize?: number
}

type InboundStreamOpts = {
/** Max size in bytes for reading messages from the stream */
maxDataLength?: number
}

export class OutboundStream {
private readonly pushable: Pushable<Uint8Array>
private readonly closeController: AbortController
Expand Down Expand Up @@ -54,11 +59,13 @@ export class InboundStream {
private readonly rawStream: Stream
private readonly closeController: AbortController

constructor(rawStream: Stream) {
constructor(rawStream: Stream, opts: InboundStreamOpts = {}) {
this.rawStream = rawStream
this.closeController = new AbortController()

this.source = abortableSource(pipe(this.rawStream, decode()), this.closeController.signal, { returnOnAbort: true })
this.source = abortableSource(pipe(this.rawStream, decode(opts)), this.closeController.signal, {
returnOnAbort: true
})
}

close(): void {
Expand Down
38 changes: 37 additions & 1 deletion test/gossip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ describe('gossip', () => {
init: {
scoreParams: {
IPColocationFactorThreshold: GossipsubDhi + 3
}
},
maxInboundDataLength: 4000000
}
})
})
Expand Down Expand Up @@ -79,6 +80,41 @@ describe('gossip', () => {
nodeASpy.pushGossip.restore()
})

it('should reject incoming messages bigger than maxInboundDataLength limit', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
const nodeB = nodes[1]

const twoNodes = [nodeA, nodeB]
const topic = 'Z'
// add subscriptions to each node
twoNodes.forEach((n) => n.pubsub.subscribe(topic))

// every node connected to every other
await connectAllPubSubNodes(twoNodes)

// wait for subscriptions to be transmitted
await Promise.all(twoNodes.map(async (n) => await pEvent(n.pubsub, 'subscription-change')))

// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => await pEvent(n.pubsub, 'gossipsub:heartbeat')))

// set spy. NOTE: Forcing private property to be public
const nodeBSpy = nodeB.pubsub as Partial<GossipSub> as SinonStubbedInstance<{
handlePeerReadStreamError: GossipSub['handlePeerReadStreamError']
}>
sinon.spy(nodeBSpy, 'handlePeerReadStreamError')

// This should lead to handlePeerReadStreamError at nodeB
await nodeA.pubsub.publish(topic, new Uint8Array(5000000))
await pEvent(nodeA.pubsub, 'gossipsub:heartbeat')
const expectedError = nodeBSpy.handlePeerReadStreamError.getCalls()[0]?.args[0]
expect(expectedError !== undefined && (expectedError as unknown as { code: string }).code, 'ERR_MSG_DATA_TOO_LONG')

// unset spy
nodeBSpy.handlePeerReadStreamError.restore()
})

it('should send piggyback control into other sent messages', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
Expand Down

0 comments on commit 3475242

Please sign in to comment.