Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework pubsub #1474

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions beacon_chain/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import
multiaddress, multicodec, crypto/crypto, crypto/secp,
protocols/identify, protocols/protocol],
libp2p/protocols/secure/[secure, secio],
libp2p/protocols/pubsub/[pubsub, floodsub, rpc/message, rpc/messages],
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub, rpc/message, rpc/messages],
libp2p/transports/tcptransport,
libp2p/stream/lpstream,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
Expand Down Expand Up @@ -51,6 +51,7 @@ type
# TODO Is this really needed?
Eth2Node* = ref object of RootObj
switch*: Switch
pubsub*: PubSub
discovery*: Eth2DiscoveryProtocol
wantedPeers*: int
peerPool*: PeerPool[Peer, PeerID]
Expand Down Expand Up @@ -211,7 +212,7 @@ const
PeerScoreHighLimit* = 1000
## Max value of peer's score

ConcurrentConnections* = 4
ConcurrentConnections* = 10
## Maximum number of active concurrent connection requests.

SeenTableTimeTimeout* =
Expand Down Expand Up @@ -274,6 +275,7 @@ template libp2pProtocol*(name: string, version: int) {.pragma.}

template `$`*(peer: Peer): string = id(peer.info)
chronicles.formatIt(Peer): $it
chronicles.formatIt(PeerID): $it

template remote*(peer: Peer): untyped =
peer.info.peerId
Expand Down Expand Up @@ -848,18 +850,20 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
inc peer.connections
debug "Peer upgraded", peer = peerId, connections = peer.connections
debug "Peer upgraded", peer = $peerId, connections = peer.connections

if peer.connections == 1:
# Libp2p may connect multiple times to the same peer - using different
# transports or both incoming and outgoing. For now, we'll count our
# transports for both incoming and outgoing. For now, we'll count our
# "fist" encounter with the peer as the true connection, leaving the
# other connections be - libp2p limits the number of concurrent
# connections to the same peer, and only one of these connections will be
# active. Nonetheless, this quirk will cause a number of odd behaviours:
# * For peer limits, we might miscount the incoming vs outgoing quota
# * Protocol handshakes are wonky: we'll not necessarily use the newly
# connected transport - instead we'll just pick a random one!

node.pubsub.subscribePeer(peerId)
await performProtocolHandshakes(peer, event.incoming)

# While performing the handshake, the peer might have been disconnected -
Expand All @@ -880,18 +884,21 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =

of ConnEventKind.Disconnected:
dec peer.connections
debug "Peer disconnected", peer = peerId, connections = peer.connections
debug "Peer disconnected", peer = $peerId, connections = peer.connections
if peer.connections == 0:
node.pubsub.unsubscribePeer(peerId)
let fut = peer.disconnectedFut
if fut != nil:
peer.disconnectedFut = nil
fut.complete()

proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
switch: Switch, ip: Option[ValidIpAddress], tcpPort, udpPort: Port,
privKey: keys.PrivateKey, rng: ref BrHmacDrbgContext): T =
switch: Switch, pubsub: PubSub, ip: Option[ValidIpAddress],
tcpPort, udpPort: Port, privKey: keys.PrivateKey,
rng: ref BrHmacDrbgContext): T =
new result
result.switch = switch
result.pubsub = pubsub
result.wantedPeers = conf.maxPeers
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
when not defined(local_testnet):
Expand Down Expand Up @@ -932,6 +939,7 @@ template publicKey*(node: Eth2Node): keys.PublicKey =
proc startListening*(node: Eth2Node) {.async.} =
node.discovery.open()
node.libp2pTransportLoops = await node.switch.start()
await node.pubsub.start()

proc start*(node: Eth2Node) {.async.} =
for i in 0 ..< ConcurrentConnections:
Expand Down Expand Up @@ -1189,15 +1197,21 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, conf: BeaconNodeConf, enrForkId
# that are different from the host address (this is relevant when we
# are running behind a NAT).
var switch = newStandardSwitch(some keys.seckey, hostAddress,
triggerSelf = true, gossip = true,
sign = false, verifySignature = false,
transportFlags = {ServerFlags.ReuseAddr},
msgIdProvider = msgIdProvider,
secureManagers = [
SecureProtocol.Noise, # Only noise in ETH2!
],
rng = rng)
result = Eth2Node.init(conf, enrForkId, switch,

let pubsub = GossipSub.init(
switch = switch,
msgIdProvider = msgIdProvider,
triggerSelf = true, sign = false,
verifySignature = false).PubSub

switch.mount(pubsub)

result = Eth2Node.init(conf, enrForkId, switch, pubsub,
extIp, extTcpPort, extUdpPort,
keys.seckey.asEthKey, rng = rng)

Expand Down Expand Up @@ -1245,7 +1259,7 @@ proc subscribe*[MsgType](node: Eth2Node,
debug "Gossip msg handler error",
msg = err.msg, len = data.len, topic, msgId = gossipId(data)

await node.switch.subscribe(topic & "_snappy", execMsgHandler)
await node.pubsub.subscribe(topic & "_snappy", execMsgHandler)

proc addValidator*[MsgType](node: Eth2Node,
topic: string,
Expand All @@ -1262,7 +1276,7 @@ proc addValidator*[MsgType](node: Eth2Node,
msg = err.msg, msgId = gossipId(message.data)
return false

node.switch.addValidator(topic & "_snappy", execValidator)
node.pubsub.addValidator(topic & "_snappy", execValidator)

proc subscribe*[MsgType](node: Eth2Node,
topic: string,
Expand All @@ -1272,7 +1286,7 @@ proc subscribe*[MsgType](node: Eth2Node,
await node.subscribe(topic, msgHandler)

proc unsubscribe*(node: Eth2Node, topic: string): Future[void] =
node.switch.unsubscribeAll(topic)
node.pubsub.unsubscribeAll(topic)

proc traceMessage(fut: FutureBase, msgId: string) =
fut.addCallback do (arg: pointer):
Expand All @@ -1287,5 +1301,5 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
inc nbc_gossip_messages_sent
let
data = snappy.encode(SSZ.encode(msg))
var futSnappy = node.switch.publish(topic & "_snappy", data, 1.minutes)
var futSnappy = node.pubsub.publish(topic & "_snappy", data)
traceMessage(futSnappy, gossipId(data))
19 changes: 13 additions & 6 deletions beacon_chain/inspector.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import confutils, chronicles, chronos
import libp2p/[switch, standard_setup, multiaddress, multicodec, peerinfo]
import libp2p/crypto/crypto as lcrypto
import libp2p/crypto/secp as lsecp
import libp2p/protocols/pubsub/[pubsub, gossipsub]
import eth/p2p/discoveryv5/enr as enr
import eth/p2p/discoveryv5/[protocol, discovery_db, node]
import eth/keys as ethkeys, eth/trie/db
Expand Down Expand Up @@ -674,9 +675,14 @@ proc run(conf: InspectorConf) {.async.} =
error "Bind address is incorrect MultiAddress", address = conf.bindAddress
quit(1)

var switch = newStandardSwitch(some(seckey), hostAddress.get(),
triggerSelf = true, gossip = true,
sign = false, verifySignature = false, rng = rng)
let switch = newStandardSwitch(some(seckey), hostAddress.get(), rng = rng)

let pubsub = GossipSub.init(
switch = switch,
triggerSelf = true, sign = false,
verifySignature = false).PubSub

switch.mount(pubsub)

if len(conf.topics) > 0:
for item in conf.topics:
Expand Down Expand Up @@ -708,17 +714,18 @@ proc run(conf: InspectorConf) {.async.} =
data: seq[byte]): Future[void] {.gcsafe.} =
result = pubsubLogger(conf, switch, resolveQueue, topic, data)

discard switch.start()
discard await switch.start()
await pubsub.start()

var topicFilters = newSeq[string]()
try:
for filter in topics:
for topic in getTopics(forkDigest.get(), filter):
await switch.subscribe(topic, pubsubTrampoline)
await pubsub.subscribe(topic, pubsubTrampoline)
topicFilters.add(topic)
trace "Subscribed to topic", topic = topic
for filter in conf.customTopics:
await switch.subscribe(filter, pubsubTrampoline)
await pubsub.subscribe(filter, pubsubTrampoline)
topicFilters.add(filter)
trace "Subscribed to custom topic", topic = filter
except CatchableError as exc:
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-libp2p
2 changes: 1 addition & 1 deletion vendor/nimcrypto