diff --git a/src/malebolgia.nim b/src/malebolgia.nim index c92e9f4..217644c 100644 --- a/src/malebolgia.nim +++ b/src/malebolgia.nim @@ -104,7 +104,7 @@ type data: array[FixedChanSize, PoolTask] var - thr: array[ThreadPoolSize-1, Thread[void]] # -1 because the main thread counts too + thr: seq[Thread[void]] chan: FixedChan globalStopToken: Atomic[bool] busyThreads: Atomic[int] @@ -153,10 +153,13 @@ proc worker() {.thread.} = # but mark it as completed either way! taskCompleted item.m[] -proc setup() = +proc malebolgiaSetup*(numThr: int) = + doAssert numThr > 0, "Malebolgia must be initialized with a thread pool size of at least one" + doAssert thr.len == 0, "Malebolgia can only be initialized once" initCond(chan.dataAvailable) initCond(chan.spaceAvailable) initLock(chan.L) + thr.setLen(numThr - 1) # -1 because the main thread counts too for i in 0..high(thr): createThread[void](thr[i], worker) proc panicStop*() = @@ -168,7 +171,10 @@ proc panicStop*() = deinitLock(chan.L) proc shouldSend(master: var Master): bool {.inline.} = - master.activeProducer or busyThreads.load(moRelaxed) < ThreadPoolSize-1 + {.cast(gcsafe).}: + let numThr = thr.len + doAssert numThr > 0, "Malebolgia must be initialized first" + master.activeProducer or busyThreads.load(moRelaxed) < numThr template spawnImplRes[T](master: var Master; fn: typed; res: T) = if stillHaveTime(master): @@ -254,6 +260,6 @@ template awaitAll*(master: var Master; body: untyped) = waitForCompletions(master) when not defined(maleSkipSetup): - setup() + malebolgiaSetup(ThreadPoolSize) include malebolgia / masterhandles diff --git a/tests/tasymmetric_load.nim b/tests/tasymmetric_load.nim new file mode 100644 index 0000000..5fa29a5 --- /dev/null +++ b/tests/tasymmetric_load.nim @@ -0,0 +1,27 @@ +import malebolgia + +import std/[os, atomics] + +malebolgiaSetup(numThr = 20) + +var + threadpool = createMaster(activeProducer = true) + busyThreads: Atomic[int] +busyThreads.store 0 + +let mainThreadId = getThreadId() + +proc findStartPositionsAndPlay() = + atomicInc busyThreads + echo "Started. Busy threads: ", busyThreads.load + + sleep(1000) + if getThreadId() == mainThreadId: + sleep(10_000) + + atomicDec busyThreads + echo "Finished. Busy threads: ", busyThreads.load + +threadpool.awaitAll: + for i in 1 .. 10000: + threadpool.spawn findStartPositionsAndPlay()