From be33ad6ac7d1cac5ade6bbf8e45541f65705fa25 Mon Sep 17 00:00:00 2001 From: vladopajic Date: Fri, 14 Feb 2025 15:36:29 +0100 Subject: [PATCH] chore: specify raising exceptions in `daemon` module (#1249) --- libp2p/daemon/daemonapi.nim | 288 ++++++++++++++++++++++++++++++------ libp2p/daemon/transpool.nim | 18 ++- libp2p/utility.nim | 17 --- libp2p/wire.nim | 4 +- tests/commoninterop.nim | 16 +- tests/testdaemon.nim | 4 +- 6 files changed, 273 insertions(+), 74 deletions(-) diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index bd93efabe5..e1278befa9 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -158,7 +158,7 @@ type key*: PublicKey P2PStreamCallback* = proc(api: DaemonAPI, stream: P2PStream): Future[void] {. - gcsafe, raises: [CatchableError] + gcsafe, async: (raises: [CatchableError]) .} P2PPubSubCallback* = proc( api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage @@ -485,7 +485,11 @@ proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [DaemonLocalErro if initProtoBuffer(error).getRequiredField(1, result).isErr(): raise newException(DaemonLocalError, "Error message is missing!") -proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = +proc recvMessage( + conn: StreamTransport +): Future[seq[byte]] {. + async: (raises: [TransportIncompleteError, TransportError, CancelledError]) +.} = var size: uint length: int @@ -508,13 +512,19 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = result = buffer -proc newConnection*(api: DaemonAPI): Future[StreamTransport] {.raises: [LPError].} = - result = connect(api.address) +proc newConnection*( + api: DaemonAPI +): Future[StreamTransport] {. + async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]) +.} = + await connect(api.address) -proc closeConnection*(api: DaemonAPI, transp: StreamTransport): Future[void] = - result = transp.closeWait() +proc closeConnection*( + api: DaemonAPI, transp: StreamTransport +): Future[void] {.async: (raises: [CancelledError]).} = + await transp.closeWait() -proc socketExists(address: MultiAddress): Future[bool] {.async.} = +proc socketExists(address: MultiAddress): Future[bool] {.async: (raises: []).} = try: var transp = await connect(address) await transp.closeWait() @@ -534,7 +544,9 @@ else: proc getProcessId(): int = result = int(posix.getpid()) -proc getSocket(pattern: string, count: ptr int): Future[MultiAddress] {.async.} = +proc getSocket( + pattern: string, count: ptr int +): Future[MultiAddress] {.async: (raises: [ValueError, LPError]).} = var sockname = "" var pid = $getProcessId() sockname = pattern % [pid, $(count[])] @@ -562,7 +574,35 @@ proc getSocket(pattern: string, count: ptr int): Future[MultiAddress] {.async.} closeSocket(sock) # This is forward declaration needed for newDaemonApi() -proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} +proc listPeers*( + api: DaemonAPI +): Future[seq[PeerInfo]] {. + async: ( + raises: [ + ValueError, DaemonLocalError, OSError, MaInvalidAddress, TransportError, + CancelledError, LPError, + ] + ) +.} + +template exceptionToAssert(body: untyped): untyped = + block: + var res: type(body) + when defined(nimHasWarnBareExcept): + {.push warning[BareExcept]: off.} + try: + res = body + except OSError as exc: + raise exc + except IOError as exc: + raise exc + except Defect as exc: + raise exc + except Exception as exc: + raiseAssert exc.msg + when defined(nimHasWarnBareExcept): + {.pop.} + res proc copyEnv(): StringTableRef = ## This procedure copy all environment variables into StringTable. @@ -586,7 +626,14 @@ proc newDaemonApi*( peersRequired = 2, logFile = "", logLevel = IpfsLogLevel.Debug, -): Future[DaemonAPI] {.async.} = +): Future[DaemonAPI] {. + async: ( + raises: [ + ValueError, DaemonLocalError, CancelledError, LPError, OSError, IOError, + AsyncError, + ] + ) +.} = ## Initialize connection to `go-libp2p-daemon` control socket. ## ## ``flags`` - set of P2PDaemonFlags. @@ -780,7 +827,7 @@ proc newDaemonApi*( result = api -proc close*(stream: P2PStream) {.async.} = +proc close*(stream: P2PStream) {.async: (raises: [DaemonLocalError]).} = ## Close ``stream``. if P2PStreamFlags.Closed notin stream.flags: await stream.transp.closeWait() @@ -789,7 +836,9 @@ proc close*(stream: P2PStream) {.async.} = else: raise newException(DaemonLocalError, "Stream is already closed!") -proc close*(api: DaemonAPI) {.async.} = +proc close*( + api: DaemonAPI +) {.async: (raises: [TransportOsError, LPError, ValueError, OSError, CancelledError]).} = ## Shutdown connections to `go-libp2p-daemon` control socket. # await api.pool.close() # Closing all pending servers. @@ -827,7 +876,9 @@ template withMessage(m, body: untyped): untyped = proc transactMessage( transp: StreamTransport, pb: ProtoBuffer -): Future[ProtoBuffer] {.async.} = +): Future[ProtoBuffer] {. + async: (raises: [DaemonLocalError, TransportError, CancelledError]) +.} = let length = pb.getLen() let res = await transp.write(pb.getPtr(), length) if res != length: @@ -845,7 +896,11 @@ proc getPeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [DaemonLocalError].} = discard pb.getRepeatedField(2, result.addresses) -proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} = +proc identity*( + api: DaemonAPI +): Future[PeerInfo] {. + async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]) +.} = ## Get Node identity information var transp = await api.newConnection() try: @@ -860,7 +915,7 @@ proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} = proc connect*( api: DaemonAPI, peer: PeerId, addresses: seq[MultiAddress], timeout = 0 -) {.async.} = +) {.async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]).} = ## Connect to remote peer with id ``peer`` and addresses ``addresses``. var transp = await api.newConnection() try: @@ -870,7 +925,9 @@ proc connect*( except CatchableError: await api.closeConnection(transp) -proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} = +proc disconnect*( + api: DaemonAPI, peer: PeerId +) {.async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]).} = ## Disconnect from remote peer with id ``peer``. var transp = await api.newConnection() try: @@ -882,7 +939,12 @@ proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} = proc openStream*( api: DaemonAPI, peer: PeerId, protocols: seq[string], timeout = 0 -): Future[P2PStream] {.async.} = +): Future[P2PStream] {. + async: ( + raises: + [MaInvalidAddress, TransportError, CancelledError, LPError, DaemonLocalError] + ) +.} = ## Open new stream to peer ``peer`` using one of the protocols in ## ``protocols``. Returns ``StreamTransport`` for the stream. var transp = await api.newConnection() @@ -903,9 +965,9 @@ proc openStream*( stream.flags.incl(Outbound) stream.transp = transp result = stream - except CatchableError as exc: + except ResultError[ProtoError]: await api.closeConnection(transp) - raise exc + raise newException(DaemonLocalError, "Wrong message type!") proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = var api = getUserData[DaemonAPI](server) @@ -927,11 +989,28 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = proc addHandler*( api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback -) {.async, raises: [LPError].} = +) {. + async: ( + raises: [ + MaInvalidAddress, DaemonLocalError, TransportError, CancelledError, LPError, + ValueError, + ] + ) +.} = ## Add stream handler ``handler`` for set of protocols ``protocols``. var transp = await api.newConnection() let maddress = await getSocket(api.pattern, addr api.ucounter) var server = createStreamServer(maddress, streamHandler, udata = api) + + var removeHandler = proc(): Future[void] {. + async: (raises: [CancelledError, TransportError]) + .} = + for item in protocols: + api.handlers.del(item) + server.stop() + server.close() + await server.join() + try: for item in protocols: api.handlers[item] = handler @@ -939,17 +1018,28 @@ proc addHandler*( var pb = await transp.transactMessage(requestStreamHandler(maddress, protocols)) pb.withMessage: api.servers.add(P2PServer(server: server, address: maddress)) - except CatchableError as exc: - for item in protocols: - api.handlers.del(item) - server.stop() - server.close() - await server.join() - raise exc + except DaemonLocalError as e: + await removeHandler() + raise e + except TransportError as e: + await removeHandler() + raise e + except CancelledError as e: + await removeHandler() + raise e finally: await api.closeConnection(transp) -proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = +proc listPeers*( + api: DaemonAPI +): Future[seq[PeerInfo]] {. + async: ( + raises: [ + ValueError, DaemonLocalError, OSError, MaInvalidAddress, TransportError, + CancelledError, LPError, + ] + ) +.} = ## Get list of remote peers to which we are currently connected. var transp = await api.newConnection() try: @@ -964,7 +1054,14 @@ proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = finally: await api.closeConnection(transp) -proc cmTagPeer*(api: DaemonAPI, peer: PeerId, tag: string, weight: int) {.async.} = +proc cmTagPeer*( + api: DaemonAPI, peer: PeerId, tag: string, weight: int +) {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Tag peer with id ``peer`` using ``tag`` and ``weight``. var transp = await api.newConnection() try: @@ -974,7 +1071,14 @@ proc cmTagPeer*(api: DaemonAPI, peer: PeerId, tag: string, weight: int) {.async. finally: await api.closeConnection(transp) -proc cmUntagPeer*(api: DaemonAPI, peer: PeerId, tag: string) {.async.} = +proc cmUntagPeer*( + api: DaemonAPI, peer: PeerId, tag: string +) {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Remove tag ``tag`` from peer with id ``peer``. var transp = await api.newConnection() try: @@ -984,7 +1088,14 @@ proc cmUntagPeer*(api: DaemonAPI, peer: PeerId, tag: string) {.async.} = finally: await api.closeConnection(transp) -proc cmTrimPeers*(api: DaemonAPI) {.async.} = +proc cmTrimPeers*( + api: DaemonAPI +) {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Trim all connections. var transp = await api.newConnection() try: @@ -1058,7 +1169,12 @@ proc getDhtMessageType( proc dhtFindPeer*( api: DaemonAPI, peer: PeerId, timeout = 0 -): Future[PeerInfo] {.async.} = +): Future[PeerInfo] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Find peer with id ``peer`` and return peer information ``PeerInfo``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1073,7 +1189,12 @@ proc dhtFindPeer*( proc dhtGetPublicKey*( api: DaemonAPI, peer: PeerId, timeout = 0 -): Future[PublicKey] {.async.} = +): Future[PublicKey] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Get peer's public key from peer with id ``peer``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1088,7 +1209,12 @@ proc dhtGetPublicKey*( proc dhtGetValue*( api: DaemonAPI, key: string, timeout = 0 -): Future[seq[byte]] {.async.} = +): Future[seq[byte]] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Get value associated with ``key``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1103,7 +1229,12 @@ proc dhtGetValue*( proc dhtPutValue*( api: DaemonAPI, key: string, value: seq[byte], timeout = 0 -) {.async.} = +) {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Associate ``value`` with ``key``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1116,7 +1247,14 @@ proc dhtPutValue*( finally: await api.closeConnection(transp) -proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} = +proc dhtProvide*( + api: DaemonAPI, cid: Cid, timeout = 0 +) {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Provide content with id ``cid``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1131,7 +1269,12 @@ proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} = proc dhtFindPeersConnectedToPeer*( api: DaemonAPI, peer: PeerId, timeout = 0 -): Future[seq[PeerInfo]] {.async.} = +): Future[seq[PeerInfo]] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Find peers which are connected to peer with id ``peer``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1157,7 +1300,12 @@ proc dhtFindPeersConnectedToPeer*( proc dhtGetClosestPeers*( api: DaemonAPI, key: string, timeout = 0 -): Future[seq[PeerId]] {.async.} = +): Future[seq[PeerId]] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Get closest peers for ``key``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1183,7 +1331,12 @@ proc dhtGetClosestPeers*( proc dhtFindProviders*( api: DaemonAPI, cid: Cid, count: uint32, timeout = 0 -): Future[seq[PeerInfo]] {.async.} = +): Future[seq[PeerInfo]] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Get ``count`` providers for content with id ``cid``. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1209,7 +1362,12 @@ proc dhtFindProviders*( proc dhtSearchValue*( api: DaemonAPI, key: string, timeout = 0 -): Future[seq[seq[byte]]] {.async.} = +): Future[seq[seq[byte]]] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Search for value with ``key``, return list of values found. ## ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value @@ -1232,7 +1390,14 @@ proc dhtSearchValue*( finally: await api.closeConnection(transp) -proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = +proc pubsubGetTopics*( + api: DaemonAPI +): Future[seq[string]] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Get list of topics this node is subscribed to. var transp = await api.newConnection() try: @@ -1245,7 +1410,14 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = finally: await api.closeConnection(transp) -proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerId]] {.async.} = +proc pubsubListPeers*( + api: DaemonAPI, topic: string +): Future[seq[PeerId]] {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Get list of peers we are connected to and which also subscribed to topic ## ``topic``. var transp = await api.newConnection() @@ -1260,7 +1432,14 @@ proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerId]] {.asyn finally: await api.closeConnection(transp) -proc pubsubPublish*(api: DaemonAPI, topic: string, value: seq[byte]) {.async.} = +proc pubsubPublish*( + api: DaemonAPI, topic: string, value: seq[byte] +) {. + async: ( + raises: + [DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError] + ) +.} = ## Get list of peer identifiers which are subscribed to topic ``topic``. var transp = await api.newConnection() try: @@ -1280,7 +1459,13 @@ proc getPubsubMessage*(pb: ProtoBuffer): PubSubMessage = discard pb.getField(5, result.signature) discard pb.getField(6, result.key) -proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = +proc pubsubLoop( + api: DaemonAPI, ticket: PubsubTicket +) {. + async: ( + raises: [TransportIncompleteError, TransportError, CancelledError, CatchableError] + ) +.} = while true: var pbmessage = await ticket.transp.recvMessage() if len(pbmessage) == 0: @@ -1296,7 +1481,12 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = proc pubsubSubscribe*( api: DaemonAPI, topic: string, handler: P2PPubSubCallback -): Future[PubsubTicket] {.async: (raises: [CatchableError]).} = +): Future[PubsubTicket] {. + async: ( + raises: + [MaInvalidAddress, TransportError, LPError, CancelledError, DaemonLocalError] + ) +.} = ## Subscribe to topic ``topic``. var transp = await api.newConnection() try: @@ -1308,7 +1498,13 @@ proc pubsubSubscribe*( ticket.transp = transp asyncSpawn pubsubLoop(api, ticket) result = ticket - except CatchableError as exc: + except DaemonLocalError as exc: + await api.closeConnection(transp) + raise exc + except TransportError as exc: + await api.closeConnection(transp) + raise exc + except CancelledError as exc: await api.closeConnection(transp) raise exc diff --git a/libp2p/daemon/transpool.nim b/libp2p/daemon/transpool.nim index fbb42c09de..52de06ffb7 100644 --- a/libp2p/daemon/transpool.nim +++ b/libp2p/daemon/transpool.nim @@ -55,7 +55,7 @@ proc newPool*( address: TransportAddress, poolsize: int = DefaultPoolSize, bufferSize = DefaultStreamBufferSize, -): Future[TransportPool] {.async.} = +): Future[TransportPool] {.async: (raises: [CancelledError]).} = ## Establish pool of connections to address ``address`` with size ## ``poolsize``. var pool = new TransportPool @@ -80,7 +80,9 @@ proc newPool*( pool.state = Connected result = pool -proc acquire*(pool: TransportPool): Future[StreamTransport] {.async.} = +proc acquire*( + pool: TransportPool +): Future[StreamTransport] {.async: (raises: [CancelledError, TransportPoolError]).} = ## Acquire non-busy connection from pool ``pool``. var transp: StreamTransport if pool.state in {Connected}: @@ -102,7 +104,9 @@ proc acquire*(pool: TransportPool): Future[StreamTransport] {.async.} = raise newException(TransportPoolError, "Pool is not ready!") result = transp -proc release*(pool: TransportPool, transp: StreamTransport) = +proc release*( + pool: TransportPool, transp: StreamTransport +) {.async: (raises: [TransportPoolError]).} = ## Release connection ``transp`` back to pool ``pool``. if pool.state in {Connected, Closing}: var found = false @@ -118,7 +122,9 @@ proc release*(pool: TransportPool, transp: StreamTransport) = else: raise newException(TransportPoolError, "Pool is not ready!") -proc join*(pool: TransportPool) {.async.} = +proc join*( + pool: TransportPool +) {.async: (raises: [TransportPoolError, CancelledError]).} = ## Waiting for all connection to become available. if pool.state in {Connected, Closing}: while true: @@ -130,7 +136,9 @@ proc join*(pool: TransportPool) {.async.} = elif pool.state == Connecting: raise newException(TransportPoolError, "Pool is not ready!") -proc close*(pool: TransportPool) {.async.} = +proc close*( + pool: TransportPool +) {.async: (raises: [TransportPoolError, CancelledError]).} = ## Closes transports pool ``pool`` and release all resources. if pool.state == Connected: pool.state = Closing diff --git a/libp2p/utility.nim b/libp2p/utility.nim index cdfd19ca5d..77d77c9d3f 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -71,23 +71,6 @@ proc capLen*[T](s: var seq[T], length: Natural) = if s.len > length: s.setLen(length) -template exceptionToAssert*(body: untyped): untyped = - block: - var res: type(body) - when defined(nimHasWarnBareExcept): - {.push warning[BareExcept]: off.} - try: - res = body - except CatchableError as exc: - raise exc - except Defect as exc: - raise exc - except Exception as exc: - raiseAssert exc.msg - when defined(nimHasWarnBareExcept): - {.pop.} - res - template withValue*[T](self: Opt[T] | Option[T], value, body: untyped): untyped = ## This template provides a convenient way to work with `Option` types in Nim. ## It allows you to execute a block of code (`body`) only when the `Option` is not empty. diff --git a/libp2p/wire.nim b/libp2p/wire.nim index ef133ab48c..af4fb3f458 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -67,7 +67,9 @@ proc connect*( child: StreamTransport = nil, flags = default(set[SocketFlags]), localAddress: Opt[MultiAddress] = Opt.none(MultiAddress), -): Future[StreamTransport] {.async.} = +): Future[StreamTransport] {. + async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]) +.} = ## Open new connection to remote peer with address ``ma`` and create ## new transport object ``StreamTransport`` for established connection. ## ``bufferSize`` is size of internal buffer for transport. diff --git a/tests/commoninterop.nim b/tests/commoninterop.nim index 5d9edd299d..5c40449117 100644 --- a/tests/commoninterop.nim +++ b/tests/commoninterop.nim @@ -185,7 +185,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) = let daemonPeer = await daemonNode.identity() var testFuture = newFuture[void]("test.future") - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + proc daemonHandler( + api: DaemonAPI, stream: P2PStream + ) {.async: (raises: [CatchableError]).} = check string.fromBytes(await stream.transp.readLp()) == "test 1" discard await stream.transp.writeLp("test 2") check string.fromBytes(await stream.transp.readLp()) == "test 3" @@ -227,7 +229,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) = let daemonPeer = await daemonNode.identity() var testFuture = newFuture[string]("test.future") - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + proc daemonHandler( + api: DaemonAPI, stream: P2PStream + ) {.async: (raises: [CatchableError]).} = # We should perform `readLp()` instead of `readLine()`. `readLine()` # here reads actually length prefixed string. var line = await stream.transp.readLine() @@ -351,7 +355,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) = let daemonPeer = await daemonNode.identity() var testFuture = newFuture[string]("test.future") - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + proc daemonHandler( + api: DaemonAPI, stream: P2PStream + ) {.async: (raises: [CatchableError]).} = # We should perform `readLp()` instead of `readLine()`. `readLine()` # here reads actually length prefixed string. var line = await stream.transp.readLine() @@ -485,7 +491,9 @@ proc relayInteropTests*(name: string, relayCreator: SwitchCreator) = # TODO: This Future blocks the daemonHandler after sending the last message. # It exists because there's a strange behavior where stream.close sends # a Rst instead of Fin. We should investigate this at some point. - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + proc daemonHandler( + api: DaemonAPI, stream: P2PStream + ) {.async: (raises: [CatchableError]).} = check "line1" == string.fromBytes(await stream.transp.readLp()) discard await stream.transp.writeLp("line2") check "line3" == string.fromBytes(await stream.transp.readLp()) diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index e0358fab47..27658899e8 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -28,7 +28,9 @@ proc connectStreamTest(): Future[bool] {.async.} = var testFuture = newFuture[string]("test.future") - proc streamHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + proc streamHandler( + api: DaemonAPI, stream: P2PStream + ) {.async: (raises: [CatchableError]).} = var line = await stream.transp.readLine() testFuture.complete(line)