-
Notifications
You must be signed in to change notification settings - Fork 532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dispatcher: remove alive
in favour of nullifying queues
#3701
Dispatcher: remove alive
in favour of nullifying queues
#3701
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! I got carried away with some more thoughts but that can be another PR ...
I forgot, did we have a test for this? I have a vague recollection we sort of discovered this issue when investigating a test failure. So it would be good to add/restore that test, whatever it was 😅
doneR: AtomicBoolean): F[Unit] = | ||
for { | ||
done <- F.delay(doneR.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, does this need to be an AtomicBoolean
, what if we just directly pass a done: Boolean
parameter? The final invocation of step
can set it to true
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh maybe not ... 🤔
if (st.get() ne Nil) { | ||
val list = st.getAndSet(Nil) | ||
buffer ++= list.reverse // FIFO order here is a form of fairness | ||
if (st.get() ne null) { | ||
val list = if (done) st.getAndSet(null) else st.getAndSet(Nil) | ||
if ((list ne null) && (list ne Nil)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... because looking at this here, I realize there still must be a race condition. If I remember correctly, there is still a race between the step
s running on the worker fiber, and the final step
taken when the Dispatcher
closes. I guess it's for another PR, but I think getting rid of that race condition would also help with reasoning about this implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you're exactly correct! I agree about trying to remove that race, but I recall that things got tricky with that so it will likely take more staring and more thinking 😆
// double-check after we already put things in the structure | ||
if (alive.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that this definitely improves things. Thank you!
@armanbilge I will have to re-remember things, but I think this came up while we were trying to diagnose the flakiness in the |
Thanks! Yes, perhaps related to this issue. |
This idea came from conversations with Arman!
If workers close their own state queues as they're marked done, then we can remove the double check on alive when submitting work. If a task is submitted to a queue successfully it will be seen by that worker. Or if the worker has already closed the queue, then submitting the task will fail entirely. If workers are controlling their own queues in this way, I'm not convinced we need the alive ref at all! So I've gone so far as to remove it entirely.