Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatcher: remove alive in favour of nullifying queues #3701

Merged
merged 4 commits into from
Jun 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 52 additions & 58 deletions std/shared/src/main/scala/cats/effect/std/Dispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,22 @@ object Dispatcher {
supervisor <- Supervisor[F](await, Some((_: Outcome[F, Throwable, _]) => true))

_ <- {
def step(state: Array[AtomicReference[List[Registration]]], await: F[Unit]): F[Unit] =
def step(
state: Array[AtomicReference[List[Registration]]],
await: F[Unit],
doneR: AtomicBoolean): F[Unit] =
for {
done <- F.delay(doneR.get())
Comment on lines +253 to +255
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, does this need to be an AtomicBoolean, what if we just directly pass a done: Boolean parameter? The final invocation of step can set it to true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh maybe not ... 🤔

regs <- F delay {
val buffer = mutable.ListBuffer.empty[Registration]
var i = 0
while (i < workers) {
val st = state(i)
if (st.get() ne Nil) {
val list = st.getAndSet(Nil)
buffer ++= list.reverse // FIFO order here is a form of fairness
if (st.get() ne null) {
val list = if (done) st.getAndSet(null) else st.getAndSet(Nil)
if ((list ne null) && (list ne Nil)) {
Comment on lines -257 to +263
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... because looking at this here, I realize there still must be a race condition. If I remember correctly, there is still a race between the steps running on the worker fiber, and the final step taken when the Dispatcher closes. I guess it's for another PR, but I think getting rid of that race condition would also help with reasoning about this implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're exactly correct! I agree about trying to remove that race, but I recall that things got tricky with that so it will likely take more staring and more thinking 😆

buffer ++= list.reverse // FIFO order here is a form of fairness
}
}
i += 1
}
Expand Down Expand Up @@ -294,7 +300,7 @@ object Dispatcher {
F.delay(latch.set(Noop)) *> // reset latch
// if we're marked as done, yield immediately to give other fibers a chance to shut us down
// we might loop on this a few times since we're marked as done before the supervisor is canceled
F.delay(doneR.get()).ifM(F.cede, step(state, await))
F.delay(doneR.get()).ifM(F.cede, step(state, await, doneR))
}

0.until(workers).toList traverse_ { n =>
Expand All @@ -303,15 +309,11 @@ object Dispatcher {
val worker = dispatcher(doneR, latch, states(n))
val release = F.delay(latch.getAndSet(Open)())
Resource.make(supervisor.supervise(worker)) { _ =>
F.delay(doneR.set(true)) *> step(states(n), F.unit) *> release
F.delay(doneR.set(true)) *> step(states(n), F.unit, doneR) *> release
}
}
}
}

// Alive is the innermost resource so that when releasing
// the very first thing we do is set dispatcher to un-alive
alive <- Resource.make(F.delay(new AtomicBoolean(true)))(ref => F.delay(ref.set(false)))
} yield {
new Dispatcher[F] {
override def unsafeRunAndForget[A](fa: F[A]): Unit = {
Expand Down Expand Up @@ -361,65 +363,57 @@ object Dispatcher {
@tailrec
def enqueue(state: AtomicReference[List[Registration]], reg: Registration): Unit = {
val curr = state.get()
val next = reg :: curr

if (!state.compareAndSet(curr, next)) enqueue(state, reg)
if (curr eq null) {
throw new IllegalStateException("dispatcher already shutdown")
} else {
val next = reg :: curr
if (!state.compareAndSet(curr, next)) enqueue(state, reg)
}
}

if (alive.get()) {
val (state, lt) = if (workers > 1) {
val rand = ThreadLocalRandom.current()
val dispatcher = rand.nextInt(workers)
val inner = rand.nextInt(workers)
val (state, lt) = if (workers > 1) {
val rand = ThreadLocalRandom.current()
val dispatcher = rand.nextInt(workers)
val inner = rand.nextInt(workers)

(states(dispatcher)(inner), latches(dispatcher))
} else {
(states(0)(0), latches(0))
}
(states(dispatcher)(inner), latches(dispatcher))
} else {
(states(0)(0), latches(0))
}

val reg = Registration(action, registerCancel _)
enqueue(state, reg)
val reg = Registration(action, registerCancel _)
enqueue(state, reg)

if (lt.get() ne Open) {
val f = lt.getAndSet(Open)
f()
}
if (lt.get() ne Open) {
val f = lt.getAndSet(Open)
f()
}

val cancel = { () =>
reg.lazySet(false)

@tailrec
def loop(): Future[Unit] = {
val state = cancelState.get()
state match {
case CancelInit =>
val promise = Promise[Unit]()
if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) {
loop()
} else {
promise.future
}
case CanceledNoToken(promise) =>
val cancel = { () =>
reg.lazySet(false)

@tailrec
def loop(): Future[Unit] = {
val state = cancelState.get()
state match {
case CancelInit =>
val promise = Promise[Unit]()
if (!cancelState.compareAndSet(state, CanceledNoToken(promise))) {
loop()
} else {
promise.future
case CancelToken(cancelToken) =>
cancelToken()
}
}
case CanceledNoToken(promise) =>
promise.future
case CancelToken(cancelToken) =>
cancelToken()
}

loop()
}

// double-check after we already put things in the structure
if (alive.get()) {
Comment on lines -412 to -413
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

(promise.future, cancel)
} else {
// we were shutdown *during* the enqueue
cancel()
throw new IllegalStateException("dispatcher already shutdown")
}
} else {
throw new IllegalStateException("dispatcher already shutdown")
loop()
}

(promise.future, cancel)
}
}
}
Expand Down