Skip to content

Commit

Permalink
feat: add topic validators to pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed May 23, 2020
1 parent acb5d28 commit 5712fd1
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 15 deletions.
64 changes: 55 additions & 9 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ class BasicPubSub extends Pubsub {
* @returns {string} message id as string
*/
this.defaultMsgIdFn = (msg) => utils.msgId(msg.from, msg.seqno)

/**
* Topic validator map
*
* Keyed by topic
* Topic validators are functions with the following input:
* (topic: string, peer: Peer, message: rpc.RPC)
*/
this.topicValidators = new Map()
}

/**
Expand Down Expand Up @@ -183,17 +192,14 @@ class BasicPubSub extends Pubsub {
this.seenCache.put(msgID)

// Ensure the message is valid before processing it
let isValid
let error

try {
isValid = await this.validate(message)
const isValid = await this.validate(peer, message)
if (!isValid) {
this.log('Message is invalid, dropping it.')
return
}
} catch (err) {
error = err
}

if (error || !isValid) {
this.log('Message could not be validated, dropping it. isValid=%s', isValid, error)
this.log('Error in message validation, dropping it. %O', err)
return
}

Expand All @@ -203,6 +209,46 @@ class BasicPubSub extends Pubsub {
this._handleRpcControl(peer, rpc)
}

/**
* Validates the given message.
* @param {Peer} peer
* @param {rpc.RPC.Message} message
* @returns {Promise<Boolean>}
*/
async validate (peer, message) {
const isValid = await super.validate(message)
if (!isValid) {
return false
}
for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic)
if (validatorFn) {
const result = validatorFn(topic, peer, message)
if (!this._processTopicValidatorResult(topic, peer, message, result)) {
return false
}
}
}
return true
}

/**
* Coerces topic validator result to determine message validity
*
* Defaults to true if truthy
*
* Override this method to provide custom topic validator result processing (eg: scoring)
*
* @param {String} topic
* @param {Peer} peer
* @param {rpc.RPC.Message} message
* @param {unknown} result
* @returns {Promise<Boolean>}
*/
_processTopicValidatorResult (topic, peer, message, result) {
return Boolean(result)
}

/**
* @param {rpc.RPC.Message} msg
*/
Expand Down
14 changes: 9 additions & 5 deletions test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ describe('2 nodes', () => {

it('Subscribe to a topic', async () => {
const topic = 'Z'
await new Promise((resolve) => setTimeout(resolve, 2000))
nodes[0].subscribe(topic)
nodes[1].subscribe(topic)

// await subscription change
const [changedPeerId, changedTopics, changedSubs] = await new Promise((resolve) => {
nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))
})
const [evt0] = await Promise.all([
new Promise(resolve => nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))),
new Promise(resolve => nodes[1].once('pubsub:subscription-change', (...args) => resolve(args)))
])

const [changedPeerId, changedTopics, changedSubs] = evt0

expectSet(nodes[0].subscriptions, [topic])
expectSet(nodes[1].subscriptions, [topic])
Expand Down Expand Up @@ -134,7 +136,9 @@ describe('2 nodes', () => {
nodes[1].subscribe(topic)

// await subscription change and heartbeat
await new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve))
await Promise.all(
nodes.map(n => new Promise(resolve => n.once('pubsub:subscription-change', resolve)))
)
await Promise.all([
new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)),
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))
Expand Down
73 changes: 72 additions & 1 deletion test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe('Pubsub', () => {

// Get the first message sent to _publish, and validate it
const signedMessage = gossipsub._publish.getCall(0).lastArg[0]
const isValid = await gossipsub.validate(signedMessage)
const isValid = await gossipsub.validate({}, signedMessage)

expect(isValid).to.eql(true)
})
Expand Down Expand Up @@ -152,4 +152,75 @@ describe('Pubsub', () => {
await pWaitFor(() => gossipsub._onPeerDisconnected.calledWith(peerId), { timeout: 1000 })
})
})

describe('topic validators', () => {
it('should filter messages by topic validator', async () => {
// use onRpcMessage.callCount to see if a message is valid or not
// a valid message will trigger onRpcMessage
sinon.stub(gossipsub, '_processRpcMessage')
// Disable strict signing
sinon.stub(gossipsub, 'strictSigning').value(false)
const filteredTopic = 't'
const peerStr = 'QmAnotherPeer'
gossipsub.peers.set(peerStr, {})

// Set a trivial topic validator
gossipsub.topicValidators.set(filteredTopic, (topic, peer, message) => {
return message.data.equals(Buffer.from('a message'))
})

// valid case
const validRpc = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
data: Buffer.from('a message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process valid message
gossipsub._onRpc(peerStr, validRpc)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(1)

// invalid case
const invalidRpc = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
data: Buffer.from('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process invalid message
gossipsub._onRpc(peerStr, invalidRpc)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(1)

// remove topic validator
gossipsub.topicValidators.delete(filteredTopic)

// another invalid case
const invalidRpc2 = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
data: Buffer.from('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process previously invalid message, now is valid
gossipsub._onRpc(peerStr, invalidRpc2)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(2)
// cleanup
gossipsub.peers.delete(peerStr)
})
})
})

0 comments on commit 5712fd1

Please sign in to comment.