-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: emit drain
as soon as capacity is available
#367
Conversation
drain
as soon as capacity is available
is it different from #368? |
README.md
Outdated
@@ -438,7 +438,8 @@ itself. | |||
|
|||
### Event: `'drain'` | |||
|
|||
A `'drain'` event is emitted whenever the `queueSize` reaches `0`. | |||
A `'drain'` event is emitted whenever the `queueSize` is below the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should do it. The docs clearly show an example:
The 'drain' event may be used to receive notification when the queue is empty and all tasks have been submitted to workers for processing. -- https://github.com/piscinajs/piscina#backpressure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, and the current approach might not be the more optimal as it requires waiting until the queue is totally empty before resuming work, which can not be the best depending on the workload (e.g. extending waiting times, etc.)
The idea was to allow the pool to notify whenever there is spare capacity to be used for scheduling new tasks, maybe we can think of something better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHo this should work the same as stream.Writable
or be called something else. Otherwise it is confusing. What 'drain'
means is kind of established.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change this behaviour, it should be a semver-major.
@ronag I couldn't find it in the docs, but the behaviour stream.Writable does it the same as this PR is trying to apply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change this behaviour, it should be a semver-major.
Sure, is already marked as so and pointing to next
branch 👍
@ronag I couldn't find it in the docs, but the behaviour stream.Writable does it the same as this PR is trying to apply?
Yeah, have the same issue when trying to check stream.Writable
, so how we should move forward? 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description needs to be updated with the current intended behaviour.
Giving this some more thought here is what I currently think is the "correct" behaviour (inspired by looking at Writable). Define Our goal is to make it possible for user space to use the pool in a way where always Hence, our logic should be: Whenever, a task is added AND EDIT: Fixed incorrect logic. |
Got it, this was the part I was missing
That resonates to me, then for #368, I'll change the order of the |
Actually, change this:
To
|
Applied the change, it has a dependency on #368, so I'll add the remaining bits after that one is merged 👍 |
src/index.ts
Outdated
if (maxCapacity === currentUsage) { | ||
// TODO: needs drain goes here | ||
} else if (maxCapacity > currentUsage) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (maxCapacity === currentUsage) { | |
// TODO: needs drain goes here | |
} else if (maxCapacity > currentUsage) { | |
if (maxCapacity >= currentUsage) { | |
this._needsDrain = true | |
} else if (maxCapacity > currentUsage && this._needsDrain) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Waiting for #368 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in f91a697
src/index.ts
Outdated
* in a way where always waiting === 0, since we want to avoid creating tasks that can't execute | ||
* immediately in order to provide back pressure to the task source. | ||
*/ | ||
const maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the options object is immutable so it might be a good idea to recalculate and store this value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'll be missing something, but this is not exposed through the Piscina
interface, in fact, the ThreadPool
is hidden under a private variable, so it should be safe enough and in theory not mutable (at least not at user-land).
Or do you refer to store the maxCapacity
to avoid calculate it every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't actually know where options comes from so maybe it's not a problem...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually... no... the options
object comes from the Piscina
constructor which is a user provided object and can be modified externally by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's probably a general problem. You should make sure there is no outside reference to internal state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, got it. Yes, you are right, as preserve the object down the path. We'll need to either store it somewhere or make a deep copy of it to avoid external mutations.
Most likely it will deserve its own PR.
For now, I'll store the value within the ThreadPool
to avoid side-effects.
Co-authored-by: Robert Nagy <ronagy@icloud.com>
Implements insights from #126.
The new rationale to emit
drain
event, goes as follows: