Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reenable go-gossipsub tests #201

Merged
merged 11 commits into from
Mar 18, 2022
135 changes: 110 additions & 25 deletions test/go-gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
tearDownGossipsubs,
createPeers
} from './utils'
import PeerId from 'peer-id'

/**
* These tests were translated from:
Expand All @@ -36,6 +37,35 @@ chai.use(require('dirty-chai'))

EventEmitter.defaultMaxListeners = 100

const checkReceivedSubscription = (psub: Gossipsub, peerIdStr: string, topic: string, peerIdx: number, timeout = 1000) => new Promise<void> ((resolve, reject) => {
const event = 'pubsub:subscription-change'
let cb: (peerId: PeerId, subs: RPC.ISubOpts[]) => void
const t = setTimeout(() => reject(`Not received subscriptions of psub ${peerIdx}`), timeout)
cb = (peerId, subs) => {
if (peerId.toB58String() === peerIdStr && subs[0].topicID === topic && subs[0].subscribe === true) {
clearTimeout(t)
psub.off(event, cb)
if (Array.from(psub.topics.get(topic) || []).includes(peerIdStr)) {
resolve()
} else {
reject(Error('topics should include the peerId'))
}
}
}
psub.on(event, cb);
});

const checkReceivedSubscriptions = async (psub: Gossipsub, peerIdStrs: string[], topic: string) => {
const recvPeerIdStrs = peerIdStrs.filter((peerIdStr) => peerIdStr !== psub.peerId.toB58String())
const promises = recvPeerIdStrs.map((peerIdStr, idx) => checkReceivedSubscription(psub, peerIdStr, topic, idx))
await Promise.all(promises)
expect(Array.from(psub.topics.get(topic) || []).sort()).to.be.deep.equal(recvPeerIdStrs.sort())
recvPeerIdStrs.forEach((peerIdStr) => {
const peerStream = psub.peers.get(peerIdStr)
expect(peerStream && peerStream.isWritable, "no peerstream or peerstream is not writable").to.be.true
})
}

/**
* Given a topic and data (and debug metadata -- sender index and msg index)
* Return a function (takes a gossipsub (and receiver index))
Expand Down Expand Up @@ -80,7 +110,7 @@ const awaitEvents = (emitter: EventEmitter, event: string, number: number, timeo
})
}

describe.skip('go-libp2p-pubsub gossipsub tests', function () {
describe('go-libp2p-pubsub gossipsub tests', function () {
this.timeout(100000)
afterEach(() => {
sinon.restore()
Expand Down Expand Up @@ -663,26 +693,40 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub multihops', async function () {
// Create 6 gossipsub nodes
// Connect nodes in a line (eg: 0 -> 1 -> 2 -> 3 ...)
// Subscribe to the topic, all nodes
// Publish a message from node 0
// Assert that the last node receives the message
const numPeers = 6
const psubs = await createGossipsubs({
number: 6,
number: numPeers,
options: { scoreParams: { IPColocationFactorThreshold: 20 } }
})
const topic = 'foobar'

for (let i = 0; i < 5; i++) {
for (let i = 0; i < numPeers - 1; i++) {
await psubs[i]._libp2p.dialProtocol(psubs[i + 1]._libp2p.peerId, psubs[i].multicodecs)
}
const peerIdStrsByIdx: string[][] = []
for (let i = 0; i < numPeers; i++) {
if (i === 0) { // first
peerIdStrsByIdx[i] = [psubs[i + 1].peerId.toB58String()]
} else if (i > 0 && i < numPeers - 1) { // middle
peerIdStrsByIdx[i] = [psubs[i + 1].peerId.toB58String(), psubs[i - 1].peerId.toB58String()]
} else if (i === numPeers - 1) { // last
peerIdStrsByIdx[i] = [psubs[i - 1].peerId.toB58String()]
}
}

psubs.forEach((ps) => ps.subscribe(topic))
const subscriptionPromises = psubs.map((psub, i) => checkReceivedSubscriptions(psub, peerIdStrsByIdx[i], topic))
psubs.forEach(ps => ps.subscribe(topic))

// wait for heartbeats to build mesh
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await Promise.all(subscriptionPromises)

const msg = uint8ArrayFromString(`${0} its not a flooooood ${0}`)
const owner = 0
Expand All @@ -691,6 +735,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await results
await tearDownGossipsubs(psubs)
})

it('test gossipsub tree topology', async function () {
// Create 10 gossipsub nodes
// Connect nodes in a tree, diagram below
Expand All @@ -714,20 +759,39 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
[8] -> [9]
*/
const multicodecs = psubs[0].multicodecs
await psubs[0]._libp2p.dialProtocol(psubs[1]._libp2p.peerId, multicodecs)
await psubs[1]._libp2p.dialProtocol(psubs[2]._libp2p.peerId, multicodecs)
await psubs[1]._libp2p.dialProtocol(psubs[4]._libp2p.peerId, multicodecs)
await psubs[2]._libp2p.dialProtocol(psubs[3]._libp2p.peerId, multicodecs)
await psubs[0]._libp2p.dialProtocol(psubs[5]._libp2p.peerId, multicodecs)
await psubs[5]._libp2p.dialProtocol(psubs[6]._libp2p.peerId, multicodecs)
await psubs[5]._libp2p.dialProtocol(psubs[8]._libp2p.peerId, multicodecs)
await psubs[6]._libp2p.dialProtocol(psubs[7]._libp2p.peerId, multicodecs)
await psubs[8]._libp2p.dialProtocol(psubs[9]._libp2p.peerId, multicodecs)
const treeTopology = [
[1, 5], // 0
[2, 4], // 1
[3], // 2
[], // 3 leaf
[], // 4 leaf
[6, 8], // 5
[7], // 6
[], // 7 leaf
[9], // 8
[], // 9 leaf
]
for (let from = 0; from < treeTopology.length; from++) {
for (let to of treeTopology[from]) {
await psubs[from]._libp2p.dialProtocol(psubs[to]._libp2p.peerId, multicodecs)
}
}

const getPeerIdStrs = (idx: number): string[] => {
const outbounds = treeTopology[idx]
const inbounds = []
for (let i = 0; i < treeTopology.length; i++) {
if (treeTopology[i].includes(idx)) inbounds.push(i)
}
return Array.from(new Set([...inbounds, ...outbounds])).map((i) => psubs[i].peerId.toB58String())
}

const subscriptionPromises = psubs.map((psub, i) => checkReceivedSubscriptions(psub, getPeerIdStrs(i), topic))
psubs.forEach((ps) => ps.subscribe(topic))

// wait for heartbeats to build mesh
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 2)))
await Promise.all(subscriptionPromises)

expectSet(new Set(psubs[0].peers.keys()), [psubs[1].peerId.toB58String(), psubs[5].peerId.toB58String()])
expectSet(new Set(psubs[1].peers.keys()), [
Expand All @@ -749,13 +813,15 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub star topology with signed peer records', async function () {
// Create 20 gossipsub nodes with lower degrees
// Connect nodes to a center node, with the center having very low degree
// Subscribe to the topic, all nodes
// Assert that all nodes have > 1 connection
// Publish one message per node
// Assert that the subscribed nodes receive every message
sinon.replace(constants, 'GossipsubPrunePeers', 5 as 16)
const psubs = await createGossipsubs({
number: 20,
options: {
Expand All @@ -782,10 +848,13 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {

// build the mesh
const topic = 'foobar'
const peerIdStrs = psubs.map((psub) => psub.peerId.toB58String())
const subscriptionPromise = checkReceivedSubscriptions(psubs[0], peerIdStrs, topic)
psubs.forEach((ps) => ps.subscribe(topic))

// wait a bit for the mesh to build
await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 15, 25000)))
await subscriptionPromise

// check that all peers have > 1 connection
psubs.forEach((ps) => {
Expand All @@ -806,6 +875,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub direct peers', async function () {
// Create 3 gossipsub nodes
// 2 and 3 with direct peer connections with each other
Expand Down Expand Up @@ -845,35 +915,43 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
]
await Promise.all(psubs.map((ps) => ps.start()))
const multicodecs = psubs[0].multicodecs
// each peer connects to 2 other peers
let connectPromises = libp2ps.map((libp2p) => awaitEvents(libp2p.connectionManager, 'peer:connect', 2))
await libp2ps[0].dialProtocol(libp2ps[1].peerId, multicodecs)
await libp2ps[0].dialProtocol(libp2ps[2].peerId, multicodecs)

// verify that the direct peers connected
await delay(2000)
expect(libp2ps[1].connectionManager.get(libp2ps[2].peerId)).to.be.ok
await Promise.all(connectPromises)

const topic = 'foobar'
psubs.forEach((ps) => ps.subscribe(topic))

await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 1)))
const peerIdStrs = libp2ps.map((libp2p) => libp2p.peerId.toB58String())
let subscriptionPromises = psubs.map((psub) => checkReceivedSubscriptions(psub, peerIdStrs, topic))
psubs.forEach(ps => ps.subscribe(topic))
await Promise.all(psubs.map(ps => awaitEvents(ps, 'gossipsub:heartbeat', 1)))
await Promise.all(subscriptionPromises)

let sendRecv = []
for (let i = 0; i < 3; i++) {
const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`)
const owner = i
const results = Promise.all(
psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
psubs.filter((_, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
)
sendRecv.push(psubs[owner].publish(topic, msg))
sendRecv.push(results)
}
await Promise.all(sendRecv)

connectPromises = [1, 2].map((i) => awaitEvents(libp2ps[i].connectionManager, 'peer:connect', 1))
// disconnect the direct peers to test reconnection
libp2ps[1].connectionManager.getAll(libp2ps[2].peerId).forEach((c) => c.close())
// need more time to disconnect/connect/send subscriptions again
subscriptionPromises = [
checkReceivedSubscription(psubs[1], peerIdStrs[2], topic, 2, 10000),
checkReceivedSubscription(psubs[2], peerIdStrs[1], topic, 1, 10000),
]
await libp2ps[1].hangUp(libp2ps[2].peerId);

await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 5)))

await Promise.all(connectPromises)
await Promise.all(subscriptionPromises)
expect(libp2ps[1].connectionManager.get(libp2ps[2].peerId)).to.be.ok

sendRecv = []
Expand All @@ -889,14 +967,16 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub flood publish', async function () {
// Create 30 gossipsub nodes
// Connect in star topology
// Subscribe to the topic, all nodes
// Publish 20 messages, each from the center node
// Assert that the other nodes receive the message
const numPeers = 30;
const psubs = await createGossipsubs({
number: 30,
number: numPeers,
options: { scoreParams: { IPColocationFactorThreshold: 30 } }
})

Expand All @@ -906,17 +986,21 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
})
)

const owner = 0
const psub0 = psubs[owner]
const peerIdStrs = psubs.filter((_, j) => j !== owner).map(psub => psub.peerId.toB58String())
// build the (partial, unstable) mesh
const topic = 'foobar'
const subscriptionPromise = checkReceivedSubscriptions(psub0, peerIdStrs, topic)
psubs.forEach((ps) => ps.subscribe(topic))

await Promise.all(psubs.map((ps) => awaitEvents(ps, 'gossipsub:heartbeat', 1)))
await subscriptionPromise

// send messages from the star and assert they were received
let sendRecv = []
for (let i = 0; i < 20; i++) {
const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`)
const owner = 0
const results = Promise.all(
psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
)
Expand All @@ -926,6 +1010,7 @@ describe.skip('go-libp2p-pubsub gossipsub tests', function () {
await Promise.all(sendRecv)
await tearDownGossipsubs(psubs)
})

it('test gossipsub negative score', async function () {
// Create 20 gossipsub nodes, with scoring params to quickly lower node 0's score
// Connect densely
Expand Down