Skip to content

Commit

Permalink
Remove intrinsics usage in RibCoroutineWorker
Browse files Browse the repository at this point in the history
This commit changes dispatching logic to a much simpler one that does not require usage of intrinsics.

In order to synchronously get an instance of `bindJob`, we start the `unbindJob` coroutine undispatched (`bindJob` is a child of `unbindJob`, so we need an instance of `unbindJob` to create an instance of `bindJob`).

After saving `bindJob`, we properly dispatch in a cancellable way by simply `launch`ing a new coroutine.
  • Loading branch information
psteiger committed Nov 30, 2023
1 parent 9b5cff6 commit 7efa522
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,10 @@
package com.uber.rib.core

import com.uber.autodispose.coroutinesinterop.asScopeProvider
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.intercepted
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.resume
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -122,10 +112,18 @@ public fun CoroutineScope.bind(
worker: RibCoroutineWorker,
context: CoroutineContext = RibDispatchers.Default,
): BindWorkerHandle {
val bindJob: CompletableJob // A job that completes once worker's onStart completes
var bindJob: CompletableJob? = null // A job that completes once worker's onStart completes
val unbindJob =
launch(context, { bindJob = createBindingJob() }) { bindAndAwaitCancellation(worker, bindJob) }
return BindWorkerHandleImpl(bindJob, unbindJob)
launch(context, CoroutineStart.UNDISPATCHED) {
bindJob =
createBindingJob().also {
// launch again -- this time, we will dispatch if installed dispatcher
// tell us to (CoroutineDispatcher.isDispatchNeeded()).
launch { bindAndAwaitCancellation(worker, it) }
}
}
// !! is safe here -- outer coroutine was started undispatched.
return BindWorkerHandleImpl(bindJob!!, unbindJob)
}

/** Binds [workers] in a scope that is a child of the [CoroutineScope] receiver. */
Expand All @@ -139,46 +137,6 @@ public fun CoroutineScope.bind(
}
}

/**
* Guarantees to run synchronous [init] block exactly once in an undispatched manner.
*
* **Exceptions thrown in [init] block will be rethrown at call site.**
*/
@OptIn(ExperimentalContracts::class)
private fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
init: CoroutineScope.() -> Unit = {},
block: suspend CoroutineScope.() -> Unit,
): Job {
contract {
callsInPlace(init, InvocationKind.EXACTLY_ONCE)
callsInPlace(block, InvocationKind.AT_MOST_ONCE)
}
var initError: Throwable? = null
val job =
launch(context, CoroutineStart.UNDISPATCHED) {
runCatching(init).onFailure { initError = it }.getOrThrow()
dispatchIfNeeded()
block()
}
initError?.let { throw it }
return job
}

private suspend inline fun dispatchIfNeeded() {
suspendCoroutineUninterceptedOrReturn sc@{ cont ->
val context = cont.context
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
if (!dispatcher.isDispatchNeeded(context)) return@sc Unit
// Coroutine was not in the right context -- we'll dispatch.
context.ensureActive()
cont.intercepted().resume(Unit)
COROUTINE_SUSPENDED
}
// Don't continue if coroutine was cancelled after returning from dispatch.
coroutineContext.ensureActive()
}

private fun CoroutineScope.createBindingJob(): CompletableJob =
Job(coroutineContext.job).also {
// Cancel `unbindJob` if `bindJob` has cancelled. This is important to abort `onStart` if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class RibCoroutineWorkerTest {
worker.doOnStart { error(onStartErrorMsg) }
worker.doOnStop { error(onStopErrorMsg) }
bind(worker).join()
runCurrent()
assertThat(throwable).isInstanceOf(IllegalStateException::class.java)
assertThat(throwable).hasMessageThat().isEqualTo(onStartErrorMsg)
val suppressed = throwable?.suppressed?.firstOrNull()
Expand Down

0 comments on commit 7efa522

Please sign in to comment.