Skip to content

Commit

Permalink
use split out pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and arnetheduck committed Aug 13, 2020
1 parent 58d7715 commit 1570caf
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 23 deletions.
44 changes: 29 additions & 15 deletions beacon_chain/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import
multiaddress, multicodec, crypto/crypto, crypto/secp,
protocols/identify, protocols/protocol],
libp2p/protocols/secure/[secure, secio],
libp2p/protocols/pubsub/[pubsub, floodsub, rpc/message, rpc/messages],
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub, rpc/message, rpc/messages],
libp2p/transports/tcptransport,
libp2p/stream/lpstream,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
Expand Down Expand Up @@ -51,6 +51,7 @@ type
# TODO Is this really needed?
Eth2Node* = ref object of RootObj
switch*: Switch
pubsub*: PubSub
discovery*: Eth2DiscoveryProtocol
wantedPeers*: int
peerPool*: PeerPool[Peer, PeerID]
Expand Down Expand Up @@ -211,7 +212,7 @@ const
PeerScoreHighLimit* = 1000
## Max value of peer's score

ConcurrentConnections* = 4
ConcurrentConnections* = 10
## Maximum number of active concurrent connection requests.

SeenTableTimeTimeout* =
Expand Down Expand Up @@ -274,6 +275,7 @@ template libp2pProtocol*(name: string, version: int) {.pragma.}

template `$`*(peer: Peer): string = id(peer.info)
chronicles.formatIt(Peer): $it
chronicles.formatIt(PeerID): $it

template remote*(peer: Peer): untyped =
peer.info.peerId
Expand Down Expand Up @@ -848,18 +850,20 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
inc peer.connections
debug "Peer upgraded", peer = peerId, connections = peer.connections
debug "Peer upgraded", peer = $peerId, connections = peer.connections

if peer.connections == 1:
# Libp2p may connect multiple times to the same peer - using different
# transports or both incoming and outgoing. For now, we'll count our
# transports for both incoming and outgoing. For now, we'll count our
# "fist" encounter with the peer as the true connection, leaving the
# other connections be - libp2p limits the number of concurrent
# connections to the same peer, and only one of these connections will be
# active. Nonetheless, this quirk will cause a number of odd behaviours:
# * For peer limits, we might miscount the incoming vs outgoing quota
# * Protocol handshakes are wonky: we'll not necessarily use the newly
# connected transport - instead we'll just pick a random one!

node.pubsub.subscribePeer(peerId)
await performProtocolHandshakes(peer, event.incoming)

# While performing the handshake, the peer might have been disconnected -
Expand All @@ -880,18 +884,21 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =

of ConnEventKind.Disconnected:
dec peer.connections
debug "Peer disconnected", peer = peerId, connections = peer.connections
debug "Peer disconnected", peer = $peerId, connections = peer.connections
if peer.connections == 0:
node.pubsub.unsubscribePeer(peerId)
let fut = peer.disconnectedFut
if fut != nil:
peer.disconnectedFut = nil
fut.complete()

proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
switch: Switch, ip: Option[ValidIpAddress], tcpPort, udpPort: Port,
privKey: keys.PrivateKey, rng: ref BrHmacDrbgContext): T =
switch: Switch, pubsub: PubSub, ip: Option[ValidIpAddress],
tcpPort, udpPort: Port, privKey: keys.PrivateKey,
rng: ref BrHmacDrbgContext): T =
new result
result.switch = switch
result.pubsub = pubsub
result.wantedPeers = conf.maxPeers
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
when not defined(local_testnet):
Expand Down Expand Up @@ -932,6 +939,7 @@ template publicKey*(node: Eth2Node): keys.PublicKey =
proc startListening*(node: Eth2Node) {.async.} =
node.discovery.open()
node.libp2pTransportLoops = await node.switch.start()
await node.pubsub.start()

proc start*(node: Eth2Node) {.async.} =
for i in 0 ..< ConcurrentConnections:
Expand Down Expand Up @@ -1189,15 +1197,21 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, conf: BeaconNodeConf, enrForkId
# that are different from the host address (this is relevant when we
# are running behind a NAT).
var switch = newStandardSwitch(some keys.seckey, hostAddress,
triggerSelf = true, gossip = true,
sign = false, verifySignature = false,
transportFlags = {ServerFlags.ReuseAddr},
msgIdProvider = msgIdProvider,
secureManagers = [
SecureProtocol.Noise, # Only noise in ETH2!
],
rng = rng)
result = Eth2Node.init(conf, enrForkId, switch,

let pubsub = GossipSub.init(
switch = switch,
msgIdProvider = msgIdProvider,
triggerSelf = true, sign = false,
verifySignature = false).PubSub

switch.mount(pubsub)

result = Eth2Node.init(conf, enrForkId, switch, pubsub,
extIp, extTcpPort, extUdpPort,
keys.seckey.asEthKey, rng = rng)

Expand Down Expand Up @@ -1245,7 +1259,7 @@ proc subscribe*[MsgType](node: Eth2Node,
debug "Gossip msg handler error",
msg = err.msg, len = data.len, topic, msgId = gossipId(data)

await node.switch.subscribe(topic & "_snappy", execMsgHandler)
await node.pubsub.subscribe(topic & "_snappy", execMsgHandler)

proc addValidator*[MsgType](node: Eth2Node,
topic: string,
Expand All @@ -1262,7 +1276,7 @@ proc addValidator*[MsgType](node: Eth2Node,
msg = err.msg, msgId = gossipId(message.data)
return false

node.switch.addValidator(topic & "_snappy", execValidator)
node.pubsub.addValidator(topic & "_snappy", execValidator)

proc subscribe*[MsgType](node: Eth2Node,
topic: string,
Expand All @@ -1272,7 +1286,7 @@ proc subscribe*[MsgType](node: Eth2Node,
await node.subscribe(topic, msgHandler)

proc unsubscribe*(node: Eth2Node, topic: string): Future[void] =
node.switch.unsubscribeAll(topic)
node.pubsub.unsubscribeAll(topic)

proc traceMessage(fut: FutureBase, msgId: string) =
fut.addCallback do (arg: pointer):
Expand All @@ -1287,5 +1301,5 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
inc nbc_gossip_messages_sent
let
data = snappy.encode(SSZ.encode(msg))
var futSnappy = node.switch.publish(topic & "_snappy", data, 1.minutes)
var futSnappy = node.pubsub.publish(topic & "_snappy", data)
traceMessage(futSnappy, gossipId(data))
19 changes: 13 additions & 6 deletions beacon_chain/inspector.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import confutils, chronicles, chronos
import libp2p/[switch, standard_setup, multiaddress, multicodec, peerinfo]
import libp2p/crypto/crypto as lcrypto
import libp2p/crypto/secp as lsecp
import libp2p/protocols/pubsub/[pubsub, gossipsub]
import eth/p2p/discoveryv5/enr as enr
import eth/p2p/discoveryv5/[protocol, discovery_db, node]
import eth/keys as ethkeys, eth/trie/db
Expand Down Expand Up @@ -674,9 +675,14 @@ proc run(conf: InspectorConf) {.async.} =
error "Bind address is incorrect MultiAddress", address = conf.bindAddress
quit(1)

var switch = newStandardSwitch(some(seckey), hostAddress.get(),
triggerSelf = true, gossip = true,
sign = false, verifySignature = false, rng = rng)
let switch = newStandardSwitch(some(seckey), hostAddress.get(), rng = rng)

let pubsub = GossipSub.init(
switch = switch,
triggerSelf = true, sign = false,
verifySignature = false).PubSub

switch.mount(pubsub)

if len(conf.topics) > 0:
for item in conf.topics:
Expand Down Expand Up @@ -708,17 +714,18 @@ proc run(conf: InspectorConf) {.async.} =
data: seq[byte]): Future[void] {.gcsafe.} =
result = pubsubLogger(conf, switch, resolveQueue, topic, data)

discard switch.start()
discard await switch.start()
await pubsub.start()

var topicFilters = newSeq[string]()
try:
for filter in topics:
for topic in getTopics(forkDigest.get(), filter):
await switch.subscribe(topic, pubsubTrampoline)
await pubsub.subscribe(topic, pubsubTrampoline)
topicFilters.add(topic)
trace "Subscribed to topic", topic = topic
for filter in conf.customTopics:
await switch.subscribe(filter, pubsubTrampoline)
await pubsub.subscribe(filter, pubsubTrampoline)
topicFilters.add(filter)
trace "Subscribed to custom topic", topic = filter
except CatchableError as exc:
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-libp2p
2 changes: 1 addition & 1 deletion vendor/nimcrypto

0 comments on commit 1570caf

Please sign in to comment.