diff --git a/README.md b/README.md index 68c57fbd..a1f90bcc 100644 --- a/README.md +++ b/README.md @@ -451,7 +451,11 @@ itself. ### Event: `'drain'` -A `'drain'` event is emitted whenever the `queueSize` reaches `0`. +A `'drain'` event is emitted when the current usage of the +pool is below the maximum capacity of the same. +The intended goal is to provide backpressure to the task source +so creating tasks that can not be executed at immediately can be avoided. + ### Event: `'needsDrain'` diff --git a/src/index.ts b/src/index.ts index 38ad7780..165b4f8c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -386,6 +386,17 @@ class AsynchronouslyCreatedResourcePool< onAvailable (fn : (item : T) => void) { this.onAvailableListeners.push(fn); } + + getCurrentUsage (): number { + let inFlight = 0; + for (const worker of this.readyItems) { + const currentUsage = worker.currentUsage(); + + if (Number.isFinite(currentUsage)) inFlight += currentUsage; + } + + return inFlight; + } } type ResponseCallback = (response : ResponseMessage) => void; @@ -535,12 +546,13 @@ class ThreadPool { completed : number = 0; runTime : Histogram; waitTime : Histogram; - needsDrain : boolean; + _needsDrain : boolean; start : number = performance.now(); inProcessPendingMessages : boolean = false; startingUp : boolean = false; closingUp : boolean = false; workerFailsDuringBootstrap : boolean = false; + maxCapacity: number; constructor (publicInterface : Piscina, options : Options) { this.publicInterface = publicInterface; @@ -569,11 +581,12 @@ class ThreadPool { this.workers = new AsynchronouslyCreatedResourcePool( this.options.concurrentTasksPerWorker); this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w)); + this.maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker; this.startingUp = true; this._ensureMinimumWorkers(); this.startingUp = false; - this.needsDrain = false; + this._needsDrain = false; } _ensureMinimumWorkers () : void { @@ -907,17 +920,21 @@ class ThreadPool { } _maybeDrain () { - const totalCapacity = this.options.maxQueue + this.pendingCapacity(); - const totalQueueSize = this.taskQueue.size + this.skipQueue.length; - - if (totalQueueSize === 0) { - this.needsDrain = false; - this.publicInterface.emit('drain'); - } - - if (totalQueueSize >= totalCapacity) { - this.needsDrain = true; + /** + * Our goal is to make it possible for user space to use the pool + * in a way where always waiting === 0, + * since we want to avoid creating tasks that can't execute + * immediately in order to provide back pressure to the task source. + */ + const { maxCapacity } = this; + const currentUsage = this.workers.getCurrentUsage(); + + if (maxCapacity === currentUsage) { + this._needsDrain = true; this.publicInterface.emit('needsDrain'); + } else if (maxCapacity > currentUsage && this._needsDrain) { + this._needsDrain = false; + this.publicInterface.emit('drain'); } } @@ -1236,7 +1253,7 @@ class Piscina extends EventEmitterAsyncResource { } get needsDrain () : boolean { - return this.#pool.needsDrain; + return this.#pool._needsDrain; } static get isWorkerThread () : boolean { diff --git a/test/post-task.ts b/test/post-task.ts index 79ce1215..bb69ab25 100644 --- a/test/post-task.ts +++ b/test/post-task.ts @@ -98,7 +98,8 @@ test('postTask() validates abortSignal', async ({ rejects }) => { test('Piscina emits drain', async ({ ok, notOk }) => { const pool = new Piscina({ - filename: resolve(__dirname, 'fixtures/eval.js') + filename: resolve(__dirname, 'fixtures/eval.js'), + maxThreads: 1 }); let drained = false; @@ -108,7 +109,7 @@ test('Piscina emits drain', async ({ ok, notOk }) => { needsDrain = pool.needsDrain; }); - await Promise.all([pool.run('123'), pool.run('123')]); + await Promise.all([pool.runTask('123'), pool.runTask('123'), pool.runTask('123')]); ok(drained); notOk(needsDrain);