Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: emit drain as soon as capacity is available #367

Merged
merged 6 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,11 @@ itself.

### Event: `'drain'`

A `'drain'` event is emitted whenever the `queueSize` reaches `0`.
A `'drain'` event is emitted when the current usage of the
pool is below the maximum capacity of the same.
The intended goal is to provide backpressure to the task source
so creating tasks that can not be executed at immediately can be avoided.


### Event: `'needsDrain'`

Expand Down
43 changes: 30 additions & 13 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,17 @@ class AsynchronouslyCreatedResourcePool<
onAvailable (fn : (item : T) => void) {
this.onAvailableListeners.push(fn);
}

getCurrentUsage (): number {
let inFlight = 0;
for (const worker of this.readyItems) {
const currentUsage = worker.currentUsage();

if (Number.isFinite(currentUsage)) inFlight += currentUsage;
}

return inFlight;
}
}

type ResponseCallback = (response : ResponseMessage) => void;
Expand Down Expand Up @@ -521,11 +532,12 @@ class ThreadPool {
completed : number = 0;
runTime : Histogram;
waitTime : Histogram;
needsDrain : boolean;
_needsDrain : boolean;
start : number = performance.now();
inProcessPendingMessages : boolean = false;
startingUp : boolean = false;
workerFailsDuringBootstrap : boolean = false;
maxCapacity: number;

constructor (publicInterface : Piscina, options : Options) {
this.publicInterface = publicInterface;
Expand Down Expand Up @@ -554,11 +566,12 @@ class ThreadPool {
this.workers = new AsynchronouslyCreatedResourcePool<WorkerInfo>(
this.options.concurrentTasksPerWorker);
this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w));
this.maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker;

this.startingUp = true;
this._ensureMinimumWorkers();
this.startingUp = false;
this.needsDrain = false;
this._needsDrain = false;
}

_ensureMinimumWorkers () : void {
Expand Down Expand Up @@ -880,17 +893,21 @@ class ThreadPool {
}

_maybeDrain () {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
const totalQueueSize = this.taskQueue.size + this.skipQueue.length;

if (totalQueueSize === 0) {
this.needsDrain = false;
this.publicInterface.emit('drain');
}

if (totalQueueSize >= totalCapacity) {
this.needsDrain = true;
/**
* Our goal is to make it possible for user space to use the pool
* in a way where always waiting === 0,
* since we want to avoid creating tasks that can't execute
* immediately in order to provide back pressure to the task source.
*/
const { maxCapacity } = this;
const currentUsage = this.workers.getCurrentUsage();

if (maxCapacity === currentUsage) {
this._needsDrain = true;
this.publicInterface.emit('needsDrain');
} else if (maxCapacity > currentUsage && this._needsDrain) {
this._needsDrain = false;
this.publicInterface.emit('drain');
}
}

Expand Down Expand Up @@ -1123,7 +1140,7 @@ class Piscina extends EventEmitterAsyncResource {
}

get needsDrain () : boolean {
return this.#pool.needsDrain;
return this.#pool._needsDrain;
}

static get isWorkerThread () : boolean {
Expand Down
5 changes: 3 additions & 2 deletions test/post-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ test('postTask() validates abortSignal', async ({ rejects }) => {

test('Piscina emits drain', async ({ ok, notOk }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1
});

let drained = false;
Expand All @@ -108,7 +109,7 @@ test('Piscina emits drain', async ({ ok, notOk }) => {
needsDrain = pool.needsDrain;
});

await Promise.all([pool.run('123'), pool.run('123')]);
await Promise.all([pool.runTask('123'), pool.runTask('123'), pool.runTask('123')]);

ok(drained);
notOk(needsDrain);
Expand Down