Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: emit drain as soon as capacity is available #367

Merged
merged 6 commits into from
Jul 14, 2023
Merged

Conversation

metcoder95
Copy link
Member

Implements insights from #126.

The new rationale to emit drain event, goes as follows:

The rationale is to checkpoint the capacity every time new
task is being schedulded or appended to the queue.
If capacity exceeded but we having checkpointed yet the check,
we allow the pool to see if it can flush itself soon.

At subsequent checks, we verify that the previously checkpoint
was negative, and its current capacity. If the capacity excheded
and previous checkpoint was negative, and the capacity is above
the workload, we emit the drain event.

If previous checkpoint is false, but the capacity stills exceeded,
we checkpoint as false, and wait for a further checkpoint to validate
once more the checks.

@metcoder95 metcoder95 requested a review from RafaelGSS June 30, 2023 08:53
@metcoder95 metcoder95 changed the title refactor: emit drain as soon as capacity is available refactor: emit drain as soon as capacity is available Jun 30, 2023
@RafaelGSS
Copy link
Member

is it different from #368?

@metcoder95
Copy link
Member Author

is it different from #368?

Yes, as this change affects the way we decide when to emit the drain event. #368 is meant to add a new property only.

I'll rebase the other one on top of this one once merged

README.md Outdated
@@ -438,7 +438,8 @@ itself.

### Event: `'drain'`

A `'drain'` event is emitted whenever the `queueSize` reaches `0`.
A `'drain'` event is emitted whenever the `queueSize` is below the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should do it. The docs clearly show an example:

The 'drain' event may be used to receive notification when the queue is empty and all tasks have been submitted to workers for processing. -- https://github.com/piscinajs/piscina#backpressure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, and the current approach might not be the more optimal as it requires waiting until the queue is totally empty before resuming work, which can not be the best depending on the workload (e.g. extending waiting times, etc.)

The idea was to allow the pool to notify whenever there is spare capacity to be used for scheduling new tasks, maybe we can think of something better?

Copy link
Collaborator

@ronag ronag Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHo this should work the same as stream.Writable or be called something else. Otherwise it is confusing. What 'drain' means is kind of established.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change this behaviour, it should be a semver-major.

@ronag I couldn't find it in the docs, but the behaviour stream.Writable does it the same as this PR is trying to apply?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change this behaviour, it should be a semver-major.

Sure, is already marked as so and pointing to next branch 👍

@ronag I couldn't find it in the docs, but the behaviour stream.Writable does it the same as this PR is trying to apply?

Yeah, have the same issue when trying to check stream.Writable, so how we should move forward? 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina @jasnell any thoughts on this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description needs to be updated with the current intended behaviour.

@ronag
Copy link
Collaborator

ronag commented Jul 7, 2023

Giving this some more thought here is what I currently think is the "correct" behaviour (inspired by looking at Writable).

Define pending as number of tasks in-flight.
Define maxPending as maxThreads * concurrentTasksPerThread

Our goal is to make it possible for user space to use the pool in a way where always waiting === 0, since we want to avoid creating tasks that can't execute immediately in order to provide back pressure to the task source.

Hence, our logic should be:

Whenever, a task is added AND pending === maxPending, set needsDrain to true.
Whenever, a task finishes executing AND pending < maxPending , set needsDrain to false and emit 'drain'.

EDIT: Fixed incorrect logic.

@metcoder95
Copy link
Member Author

Our goal is to make it possible for user space to use the pool in a way where always waiting === 0, since we want to avoid creating tasks that can't execute immediately in order to provide back pressure to the task source

Got it, this was the part I was missing

Whenever, a task is added AND pending === maxPending, set needsDrain to true.
Whenever, a task finishes executing AND waiting === 0, set needsDrain to false and emit 'drain'.

That resonates to me, then for #368, I'll change the order of the needDrain set, and rebase on top of this branch to implement what we discussed about

@ronag
Copy link
Collaborator

ronag commented Jul 7, 2023

Actually, change this:

Whenever, a task finishes executing AND waiting === 0, set needsDrain to false and emit 'drain'.

To

Whenever, a task finishes executing AND pending < maxPending set needsDrain to false and emit 'drain'.

@metcoder95
Copy link
Member Author

Applied the change, it has a dependency on #368, so I'll add the remaining bits after that one is merged 👍

src/index.ts Outdated
Comment on lines 896 to 898
if (maxCapacity === currentUsage) {
// TODO: needs drain goes here
} else if (maxCapacity > currentUsage) {
Copy link
Collaborator

@ronag ronag Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (maxCapacity === currentUsage) {
// TODO: needs drain goes here
} else if (maxCapacity > currentUsage) {
if (maxCapacity >= currentUsage) {
this._needsDrain = true
} else if (maxCapacity > currentUsage && this._needsDrain) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting for #368 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in f91a697

src/index.ts Outdated Show resolved Hide resolved
src/index.ts Outdated
* in a way where always waiting === 0, since we want to avoid creating tasks that can't execute
* immediately in order to provide back pressure to the task source.
*/
const maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the options object is immutable so it might be a good idea to recalculate and store this value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'll be missing something, but this is not exposed through the Piscina interface, in fact, the ThreadPool is hidden under a private variable, so it should be safe enough and in theory not mutable (at least not at user-land).

Or do you refer to store the maxCapacity to avoid calculate it every time?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't actually know where options comes from so maybe it's not a problem...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where the pool is being stored:

piscina/src/index.ts

Lines 925 to 926 in 0af59c6

#pool : ThreadPool;

Where options are passed:

this.#pool = new ThreadPool(this, options);

It should be safe, I believe

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... no... the options object comes from the Piscina constructor which is a user provided object and can be modified externally by the user.

Copy link
Collaborator

@ronag ronag Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably a general problem. You should make sure there is no outside reference to internal state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it. Yes, you are right, as preserve the object down the path. We'll need to either store it somewhere or make a deep copy of it to avoid external mutations.

Most likely it will deserve its own PR.
For now, I'll store the value within the ThreadPool to avoid side-effects.

@RafaelGSS RafaelGSS merged commit 44bbe65 into next Jul 14, 2023
metcoder95 added a commit that referenced this pull request Sep 13, 2023
Co-authored-by: Robert Nagy <ronagy@icloud.com>
@metcoder95 metcoder95 deleted the refactor/126 branch September 17, 2023 20:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants