diff --git a/.pinned b/.pinned index 88d897cbc5..d54a1d4b31 100644 --- a/.pinned +++ b/.pinned @@ -1,16 +1,16 @@ -bearssl;https://github.com/status-im/nim-bearssl@#f4c4233de453cb7eac0ce3f3ffad6496295f83ab +bearssl;https://github.com/status-im/nim-bearssl@#a647994910904b0103a05db3a5ec1ecfc4d91a88 chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a -chronos;https://github.com/status-im/nim-chronos@#6525f4ce1d1a7eba146e5f1a53f6f105077ae686 +chronos;https://github.com/status-im/nim-chronos@#75d030ff71264513fb9701c75a326cd36fcb4692 dnsclient;https://github.com/ba0f3/dnsclient.nim@#fcd7443634b950eaea574e5eaa00a628ae029823 -faststreams;https://github.com/status-im/nim-faststreams@#6112432b3a81d9db116cd5d64c39648881cfff29 -httputils;https://github.com/status-im/nim-http-utils@#e88e231dfcef4585fe3b2fbd9b664dbd28a88040 -json_serialization;https://github.com/status-im/nim-json-serialization@#e5b18fb710c3d0167ec79f3b892f5a7a1bc6d1a4 -metrics;https://github.com/status-im/nim-metrics@#0a6477268e850d7bc98347b3875301524871765f +faststreams;https://github.com/status-im/nim-faststreams@#b42daf41d8eb4fbce40add6836bed838f8d85b6f +httputils;https://github.com/status-im/nim-http-utils@#a85bd52ae0a956983ca6b3267c72961d2ec0245f +json_serialization;https://github.com/status-im/nim-json-serialization@#a7d815ed92f200f490c95d3cfd722089cc923ce6 +metrics;https://github.com/status-im/nim-metrics@#21e99a2e9d9f80e68bef65c80ef781613005fccb nimcrypto;https://github.com/cheatfate/nimcrypto@#24e006df85927f64916e60511620583b11403178 -secp256k1;https://github.com/status-im/nim-secp256k1@#c7f1a37d9b0f17292649bfed8bf6cef83cf4221f -serialization;https://github.com/status-im/nim-serialization@#60a5bd8ac0461dfadd3069fd9c01a7734f205995 -stew;https://github.com/status-im/nim-stew@#23da07c9b59c0ba3d4efa7e4e6e2c4121ae5a156 +secp256k1;https://github.com/status-im/nim-secp256k1@#fd173fdff863ce2e211cf64c9a03bc7539fe40b0 +serialization;https://github.com/status-im/nim-serialization@#d77417cba6896c26287a68e6a95762e45a1b87e5 +stew;https://github.com/status-im/nim-stew@#7184d2424dc3945657884646a72715d494917aad testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#da8398c45cafd5bd7772da1fc96e3924a18d3823 -websock;https://github.com/status-im/nim-websock@#acbe30e9ca1e51dcbbfe4c552ee6f16c7eede538 +websock;https://github.com/status-im/nim-websock@#691f069b209d372b1240d5ae1f57fb7bbafeaba7 zlib;https://github.com/status-im/nim-zlib@#6a6670afba6b97b29b920340e2641978c05ab4d8 \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index 07600735e3..1c4140189b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,6 +1,5 @@ -# nim-libp2p documentation +# nim-libp2p examples -Welcome to the nim-libp2p documentation! +In this folder, you'll find the sources of the [nim-libp2p website](https://status-im.github.io/nim-libp2p/docs/) -Here, you'll find [tutorials](tutorial_1_connect.md) to help you get started, as well as -the [full reference](https://status-im.github.io/nim-libp2p/master/libp2p.html). +We recommand to follow the tutorials on the website, but feel free to grok the sources here! diff --git a/examples/index.md b/examples/index.md new file mode 100644 index 0000000000..07600735e3 --- /dev/null +++ b/examples/index.md @@ -0,0 +1,6 @@ +# nim-libp2p documentation + +Welcome to the nim-libp2p documentation! + +Here, you'll find [tutorials](tutorial_1_connect.md) to help you get started, as well as +the [full reference](https://status-im.github.io/nim-libp2p/master/libp2p.html). diff --git a/examples/tutorial_2_customproto.nim b/examples/tutorial_2_customproto.nim index be418a7a14..0bf6c197a3 100644 --- a/examples/tutorial_2_customproto.nim +++ b/examples/tutorial_2_customproto.nim @@ -32,7 +32,7 @@ proc new(T: typedesc[TestProto]): T = # We must close the connections ourselves when we're done with it await conn.close() - return T(codecs: @[TestCodec], handler: handle) + return T.new(codecs = @[TestCodec], handler = handle) ## This is a constructor for our `TestProto`, that will specify our `codecs` and a `handler`, which will be called for each incoming peer asking for this protocol. ## In our handle, we simply read a message from the connection and `echo` it. diff --git a/examples/tutorial_3_protobuf.nim b/examples/tutorial_3_protobuf.nim index 2af7efe61b..4ba7ac98f2 100644 --- a/examples/tutorial_3_protobuf.nim +++ b/examples/tutorial_3_protobuf.nim @@ -107,7 +107,7 @@ type metricGetter: MetricCallback proc new(_: typedesc[MetricProto], cb: MetricCallback): MetricProto = - let res = MetricProto(metricGetter: cb) + var res: MetricProto proc handle(conn: Connection, proto: string) {.async, gcsafe.} = let metrics = await res.metricGetter() @@ -115,8 +115,8 @@ proc new(_: typedesc[MetricProto], cb: MetricCallback): MetricProto = await conn.writeLp(asProtobuf.buffer) await conn.close() - res.codecs = @["/metric-getter/1.0.0"] - res.handler = handle + res = MetricProto.new(@["/metric-getter/1.0.0"], handle) + res.metricGetter = cb return res proc fetch(p: MetricProto, conn: Connection): Future[MetricList] {.async.} = diff --git a/examples/tutorial_5_discovery.nim b/examples/tutorial_5_discovery.nim index ce02e19df5..8890877360 100644 --- a/examples/tutorial_5_discovery.nim +++ b/examples/tutorial_5_discovery.nim @@ -36,7 +36,7 @@ proc new(T: typedesc[DumbProto], nodeNumber: int): T = proc handle(conn: Connection, proto: string) {.async, gcsafe.} = echo "Node", nodeNumber, " received: ", string.fromBytes(await conn.readLp(1024)) await conn.close() - return T(codecs: @[DumbCodec], handler: handle) + return T.new(codecs = @[DumbCodec], handler = handle) ## ## Bootnodes ## The first time a p2p program is ran, he needs to know how to join diff --git a/examples/tutorial_6_game.nim b/examples/tutorial_6_game.nim index ffbf09a7b7..f3be6d372e 100644 --- a/examples/tutorial_6_game.nim +++ b/examples/tutorial_6_game.nim @@ -157,7 +157,7 @@ proc new(T: typedesc[GameProto], g: Game): T = # The handler of a protocol must wait for the stream to # be finished before returning await conn.join() - return T(codecs: @["/tron/1.0.0"], handler: handle) + return T.new(codecs = @["/tron/1.0.0"], handler = handle) proc networking(g: Game) {.async.} = # Create our switch, similar to the GossipSub example and diff --git a/libp2p.nimble b/libp2p.nimble index ac1407905a..85ba6bf918 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -139,9 +139,13 @@ task install_pinned, "Reads the lockfile": # Remove the automatically installed deps # (inefficient you say?) - let allowedDirectories = toInstall.mapIt(it[0] & "-" & it[1].split('@')[1]) - for dependency in listDirs("nimbledeps/pkgs"): - if dependency.extractFilename notin allowedDirectories: + let nimblePkgs = + if system.dirExists("nimbledeps/pkgs"): "nimbledeps/pkgs" + else: "nimbledeps/pkgs2" + for dependency in listDirs(nimblePkgs): + let filename = dependency.extractFilename + if toInstall.anyIt(filename.startsWith(it[0]) and + filename.endsWith(it[1].split('#')[^1])) == false: rmDir(dependency) task unpin, "Restore global package use": diff --git a/libp2p/builders.nim b/libp2p/builders.nim index fdff75ba0a..5ff5169706 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -61,6 +61,7 @@ type autonat: bool circuitRelay: Relay rdv: RendezVous + services: seq[Service] proc new*(T: type[SwitchBuilder]): T {.public.} = ## Creates a SwitchBuilder @@ -199,6 +200,10 @@ proc withRendezVous*(b: SwitchBuilder, rdv: RendezVous = RendezVous.new()): Swit b.rdv = rdv b +proc withServices*(b: SwitchBuilder, services: seq[Service]): SwitchBuilder = + b.services = services + b + proc build*(b: SwitchBuilder): Switch {.raises: [Defect, LPError], public.} = @@ -254,7 +259,8 @@ proc build*(b: SwitchBuilder): Switch connManager = connManager, ms = ms, nameResolver = b.nameResolver, - peerStore = peerStore) + peerStore = peerStore, + services = b.services) if b.autonat: let autonat = Autonat.new(switch) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index b7ac3d87aa..e411d40ad0 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -110,6 +110,13 @@ proc new*(C: type ConnManager, proc connCount*(c: ConnManager, peerId: PeerId): int = c.conns.getOrDefault(peerId).len +proc connectedPeers*(c: ConnManager, dir: Direction): seq[PeerId] = + var peers = newSeq[PeerId]() + for peerId, conns in c.conns: + if conns.anyIt(it.dir == dir): + peers.add(peerId) + return peers + proc addConnEventHandler*(c: ConnManager, handler: ConnEventHandler, kind: ConnEventKind) = @@ -537,3 +544,4 @@ proc close*(c: ConnManager) {.async.} = await conn.close() trace "Closed ConnManager" + diff --git a/libp2p/dial.nim b/libp2p/dial.nim index e0d78b906f..60d63e35aa 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -36,7 +36,8 @@ method connect*( method connect*( self: Dial, - addrs: seq[MultiAddress]): Future[PeerId] {.async, base.} = + address: MultiAddress, + allowUnknownPeerId = false): Future[PeerId] {.async, base.} = ## Connects to a peer and retrieve its PeerId doAssert(false, "Not implemented!") diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index d3c0826ddf..4e39733a46 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -219,11 +219,23 @@ method connect*( method connect*( self: Dialer, - addrs: seq[MultiAddress], - ): Future[PeerId] {.async.} = + address: MultiAddress, + allowUnknownPeerId = false): Future[PeerId] {.async.} = ## Connects to a peer and retrieve its PeerId - return (await self.internalConnect(Opt.none(PeerId), addrs, false)).peerId + let fullAddress = parseFullAddress(address) + if fullAddress.isOk: + return (await self.internalConnect( + Opt.some(fullAddress.get()[0]), + @[fullAddress.get()[1]], + false)).peerId + else: + if allowUnknownPeerId == false: + raise newException(DialFailedError, "Address without PeerID and unknown peer id disabled!") + return (await self.internalConnect( + Opt.none(PeerId), + @[address], + false)).peerId proc negotiateStream( self: Dialer, diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index e2797a6a36..bded5a5322 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[strutils, sequtils] +import std/[strutils, sequtils, tables] import chronos, chronicles, stew/byteutils import stream/connection, protocols/protocol @@ -21,7 +21,7 @@ logScope: topics = "libp2p multistream" const - MsgSize* = 64*1024 + MsgSize* = 1024 Codec* = "/multistream/1.0.0" MSCodec* = "\x13" & Codec & "\n" @@ -33,17 +33,20 @@ type MultiStreamError* = object of LPError - HandlerHolder* = object + HandlerHolder* = ref object protos*: seq[string] protocol*: LPProtocol match*: Matcher + openedStreams: CountTable[PeerId] MultistreamSelect* = ref object of RootObj handlers*: seq[HandlerHolder] codec*: string proc new*(T: typedesc[MultistreamSelect]): T = - T(codec: MSCodec) + T( + codec: MSCodec, + ) template validateSuffix(str: string): untyped = if str.endsWith("\n"): @@ -169,9 +172,22 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy for h in m.handlers: if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms): trace "found handler", conn, protocol = ms - await conn.writeLp(ms & "\n") - conn.protocol = ms - await h.protocol.handler(conn, ms) + + var protocolHolder = h + let maxIncomingStreams = protocolHolder.protocol.maxIncomingStreams + if protocolHolder.openedStreams.getOrDefault(conn.peerId) >= maxIncomingStreams: + debug "Max streams for protocol reached, blocking new stream", + conn, protocol = ms, maxIncomingStreams + return + protocolHolder.openedStreams.inc(conn.peerId) + try: + await conn.writeLp(ms & "\n") + conn.protocol = ms + await protocolHolder.protocol.handler(conn, ms) + finally: + protocolHolder.openedStreams.inc(conn.peerId, -1) + if protocolHolder.openedStreams[conn.peerId] == 0: + protocolHolder.openedStreams.del(conn.peerId) return debug "no handlers", conn, protocol = ms await conn.write(Na) diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index a5273794de..9f28449c61 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -414,7 +414,8 @@ method close*(m: Yamux) {.async.} = let channels = toSeq(m.channels.values()) for channel in channels: await channel.reset(true) - await m.connection.write(YamuxHeader.goAway(NormalTermination)) + try: await m.connection.write(YamuxHeader.goAway(NormalTermination)) + except CatchableError as exc: trace "failed to send goAway", msg=exc.msg await m.connection.close() trace "Closed yamux" diff --git a/libp2p/protocols/connectivity/autonat.nim b/libp2p/protocols/connectivity/autonat.nim index 6408a7e040..47bd3a4d7b 100644 --- a/libp2p/protocols/connectivity/autonat.nim +++ b/libp2p/protocols/connectivity/autonat.nim @@ -32,6 +32,7 @@ const type AutonatError* = object of LPError + AutonatUnreachableError* = object of LPError MsgType* = enum Dial = 0 @@ -45,21 +46,21 @@ type InternalError = 300 AutonatPeerInfo* = object - id: Option[PeerId] - addrs: seq[MultiAddress] + id*: Option[PeerId] + addrs*: seq[MultiAddress] AutonatDial* = object - peerInfo: Option[AutonatPeerInfo] + peerInfo*: Option[AutonatPeerInfo] AutonatDialResponse* = object status*: ResponseStatus text*: Option[string] ma*: Option[MultiAddress] - AutonatMsg = object - msgType: MsgType - dial: Option[AutonatDial] - response: Option[AutonatDialResponse] + AutonatMsg* = object + msgType*: MsgType + dial*: Option[AutonatDial] + response*: Option[AutonatDialResponse] proc encode*(msg: AutonatMsg): ProtoBuffer = result = initProtoBuffer() @@ -119,7 +120,7 @@ proc encode*(r: AutonatDialResponse): ProtoBuffer = result.write(3, bufferResponse.buffer) result.finish() -proc decode(_: typedesc[AutonatMsg], buf: seq[byte]): Option[AutonatMsg] = +proc decode*(_: typedesc[AutonatMsg], buf: seq[byte]): Option[AutonatMsg] = var msgTypeOrd: uint32 pbDial: ProtoBuffer @@ -202,31 +203,45 @@ type Autonat* = ref object of LPProtocol sem: AsyncSemaphore switch*: Switch + dialTimeout: Duration + +method dialMe*(a: Autonat, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()): + Future[MultiAddress] {.base, async.} = + + proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [UnpackError, AutonatError].} = + if autonatMsg.isNone() or + autonatMsg.get().msgType != DialResponse or + autonatMsg.get().response.isNone() or + (autonatMsg.get().response.get().status == Ok and + autonatMsg.get().response.get().ma.isNone()): + raise newException(AutonatError, "Unexpected response") + else: + autonatMsg.get().response.get() + + let conn = + try: + if addrs.len == 0: + await a.switch.dial(pid, @[AutonatCodec]) + else: + await a.switch.dial(pid, addrs, AutonatCodec) + except CatchableError as err: + raise newException(AutonatError, "Unexpected error when dialling", err) -proc dialMe*(a: Autonat, pid: PeerId, ma: MultiAddress|seq[MultiAddress]): - Future[MultiAddress] {.async.} = - let addrs = when ma is MultiAddress: @[ma] else: ma - let conn = await a.switch.dial(pid, addrs, AutonatCodec) defer: await conn.close() await conn.sendDial(a.switch.peerInfo.peerId, a.switch.peerInfo.addrs) - let msgOpt = AutonatMsg.decode(await conn.readLp(1024)) - if msgOpt.isNone() or - msgOpt.get().msgType != DialResponse or - msgOpt.get().response.isNone(): - raise newException(AutonatError, "Unexpected response") - let response = msgOpt.get().response.get() - if response.status != ResponseStatus.Ok: - raise newException(AutonatError, "Bad status " & - $response.status & " " & - response.text.get("")) - if response.ma.isNone(): - raise newException(AutonatError, "Missing address") - return response.ma.get() + let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024))) + return case response.status: + of ResponseStatus.Ok: + response.ma.get() + of ResponseStatus.DialError: + raise newException(AutonatUnreachableError, "Peer could not dial us back") + else: + raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get("")) proc tryDial(a: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} = try: await a.sem.acquire() - let ma = await a.switch.dialer.tryDial(conn.peerId, addrs) + let ma = await a.switch.dialer.tryDial(conn.peerId, addrs).wait(a.dialTimeout) if ma.isSome: await conn.sendResponseOk(ma.get()) else: @@ -284,8 +299,8 @@ proc handleDial(a: Autonat, conn: Connection, msg: AutonatMsg): Future[void] = return conn.sendResponseError(DialRefused, "No dialable address") return a.tryDial(conn, toSeq(addrs)) -proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1): T = - let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize)) +proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = 15.seconds): T = + let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout) autonat.init() autonat diff --git a/libp2p/protocols/connectivity/relay/client.nim b/libp2p/protocols/connectivity/relay/client.nim index b12a728dc2..e92d21b720 100644 --- a/libp2p/protocols/connectivity/relay/client.nim +++ b/libp2p/protocols/connectivity/relay/client.nim @@ -25,6 +25,7 @@ import ./relay, ../../../multiaddress, ../../../stream/connection +export options logScope: topics = "libp2p relay relay-client" diff --git a/libp2p/protocols/protocol.nim b/libp2p/protocols/protocol.nim index ee3c39a625..5103264ad8 100644 --- a/libp2p/protocols/protocol.nim +++ b/libp2p/protocols/protocol.nim @@ -12,9 +12,14 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import chronos +import chronos, stew/results import ../stream/connection +export results + +const + DefaultMaxIncomingStreams* = 10 + type LPProtoHandler* = proc ( conn: Connection, @@ -26,11 +31,17 @@ type codecs*: seq[string] handler*: LPProtoHandler ## this handler gets invoked by the protocol negotiator started*: bool + maxIncomingStreams: Opt[int] method init*(p: LPProtocol) {.base, gcsafe.} = discard method start*(p: LPProtocol) {.async, base.} = p.started = true method stop*(p: LPProtocol) {.async, base.} = p.started = false +proc maxIncomingStreams*(p: LPProtocol): int = + p.maxIncomingStreams.get(DefaultMaxIncomingStreams) + +proc `maxIncomingStreams=`*(p: LPProtocol, val: int) = + p.maxIncomingStreams = Opt.some(val) func codec*(p: LPProtocol): string = assert(p.codecs.len > 0, "Codecs sequence was empty!") @@ -40,3 +51,16 @@ func `codec=`*(p: LPProtocol, codec: string) = # always insert as first codec # if we use this abstraction p.codecs.insert(codec, 0) + +proc new*( + T: type LPProtocol, + codecs: seq[string], + handler: LPProtoHandler, # default(Opt[int]) or Opt.none(int) don't work on 1.2 + maxIncomingStreams: Opt[int] | int = Opt[int]()): T = + T( + codecs: codecs, + handler: handler, + maxIncomingStreams: + when maxIncomingStreams is int: Opt.some(maxIncomingStreams) + else: maxIncomingStreams + ) diff --git a/libp2p/services/autonatservice.nim b/libp2p/services/autonatservice.nim new file mode 100644 index 0000000000..0b26fd081c --- /dev/null +++ b/libp2p/services/autonatservice.nim @@ -0,0 +1,159 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[options, deques, sequtils] +import chronos, metrics +import ../switch +import ../protocols/[connectivity/autonat] +import ../utils/heartbeat +import ../crypto/crypto + +declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability confidence", labels = ["reachability"]) + +type + AutonatService* = ref object of Service + newConnectedPeerHandler: PeerEventHandler + scheduleHandle: Future[void] + networkReachability: NetworkReachability + confidence: Option[float] + answers: Deque[NetworkReachability] + autonat: Autonat + statusAndConfidenceHandler: StatusAndConfidenceHandler + rng: ref HmacDrbgContext + scheduleInterval: Option[Duration] + askNewConnectedPeers: bool + numPeersToAsk: int + maxQueueSize: int + minConfidence: float + dialTimeout: Duration + + NetworkReachability* {.pure.} = enum + NotReachable, Reachable, Unknown + + StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [Defect].} + +proc new*( + T: typedesc[AutonatService], + autonat: Autonat, + rng: ref HmacDrbgContext, + scheduleInterval: Option[Duration] = none(Duration), + askNewConnectedPeers = true, + numPeersToAsk: int = 5, + maxQueueSize: int = 10, + minConfidence: float = 0.3, + dialTimeout = 30.seconds): T = + return T( + scheduleInterval: scheduleInterval, + networkReachability: Unknown, + confidence: none(float), + answers: initDeque[NetworkReachability](), + autonat: autonat, + rng: rng, + askNewConnectedPeers: askNewConnectedPeers, + numPeersToAsk: numPeersToAsk, + maxQueueSize: maxQueueSize, + minConfidence: minConfidence, + dialTimeout: dialTimeout) + +proc networkReachability*(self: AutonatService): NetworkReachability {.inline.} = + return self.networkReachability + +proc callHandler(self: AutonatService) {.async.} = + if not isNil(self.statusAndConfidenceHandler): + await self.statusAndConfidenceHandler(self.networkReachability, self.confidence) + +proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} = + + if ans == Unknown: + return + + if self.answers.len == self.maxQueueSize: + self.answers.popFirst() + self.answers.addLast(ans) + + self.networkReachability = Unknown + self.confidence = none(float) + const reachabilityPriority = [Reachable, NotReachable] + for reachability in reachabilityPriority: + let confidence = self.answers.countIt(it == reachability) / self.maxQueueSize + libp2p_autonat_reachability_confidence.set(value = confidence, labelValues = [$reachability]) + if self.confidence.isNone and confidence >= self.minConfidence: + self.networkReachability = reachability + self.confidence = some(confidence) + + trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence + +proc askPeer(self: AutonatService, s: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} = + trace "Asking for reachability", peerId = $peerId + let ans = + try: + discard await self.autonat.dialMe(peerId).wait(self.dialTimeout) + Reachable + except AutonatUnreachableError: + trace "dialMe answer is not reachable", peerId = $peerId + NotReachable + except AsyncTimeoutError: + trace "dialMe timed out", peerId = $peerId + Unknown + except CatchableError as err: + trace "dialMe unexpected error", peerId = $peerId, errMsg = $err.msg + Unknown + await self.handleAnswer(ans) + if not isNil(self.statusAndConfidenceHandler): + await self.statusAndConfidenceHandler(self.networkReachability, self.confidence) + return ans + +proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} = + var peers = switch.connectedPeers(Direction.Out) + self.rng.shuffle(peers) + var answersFromPeers = 0 + for peer in peers: + if answersFromPeers >= self.numPeersToAsk: + break + elif (await askPeer(self, switch, peer)) != Unknown: + answersFromPeers.inc() + +proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} = + heartbeat "Schedule AutonatService run", interval: + await service.run(switch) + +method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} = + let hasBeenSetup = await procCall Service(self).setup(switch) + if hasBeenSetup: + if self.askNewConnectedPeers: + self.newConnectedPeerHandler = proc (peerId: PeerId, event: PeerEvent): Future[void] {.async.} = + discard askPeer(self, switch, peerId) + await self.callHandler() + switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined) + if self.scheduleInterval.isSome(): + self.scheduleHandle = schedule(self, switch, self.scheduleInterval.get()) + return hasBeenSetup + +method run*(self: AutonatService, switch: Switch) {.async, public.} = + await askConnectedPeers(self, switch) + await self.callHandler() + + +method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} = + let hasBeenStopped = await procCall Service(self).stop(switch) + if hasBeenStopped: + if not isNil(self.scheduleHandle): + self.scheduleHandle.cancel() + self.scheduleHandle = nil + if not isNil(self.newConnectedPeerHandler): + switch.connManager.removePeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined) + return hasBeenStopped + +proc statusAndConfidenceHandler*(self: AutonatService, statusAndConfidenceHandler: StatusAndConfidenceHandler) = + self.statusAndConfidenceHandler = statusAndConfidenceHandler diff --git a/libp2p/switch.nim b/libp2p/switch.nim index b1a7d489ed..964ddd7c32 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -74,6 +74,28 @@ type peerStore*: PeerStore nameResolver*: NameResolver started: bool + services*: seq[Service] + + Service* = ref object of RootObj + inUse: bool + + +method setup*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe.} = + if self.inUse: + warn "service setup has already been called" + return false + self.inUse = true + return true + +method run*(self: Service, switch: Switch) {.base, async, gcsafe.} = + doAssert(false, "Not implemented!") + +method stop*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe.} = + if not self.inUse: + warn "service is already stopped" + return false + self.inUse = false + return true proc addConnEventHandler*(s: Switch, handler: ConnEventHandler, @@ -108,6 +130,9 @@ method addTransport*(s: Switch, t: Transport) = s.transports &= t s.dialer.addTransport(t) +proc connectedPeers*(s: Switch, dir: Direction): seq[PeerId] = + s.connManager.connectedPeers(dir) + proc isConnected*(s: Switch, peerId: PeerId): bool {.public.} = ## returns true if the peer has one or more ## associated connections @@ -130,10 +155,15 @@ method connect*( method connect*( s: Switch, - addrs: seq[MultiAddress]): Future[PeerId] = + address: MultiAddress, + allowUnknownPeerId = false): Future[PeerId] = ## Connects to a peer and retrieve its PeerId + ## + ## If the P2P part is missing from the MA and `allowUnknownPeerId` is set + ## to true, this will discover the PeerId while connecting. This exposes + ## you to MiTM attacks, so it shouldn't be used without care! - s.dialer.connect(addrs) + s.dialer.connect(address, allowUnknownPeerId) method dial*( s: Switch, @@ -289,6 +319,9 @@ proc stop*(s: Switch) {.async, public.} = if not a.finished: a.cancel() + for service in s.services: + discard await service.stop(s) + await s.ms.stop() trace "Switch stopped" @@ -330,6 +363,9 @@ proc start*(s: Switch) {.async, gcsafe, public.} = await s.ms.start() + for service in s.services: + discard await service.setup(s) + s.started = true debug "Started libp2p node", peer = s.peerInfo @@ -341,7 +377,8 @@ proc newSwitch*(peerInfo: PeerInfo, connManager: ConnManager, ms: MultistreamSelect, nameResolver: NameResolver = nil, - peerStore = PeerStore.new()): Switch + peerStore = PeerStore.new(), + services = newSeq[Service]()): Switch {.raises: [Defect, LPError], public.} = if secureManagers.len == 0: raise newException(LPError, "Provide at least one secure manager") @@ -353,8 +390,10 @@ proc newSwitch*(peerInfo: PeerInfo, connManager: connManager, peerStore: peerStore, dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms, nameResolver), - nameResolver: nameResolver) + nameResolver: nameResolver, + services: services) switch.connManager.peerStore = peerStore switch.mount(identity) + return switch diff --git a/libp2p/varint.nim b/libp2p/varint.nim index e881a317a0..2d82f26c56 100644 --- a/libp2p/varint.nim +++ b/libp2p/varint.nim @@ -58,10 +58,7 @@ type SomeVarint* = PBSomeVarint | LPSomeVarint SomeUVarint* = PBSomeUVarint | LPSomeUVarint -template toUleb(x: uint64): uint64 = x -template toUleb(x: uint32): uint32 = x -template toUleb(x: uint16): uint16 = x -template toUleb(x: uint8): uint8 = x +template toUleb[T: uint64|uint32|uint16|uint8|uint](x: T): T = x func toUleb(x: zint64): uint64 = let v = cast[uint64](x) diff --git a/mkdocs.yml b/mkdocs.yml index e9cdaacd69..94e4069275 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -20,8 +20,8 @@ markdown_extensions: - pymdownx.superfences theme: - logo: https://docs.libp2p.io/images/logo_small.png - favicon: https://docs.libp2p.io/images/logo_small.png + logo: https://libp2p.io/img/logo_small.png + favicon: https://libp2p.io/img/logo_small.png name: material features: - navigation.instant @@ -41,7 +41,7 @@ theme: nav: - Tutorials: - - 'Introduction': README.md + - 'Introduction': index.md - 'Simple connection': tutorial_1_connect.md - 'Create a custom protocol': tutorial_2_customproto.md - 'Protobuf': tutorial_3_protobuf.md diff --git a/nimble.lock b/nimble.lock index a2fc36c53a..2c0d986664 100644 --- a/nimble.lock +++ b/nimble.lock @@ -2,53 +2,68 @@ "version": 1, "packages": { "unittest2": { - "version": "0.0.4", - "vcsRevision": "f180f596c88dfd266f746ed6f8dbebce39c824db", - "url": "https://github.com/status-im/nim-unittest2.git", + "version": "0.0.5", + "vcsRevision": "da8398c45cafd5bd7772da1fc96e3924a18d3823", + "url": "https://github.com/status-im/nim-unittest2", "downloadMethod": "git", "dependencies": [], "checksums": { - "sha1": "fa309c41eaf6ef57895b9e603f2620a2f6e11780" + "sha1": "b3f8493a4948989ef3e645a38b23aad77e851e26" + } + }, + "testutils": { + "version": "0.5.0", + "vcsRevision": "dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34", + "url": "https://github.com/status-im/nim-testutils", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "756d0757c4dd06a068f9d38c7f238576ba5ee897" } }, "stew": { "version": "0.1.0", - "vcsRevision": "6ad35b876fb6ebe0dfee0f697af173acc47906ee", - "url": "https://github.com/status-im/nim-stew.git", + "vcsRevision": "7184d2424dc3945657884646a72715d494917aad", + "url": "https://github.com/status-im/nim-stew", "downloadMethod": "git", - "dependencies": [], + "dependencies": [ + "unittest2" + ], "checksums": { - "sha1": "46d58c4feb457f3241e3347778334e325dce5268" + "sha1": "f3125ed2fd126dfd3edbaea14275abd9fa57d703" } }, "bearssl": { - "version": "0.1.5", - "vcsRevision": "ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7", + "version": "0.2.0", + "vcsRevision": "a647994910904b0103a05db3a5ec1ecfc4d91a88", "url": "https://github.com/status-im/nim-bearssl", "downloadMethod": "git", "dependencies": [ "unittest2" ], "checksums": { - "sha1": "383abd5becc77bf8e365b780a29d20529e1d9c4c" + "sha1": "d634751df2716ea9975912a2d5d0a090bb6bcfa9" } }, "httputils": { "version": "0.3.0", - "vcsRevision": "689da19e9e9cfff4ced85e2b25c6b2b5598ed079", - "url": "https://github.com/status-im/nim-http-utils.git", + "vcsRevision": "a85bd52ae0a956983ca6b3267c72961d2ec0245f", + "url": "https://github.com/status-im/nim-http-utils", "downloadMethod": "git", "dependencies": [ - "stew" + "stew", + "unittest2" ], "checksums": { - "sha1": "4ad3ad68d13c50184180ab4b2eacc0bd7ed2ed44" + "sha1": "92933b21bcd29335f68e377e2b2193fa331e28b3" } }, "chronos": { "version": "3.0.11", - "vcsRevision": "17fed89c99beac5a92d3668d0d3e9b0e4ac13936", - "url": "https://github.com/status-im/nim-chronos.git", + "vcsRevision": "75d030ff71264513fb9701c75a326cd36fcb4692", + "url": "https://github.com/status-im/nim-chronos", "downloadMethod": "git", "dependencies": [ "stew", @@ -57,52 +72,27 @@ "unittest2" ], "checksums": { - "sha1": "f6fffc87571e5f76af2a77c4ebcc0e00909ced4e" - } - }, - "metrics": { - "version": "0.0.1", - "vcsRevision": "71e0f0e354e1f4c59e3dc92153989c8b723c3440", - "url": "https://github.com/status-im/nim-metrics", - "downloadMethod": "git", - "dependencies": [ - "chronos" - ], - "checksums": { - "sha1": "86da251fe532ef2163da30343688ab1c148c0340" - } - }, - "testutils": { - "version": "0.4.2", - "vcsRevision": "aa6e5216f4b4ab5aa971cdcdd70e1ec1203cedf2", - "url": "https://github.com/status-im/nim-testutils", - "downloadMethod": "git", - "dependencies": [ - "unittest2" - ], - "checksums": { - "sha1": "94427e0cce0e0c5841edcd3a6530b4e6b857a3cb" + "sha1": "57a674ba3c1a57a694fa7810d93ceb68f338a861" } }, "faststreams": { "version": "0.3.0", - "vcsRevision": "1b561a9e71b6bdad1c1cdff753418906037e9d09", - "url": "https://github.com/status-im/nim-faststreams.git", + "vcsRevision": "b42daf41d8eb4fbce40add6836bed838f8d85b6f", + "url": "https://github.com/status-im/nim-faststreams", "downloadMethod": "git", "dependencies": [ "stew", - "testutils", "chronos", "unittest2" ], "checksums": { - "sha1": "97edf9797924af48566a0af8267203dc21d80c77" + "sha1": "62f7ac8fb200a8ecb9e6c63f5553a7dad66ae613" } }, "serialization": { "version": "0.1.0", - "vcsRevision": "fcd0eadadde0ee000a63df8ab21dc4e9f015a790", - "url": "https://github.com/status-im/nim-serialization.git", + "vcsRevision": "d77417cba6896c26287a68e6a95762e45a1b87e5", + "url": "https://github.com/status-im/nim-serialization", "downloadMethod": "git", "dependencies": [ "faststreams", @@ -110,70 +100,72 @@ "stew" ], "checksums": { - "sha1": "fef59519892cac70cccd81b612085caaa5e3e6cf" + "sha1": "e17244c6654de22254acb9bcf71d8ddbeca8b2aa" + } + }, + "metrics": { + "version": "0.0.1", + "vcsRevision": "21e99a2e9d9f80e68bef65c80ef781613005fccb", + "url": "https://github.com/status-im/nim-metrics", + "downloadMethod": "git", + "dependencies": [ + "chronos" + ], + "checksums": { + "sha1": "ab1c994bbcd6b04f2500f05d8ea4e463f33dd310" + } + }, + "nimcrypto": { + "version": "0.5.4", + "vcsRevision": "24e006df85927f64916e60511620583b11403178", + "url": "https://github.com/cheatfate/nimcrypto", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "a4db2105de265930f1578bb7957f49fa39b10d9b" } }, "json_serialization": { "version": "0.1.0", - "vcsRevision": "c5f0e2465e8375dfc7aa0f56ccef67cb680bc6b0", - "url": "https://github.com/status-im/nim-json-serialization.git", + "vcsRevision": "a7d815ed92f200f490c95d3cfd722089cc923ce6", + "url": "https://github.com/status-im/nim-json-serialization", "downloadMethod": "git", "dependencies": [ "serialization", "stew" ], "checksums": { - "sha1": "d89d79d0679a3a41b350e3ad4be56c0308cc5ec6" + "sha1": "50fc34a992ef3df68a7bee88af096bb8ed42572f" } }, "chronicles": { - "version": "0.10.2", - "vcsRevision": "1682096306ddba8185dcfac360a8c3f952d721e4", - "url": "https://github.com/status-im/nim-chronicles.git", + "version": "0.10.3", + "vcsRevision": "32ac8679680ea699f7dbc046e8e0131cac97d41a", + "url": "https://github.com/status-im/nim-chronicles", "downloadMethod": "git", "dependencies": [ "testutils", "json_serialization" ], "checksums": { - "sha1": "9a5bebb76b0f7d587a31e621d260119279e91c76" - } - }, - "asynctest": { - "version": "0.3.1", - "vcsRevision": "5347c59b4b057443a014722aa40800cd8bb95c69", - "url": "https://github.com/status-im/asynctest.git", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "53e0b610d13700296755a4ebe789882cae47a3b9" - } - }, - "nimcrypto": { - "version": "0.5.4", - "vcsRevision": "a5742a9a214ac33f91615f3862c7b099aec43b00", - "url": "https://github.com/cheatfate/nimcrypto", - "downloadMethod": "git", - "dependencies": [], - "checksums": { - "sha1": "f76c87707cd4e96355b8bb6ef27e7f8b0aac1e08" + "sha1": "79f09526d4d9b9196dd2f6a75310d71a890c4f88" } }, "zlib": { "version": "0.1.0", - "vcsRevision": "74cdeb54b21bededb5a515d36f608bc1850555a2", + "vcsRevision": "6a6670afba6b97b29b920340e2641978c05ab4d8", "url": "https://github.com/status-im/nim-zlib", "downloadMethod": "git", "dependencies": [ "stew" ], "checksums": { - "sha1": "01d330dc4c1924e56b1559ee73bc760e526f635c" + "sha1": "2621e46369be2a6846713e8c3d681a5bba3e0325" } }, "websock": { "version": "0.1.0", - "vcsRevision": "73edde4417f7b45003113b7a34212c3ccd95b9fd", + "vcsRevision": "691f069b209d372b1240d5ae1f57fb7bbafeaba7", "url": "https://github.com/status-im/nim-websock", "downloadMethod": "git", "dependencies": [ @@ -181,36 +173,35 @@ "httputils", "chronicles", "stew", - "asynctest", "nimcrypto", "bearssl", "zlib" ], "checksums": { - "sha1": "ec2b137543f280298ca48de9ed4461a033ba88d3" + "sha1": "c71edfce064e7c0cadde0e687c6edc0caaf9ec07" } }, "dnsclient": { - "version": "0.1.2", - "vcsRevision": "fbb76f8af8a33ab818184a7d4406d9fee20993be", - "url": "https://github.com/ba0f3/dnsclient.nim.git", + "version": "0.3.2", + "vcsRevision": "fcd7443634b950eaea574e5eaa00a628ae029823", + "url": "https://github.com/ba0f3/dnsclient.nim", "downloadMethod": "git", "dependencies": [], "checksums": { - "sha1": "663239a914c814204b30dda6e0902cc0fbd0b8ee" + "sha1": "146aa4a8d512a3a786c5bf54311b79900166d9d7" } }, "secp256k1": { "version": "0.5.2", - "vcsRevision": "5340cf188168d6afcafc8023770d880f067c0b2f", - "url": "https://github.com/status-im/nim-secp256k1.git", + "vcsRevision": "fd173fdff863ce2e211cf64c9a03bc7539fe40b0", + "url": "https://github.com/status-im/nim-secp256k1", "downloadMethod": "git", "dependencies": [ "stew", "nimcrypto" ], "checksums": { - "sha1": "ae9cbea4487be94a06653ffee075a7f1bd1e231e" + "sha1": "657c79f6f2b1b6da92a9cda81ffc9f95d26443cb" } } } diff --git a/tests/config.nims b/tests/config.nims index 3ec031e767..ab493670dd 100644 --- a/tests/config.nims +++ b/tests/config.nims @@ -1,3 +1,5 @@ import ../config.nims --threads:on +--d:metrics +--d:withoutPCRE diff --git a/tests/stubs/autonatstub.nim b/tests/stubs/autonatstub.nim new file mode 100644 index 0000000000..e05da826ab --- /dev/null +++ b/tests/stubs/autonatstub.nim @@ -0,0 +1,44 @@ +{.used.} + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import chronos +import ../../libp2p/protocols/connectivity/autonat +import ../../libp2p/peerid +import ../../libp2p/multiaddress + +type + AutonatStub* = ref object of Autonat + answer*: Answer + dials: int + expectedDials: int + finished*: Future[void] + + Answer* = enum + Reachable, + NotReachable, + Unknown + +proc new*(T: typedesc[AutonatStub], expectedDials: int): T = + return T(dials: 0, expectedDials: expectedDials, finished: newFuture[void]()) + +method dialMe*( + self: AutonatStub, + pid: PeerId, + addrs: seq[MultiAddress] = newSeq[MultiAddress]()): + Future[MultiAddress] {.async.} = + + self.dials += 1 + + if self.dials == self.expectedDials: + self.finished.complete() + case self.answer: + of Reachable: + return MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + of NotReachable: + raise newException(AutonatUnreachableError, "") + of Unknown: + raise newException(AutonatError, "") diff --git a/tests/stubs.nim b/tests/stubs/torstub.nim similarity index 98% rename from tests/stubs.nim rename to tests/stubs/torstub.nim index aae661de44..ca5fe97769 100644 --- a/tests/stubs.nim +++ b/tests/stubs/torstub.nim @@ -7,7 +7,7 @@ else: import tables import chronos, stew/[byteutils, endians2, shims/net] -import ../libp2p/[stream/connection, +import ../../libp2p/[stream/connection, protocols/connectivity/relay/utils, transports/tcptransport, transports/tortransport, diff --git a/tests/testautonat.nim b/tests/testautonat.nim index ce44b581a8..7542cddf52 100644 --- a/tests/testautonat.nim +++ b/tests/testautonat.nim @@ -2,6 +2,8 @@ import std/options import chronos import ../libp2p/[ + transports/tcptransport, + upgrademngrs/upgrade, builders, protocols/connectivity/autonat ], @@ -34,7 +36,7 @@ suite "Autonat": teardown: checkTrackers() - asyncTest "Simple test": + asyncTest "dialMe returns public address": let src = newStandardSwitch() dst = createAutonatSwitch() @@ -43,9 +45,10 @@ suite "Autonat": await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) let ma = await Autonat.new(src).dialMe(dst.peerInfo.peerId, dst.peerInfo.addrs) + check ma in src.peerInfo.addrs await allFutures(src.stop(), dst.stop()) - asyncTest "Simple failed test": + asyncTest "dialMe handles dial error msg": let src = newStandardSwitch() dst = makeAutonatServicePrivate() @@ -54,6 +57,33 @@ suite "Autonat": await dst.start() await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) - expect AutonatError: + expect AutonatUnreachableError: discard await Autonat.new(src).dialMe(dst.peerInfo.peerId, dst.peerInfo.addrs) await allFutures(src.stop(), dst.stop()) + + asyncTest "Timeout is triggered in autonat handle": + let + src = newStandardSwitch() + dst = newStandardSwitch() + autonat = Autonat.new(dst, dialTimeout = 1.seconds) + doesNothingListener = TcpTransport.new(upgrade = Upgrade()) + + dst.mount(autonat) + await src.start() + await dst.start() + await doesNothingListener.start(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]) + + await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) + let conn = await src.dial(dst.peerInfo.peerId, @[AutonatCodec]) + let buffer = AutonatDial(peerInfo: some(AutonatPeerInfo( + id: some(src.peerInfo.peerId), + # we ask to be dialed in the does nothing listener instead + addrs: doesNothingListener.addrs + ))).encode().buffer + await conn.writeLp(buffer) + let response = AutonatMsg.decode(await conn.readLp(1024)).get().response.get() + check: + response.status == DialError + response.text.get() == "Timeout exceeded!" + response.ma.isNone() + await allFutures(doesNothingListener.stop(), src.stop(), dst.stop()) diff --git a/tests/testautonatservice.nim b/tests/testautonatservice.nim new file mode 100644 index 0000000000..0d0c5f3bb4 --- /dev/null +++ b/tests/testautonatservice.nim @@ -0,0 +1,251 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import std/options +import chronos, metrics +import unittest2 +import ../libp2p/[builders, + switch, + services/autonatservice, + protocols/connectivity/autonat] +import ./helpers +import stubs/autonatstub + +proc createSwitch(autonatSvc: Service = nil, withAutonat = true): Switch = + var builder = SwitchBuilder.new() + .withRng(newRng()) + .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) + .withTcpTransport() + .withMplex() + .withNoise() + + if withAutonat: + builder = builder.withAutonat() + + if autonatSvc != nil: + builder = builder.withServices(@[autonatSvc]) + + return builder.build() + +suite "Autonat Service": + teardown: + checkTrackers() + + asyncTest "Peer must be not reachable": + + let autonatStub = AutonatStub.new(expectedDials = 3) + autonatStub.answer = NotReachable + + let autonatService = AutonatService.new(autonatStub, newRng()) + + let switch1 = createSwitch(autonatService) + let switch2 = createSwitch() + let switch3 = createSwitch() + let switch4 = createSwitch() + + check autonatService.networkReachability() == NetworkReachability.Unknown + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + + await autonatStub.finished + + check autonatService.networkReachability() == NetworkReachability.NotReachable + check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3 + + await allFuturesThrowing( + switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + + asyncTest "Peer must be reachable": + + let autonat = Autonat.new(switch = nil) + + let autonatService = AutonatService.new(autonat, newRng(), some(1.seconds)) + + let switch1 = createSwitch(autonatService) + let switch2 = createSwitch() + let switch3 = createSwitch() + let switch4 = createSwitch() + + let awaiter = newFuture[void]() + + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() >= 0.3: + if not awaiter.finished: + awaiter.complete() + + autonat.switch = switch1 + check autonatService.networkReachability() == NetworkReachability.Unknown + + autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + + await awaiter + + check autonatService.networkReachability() == NetworkReachability.Reachable + check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3 + + await allFuturesThrowing( + switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + + asyncTest "Peer must be not reachable and then reachable": + + let autonatStub = AutonatStub.new(expectedDials = 6) + autonatStub.answer = NotReachable + + let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds)) + + let switch1 = createSwitch(autonatService) + let switch2 = createSwitch() + let switch3 = createSwitch() + let switch4 = createSwitch() + + let awaiter = newFuture[void]() + + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: + if not awaiter.finished: + autonatStub.answer = Reachable + awaiter.complete() + + check autonatService.networkReachability() == NetworkReachability.Unknown + + autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + + await awaiter + + check autonatService.networkReachability() == NetworkReachability.NotReachable + check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3 + + await autonatStub.finished + + check autonatService.networkReachability() == NetworkReachability.Reachable + check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3 + + await allFuturesThrowing(switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + + asyncTest "Peer must be reachable when one connected peer has autonat disabled": + let autonat = Autonat.new(switch = nil) + + let autonatService = AutonatService.new(autonat, newRng(), some(1.seconds), maxQueueSize = 2) + + let switch1 = createSwitch(autonatService) + let switch2 = createSwitch(withAutonat = false) + let switch3 = createSwitch() + let switch4 = createSwitch() + + let awaiter = newFuture[void]() + + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: + if not awaiter.finished: + awaiter.complete() + + autonat.switch = switch1 + check autonatService.networkReachability() == NetworkReachability.Unknown + + autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + + await awaiter + + check autonatService.networkReachability() == NetworkReachability.Reachable + check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 1 + + await allFuturesThrowing( + switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + + asyncTest "Unknown answers must be ignored": + + let autonatStub = AutonatStub.new(expectedDials = 6) + autonatStub.answer = NotReachable + + let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds), maxQueueSize = 3) + + let switch1 = createSwitch(autonatService) + let switch2 = createSwitch() + let switch3 = createSwitch() + let switch4 = createSwitch() + + let awaiter = newFuture[void]() + + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: + if not awaiter.finished: + autonatStub.answer = Unknown + awaiter.complete() + + check autonatService.networkReachability() == NetworkReachability.Unknown + + autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + + await awaiter + + check autonatService.networkReachability() == NetworkReachability.NotReachable + check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 1/3 + + await autonatStub.finished + + check autonatService.networkReachability() == NetworkReachability.NotReachable + check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 1/3 + + await allFuturesThrowing(switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + + asyncTest "Calling setup and stop twice must work": + + let switch = createSwitch() + let autonatService = AutonatService.new(AutonatStub.new(expectedDials = 0), newRng(), some(1.seconds)) + + check (await autonatService.setup(switch)) == true + check (await autonatService.setup(switch)) == false + + check (await autonatService.stop(switch)) == true + check (await autonatService.stop(switch)) == false + + await allFuturesThrowing(switch.stop()) diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 6bdf1aa40a..a29993d53b 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -278,6 +278,79 @@ suite "Multistream select": await handlerWait.wait(30.seconds) + asyncTest "e2e - streams limit": + let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] + let blocker = newFuture[void]() + + # Start 5 streams which are blocked by `blocker` + # Try to start a new one, which should fail + # Unblock the 5 streams, check that we can open a new one + proc testHandler(conn: Connection, + proto: string): + Future[void] {.async, gcsafe.} = + await blocker + await conn.writeLp("Hello!") + await conn.close() + + var protocol: LPProtocol = LPProtocol.new( + @["/test/proto/1.0.0"], + testHandler, + maxIncomingStreams = 5 + ) + + protocol.handler = testHandler + let msListen = MultistreamSelect.new() + msListen.addHandler("/test/proto/1.0.0", protocol) + + let transport1 = TcpTransport.new(upgrade = Upgrade()) + await transport1.start(ma) + + proc acceptedOne(c: Connection) {.async.} = + await msListen.handle(c) + await c.close() + + proc acceptHandler() {.async, gcsafe.} = + while true: + let conn = await transport1.accept() + asyncSpawn acceptedOne(conn) + + var handlerWait = acceptHandler() + + let msDial = MultistreamSelect.new() + let transport2 = TcpTransport.new(upgrade = Upgrade()) + + proc connector {.async.} = + let conn = await transport2.dial(transport1.addrs[0]) + check: (await msDial.select(conn, "/test/proto/1.0.0")) == true + check: string.fromBytes(await conn.readLp(1024)) == "Hello!" + await conn.close() + + # Fill up the 5 allowed streams + var dialers: seq[Future[void]] + for _ in 0..<5: + dialers.add(connector()) + + # This one will fail during negotiation + expect(CatchableError): + try: waitFor(connector().wait(1.seconds)) + except AsyncTimeoutError as exc: + check false + raise exc + # check that the dialers aren't finished + check: (await dialers[0].withTimeout(10.milliseconds)) == false + + # unblock the dialers + blocker.complete() + await allFutures(dialers) + + # now must work + waitFor(connector()) + + await transport2.stop() + await transport1.stop() + + await handlerWait.cancelAndWait() + asyncTest "e2e - ls": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] diff --git a/tests/testnative.nim b/tests/testnative.nim index cc04fab66c..fb1f0de9dd 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -41,4 +41,5 @@ import testtcptransport, testrendezvous, testdiscovery, testyamux, - testautonat + testautonat, + testautonatservice diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 3cbb66d7ed..92a7d9f533 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -10,6 +10,7 @@ import ../libp2p/[errors, builders, stream/bufferstream, stream/connection, + multicodec, multiaddress, peerinfo, crypto/crypto, @@ -213,12 +214,40 @@ suite "Switch": "dnsaddr=" & $switch1.peerInfo.addrs[0] & "/p2p/" & $switch1.peerInfo.peerId, ] - check: (await switch2.connect(@[MultiAddress.init("/dnsaddr/test.io/").tryGet()])) == switch1.peerInfo.peerId + check: (await switch2.connect(MultiAddress.init("/dnsaddr/test.io/").tryGet(), true)) == switch1.peerInfo.peerId await switch2.disconnect(switch1.peerInfo.peerId) # via direct ip check not switch2.isConnected(switch1.peerInfo.peerId) - check: (await switch2.connect(switch1.peerInfo.addrs)) == switch1.peerInfo.peerId + check: (await switch2.connect(switch1.peerInfo.addrs[0], true)) == switch1.peerInfo.peerId + + await switch2.disconnect(switch1.peerInfo.peerId) + + await allFuturesThrowing( + switch1.stop(), + switch2.stop() + ) + + asyncTest "e2e connect to peer with known PeerId": + let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) + let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) + await switch1.start() + await switch2.start() + + # via direct ip + check not switch2.isConnected(switch1.peerInfo.peerId) + + # without specifying allow unknown, will fail + expect(DialFailedError): + discard await switch2.connect(switch1.peerInfo.addrs[0]) + + # with invalid PeerId, will fail + let fakeMa = concat(switch1.peerInfo.addrs[0], MultiAddress.init(multiCodec("p2p"), PeerId.random.tryGet().data).tryGet()).tryGet() + expect(CatchableError): + discard (await switch2.connect(fakeMa)) + + # real thing works + check (await switch2.connect(switch1.peerInfo.fullAddrs.tryGet()[0])) == switch1.peerInfo.peerId await switch2.disconnect(switch1.peerInfo.peerId) diff --git a/tests/testtortransport.nim b/tests/testtortransport.nim index de9050e5a8..2431458e61 100644 --- a/tests/testtortransport.nim +++ b/tests/testtortransport.nim @@ -14,9 +14,9 @@ import ../libp2p/[stream/connection, multiaddress, builders] -import ./helpers, ./stubs, ./commontransport +import ./helpers, ./stubs/torstub, ./commontransport -const torServer = initTAddress("127.0.0.1", 9050.Port) +const torServer = initTAddress("127.0.0.1", 9050.Port) var stub: TorServerStub var startFut: Future[void] suite "Tor transport": diff --git a/tools/pinner.nim b/tools/pinner.nim index 702870d396..3417454e3e 100644 --- a/tools/pinner.nim +++ b/tools/pinner.nim @@ -13,8 +13,15 @@ createDir("nimbledeps") discard execCmd("nimble install -dy") var allDeps: Table[string, string] -for (_, dependency) in walkDir("nimbledeps/pkgs"): - let fileContent = parseJson(readFile(dependency & "/nimblemeta.json")) +let nimblePkgs = + if dirExists("nimbledeps/pkgs"): "nimbledeps/pkgs" + else: "nimbledeps/pkgs2" +for (_, dependency) in walkDir(nimblePkgs): + let + jsonContent = parseJson(readFile(dependency & "/nimblemeta.json")) + fileContent = + if "metaData" in jsonContent: jsonContent["metaData"] + else: jsonContent let url = fileContent.getOrDefault("url").getStr("") var version = fileContent.getOrDefault("vcsRevision").getStr("") var packageName = dependency.split('/')[^1].split('-')[0]