Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the transfer of WorkerThread data structures when blocking #2769

Merged
merged 8 commits into from
Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,26 @@ lazy val core = crossProject(JSPlatform, JVMPlatform)
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.LocalQueue.stealInto"),
// introduced by #2673, Cross platform weak bag implementation
// changes to `cats.effect.unsafe` package private code
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.WorkerThread.monitor")
)
"cats.effect.unsafe.WorkerThread.monitor"),
// introduced by #2769, Simplify the transfer of WorkerThread data structures when blocking
// changes to `cats.effect.unsafe` package private code
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.WorkerThread$"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.WorkerThread$Data")
) ++ {
if (isDotty.value) {
// Scala 3 specific exclusions
Comment on lines +456 to +457
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. Thanks.

Seq(
// introduced by #2769, Simplify the transfer of WorkerThread data structures when blocking
// changes to `cats.effect.unsafe` package private code
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.WorkStealingThreadPool.localQueuesForwarder"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.WorkerThread.NullData")
)
} else Seq()
}
)
.jvmSettings(
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
case _: Throwable =>
}

val localQueues = threadPool.localQueuesForwarder
val localQueues = threadPool.localQueues
var i = 0
val len = localQueues.length

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ private[effect] final class WorkStealingThreadPool(
* References to worker threads and their local queues.
*/
private[this] val workerThreads: Array[WorkerThread] = new Array(threadCount)
private[this] val localQueues: Array[LocalQueue] = new Array(threadCount)
private[this] val parkedSignals: Array[AtomicBoolean] = new Array(threadCount)
private[unsafe] val localQueues: Array[LocalQueue] = new Array(threadCount)
private[unsafe] val parkedSignals: Array[AtomicBoolean] = new Array(threadCount)
private[unsafe] val fiberBags: Array[WeakBag[IOFiber[_]]] = new Array(threadCount)

/**
* Atomic variable for used for publishing changes to the references in the `workerThreads`
Expand Down Expand Up @@ -118,8 +119,9 @@ private[effect] final class WorkStealingThreadPool(
parkedSignals(i) = parkedSignal
val index = i
val fiberBag = new WeakBag[IOFiber[_]]()
fiberBags(i) = fiberBag
val thread =
new WorkerThread(index, queue, parkedSignal, externalQueue, null, fiberBag, this)
new WorkerThread(index, queue, parkedSignal, externalQueue, fiberBag, this)
workerThreads(i) = thread
i += 1
}
Expand Down Expand Up @@ -584,9 +586,6 @@ private[effect] final class WorkStealingThreadPool(
}
}

private[unsafe] def localQueuesForwarder: Array[LocalQueue] =
localQueues

/*
* What follows is a collection of methos used in the implementation of the
* `cats.effect.unsafe.metrics.ComputePoolSamplerMBean` interface.
Expand Down
68 changes: 26 additions & 42 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,7 @@ private final class WorkerThread(
private[this] var parked: AtomicBoolean,
// External queue used by the local queue for offloading excess fibers, as well as
// for drawing fibers when the local queue is exhausted.
private[this] var external: ScalQueue[AnyRef],
// A mutable reference to a fiber which is used to bypass the local queue
// when a `cede` operation would enqueue a fiber to the empty local queue
// and then proceed to dequeue the same fiber again from the queue. This not
// only avoids unnecessary synchronization, but also avoids notifying other
// worker threads that new work has become available, even though that's not
// true in tis case.
private[this] var cedeBypass: IOFiber[_],
private[this] val external: ScalQueue[AnyRef],
// A worker-thread-local weak bag for tracking suspended fibers.
private[this] var fiberBag: WeakBag[IOFiber[_]],
// Reference to the `WorkStealingThreadPool` in which this thread operates.
Expand All @@ -75,6 +68,15 @@ private final class WorkerThread(
*/
private[this] var random: ThreadLocalRandom = _

/**
* A mutable reference to a fiber which is used to bypass the local queue when a `cede`
* operation would enqueue a fiber to the empty local queue and then proceed to dequeue the
* same fiber again from the queue. This not only avoids unnecessary synchronization, but also
* avoids notifying other worker threads that new work has become available, even though
* that's not true in tis case.
*/
private[this] var cedeBypass: IOFiber[_] = _

/**
* A flag which is set whenever a blocking code region is entered. This is useful for
* detecting nested blocking regions, in order to avoid unnecessarily spawning extra
Expand All @@ -90,8 +92,7 @@ private final class WorkerThread(
*/
private[this] var _active: IOFiber[_] = _

private val dataTransfer: ArrayBlockingQueue[WorkerThread.Data] =
new ArrayBlockingQueue(1)
private val indexTransfer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(1)

val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.incrementAndGet()

Expand Down Expand Up @@ -318,16 +319,18 @@ private final class WorkerThread(
// First of all, remove the references to data structures of the core
// pool because they have already been transferred to another thread
// which took the place of this one.
init(WorkerThread.NullData)
queue = null
parked = null
fiberBag = null

// Add this thread to the cached threads data structure, to be picked up
// by another thread in the future.
pool.cachedThreads.add(this)
try {
// Wait up to 60 seconds (should be configurable in the future) for
// another thread to wake this thread up.
var data = dataTransfer.poll(60L, TimeUnit.SECONDS)
if (data eq null) {
var newIdx: Integer = indexTransfer.poll(60L, TimeUnit.SECONDS)
if (newIdx eq null) {
// The timeout elapsed and no one woke up this thread. Try to remove
// the thread from the cached threads data structure.
if (pool.cachedThreads.remove(this)) {
Expand All @@ -338,12 +341,12 @@ private final class WorkerThread(
// Someone else concurrently stole this thread from the cached
// data structure and will transfer the data soon. Time to wait
// for it again.
data = dataTransfer.take()
init(data)
newIdx = indexTransfer.take()
init(newIdx)
}
} else {
// Some other thread woke up this thread. Time to take its place.
init(data)
init(newIdx)
}
} catch {
case _: InterruptedException =>
Expand Down Expand Up @@ -597,11 +600,9 @@ private final class WorkerThread(
if (cached ne null) {
// There is a cached worker thread that can be reused.
val idx = index
val data = new WorkerThread.Data(idx, queue, parked, external, cedeBypass, fiberBag)
cedeBypass = null
pool.replaceWorker(idx, cached)
// Transfer the data structures to the cached thread and wake it up.
cached.dataTransfer.offer(data)
cached.indexTransfer.offer(idx)
} else {
// Spawn a new `WorkerThread`, a literal clone of this one. It is safe to
// transfer ownership of the local queue and the parked signal to the new
Expand All @@ -614,8 +615,7 @@ private final class WorkerThread(
// for unparking.
val idx = index
val clone =
new WorkerThread(idx, queue, parked, external, cedeBypass, fiberBag, pool)
cedeBypass = null
new WorkerThread(idx, queue, parked, external, fiberBag, pool)
pool.replaceWorker(idx, clone)
clone.start()
}
Expand All @@ -624,13 +624,11 @@ private final class WorkerThread(
}
}

private[this] def init(data: WorkerThread.Data): Unit = {
_index = data.index
queue = data.queue
parked = data.parked
external = data.external
cedeBypass = data.cedeBypass
fiberBag = data.fiberBag
private[this] def init(newIdx: Int): Unit = {
_index = newIdx
queue = pool.localQueues(newIdx)
parked = pool.parkedSignals(newIdx)
fiberBag = pool.fiberBags(newIdx)
}

/**
Expand All @@ -647,17 +645,3 @@ private final class WorkerThread(
def getSuspendedFiberCount(): Int =
fiberBag.size
}

private object WorkerThread {
final class Data(
val index: Int,
val queue: LocalQueue,
val parked: AtomicBoolean,
val external: ScalQueue[AnyRef],
val cedeBypass: IOFiber[_],
val fiberBag: WeakBag[IOFiber[_]]
)

private[WorkerThread] val NullData: Data =
new Data(-1, null, null, null, null, null)
}