Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactors to remove .closure., .gcsafe for .async. procs, and added callback compatibility to daemonapi #1240

Merged
merged 5 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions libp2p/daemon/daemonapi.nim
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type

PubsubTicket* = ref object
topic*: string
handler*: P2PPubSubCallback
handler*: P2PPubSubCallback2
transp*: StreamTransport

PubSubMessage* = object
Expand All @@ -162,8 +162,10 @@ type
.}
P2PPubSubCallback* = proc(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.gcsafe, async: (raises: [CatchableError]).}

): Future[bool] {.gcsafe, raises: [CatchableError].}
P2PPubSubCallback2* = proc(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async: (raises: [CatchableError]).}
DaemonError* = object of LPError
DaemonRemoteError* = object of DaemonError
DaemonLocalError* = object of DaemonError
Expand Down Expand Up @@ -1480,7 +1482,7 @@ proc pubsubLoop(
break

proc pubsubSubscribe*(
api: DaemonAPI, topic: string, handler: P2PPubSubCallback
api: DaemonAPI, topic: string, handler: P2PPubSubCallback2
): Future[PubsubTicket] {.
async: (
raises:
Expand Down Expand Up @@ -1508,6 +1510,18 @@ proc pubsubSubscribe*(
await api.closeConnection(transp)
raise exc

proc pubsubSubscribe*(
api: DaemonAPI, topic: string, handler: P2PPubSubCallback
): Future[PubsubTicket] {.
async: (raises: [CatchableError]), deprecated: "Use P2PPubSubCallback2 instead"
.} =
proc wrap(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
): Future[bool] {.async: (raises: [CatchableError]).} =
await handler(api, ticket, message)

await pubsubSubscribe(api, topic, wrap)

proc shortLog*(pinfo: PeerInfo): string =
## Get string representation of ``PeerInfo`` object.
result = newStringOfCap(128)
Expand Down
2 changes: 1 addition & 1 deletion libp2p/nameresolving/dnsresolver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ proc getDnsResponse(

proc datagramDataReceived(
transp: DatagramTransport, raddr: TransportAddress
): Future[void] {.async: (raises: []), closure.} =
): Future[void] {.async: (raises: []).} =
receivedDataFuture.complete()

let sock =
Expand Down
6 changes: 5 additions & 1 deletion libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,13 @@ method getOrCreatePeer*(
peer[].codec = protoNegotiated
return peer[]

proc getConn(): Future[Connection] {.async: (raises: [GetConnDialError]).} =
proc getConn(): Future[Connection] {.
async: (raises: [CancelledError, GetConnDialError])
.} =
try:
return await p.switch.dial(peerId, protosToDial)
except CancelledError as exc:
raise exc
except CatchableError as e:
raise (ref GetConnDialError)(parent: e)

Expand Down
8 changes: 4 additions & 4 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type
PubSubPeerEvent* = object
kind*: PubSubPeerEventKind

GetConn* = proc(): Future[Connection] {.gcsafe, async: (raises: [GetConnDialError]).}
GetConn* =
proc(): Future[Connection] {.async: (raises: [CancelledError, GetConnDialError]).}
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].}
# have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
Expand Down Expand Up @@ -123,9 +124,8 @@ type
# The max number of elements allowed in the non-priority queue.
disconnected: bool

RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.
gcsafe, async: (raises: [])
.}
RPCHandler* =
proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.async: (raises: []).}

when defined(libp2p_agents_metrics):
func shortAgent*(p: PubSubPeer): string =
Expand Down
2 changes: 1 addition & 1 deletion libp2p/transports/quictransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ method dial*(
hostname: string,
address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId),
): Future[P2PConnection] {.async, gcsafe.} =
): Future[P2PConnection] {.async.} =
let connection = await dial(initTAddress(address).tryGet)
return transport.wrapConnection(connection)

Expand Down
4 changes: 2 additions & 2 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ suite "GossipSub":
var handler: TopicHandler
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
handler = proc(topic: string, data: seq[byte]) {.async.} =
seen.mgetOrPut(peerName, 0).inc()
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
Expand Down Expand Up @@ -772,7 +772,7 @@ suite "GossipSub":
var handler: TopicHandler
capture dialer, i:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
handler = proc(topic: string, data: seq[byte]) {.async.} =
try:
if peerName notin seen:
seen[peerName] = 0
Expand Down
4 changes: 2 additions & 2 deletions tests/pubsub/testgossipsub2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ suite "GossipSub":
var handler: TopicHandler
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
handler = proc(topic: string, data: seq[byte]) {.async.} =
seen.mgetOrPut(peerName, 0).inc()
info "seen up", count = seen.len
check topic == "foobar"
Expand Down Expand Up @@ -272,7 +272,7 @@ suite "GossipSub":
var handler: TopicHandler
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(topic: string, data: seq[byte]) {.async, closure.} =
handler = proc(topic: string, data: seq[byte]) {.async.} =
seen.mgetOrPut(peerName, 0).inc()
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
Expand Down
6 changes: 5 additions & 1 deletion tests/pubsub/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ randomize()
type TestGossipSub* = ref object of GossipSub

proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
proc getConn(): Future[Connection] {.async: (raises: [GetConnDialError]).} =
proc getConn(): Future[Connection] {.
async: (raises: [CancelledError, GetConnDialError])
.} =
try:
return await p.switch.dial(peerId, GossipSubCodec_12)
except CancelledError as exc:
raise exc
except CatchableError as e:
raise (ref GetConnDialError)(parent: e)

Expand Down
4 changes: 2 additions & 2 deletions tests/testping.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ suite "Ping":
transport1 = TcpTransport.new(upgrade = Upgrade())
transport2 = TcpTransport.new(upgrade = Upgrade())

proc handlePing(peer: PeerId) {.async, closure.} =
proc handlePing(peer: PeerId) {.async.} =
inc pingReceivedCount

pingProto1 = Ping.new()
Expand Down Expand Up @@ -96,7 +96,7 @@ suite "Ping":
asyncTest "bad ping data ack":
type FakePing = ref object of LPProtocol
let fakePingProto = FakePing()
proc fakeHandle(conn: Connection, proto: string) {.async, closure.} =
proc fakeHandle(conn: Connection, proto: string) {.async.} =
var
buf: array[32, byte]
fakebuf: array[32, byte]
Expand Down
Loading