Skip to content

Commit

Permalink
fixes #23212; Asyncdispatch leaks under --mm:arc (#24556)
Browse files Browse the repository at this point in the history
Fixes #23212

Inspired by [this chronos
PR](status-im/nim-chronos#243)

(cherry picked from commit f29234b)
  • Loading branch information
nitely authored and narimiran committed Jan 15, 2025
1 parent 0c14372 commit ede6540
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 44 deletions.
97 changes: 54 additions & 43 deletions lib/pure/asyncmacro.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,39 @@ proc newCallWithLineInfo(fromNode: NimNode; theProc: NimNode, args: varargs[NimN
result = newCall(theProc, args)
result.copyLineInfo(fromNode)

template createCb(retFutureSym, iteratorNameSym,
strName, identName, futureVarCompletions: untyped) =
type
ClosureIt[T] = iterator(f: Future[T]): owned(FutureBase)

template createCb(futTyp, strName, identName, futureVarCompletions: untyped) =
bind finished
var nameIterVar = iteratorNameSym
proc identName {.closure, stackTrace: off.} =
{.push stackTrace: off.}
proc identName(fut: Future[futTyp], it: ClosureIt[futTyp]) {.effectsOf: it.} =
try:
if not nameIterVar.finished:
var next = nameIterVar()
if not it.finished:
var next = it(fut)
# Continue while the yielded future is already finished.
while (not next.isNil) and next.finished:
next = nameIterVar()
if nameIterVar.finished:
next = it(fut)
if it.finished:
break

if next == nil:
if not retFutureSym.finished:
if not fut.finished:
let msg = "Async procedure ($1) yielded `nil`, are you await'ing a `nil` Future?"
raise newException(AssertionDefect, msg % strName)
else:
{.gcsafe.}:
next.addCallback cast[proc() {.closure, gcsafe.}](identName)
next.addCallback(cast[proc() {.closure, gcsafe.}](proc =
identName(fut, it)))
except:
futureVarCompletions
if retFutureSym.finished:
if fut.finished:
# Take a look at tasyncexceptions for the bug which this fixes.
# That test explains it better than I can here.
raise
else:
retFutureSym.fail(getCurrentException())
identName()
fut.fail(getCurrentException())
{.pop.}

proc createFutureVarCompletions(futureVarIdents: seq[NimNode], fromNode: NimNode): NimNode =
result = newNimNode(nnkStmtList, fromNode)
Expand All @@ -68,7 +71,7 @@ proc createFutureVarCompletions(futureVarIdents: seq[NimNode], fromNode: NimNode
)
)

proc processBody(ctx: Context; node, needsCompletionSym, retFutureSym: NimNode, futureVarIdents: seq[NimNode]): NimNode =
proc processBody(ctx: Context; node, needsCompletionSym, retFutParamSym: NimNode, futureVarIdents: seq[NimNode]): NimNode =
result = node
case node.kind
of nnkReturnStmt:
Expand All @@ -80,14 +83,14 @@ proc processBody(ctx: Context; node, needsCompletionSym, retFutureSym: NimNode,
ctx.hasRet = true
if node[0].kind == nnkEmpty:
if ctx.inTry == 0:
result.add newCallWithLineInfo(node, newIdentNode("complete"), retFutureSym, newIdentNode("result"))
result.add newCallWithLineInfo(node, newIdentNode("complete"), retFutParamSym, newIdentNode("result"))
else:
result.add newAssignment(needsCompletionSym, newLit(true))
else:
let x = processBody(ctx, node[0], needsCompletionSym, retFutureSym, futureVarIdents)
let x = processBody(ctx, node[0], needsCompletionSym, retFutParamSym, futureVarIdents)
if x.kind == nnkYieldStmt: result.add x
elif ctx.inTry == 0:
result.add newCallWithLineInfo(node, newIdentNode("complete"), retFutureSym, x)
result.add newCallWithLineInfo(node, newIdentNode("complete"), retFutParamSym, x)
else:
result.add newAssignment(newIdentNode("result"), x)
result.add newAssignment(needsCompletionSym, newLit(true))
Expand All @@ -100,18 +103,18 @@ proc processBody(ctx: Context; node, needsCompletionSym, retFutureSym: NimNode,
of nnkTryStmt:
if result[^1].kind == nnkFinally:
inc ctx.inTry
result[0] = processBody(ctx, result[0], needsCompletionSym, retFutureSym, futureVarIdents)
result[0] = processBody(ctx, result[0], needsCompletionSym, retFutParamSym, futureVarIdents)
dec ctx.inTry
for i in 1 ..< result.len:
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutureSym, futureVarIdents)
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutParamSym, futureVarIdents)
if ctx.inTry == 0 and ctx.hasRet:
let finallyNode = copyNimNode(result[^1])
let stmtNode = newNimNode(nnkStmtList)
for child in result[^1]:
stmtNode.add child
stmtNode.add newIfStmt(
( needsCompletionSym,
newCallWithLineInfo(node, newIdentNode("complete"), retFutureSym,
newCallWithLineInfo(node, newIdentNode("complete"), retFutParamSym,
newIdentNode("result")
)
)
Expand All @@ -120,10 +123,10 @@ proc processBody(ctx: Context; node, needsCompletionSym, retFutureSym: NimNode,
result[^1] = finallyNode
else:
for i in 0 ..< result.len:
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutureSym, futureVarIdents)
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutParamSym, futureVarIdents)
else:
for i in 0 ..< result.len:
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutureSym, futureVarIdents)
result[i] = processBody(ctx, result[i], needsCompletionSym, retFutParamSym, futureVarIdents)

# echo result.repr

Expand Down Expand Up @@ -229,29 +232,21 @@ proc asyncSingleProc(prc: NimNode): NimNode =
# transformation even more complex.
let body2 = extractDocCommentsAndRunnables(prc.body)

# -> var retFuture = newFuture[T]()
var retFutureSym = genSym(nskVar, "retFuture")
var subRetType =
if returnType.kind == nnkEmpty: newIdentNode("void")
else: baseType
outerProcBody.add(
newVarStmt(retFutureSym,
newCall(
newNimNode(nnkBracketExpr, prc.body).add(
newIdentNode("newFuture"),
subRetType),
newLit(prcName)))) # Get type from return type of this proc

# -> iterator nameIter(): FutureBase {.closure.} =
let retFutParamSym = genSym(nskParam, "retFutParamSym")

# -> iterator nameIter(retFutParam: Future[T]): FutureBase {.closure.} =
# -> {.push warning[resultshadowed]: off.}
# -> var result: T
# -> {.pop.}
# -> <proc_body>
# -> complete(retFuture, result)
# -> complete(retFutParam, result)
var iteratorNameSym = genSym(nskIterator, $prcName & " (Async)")
var needsCompletionSym = genSym(nskVar, "needsCompletion")
var ctx = Context()
var procBody = processBody(ctx, prc.body, needsCompletionSym, retFutureSym, futureVarIdents)
var procBody = processBody(ctx, prc.body, needsCompletionSym, retFutParamSym, futureVarIdents)
# don't do anything with forward bodies (empty)
if procBody.kind != nnkEmpty:
# fix #13899, defer should not escape its original scope
Expand All @@ -275,9 +270,11 @@ proc asyncSingleProc(prc: NimNode): NimNode =

var `needsCompletionSym` = false
procBody.add quote do:
complete(`retFutureSym`, `resultIdent`)
complete(`retFutParamSym`, `resultIdent`)

var closureIterator = newProc(iteratorNameSym, [quote do: owned(FutureBase)],
var retFutureTyp = newNimNode(nnkBracketExpr, prc).add(newIdentNode("Future")).add(subRetType)
var retFutureParam = newNimNode(nnkIdentDefs, prc).add(retFutParamSym).add(retFutureTyp).add(newEmptyNode())
var closureIterator = newProc(iteratorNameSym, [quote do: owned(FutureBase), retFutureParam],
procBody, nnkIteratorDef)
closureIterator.pragma = newNimNode(nnkPragma, lineInfoFrom = prc.body)
closureIterator.addPragma(newIdentNode("closure"))
Expand All @@ -287,17 +284,31 @@ proc asyncSingleProc(prc: NimNode): NimNode =
closureIterator.addPragma(newIdentNode("gcsafe"))
outerProcBody.add(closureIterator)

# -> createCb(retFuture)
# -> createCb()
# NOTE: The NimAsyncContinueSuffix is checked for in asyncfutures.nim to produce
# friendlier stack traces:
var cbName = genSym(nskProc, prcName & NimAsyncContinueSuffix)
var procCb = getAst createCb(retFutureSym, iteratorNameSym,
newStrLitNode(prcName),
cbName,
createFutureVarCompletions(futureVarIdents, nil)
)
var procCb = getAst createCb(
subRetType,
newStrLitNode(prcName),
cbName,
createFutureVarCompletions(futureVarIdents, nil)
)
outerProcBody.add procCb

# -> var retFuture = newFuture[T]()
let retFutureSym = genSym(nskVar, "retFuture")
outerProcBody.add(
newVarStmt(retFutureSym,
newCall(
newNimNode(nnkBracketExpr, prc.body).add(
newIdentNode("newFuture"),
subRetType),
newLit(prcName)))) # Get type from return type of this proc

# -> cb(retFuture, nameIter)
outerProcBody.add newCall(cbName, retFutureSym, iteratorNameSym)

# -> return retFuture
outerProcBody.add newNimNode(nnkReturnStmt, prc.body[^1]).add(retFutureSym)

Expand Down
2 changes: 1 addition & 1 deletion tests/arc/tasyncleak.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
discard """
outputsub: "(allocCount: 4050, deallocCount: 4048)"
outputsub: "(allocCount: 4011, deallocCount: 4009)"
cmd: "nim c --gc:orc -d:nimAllocStats $file"
"""

Expand Down
28 changes: 28 additions & 0 deletions tests/async/t23212.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
discard """
valgrind: true
cmd: '''nim c --mm:arc -d:nimAllocStats -d:useMalloc $file'''
output: '''1000'''
"""

import std/asyncdispatch

var count: int

proc stuff() {.async.} =
#echo count, 1
await sleepAsync(1)
#echo count, 2
count.inc

for _ in 0..<1000:
asyncCheck stuff()

while hasPendingOperations(): poll()

echo count

setGlobalDispatcher(nil)

import std/importutils
privateAccess(AllocStats)
doAssert getAllocStats().allocCount - getAllocStats().deallocCount < 10

0 comments on commit ede6540

Please sign in to comment.