Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/unstable' into fix-yamux
Browse files Browse the repository at this point in the history
  • Loading branch information
lchenut committed Nov 24, 2023
2 parents 6ba7613 + ce0685c commit 3fb5827
Show file tree
Hide file tree
Showing 24 changed files with 448 additions and 485 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ jobs:

- uses: jiro4989/setup-nim-action@v1
with:
nim-version: 'stable'
nim-version: '1.6.x'

- name: Generate doc
run: |
nim --version
nimble --version
nimble install_pinned -y
nimble install_pinned
# nim doc can "fail", but the doc is still generated
nim doc --git.url:https://github.com/status-im/nim-libp2p --git.commit:${GITHUB_REF##*/} --outdir:${GITHUB_REF##*/} --project libp2p || true
Expand Down
2 changes: 1 addition & 1 deletion libp2p.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ requires "nim >= 1.6.0",
"secp256k1",
"stew#head",
"websock",
"unittest2 >= 0.0.5 & <= 0.1.0"
"unittest2"

let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)
Expand Down
14 changes: 12 additions & 2 deletions libp2p/builders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import
muxers/[muxer, mplex/mplex, yamux/yamux],
protocols/[identify, secure/secure, secure/noise, rendezvous],
protocols/connectivity/[autonat/server, relay/relay, relay/client, relay/rtransport],
connmanager, upgrademngrs/muxedupgrade,
connmanager, upgrademngrs/muxedupgrade, observedaddrmanager,
nameresolving/nameresolver,
errors, utility

Expand Down Expand Up @@ -59,6 +59,7 @@ type
circuitRelay: Relay
rdv: RendezVous
services: seq[Service]
observedAddrManager: ObservedAddrManager

proc new*(T: type[SwitchBuilder]): T {.public.} =
## Creates a SwitchBuilder
Expand Down Expand Up @@ -201,6 +202,10 @@ proc withServices*(b: SwitchBuilder, services: seq[Service]): SwitchBuilder =
b.services = services
b

proc withObservedAddrManager*(b: SwitchBuilder, observedAddrManager: ObservedAddrManager): SwitchBuilder =
b.observedAddrManager = observedAddrManager
b

proc build*(b: SwitchBuilder): Switch
{.raises: [LPError], public.} =

Expand All @@ -223,8 +228,13 @@ proc build*(b: SwitchBuilder): Switch
protoVersion = b.protoVersion,
agentVersion = b.agentVersion)

let identify =
if b.observedAddrManager != nil:
Identify.new(peerInfo, b.sendSignedPeerRecord, b.observedAddrManager)
else:
Identify.new(peerInfo, b.sendSignedPeerRecord)

let
identify = Identify.new(peerInfo, b.sendSignedPeerRecord)
connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut)
ms = MultistreamSelect.new()
muxedUpgrade = MuxedUpgrade.new(b.muxers, secureManagerInstances, ms)
Expand Down
3 changes: 2 additions & 1 deletion libp2p/errors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func toException*(e: string): ref LPError =
# sadly nim needs more love for hygienic templates
# so here goes the macro, its based on the proc/template version
# and uses quote do so it's quite readable
macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
# TODO https://github.com/nim-lang/Nim/issues/22936
macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped =
let nexclude = exclude.len
case nexclude
of 0:
Expand Down
3 changes: 3 additions & 0 deletions libp2p/multiaddress.nim
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ const
MAProtocol(
mcodec: multiCodec("quic"), kind: Marker, size: 0
),
MAProtocol(
mcodec: multiCodec("quic-v1"), kind: Marker, size: 0
),
MAProtocol(
mcodec: multiCodec("ip6zone"), kind: Length, size: 0,
coder: TranscoderIP6Zone
Expand Down
1 change: 1 addition & 0 deletions libp2p/multicodec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ const MultiCodecList = [
("https", 0x01BB),
("tls", 0x01C0),
("quic", 0x01CC),
("quic-v1", 0x01CD),
("ws", 0x01DD),
("wss", 0x01DE),
("p2p-websocket-star", 0x01DF), # not in multicodec list
Expand Down
3 changes: 3 additions & 0 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ proc remoteClosed(channel: YamuxChannel) {.async.} =
method closeImpl*(channel: YamuxChannel) {.async, gcsafe.} =
if not channel.closedLocally:
channel.closedLocally = true
channel.isEof = true

if channel.isReset == false and channel.sendQueue.len == 0:
await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
Expand Down Expand Up @@ -249,6 +250,7 @@ method readOnce*(
await channel.closedRemotely or channel.receivedData.wait()
if channel.closedRemotely.done() and channel.recvQueue.len == 0:
channel.returnedEof = true
channel.isEof = true
return 0

let toRead = min(channel.recvQueue.len, nbytes)
Expand Down Expand Up @@ -461,6 +463,7 @@ method handle*(m: Yamux) {.async, gcsafe.} =
if header.streamId in m.flushed:
m.flushed.del(header.streamId)
if header.streamId mod 2 == m.currentId mod 2:
debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId
raise newException(YamuxError, "Peer used our reserved stream id")
let newStream = m.createStream(header.streamId, false, m.windowSize)
if m.channels.len >= m.maxChannCount:
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/connectivity/dcutr/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ proc startSync*(self: DcutrClient, switch: Switch, remotePeerId: PeerId, addrs:

if peerDialableAddrs.len > self.maxDialableAddrs:
peerDialableAddrs = peerDialableAddrs[0..<self.maxDialableAddrs]
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false))
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false, upgradeDir = Direction.In))
try:
discard await anyCompleted(futs).wait(self.connectTimeout)
debug "Dcutr initiator has directly connected to the remote peer."
Expand Down
9 changes: 7 additions & 2 deletions libp2p/protocols/connectivity/dcutr/core.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,10 @@ proc send*(conn: Connection, msgType: MsgType, addrs: seq[MultiAddress]) {.async
let pb = DcutrMsg(msgType: msgType, addrs: addrs).encode()
await conn.writeLp(pb.buffer)

proc getHolePunchableAddrs*(addrs: seq[MultiAddress]): seq[MultiAddress] =
addrs.filterIt(TCP.match(it))
proc getHolePunchableAddrs*(addrs: seq[MultiAddress]): seq[MultiAddress] {.raises: [LPError]} =
var result = newSeq[MultiAddress]()
for a in addrs:
# This is necessary to also accept addrs like /ip4/198.51.100/tcp/1234/p2p/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
if [TCP, mapAnd(TCP_DNS, P2PPattern), mapAnd(TCP_IP, P2PPattern)].anyIt(it.match(a)):
result.add(a[0..1].tryGet())
return result
2 changes: 1 addition & 1 deletion libp2p/protocols/connectivity/dcutr/server.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ proc new*(T: typedesc[Dcutr], switch: Switch, connectTimeout = 15.seconds, maxDi

if peerDialableAddrs.len > maxDialableAddrs:
peerDialableAddrs = peerDialableAddrs[0..<maxDialableAddrs]
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false, upgradeDir = Direction.In))
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false, upgradeDir = Direction.Out))
try:
discard await anyCompleted(futs).wait(connectTimeout)
debug "Dcutr receiver has directly connected to the remote peer."
Expand Down
1 change: 1 addition & 0 deletions libp2p/protocols/connectivity/relay/rconn.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ proc new*(
limitDuration: uint32,
limitData: uint64): T =
let rc = T(conn: conn, limitDuration: limitDuration, limitData: limitData)
rc.dir = conn.dir
rc.initStream()
if limitDuration > 0:
proc checkDurationConnection() {.async.} =
Expand Down
14 changes: 10 additions & 4 deletions libp2p/protocols/identify.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ../protobuf/minprotobuf,
../peerid,
../crypto/crypto,
../multiaddress,
../multicodec,
../protocols/protocol,
../utility,
../errors,
Expand Down Expand Up @@ -139,12 +140,13 @@ proc decodeMsg*(buf: seq[byte]): Opt[IdentifyInfo] =
proc new*(
T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false
sendSignedPeerRecord = false,
observedAddrManager = ObservedAddrManager.new(),
): T =
let identify = T(
peerInfo: peerInfo,
sendSignedPeerRecord: sendSignedPeerRecord,
observedAddrManager: ObservedAddrManager.new(),
observedAddrManager: observedAddrManager,
)
identify.init()
identify
Expand Down Expand Up @@ -186,8 +188,12 @@ proc identify*(self: Identify,
info.peerId = peer

info.observedAddr.withValue(observed):
if not self.observedAddrManager.addObservation(observed):
debug "Observed address is not valid", observedAddr = observed
# Currently, we use the ObservedAddrManager only to find our dialable external NAT address. Therefore, addresses
# like "...\p2p-circuit\p2p\..." and "\p2p\..." are not useful to us.
if observed.contains(multiCodec("p2p-circuit")).get(false) or P2PPattern.matchPartial(observed):
trace "Not adding address to ObservedAddrManager.", observed
elif not self.observedAddrManager.addObservation(observed):
trace "Observed address is not valid.", observedAddr = observed
return info

proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} =
Expand Down
10 changes: 3 additions & 7 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
disconnectBadPeers: false,
enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
iwantTimeout: 3 * GossipSubHeartbeatInterval,
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit: false
)
Expand Down Expand Up @@ -319,7 +318,7 @@ proc validateAndRelay(g: GossipSub,
of ValidationResult.Reject:
debug "Dropping message after validation, reason: reject",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
return
of ValidationResult.Ignore:
debug "Dropping message after validation, reason: ignore",
Expand Down Expand Up @@ -461,9 +460,6 @@ method rpcHandler*(g: GossipSub,
let
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
g.outstandingIWANTs.withValue(msgId, iwantRequest):
if iwantRequest.peer.peerId == peer.peerId:
g.outstandingIWANTs.del(msgId)

# addSeen adds salt to msgId to avoid
# remote attacking the hash function
Expand Down Expand Up @@ -496,14 +492,14 @@ method rpcHandler*(g: GossipSub,
# always validate if signature is present or required
debug "Dropping message due to failed signature verification",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
continue

if msg.seqno.len > 0 and msg.seqno.len != 8:
# if we have seqno should be 8 bytes long
debug "Dropping message due to invalid seqno length",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
continue

# g.anonymize needs no evaluation when receiving messages
Expand Down
16 changes: 1 addition & 15 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,7 @@ proc handleIHave*(g: GossipSub,
if not g.hasSeen(msgId):
if peer.iHaveBudget <= 0:
break
elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs:
g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now())
elif msgId notin res.messageIds:
res.messageIds.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID=msgId
Expand Down Expand Up @@ -301,17 +300,6 @@ proc handleIWant*(g: GossipSub,
messages.add(msg)
return messages

proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} =
let currentTime = Moment.now()
var idsToRemove = newSeq[MessageId]()
for msgId, request in g.outstandingIWANTs.pairs():
if currentTime - request.timestamp > timeoutDuration:
trace "IWANT request timed out", messageID=msgId, peer=request.peer
request.peer.behaviourPenalty += 0.1
idsToRemove.add(msgId)
for msgId in idsToRemove:
g.outstandingIWANTs.del(msgId)

proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
Expand Down Expand Up @@ -717,5 +705,3 @@ proc heartbeat*(g: GossipSub) {.async.} =
for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g)
trigger.fire()

checkIWANTTimeouts(g, g.parameters.iwantTimeout)
8 changes: 4 additions & 4 deletions libp2p/protocols/pubsub/gossipsub/scoring.nim
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} =
trace "running scoring heartbeat", instance = cast[int](g)
g.updateScores()

proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) =
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async.} =
let uselessAppBytesNum = msg.data.len
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum
libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
# discard g.disconnectPeer(peer)
# debug "Peer disconnected", peer, uselessAppBytesNum
# raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit")
if g.parameters.disconnectPeerAboveRateLimit:
await g.disconnectPeer(peer)
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")


for tt in msg.topicIds:
Expand Down
7 changes: 0 additions & 7 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ type
enablePX*: bool

bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
iwantTimeout*: Duration

overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
disconnectPeerAboveRateLimit*: bool
Expand Down Expand Up @@ -181,7 +180,6 @@ type
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange

heartbeatEvents*: seq[AsyncEvent]
outstandingIWANTs*: Table[MessageId, IWANTRequest]

MeshMetrics* = object
# scratch buffers for metrics
Expand All @@ -192,8 +190,3 @@ type
lowPeersTopics*: int64 # npeers < dlow
healthyPeersTopics*: int64 # npeers >= dlow
underDoutTopics*: int64

IWANTRequest* = object
messageId*: MessageId
peer*: PubSubPeer
timestamp*: Moment
2 changes: 1 addition & 1 deletion libp2p/transports/tcptransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =

try:
if self.acceptFuts.len <= 0:
self.acceptFuts = self.servers.mapIt(it.accept())
self.acceptFuts = self.servers.mapIt(Future[StreamTransport](it.accept()))

if self.acceptFuts.len <= 0:
return
Expand Down
2 changes: 2 additions & 0 deletions libp2p/transports/wstransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ method accept*(self: WsTransport): Future[Connection] {.async, gcsafe.} =
debug "AsyncStream Error", exc = exc.msg
except TransportTooManyError as exc:
debug "Too many files opened", exc = exc.msg
except TransportAbortedError as exc:
debug "Connection aborted", exc = exc.msg
except AsyncTimeoutError as exc:
debug "Timed out", exc = exc.msg
except TransportUseClosedError as exc:
Expand Down
Loading

0 comments on commit 3fb5827

Please sign in to comment.