Skip to content

Commit

Permalink
refactor: emit drain as soon as capacity is available (#367)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
metcoder95 and ronag committed Sep 13, 2023
1 parent a2cf119 commit 2cf4b46
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 16 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,11 @@ itself.

### Event: `'drain'`

A `'drain'` event is emitted whenever the `queueSize` reaches `0`.
A `'drain'` event is emitted when the current usage of the
pool is below the maximum capacity of the same.
The intended goal is to provide backpressure to the task source
so creating tasks that can not be executed at immediately can be avoided.


### Event: `'needsDrain'`

Expand Down
43 changes: 30 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,17 @@ class AsynchronouslyCreatedResourcePool<
onAvailable (fn : (item : T) => void) {
this.onAvailableListeners.push(fn);
}

getCurrentUsage (): number {
let inFlight = 0;
for (const worker of this.readyItems) {
const currentUsage = worker.currentUsage();

if (Number.isFinite(currentUsage)) inFlight += currentUsage;
}

return inFlight;
}
}

type ResponseCallback = (response : ResponseMessage) => void;
Expand Down Expand Up @@ -535,12 +546,13 @@ class ThreadPool {
completed : number = 0;
runTime : Histogram;
waitTime : Histogram;
needsDrain : boolean;
_needsDrain : boolean;
start : number = performance.now();
inProcessPendingMessages : boolean = false;
startingUp : boolean = false;
closingUp : boolean = false;
workerFailsDuringBootstrap : boolean = false;
maxCapacity: number;

constructor (publicInterface : Piscina, options : Options) {
this.publicInterface = publicInterface;
Expand Down Expand Up @@ -569,11 +581,12 @@ class ThreadPool {
this.workers = new AsynchronouslyCreatedResourcePool<WorkerInfo>(
this.options.concurrentTasksPerWorker);
this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w));
this.maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker;

this.startingUp = true;
this._ensureMinimumWorkers();
this.startingUp = false;
this.needsDrain = false;
this._needsDrain = false;
}

_ensureMinimumWorkers () : void {
Expand Down Expand Up @@ -907,17 +920,21 @@ class ThreadPool {
}

_maybeDrain () {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
const totalQueueSize = this.taskQueue.size + this.skipQueue.length;

if (totalQueueSize === 0) {
this.needsDrain = false;
this.publicInterface.emit('drain');
}

if (totalQueueSize >= totalCapacity) {
this.needsDrain = true;
/**
* Our goal is to make it possible for user space to use the pool
* in a way where always waiting === 0,
* since we want to avoid creating tasks that can't execute
* immediately in order to provide back pressure to the task source.
*/
const { maxCapacity } = this;
const currentUsage = this.workers.getCurrentUsage();

if (maxCapacity === currentUsage) {
this._needsDrain = true;
this.publicInterface.emit('needsDrain');
} else if (maxCapacity > currentUsage && this._needsDrain) {
this._needsDrain = false;
this.publicInterface.emit('drain');
}
}

Expand Down Expand Up @@ -1236,7 +1253,7 @@ class Piscina extends EventEmitterAsyncResource {
}

get needsDrain () : boolean {
return this.#pool.needsDrain;
return this.#pool._needsDrain;
}

static get isWorkerThread () : boolean {
Expand Down
5 changes: 3 additions & 2 deletions test/post-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ test('postTask() validates abortSignal', async ({ rejects }) => {

test('Piscina emits drain', async ({ ok, notOk }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1
});

let drained = false;
Expand All @@ -108,7 +109,7 @@ test('Piscina emits drain', async ({ ok, notOk }) => {
needsDrain = pool.needsDrain;
});

await Promise.all([pool.run('123'), pool.run('123')]);
await Promise.all([pool.runTask('123'), pool.runTask('123'), pool.runTask('123')]);

ok(drained);
notOk(needsDrain);
Expand Down

0 comments on commit 2cf4b46

Please sign in to comment.