Skip to content

Commit

Permalink
Apply checkReceivedSubscriptions to tests
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Mar 15, 2022
1 parent b9410ba commit a453afa
Showing 1 changed file with 88 additions and 39 deletions.
127 changes: 88 additions & 39 deletions test/go-gossipsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,31 @@ const {
tearDownGossipsubs,
createPeers
} = require('./utils')
const {resolve} = require('path')

EventEmitter.defaultMaxListeners = 100

const checkReceivedSubscription = (psub, peerIdStr, peerIdx) => new Promise ((resolve, reject) => {
const topic = 'pubsub:subscription-change'
let cb;
const checkReceivedSubscription = (psub, peerIdStr, topic, peerIdx) => new Promise ((resolve, reject) => {
const event = 'pubsub:subscription-change'
let cb
const t = setTimeout(() => reject(`Not received subscriptions of psub ${peerIdx}`), 1000)
cb = (peerId) => {
if (peerId.toB58String() === peerIdStr) {
clearTimeout(t)
psub.off(topic, cb)
psub.off(event, cb)
expect(Array.from(psub.topics.get(topic)).includes(peerIdStr), "topics should include the peerId").to.be.true
resolve()
}
}
psub.on(topic, cb);
psub.on(event, cb);
});

const checkReceivedSubscriptions = async (psub, peerIdStrs, topic) => {
const recvPeerIdStrs = peerIdStrs.filter((peerIdStr) => peerIdStr !== psub.peerId.toB58String())
const promises = recvPeerIdStrs.map((peerIdStr, idx) => checkReceivedSubscription(psub, peerIdStr, topic, idx))
await Promise.all(promises)
expect(Array.from(psub.topics.get(topic)).sort()).to.be.deep.equal(recvPeerIdStrs.sort())
}

/**
* Given a topic and data (and debug metadata -- sender index and msg index)
* Return a function (takes a gossipsub (and receiver index))
Expand Down Expand Up @@ -673,26 +680,40 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub multihops', async function () {
// Create 6 gossipsub nodes
// Connect nodes in a line (eg: 0 -> 1 -> 2 -> 3 ...)
// Subscribe to the topic, all nodes
// Publish a message from node 0
// Assert that the last node receives the message
const numPeers = 6
const psubs = await createGossipsubs({
number: 6,
number: numPeers,
options: { scoreParams: { IPColocationFactorThreshold: 20 } }
})
const topic = 'foobar'

for (let i = 0; i < 5; i++) {
const peerIdStrsByIdx = []
for (let i = 0; i < numPeers - 1; i++) {
await psubs[i]._libp2p.dialProtocol(psubs[i + 1]._libp2p.peerId, psubs[i].multicodecs)
}
for (let i = 0; i < numPeers; i++) {
if (i === 0) { // first
peerIdStrsByIdx[i] = [psubs[i + 1].peerId.toB58String()]
} else if (i > 0 && i < numPeers - 1) { // middle
peerIdStrsByIdx[i] = [psubs[i + 1].peerId.toB58String(), psubs[i - 1].peerId.toB58String()]
} else if (i === numPeers - 1) { // last
peerIdStrsByIdx[i] = [psubs[i - 1].peerId.toB58String()]
}
}

psubs.forEach((ps) => ps.subscribe(topic))
const subscriptionPromises = psubs.map((psub, i) => checkReceivedSubscriptions(psub, peerIdStrsByIdx[i], topic))
psubs.forEach(ps => ps.subscribe(topic))

// wait for heartbeats to build mesh
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await Promise.all(subscriptionPromises)

const msg = uint8ArrayFromString(`${0} its not a flooooood ${0}`)
const owner = 0
Expand All @@ -701,6 +722,7 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
await results
await tearDownGossipsubs(psubs)
})

it('test gossipsub tree topology', async function () {
// Create 10 gossipsub nodes
// Connect nodes in a tree, diagram below
Expand All @@ -724,20 +746,39 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
[8] -> [9]
*/
const multicodecs = psubs[0].multicodecs
await psubs[0]._libp2p.dialProtocol(psubs[1]._libp2p.peerId, multicodecs)
await psubs[1]._libp2p.dialProtocol(psubs[2]._libp2p.peerId, multicodecs)
await psubs[1]._libp2p.dialProtocol(psubs[4]._libp2p.peerId, multicodecs)
await psubs[2]._libp2p.dialProtocol(psubs[3]._libp2p.peerId, multicodecs)
await psubs[0]._libp2p.dialProtocol(psubs[5]._libp2p.peerId, multicodecs)
await psubs[5]._libp2p.dialProtocol(psubs[6]._libp2p.peerId, multicodecs)
await psubs[5]._libp2p.dialProtocol(psubs[8]._libp2p.peerId, multicodecs)
await psubs[6]._libp2p.dialProtocol(psubs[7]._libp2p.peerId, multicodecs)
await psubs[8]._libp2p.dialProtocol(psubs[9]._libp2p.peerId, multicodecs)
const treeTopology = [
[1, 5], // 0
[2, 4], // 1
[3], // 2
[], // 3 leaf
[], // 4 leaf
[6, 8], // 5
[7], // 6
[], // 7 leaf
[9], // 8
[], // 9 leaf
]
for (let from = 0; from < treeTopology.length; from++) {
for (let to of treeTopology[from]) {
await psubs[from]._libp2p.dialProtocol(psubs[to]._libp2p.peerId, multicodecs)
}
}

psubs.forEach((ps) => ps.subscribe(topic))
const getPeerIdStrs = (idx) => {
const outbounds = treeTopology[idx]
const inbounds = []
for (let i = 0; i < treeTopology.length; i++) {
if (treeTopology[i].includes(idx)) inbounds.push(i)
}
return Array.from(new Set([...inbounds, ...outbounds])).map((i) => psubs[i].peerId.toB58String())
}

const subscriptionPromises = psubs.map((psub, i) => checkReceivedSubscriptions(psub, getPeerIdStrs(i), topic))
psubs.forEach(ps => ps.subscribe(topic))

// wait for heartbeats to build mesh
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await Promise.all(subscriptionPromises)

expectSet(new Set(psubs[0].peers.keys()), [psubs[1].peerId.toB58String(), psubs[5].peerId.toB58String()])
expectSet(new Set(psubs[1].peers.keys()), [
Expand All @@ -759,6 +800,7 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub star topology with signed peer records', async function () {
// Create 20 gossipsub nodes with lower degrees
// Connect nodes to a center node, with the center having very low degree
Expand Down Expand Up @@ -786,16 +828,18 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
psubs[0].Dscore = 0

// build the star
await psubs.slice(1).map((ps) => psubs[0]._libp2p.dialProtocol(ps._libp2p.peerId, ps.multicodecs))

await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await psubs.slice(1).map(ps => psubs[0]._libp2p.dialProtocol(ps._libp2p.peerId, ps.multicodecs))
await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 2)))

// build the mesh
const topic = 'foobar'
psubs.forEach((ps) => ps.subscribe(topic))
const peerIdStrs = psubs.map((psub) => psub.peerId.toB58String())
const subscriptionPromise = checkReceivedSubscriptions(psubs[0], peerIdStrs, topic)
psubs.forEach(ps => ps.subscribe(topic))

// wait a bit for the mesh to build
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 15, 25000)))
await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 15, 25000)))
await subscriptionPromise

// check that all peers have > 1 connection
psubs.forEach((ps) => {
Expand All @@ -816,6 +860,7 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub direct peers', async function () {
// Create 3 gossipsub nodes
// 2 and 3 with direct peer connections with each other
Expand Down Expand Up @@ -863,9 +908,19 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
expect(libp2ps[1].connectionManager.get(libp2ps[2].peerId)).to.exist()

const topic = 'foobar'
psubs.forEach((ps) => ps.subscribe(topic))
const peerIdStrs = libp2ps.map((libp2p) => libp2p.peerId.toB58String())
const subscriptionPromises = [
// peer0 connects to peer1 and peer2
checkReceivedSubscriptions(psubs[0], peerIdStrs, topic),
// peer1 connects to peer0
checkReceivedSubscription(psubs[1], peerIdStrs[0], topic, 0),
// peer2 connects to peer0
checkReceivedSubscription(psubs[2], peerIdStrs[0], topic, 0)
]
psubs.forEach(ps => ps.subscribe(topic))

await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 1)))
await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 1)))
await subscriptionPromises

let sendRecv = []
for (let i = 0; i < 3; i++) {
Expand Down Expand Up @@ -920,22 +975,16 @@ describe('go-libp2p-pubsub gossipsub tests', function () {

const owner = 0
const psub0 = psubs[owner]
const recvPeerIdStrs = psubs.filter((psub, j) => j !== owner).map(psub => psub.peerId.toB58String());
// make sure gossipsub 0 see others in its topic
const subPromises =
recvPeerIdStrs.map((peerIdStr, idx) => checkReceivedSubscription(psub0, peerIdStr, idx))

const peerIdStrs = psubs.filter((psub, j) => j !== owner).map(psub => psub.peerId.toB58String());
// build the (partial, unstable) mesh
const topic = 'foobar'
psubs.forEach((ps) => ps.subscribe(topic))

await Promise.all(subPromises)

expect(psub0.topics.get(topic).size).to.be.equal(numPeers - 1, 'incorrect topic peers length')
expect(Array.from(psub0.topics.get(topic)).sort()).to.be.deep.equal(recvPeerIdStrs.sort())
const subscriptionPromise = checkReceivedSubscriptions(psub0, peerIdStrs, topic)
psubs.forEach(ps => ps.subscribe(topic))

await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 1)))

await subscriptionPromise

// send messages from the star and assert they were received
let sendRecv = []
for (let i = 0; i < 20; i++) {
Expand Down

0 comments on commit a453afa

Please sign in to comment.