Skip to content

Commit

Permalink
Merge branch 'series/3.5.x' into topic/worker-thread-cries-wolf
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge authored Jan 9, 2024
2 parents 05145d9 + e93ee75 commit c884c71
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
task: Runnable,
fallback: Scheduler): Runnable
private[effect] def canExecuteBlockingCode(): Boolean
private[effect] def prepareForBlocking(): Unit
private[unsafe] def liveTraces(): (
Map[Runnable, Trace],
Map[WorkerThread, (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
} else {
if (child == null) {
// bottomed out
removed
removed + 1
} else {
// note this can cause the bound to go negative, which is fine
child.packInternal(bound - 1, removed + 1, parent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,16 @@ private[effect] final class WorkStealingThreadPool(
}
}

/**
* Prepares the current thread for running blocking code. This should be called only if
* [[canExecuteBlockingCode]] returns `true`.
*/
private[effect] def prepareForBlocking(): Unit = {
val thread = Thread.currentThread()
val worker = thread.asInstanceOf[WorkerThread]
worker.prepareForBlocking()
}

/**
* Schedules a fiber for execution on this thread pool originating from an external thread (a
* thread which is not owned by this thread pool).
Expand Down
30 changes: 17 additions & 13 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private final class WorkerThread(
}

/**
* A mechanism for executing support code before executing a blocking action.
* Support code that must be run before executing a blocking action on this thread.
*
* The current thread creates a replacement worker thread (or reuses a cached one) that will
* take its place in the pool and does a complete transfer of ownership of the data structures
Expand All @@ -792,17 +792,11 @@ private final class WorkerThread(
* continue, it will be cached for a period of time instead. Finally, the `blocking` flag is
* useful when entering nested blocking regions. In this case, there is no need to spawn a
* replacement worker thread.
*
* @note
* There is no reason to enclose any code in a `try/catch` block because the only way this
* code path can be exercised is through `IO.delay`, which already handles exceptions.
*/
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
def prepareForBlocking(): Unit = {
if (blocking) {
// This `WorkerThread` is already inside an enclosing blocking region.
// There is no need to spawn another `WorkerThread`. Instead, directly
// execute the blocking action.
thunk
// This `WorkerThread` has already been prepared for blocking.
// There is no need to spawn another `WorkerThread`.
} else {
// Spawn a new `WorkerThread` to take the place of this thread, as the
// current thread prepares to execute a blocking action.
Expand All @@ -815,7 +809,7 @@ private final class WorkerThread(
cedeBypass = null
}

// Logically enter the blocking region.
// Logically become a blocking thread.
blocking = true

val prefix = pool.blockerThreadPrefix
Expand Down Expand Up @@ -849,11 +843,21 @@ private final class WorkerThread(
pool.blockedWorkerThreadCounter.incrementAndGet()
clone.start()
}

thunk
}
}

/**
* A mechanism for executing support code before executing a blocking action.
*
* @note
* There is no reason to enclose any code in a `try/catch` block because the only way this
* code path can be exercised is through `IO.delay`, which already handles exceptions.
*/
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
prepareForBlocking()
thunk
}

private[this] def init(newIdx: Int): Unit = {
_index = newIdx
queue = pool.localQueues(newIdx)
Expand Down
7 changes: 5 additions & 2 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -991,10 +991,12 @@ private final class IOFiber[A](
if (ec.isInstanceOf[WorkStealingThreadPool]) {
val wstp = ec.asInstanceOf[WorkStealingThreadPool]
if (wstp.canExecuteBlockingCode()) {
wstp.prepareForBlocking()

var error: Throwable = null
val r =
try {
scala.concurrent.blocking(cur.thunk())
cur.thunk()
} catch {
case t if NonFatal(t) =>
error = t
Expand All @@ -1003,7 +1005,8 @@ private final class IOFiber[A](
}

val next = if (error eq null) succeeded(r, 0) else failed(error, 0)
runLoop(next, nextCancelation, nextAutoCede)
// reset auto-cede counter
runLoop(next, nextCancelation, runtime.autoYieldThreshold)
} else {
blockingFallback(cur)
}
Expand Down
31 changes: 31 additions & 0 deletions tests/shared/src/test/scala/cats/effect/CallbackStackSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020-2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

class CallbackStackSpec extends BaseSpec {

"CallbackStack" should {
"correctly report the number removed" in {
val stack = CallbackStack[Unit](null)
val pushed = stack.push(_ => ())
val handle = pushed.currentHandle()
pushed.clearCurrent(handle)
stack.pack(1) must beEqualTo(1)
}
}

}

0 comments on commit c884c71

Please sign in to comment.