diff --git a/CHANGES.md b/CHANGES.md index baee6c4340..943280e796 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,17 @@ # Change log for kotlinx.coroutines +## Version 1.4.2 + +* Fixed `StackOverflowError` in `Job.toString` when `Job` is observed in its intermediate state (#2371). +* Improved liveness and latency of `Dispatchers.Default` and `Dispatchers.IO` in low-loaded mode (#2381). +* Improved performance of consecutive `Channel.cancel` invocations (#2384). +* `SharingStarted` is now `fun` interface (#2397). +* Additional lint settings for `SharedFlow` to catch programmatic errors early (#2376). +* Fixed bug when mutex and semaphore were not released during cancellation (#2390, thanks to @Tilps for reproducing). +* Some corner cases in cancellation propagation between coroutines and listenable futures are repaired (#1442, thanks to @vadimsemenov). +* Fixed unconditional cast to `CoroutineStackFrame` in exception recovery that triggered failures of instrumented code (#2386). +* Platform-specific dependencies are removed from `kotlinx-coroutines-javafx` (#2360). + ## Version 1.4.1 This is a patch release with an important fix to the `SharedFlow` implementation. diff --git a/README.md b/README.md index 7bd8e5a74b..77de32bbdc 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) [![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0) -[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.4.1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.4.1) +[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.4.2) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.4.2) [![Kotlin](https://img.shields.io/badge/kotlin-1.4.0-blue.svg?logo=kotlin)](http://kotlinlang.org) [![Slack channel](https://img.shields.io/badge/chat-slack-green.svg?logo=slack)](https://kotlinlang.slack.com/messages/coroutines/) @@ -86,7 +86,7 @@ Add dependencies (you can also add other modules that you need): org.jetbrains.kotlinx kotlinx-coroutines-core - 1.4.1 + 1.4.2 ``` @@ -104,7 +104,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1' + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' } ``` @@ -130,7 +130,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2") } ``` @@ -152,7 +152,7 @@ In common code that should get compiled for different platforms, you can add dep ```groovy commonMain { dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2") } } ``` @@ -163,7 +163,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android) module as dependency when using `kotlinx.coroutines` on Android: ```groovy -implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1' +implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2' ``` This gives you access to Android [Dispatchers.Main] @@ -190,7 +190,7 @@ packagingOptions { ### JS [Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.4.1/jar) +[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.4.2/jar) (follow the link to get the dependency declaration snippet). You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM. @@ -198,7 +198,7 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli ### Native [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.4.1/jar) +[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.4.2/jar) (follow the link to get the dependency declaration snippet). Only single-threaded code (JS-style) on Kotlin/Native is currently supported. diff --git a/build.gradle b/build.gradle index 79c7f3553e..938d42e7a1 100644 --- a/build.gradle +++ b/build.gradle @@ -33,6 +33,10 @@ buildscript { throw new IllegalArgumentException("'kotlin_snapshot_version' should be defined when building with snapshot compiler") } } + // These three flags are enabled in train builds for JVM IR compiler testing + ext.jvm_ir_enabled = rootProject.properties['enable_jvm_ir'] != null + ext.jvm_ir_api_check_enabled = rootProject.properties['enable_jvm_ir_api_check'] != null + ext.native_targets_enabled = rootProject.properties['disable_native_targets'] == null // Determine if any project dependency is using a snapshot version ext.using_snapshot_version = build_snapshot_train @@ -323,3 +327,12 @@ knit { } knitPrepare.dependsOn getTasksByName("dokka", true) + +// Disable binary compatibility check for JVM IR compiler output by default +if (jvm_ir_enabled) { + subprojects { project -> + configure(tasks.matching { it.name == "apiCheck" }) { + enabled = enabled && jvm_ir_api_check_enabled + } + } +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 1ffa02d1ae..9163cf5af1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ # # Kotlin -version=1.4.1-SNAPSHOT +version=1.4.2-SNAPSHOT group=org.jetbrains.kotlinx kotlin_version=1.4.0 diff --git a/gradle/compile-jvm-multiplatform.gradle b/gradle/compile-jvm-multiplatform.gradle index e72d30511e..44b0cbedba 100644 --- a/gradle/compile-jvm-multiplatform.gradle +++ b/gradle/compile-jvm-multiplatform.gradle @@ -6,8 +6,12 @@ sourceCompatibility = 1.6 targetCompatibility = 1.6 kotlin { - targets { - fromPreset(presets.jvm, 'jvm') + jvm { + if (rootProject.ext.jvm_ir_enabled) { + compilations.all { + kotlinOptions.useIR = true + } + } } sourceSets { jvmTest.dependencies { diff --git a/gradle/compile-jvm.gradle b/gradle/compile-jvm.gradle index caa5c45f60..bd2ae14775 100644 --- a/gradle/compile-jvm.gradle +++ b/gradle/compile-jvm.gradle @@ -9,6 +9,12 @@ apply plugin: 'org.jetbrains.kotlin.jvm' sourceCompatibility = 1.6 targetCompatibility = 1.6 +if (rootProject.ext.jvm_ir_enabled) { + kotlin.target.compilations.all { + kotlinOptions.useIR = true + } +} + dependencies { testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version" // Workaround to make addSuppressed work in tests diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 974e246283..6d1fab3d69 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -17,8 +17,11 @@ import kotlin.coroutines.* * The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws * [IllegalArgumentException], because Futures don't have a way to start lazily. * - * The created coroutine is cancelled when the resulting future completes successfully, fails, or - * is cancelled. + * When the created coroutine [isCompleted][Job.isCompleted], it will try to + * *synchronously* complete the returned Future with the same outcome. This will + * succeed, barring a race with external cancellation of returned [ListenableFuture]. + * + * Cancellation is propagated bidirectionally. * * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be * added/overlaid by passing [context]. @@ -32,8 +35,10 @@ import kotlin.coroutines.* * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging * facilities. * - * Note that the error and cancellation semantics of [future] are _subtly different_ than - * [asListenableFuture]'s. See [ListenableFutureCoroutine] for details. + * Note that the error and cancellation semantics of [future] are _subtly different_ than [asListenableFuture]'s. + * In particular, any exception that happens in the coroutine after returned future is + * successfully cancelled will be passed to the [CoroutineExceptionHandler] from the [context]. + * See [ListenableFutureCoroutine] for details. * * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context. * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. @@ -46,14 +51,9 @@ public fun CoroutineScope.future( ): ListenableFuture { require(!start.isLazy) { "$start start is not supported" } val newContext = newCoroutineContext(context) - val future = SettableFuture.create() - val coroutine = ListenableFutureCoroutine(newContext, future) - future.addListener( - coroutine, - MoreExecutors.directExecutor()) + val coroutine = ListenableFutureCoroutine(newContext) coroutine.start(start, coroutine, block) - // Return hides the SettableFuture. This should prevent casting. - return object: ListenableFuture by future {} + return coroutine.future } /** @@ -70,7 +70,7 @@ public fun CoroutineScope.future( * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel], * it will cancel the returned `Deferred`. * - * When the returned `Deferred` is [cancelled][Deferred.cancel()], it will try to propagate the + * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred` * will complete with a different outcome than `this` `ListenableFuture`. @@ -152,7 +152,8 @@ public fun ListenableFuture.asDeferred(): Deferred { deferred.invokeOnCompletion { cancel(false) } - return deferred + // Return hides the CompletableDeferred. This should prevent casting. + return object : Deferred by deferred {} } /** @@ -166,7 +167,7 @@ public fun ListenableFuture.asDeferred(): Deferred { * state - a serious fundamental bug. */ private fun ExecutionException.nonNullCause(): Throwable { - return this.cause!! + return this.cause!! } /** @@ -195,13 +196,21 @@ private fun ExecutionException.nonNullCause(): Throwable { * * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation * semantics. See [Job] for a description of coroutine cancellation semantics. See - * [DeferredListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and + * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and * corner cases of this method. */ public fun Deferred.asListenableFuture(): ListenableFuture { - val outerFuture = OuterFuture(this) - outerFuture.afterInit() - return outerFuture + val listenableFuture = JobListenableFuture(this) + // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred. + // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel(). + invokeOnCompletion { throwable -> + if (throwable == null) { + listenableFuture.complete(getCompleted()) + } else { + listenableFuture.completeExceptionallyOrCancel(throwable) + } + } + return listenableFuture } /** @@ -215,7 +224,6 @@ public fun Deferred.asListenableFuture(): ListenableFuture { * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well. * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or * [kotlinx.coroutines.NonCancellable]. - * */ public suspend fun ListenableFuture.await(): T { try { @@ -255,8 +263,7 @@ private class ToContinuation( continuation.cancel() } else { try { - continuation.resumeWith( - Result.success(Uninterruptibles.getUninterruptibly(futureToObserve))) + continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve)) } catch (e: ExecutionException) { // ExecutionException is the only kind of exception that can be thrown from a gotten // Future. Anything else showing up here indicates a very fundamental bug in a @@ -271,57 +278,46 @@ private class ToContinuation( * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to * completion. * - * The code in the [Runnable] portion of the class is registered as a [ListenableFuture] callback. - * See [run] for details. Both types are implemented by this object to save an allocation. + * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`. + * By documented contract, a [Future] has been cancelled if + * and only if its `isCancelled()` method returns true. + * + * Any error that occurs after successfully cancelling a [ListenableFuture] will be passed + * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit + * it to return an error after it is successfully cancelled. + * + * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully + * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to + * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the + * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that + * the [Deferred] pointing to the task will be used to observe any error outcome occurring after + * cancellation. + * + * This may be counterintuitive, but it maintains the error and cancellation contracts of both + * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point + * to the same running task. */ private class ListenableFutureCoroutine( - context: CoroutineContext, - private val future: SettableFuture -) : AbstractCoroutine(context), Runnable { + context: CoroutineContext +) : AbstractCoroutine(context) { - /** - * When registered as a [ListenableFuture] listener, cancels the returned [Coroutine] if - * [future] is successfully cancelled. By documented contract, a [Future] has been cancelled if - * and only if its `isCancelled()` method returns true. - * - * Any error that occurs after successfully cancelling a [ListenableFuture] - * created by submitting the returned object as a [Runnable] to an `Executor` will be passed - * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit - * it to return an error after it is successfully cancelled. - * - * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully - * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to - * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the - * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that - * the [Deferred] pointing to the task will be used to observe any error outcome occurring after - * cancellation. - * - * This may be counterintuitive, but it maintains the error and cancellation contracts of both - * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point - * to the same running task. - */ - override fun run() { - if (future.isCancelled) { - cancel() - } - } + // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture. + @JvmField val future = JobListenableFuture(this) override fun onCompleted(value: T) { - future.set(value) + future.complete(value) } - // TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation? override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!future.setException(cause) && !handled) { - // prevents loss of exception that was not handled by parent & could not be set to SettableFuture + if (!future.completeExceptionallyOrCancel(cause) && !handled) { + // prevents loss of exception that was not handled by parent & could not be set to JobListenableFuture handleCoroutineException(context, cause) } } } /** - * A [ListenableFuture] that delegates to an internal [DeferredListenableFuture], collaborating with - * it. + * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it. * * This setup allows the returned [ListenableFuture] to maintain the following properties: * @@ -333,130 +329,154 @@ private class ListenableFutureCoroutine( * - Fully correct cancellation and listener happens-after obeying [Future] and * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve. * The best way to be correct, especially given the fun corner cases from - * [AsyncFuture.setAsync], is to just use an [AsyncFuture]. - * - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture] - * around its input [deferred] as a state engine to establish happens-after-completion. This - * could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the + * [AbstractFuture.setFuture], is to just use an [AbstractFuture]. + * - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture] + * around coroutine's result as a state engine to establish happens-after-completion. This + * could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the * cost of the implementation's readability. */ -private class OuterFuture(private val deferred: Deferred): ListenableFuture { - val innerFuture = DeferredListenableFuture(deferred) +private class JobListenableFuture(private val jobToCancel: Job): ListenableFuture { + /** + * Serves as a state machine for [Future] cancellation. + * + * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and + * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to + * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned. + * + * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled]. + */ + private val auxFuture = SettableFuture.create() - // Adding the listener after initialization resolves partial construction hairpin problem. - // - // This invokeOnCompletion completes the innerFuture as `deferred` does. The innerFuture may - // have completed earlier if it got cancelled! See DeferredListenableFuture. - fun afterInit() { - deferred.invokeOnCompletion { - innerFuture.complete() - } - } + /** + * When the attached coroutine [isCompleted][Job.isCompleted] successfully + * its outcome should be passed to this method. + * + * This should succeed barring a race with external cancellation. + */ + fun complete(result: T): Boolean = auxFuture.set(result) + + /** + * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled] + * its outcome should be passed to this method. + * + * This method will map coroutine's exception into corresponding Future's exception. + * + * This should succeed barring a race with external cancellation. + */ + // CancellationException is wrapped into `Cancelled` to preserve original cause and message. + // All the other exceptions are delegated to SettableFuture.setException. + fun completeExceptionallyOrCancel(t: Throwable): Boolean = + if (t is CancellationException) auxFuture.set(Cancelled(t)) else auxFuture.setException(t) /** * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to * [Job.isCancelled]. * - * When done, this Future is cancelled if its innerFuture is cancelled, or if its delegate - * [deferred] is cancelled. Cancellation of [innerFuture] collaborates with this class. + * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture] + * contains [CancellationException]. * - * See [DeferredListenableFuture.cancel]. + * See [cancel]. */ override fun isCancelled(): Boolean { // This expression ensures that isCancelled() will *never* return true when isDone() returns false. // In the case that the deferred has completed with cancellation, completing `this`, its // reaching the "cancelled" state with a cause of CancellationException is treated as the - // same thing as innerFuture getting cancelled. If the Job is in the "cancelling" state and + // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and // this Future hasn't itself been successfully cancelled, the Future will return // isCancelled() == false. This is the only discovered way to reconcile the two different // cancellation contracts. - return isDone - && (innerFuture.isCancelled - || deferred.getCompletionExceptionOrNull() is kotlinx.coroutines.CancellationException) + return auxFuture.isCancelled || (isDone && Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled) } /** - * Waits for [innerFuture] to complete by blocking, then uses the [deferred] returned by that - * Future to get the `T` value `this` [ListenableFuture] is pointing to. This establishes - * happens-after ordering for completion of the [Deferred] input to [OuterFuture]. + * Waits for [auxFuture] to complete by blocking, then uses its `result` + * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException]. + * This establishes happens-after ordering for completion of the entangled coroutine. * - * `innerFuture` _must be complete_ in order for the [isDone] and [isCancelled] happens-after - * contract of [Future] to be correctly followed. If this method were to directly use - * _`this.deferred`_ instead of blocking on its `innerFuture`, the [Deferred] that this - * [ListenableFuture] is created from might be in an incomplete state when used by `get()`. + * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally. + * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine. + * + * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after + * contract of [Future] to be correctly followed. */ override fun get(): T { - return getInternal(innerFuture.get()) + return getInternal(auxFuture.get()) } /** See [get()]. */ override fun get(timeout: Long, unit: TimeUnit): T { - return getInternal(innerFuture.get(timeout, unit)) + return getInternal(auxFuture.get(timeout, unit)) } /** See [get()]. */ - private fun getInternal(deferred: Deferred): T { - if (deferred.isCancelled) { - val exception = deferred.getCompletionExceptionOrNull() - if (exception is kotlinx.coroutines.CancellationException) { - throw exception - } else { - throw ExecutionException(exception) - } - } else { - return deferred.getCompleted() - } + private fun getInternal(result: Any): T = if (result is Cancelled) { + throw CancellationException().initCause(result.exception) + } else { + // We know that `auxFuture` can contain either `T` or `Cancelled`. + @Suppress("UNCHECKED_CAST") + result as T } override fun addListener(listener: Runnable, executor: Executor) { - innerFuture.addListener(listener, executor) + auxFuture.addListener(listener, executor) } override fun isDone(): Boolean { - return innerFuture.isDone - } - - override fun cancel(mayInterruptIfRunning: Boolean): Boolean { - return innerFuture.cancel(mayInterruptIfRunning) - } -} - -/** - * Holds a delegate deferred, and serves as a state machine for [Future] cancellation. - * - * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and - * cancellation semantics. By using that type, the [OuterFuture] can delegate its semantics to - * _this_ `Future` `get()` the result in such a way that the `Deferred` is always complete when - * returned. - */ -private class DeferredListenableFuture( - private val deferred: Deferred -) : AbstractFuture>() { - - fun complete() { - set(deferred) + return auxFuture.isDone } /** - * Tries to cancel the task. This is fundamentally racy. + * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy. * - * For any given call to `cancel()`, if [deferred] is already completed, the call will complete - * this Future with it, and fail to cancel. Otherwise, the - * call to `cancel()` will try to cancel this Future: if and only if cancellation of this - * succeeds, [deferred] will have its [Deferred.cancel] called. + * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture] + * succeeds, [jobToCancel] will have its [Job.cancel] called. * - * This arrangement means that [deferred] _might not successfully cancel_, if the race resolves - * in a particular way. [deferred] may also be in its "cancelling" state while this + * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves + * in a particular way. [jobToCancel] may also be in its "cancelling" state while this * ListenableFuture is complete and cancelled. - * - * [OuterFuture] collaborates with this class to present a more cohesive picture and ensure - * that certain combinations of cancelled/cancelling states can't be observed. */ override fun cancel(mayInterruptIfRunning: Boolean): Boolean { - return if (super.cancel(mayInterruptIfRunning)) { - deferred.cancel() + // TODO: call jobToCancel.cancel() _before_ running the listeners. + // `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of + // `jobToCancel` until after auxFuture's listeners have already run. + // Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized. + return if (auxFuture.cancel(mayInterruptIfRunning)) { + jobToCancel.cancel() true } else { false } } + + override fun toString(): String = buildString { + append(super.toString()) + append("[status=") + if (isDone) { + try { + when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) { + is Cancelled -> append("CANCELLED, cause=[${result.exception}]") + else -> append("SUCCESS, result=[$result") + } + } catch (e: CancellationException) { + // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message. + append("CANCELLED") + } catch (e: ExecutionException) { + append("FAILURE, cause=[${e.cause}]") + } catch (t: Throwable) { + // Violation of Future's contract, should never happen. + append("UNKNOWN, cause=[${t.javaClass} thrown from get()]") + } + } else { + append("PENDING, delegate=[$auxFuture]") + } + } } + +/** + * A wrapper for `Coroutine`'s [CancellationException]. + * + * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately, + * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this + * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture]. + */ +private class Cancelled(@JvmField val exception: CancellationException) diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index a9a7f7ba9d..dc2d99d7f7 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guava import com.google.common.util.concurrent.* import kotlinx.coroutines.* import org.junit.* +import org.junit.Ignore import org.junit.Test import java.util.concurrent.* import java.util.concurrent.CancellationException @@ -315,6 +316,28 @@ class ListenableFutureTest : TestBase() { finish(4) } + @Test + @Ignore // TODO: propagate cancellation before running listeners. + fun testAsListenableFuturePropagatesCancellationBeforeRunningListeners() = runTest { + expect(1) + val deferred = async(context = Dispatchers.Unconfined) { + try { + delay(Long.MAX_VALUE) + } finally { + expect(3) // Cancelled. + } + } + val asFuture = deferred.asListenableFuture() + asFuture.addListener(Runnable { expect(4) }, MoreExecutors.directExecutor()) + assertFalse(asFuture.isDone) + expect(2) + asFuture.cancel(false) + assertTrue(asFuture.isDone) + assertTrue(asFuture.isCancelled) + assertFailsWith { deferred.await() } + finish(5) + } + @Test fun testFutureCancellation() = runTest { val future = awaitFutureWithCancel(true) @@ -333,15 +356,18 @@ class ListenableFutureTest : TestBase() { val outputCancellationException = assertFailsWith { asFuture.get() } - assertEquals(outputCancellationException.message, "Foobar") - assertTrue(outputCancellationException.cause is OutOfMemoryError) - assertEquals(outputCancellationException.cause?.message, "Foobaz") + val cause = outputCancellationException.cause + assertNotNull(cause) + assertEquals(cause.message, "Foobar") + assertTrue(cause.cause is OutOfMemoryError) + assertEquals(cause.cause?.message, "Foobaz") } @Test fun testNoFutureCancellation() = runTest { val future = awaitFutureWithCancel(false) assertFalse(future.isCancelled) + @Suppress("BlockingMethodInNonBlockingContext") assertEquals(42, future.get()) finish(4) } @@ -354,7 +380,7 @@ class ListenableFutureTest : TestBase() { assertTrue(asDeferredAsFuture.isCancelled) assertFailsWith { - val value: Int = asDeferredAsFuture.await() + asDeferredAsFuture.await() } } @@ -379,7 +405,7 @@ class ListenableFutureTest : TestBase() { assertTrue(asDeferred.isCancelled) assertFailsWith { - val value: Int = asDeferred.await() + asDeferred.await() } } @@ -433,7 +459,10 @@ class ListenableFutureTest : TestBase() { @Test fun testFutureCompletedWithNullFastPathAsDeferred() = runTest { val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) - val future = executor.submit(Callable { null }).also { it.get() } + val future = executor.submit(Callable { null }).also { + @Suppress("BlockingMethodInNonBlockingContext") + it.get() + } assertNull(future.asDeferred().await()) } @@ -494,8 +523,10 @@ class ListenableFutureTest : TestBase() { val future = future(Dispatchers.Unconfined) { try { delay(Long.MAX_VALUE) - } finally { + expectUnreached() + } catch (e: CancellationException) { expect(2) + throw e } } @@ -507,17 +538,19 @@ class ListenableFutureTest : TestBase() { @Test fun testExceptionOnExternalCancellation() = runTest(expected = {it is TestException}) { - expect(1) val result = future(Dispatchers.Unconfined) { try { + expect(1) delay(Long.MAX_VALUE) - } finally { - expect(2) + expectUnreached() + } catch (e: CancellationException) { + expect(3) throw TestException() } } + expect(2) result.cancel(true) - finish(3) + finish(4) } @Test @@ -540,12 +573,120 @@ class ListenableFutureTest : TestBase() { finish(3) } + /** This test ensures that we never pass [CancellationException] to [CoroutineExceptionHandler]. */ + @Test + fun testCancellationExceptionOnExternalCancellation() = runTest { + expect(1) + // No parent here (NonCancellable), so nowhere to propagate exception + val result = future(NonCancellable + Dispatchers.Unconfined) { + try { + delay(Long.MAX_VALUE) + } finally { + expect(2) + throw TestCancellationException() // this exception cannot be handled + } + } + assertTrue(result.cancel(true)) + finish(3) + } + + @Test + fun testCancellingFutureContextJobCancelsFuture() = runTest { + expect(1) + val supervisorJob = SupervisorJob() + val future = future(context = supervisorJob) { + expect(2) + try { + delay(Long.MAX_VALUE) + expectUnreached() + } catch (e: CancellationException) { + expect(4) + throw e + } + } + yield() + expect(3) + supervisorJob.cancel(CancellationException("Parent cancelled", TestException())) + supervisorJob.join() + assertTrue(future.isDone) + assertTrue(future.isCancelled) + val thrown = assertFailsWith { future.get() } + val cause = thrown.cause + assertNotNull(cause) + assertTrue(cause is CancellationException) + assertEquals("Parent cancelled", cause.message) + assertTrue(cause.cause is TestException) + finish(5) + } + + @Test + fun testFutureChildException() = runTest { + val future = future(context = NonCancellable + Dispatchers.Unconfined) { + val foo = async { delay(Long.MAX_VALUE); 42 } + val bar = async { throw TestException() } + foo.await() + bar.await() + } + future.checkFutureException() + } + + @Test + fun testFutureIsDoneAfterChildrenCompleted() = runTest { + expect(1) + val testException = TestException() + // Don't propagate exception to the test and use different dispatchers as we are going to block test thread. + val future = future(context = NonCancellable + Dispatchers.Default) { + val foo = async { + try { + delay(Long.MAX_VALUE) + 42 + } finally { + withContext(NonCancellable) { + delay(200) + } + } + } + foo.invokeOnCompletion { + expect(3) + } + val bar = async { throw testException } + foo.await() + bar.await() + } + yield() + expect(2) + // Blocking get should succeed after internal coroutine completes. + val thrown = assertFailsWith { future.get() } + expect(4) + assertEquals(testException, thrown.cause) + finish(5) + } + + @Test + @Ignore // TODO: propagate cancellation before running listeners. + fun testFuturePropagatesCancellationBeforeRunningListeners() = runTest { + expect(1) + val future = future(context = Dispatchers.Unconfined) { + try { + delay(Long.MAX_VALUE) + } finally { + expect(3) // Cancelled. + } + } + future.addListener(Runnable { expect(4) }, MoreExecutors.directExecutor()) + assertFalse(future.isDone) + expect(2) + future.cancel(false) + assertTrue(future.isDone) + assertTrue(future.isCancelled) + finish(5) + } + private inline fun ListenableFuture<*>.checkFutureException() { val e = assertFailsWith { get() } val cause = e.cause!! assertTrue(cause is T) } + @Suppress("SuspendFunctionOnCoroutineScope") private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): ListenableFuture { val latch = CountDownLatch(1) val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index b86076fca1..dcd837f7b2 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -1064,6 +1064,7 @@ public final class kotlinx/coroutines/flow/LintKt { } public abstract interface class kotlinx/coroutines/flow/MutableSharedFlow : kotlinx/coroutines/flow/FlowCollector, kotlinx/coroutines/flow/SharedFlow { + public abstract fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getSubscriptionCount ()Lkotlinx/coroutines/flow/StateFlow; public abstract fun resetReplayCache ()V public abstract fun tryEmit (Ljava/lang/Object;)Z diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index f98f6a529c..314eea350b 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -5,8 +5,12 @@ apply plugin: 'org.jetbrains.kotlin.multiplatform' apply from: rootProject.file("gradle/compile-jvm-multiplatform.gradle") apply from: rootProject.file("gradle/compile-common.gradle") + +if (rootProject.ext.native_targets_enabled) { + apply from: rootProject.file("gradle/compile-native-multiplatform.gradle") +} + apply from: rootProject.file("gradle/compile-js-multiplatform.gradle") -apply from: rootProject.file("gradle/compile-native-multiplatform.gradle") apply from: rootProject.file('gradle/publish-npm-js.gradle') /* ========================================================================== @@ -52,8 +56,11 @@ static boolean isNativeDarwin(String name) { return ["ios", "macos", "tvos", "wa static boolean isNativeOther(String name) { return ["linux", "mingw"].any { name.startsWith(it) } } defineSourceSet("concurrent", ["common"]) { it in ["jvm", "native"] } -defineSourceSet("nativeDarwin", ["native"]) { isNativeDarwin(it) } -defineSourceSet("nativeOther", ["native"]) { isNativeOther(it) } + +if (rootProject.ext.native_targets_enabled) { + defineSourceSet("nativeDarwin", ["native"]) { isNativeDarwin(it) } + defineSourceSet("nativeOther", ["native"]) { isNativeOther(it) } +} /* ========================================================================== */ @@ -129,7 +136,7 @@ def configureNativeSourceSetPreset(name, preset) { } // :KLUDGE: Idea.active: Configure platform libraries for native source sets when working in IDEA -if (Idea.active) { +if (Idea.active && rootProject.ext.native_targets_enabled) { def manager = project.ext.hostManager def linuxPreset = kotlin.presets.linuxX64 def macosPreset = kotlin.presets.macosX64 @@ -183,6 +190,13 @@ jvmTest { exclude '**/*StressTest.*' } systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test + + // TODO: JVM IR generates different stacktrace so temporary disable stacktrace tests + if (rootProject.ext.jvm_ir_enabled) { + filter { + excludeTestsMatching('kotlinx.coroutines.exceptions.StackTraceRecovery*') + } + } } jvmJar { diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt index b7deaccb72..6ef1a8daea 100644 --- a/kotlinx-coroutines-core/common/src/Builders.common.kt +++ b/kotlinx-coroutines-core/common/src/Builders.common.kt @@ -133,6 +133,10 @@ private class LazyDeferredCoroutine( * which means that if the original [coroutineContext], in which `withContext` was invoked, * is cancelled by the time its dispatcher starts to execute the code, * it discards the result of `withContext` and throws [CancellationException]. + * + * The cancellation behaviour described above is enabled if and only if the dispatcher is being changed. + * For example, when using `withContext(NonCancellable) { ... }` there is no change in dispatcher and + * this call will not be cancelled neither on entry to the block inside `withContext` nor on exit from it. */ public suspend fun withContext( context: CoroutineContext, diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 020d00a32c..5f21299e58 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -1151,8 +1151,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren override fun invoke(cause: Throwable?) { parent.continueCompleting(state, child, proposedUpdate) } - override fun toString(): String = - "ChildCompletion[$child, $proposedUpdate]" } private class AwaitContinuation( @@ -1350,6 +1348,7 @@ internal abstract class JobNode( override val isActive: Boolean get() = true override val list: NodeList? get() = null override fun dispose() = (job as JobSupport).removeNode(this) + override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]" } internal class NodeList : LockFreeLinkedListHead(), Incomplete { @@ -1384,7 +1383,6 @@ private class InvokeOnCompletion( private val handler: CompletionHandler ) : JobNode(job) { override fun invoke(cause: Throwable?) = handler.invoke(cause) - override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]" } private class ResumeOnCompletion( @@ -1392,7 +1390,6 @@ private class ResumeOnCompletion( private val continuation: Continuation ) : JobNode(job) { override fun invoke(cause: Throwable?) = continuation.resume(Unit) - override fun toString() = "ResumeOnCompletion[$continuation]" } private class ResumeAwaitOnCompletion( @@ -1411,7 +1408,6 @@ private class ResumeAwaitOnCompletion( continuation.resume(state.unboxState() as T) } } - override fun toString() = "ResumeAwaitOnCompletion[$continuation]" } internal class DisposeOnCompletion( @@ -1419,7 +1415,6 @@ internal class DisposeOnCompletion( private val handle: DisposableHandle ) : JobNode(job) { override fun invoke(cause: Throwable?) = handle.dispose() - override fun toString(): String = "DisposeOnCompletion[$handle]" } private class SelectJoinOnCompletion( @@ -1431,7 +1426,6 @@ private class SelectJoinOnCompletion( if (select.trySelect()) block.startCoroutineCancellable(select.completion) } - override fun toString(): String = "SelectJoinOnCompletion[$select]" } private class SelectAwaitOnCompletion( @@ -1443,7 +1437,6 @@ private class SelectAwaitOnCompletion( if (select.trySelect()) job.selectAwaitCompletion(select, block) } - override fun toString(): String = "SelectAwaitOnCompletion[$select]" } // -------- invokeOnCancellation nodes @@ -1463,7 +1456,6 @@ private class InvokeOnCancelling( override fun invoke(cause: Throwable?) { if (_invoked.compareAndSet(0, 1)) handler.invoke(cause) } - override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]" } internal class ChildHandleNode( @@ -1472,7 +1464,6 @@ internal class ChildHandleNode( ) : JobCancellingNode(parent), ChildHandle { override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) - override fun toString(): String = "ChildHandle[$childJob]" } // Same as ChildHandleNode, but for cancellable continuation @@ -1483,7 +1474,5 @@ internal class ChildContinuation( override fun invoke(cause: Throwable?) { child.parentCancelled(child.getContinuationCancellationCause(job)) } - override fun toString(): String = - "ChildContinuation[$child]" } diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 8edd2b310c..87bd43714d 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -635,6 +635,7 @@ internal abstract class AbstractChannel( cancelInternal(cause) final override fun cancel(cause: CancellationException?) { + if (isClosedForReceive) return // Do not create an exception if channel is already cancelled cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled")) } diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt index a75d466199..9ceb77ddc2 100644 --- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt @@ -26,6 +26,7 @@ internal open class ChannelCoroutine( } final override fun cancel(cause: CancellationException?) { + if (isClosedForReceive) return // Do not create an exception if channel is already cancelled cancelInternal(cause ?: defaultCancellationException()) } diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index 762cdcad1b..63b285abc3 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -201,7 +201,7 @@ public fun BroadcastChannel.asFlow(): Flow = flow { */ @Deprecated( message = "Use shareIn operator and the resulting SharedFlow as a replacement for BroadcastChannel", - replaceWith = ReplaceWith("shareIn(scope, 0, SharingStarted.Lazily)"), + replaceWith = ReplaceWith("this.shareIn(scope, SharingStarted.Lazily, 0)"), level = DeprecationLevel.WARNING ) public fun Flow.broadcastIn( diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index a3075b927a..75f9e710f7 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -147,6 +147,17 @@ public interface SharedFlow : Flow { * Use the `MutableSharedFlow(...)` constructor function to create an implementation. */ public interface MutableSharedFlow : SharedFlow, FlowCollector { + /** + * Emits a [value] to this shared flow, suspending on buffer overflow if the shared flow was created + * with the default [BufferOverflow.SUSPEND] strategy. + * + * See [tryEmit] for a non-suspending variant of this function. + * + * This method is **thread-safe** and can be safely invoked from concurrent coroutines without + * external synchronization. + */ + override suspend fun emit(value: T) + /** * Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was * emitted successfully. When this function returns `false`, it means that the call to a plain [emit] @@ -155,6 +166,9 @@ public interface MutableSharedFlow : SharedFlow, FlowCollector { * A shared flow configured with a [BufferOverflow] strategy other than [SUSPEND][BufferOverflow.SUSPEND] * (either [DROP_OLDEST][BufferOverflow.DROP_OLDEST] or [DROP_LATEST][BufferOverflow.DROP_LATEST]) never * suspends on [emit], and thus `tryEmit` to such a shared flow always returns `true`. + * + * This method is **thread-safe** and can be safely invoked from concurrent coroutines without + * external synchronization. */ public fun tryEmit(value: T): Boolean @@ -190,6 +204,9 @@ public interface MutableSharedFlow : SharedFlow, FlowCollector { * supported, and throws an [UnsupportedOperationException]. To reset a [MutableStateFlow] * to an initial value, just update its [value][MutableStateFlow.value]. * + * This method is **thread-safe** and can be safely invoked from concurrent coroutines without + * external synchronization. + * * **Note: This is an experimental api.** This function may be removed or renamed in the future. */ @ExperimentalCoroutinesApi diff --git a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt index 675233765d..2691d4bd87 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt @@ -38,7 +38,7 @@ public enum class SharingCommand { /** * A strategy for starting and stopping the sharing coroutine in [shareIn] and [stateIn] operators. * - * This interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and + * This functional interface provides a set of built-in strategies: [Eagerly], [Lazily], [WhileSubscribed], and * supports custom strategies by implementing this interface's [command] function. * * For example, it is possible to define a custom strategy that starts the upstream only when the number @@ -46,11 +46,9 @@ public enum class SharingCommand { * that it looks like a built-in strategy on the use-site: * * ``` - * fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int): SharingStarted = - * object : SharingStarted { - * override fun command(subscriptionCount: StateFlow): Flow = - * subscriptionCount - * .map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP } + * fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int) = + * SharingStarted { subscriptionCount: StateFlow -> + * subscriptionCount.map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP } * } * ``` * @@ -74,7 +72,7 @@ public enum class SharingCommand { * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running). * The failure of the `command` flow cancels the sharing coroutine and the upstream flow. */ -public interface SharingStarted { +public fun interface SharingStarted { public companion object { /** * Sharing is started immediately and never stops. diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index a9a4ed3d24..45641ca92d 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -160,6 +160,9 @@ public interface MutableStateFlow : StateFlow, MutableSharedFlow { * The current value of this state flow. * * Setting a value that is [equal][Any.equals] to the previous one does nothing. + * + * This property is **thread-safe** and can be safely updated from concurrent coroutines without + * external synchronization. */ public override var value: T @@ -170,6 +173,9 @@ public interface MutableStateFlow : StateFlow, MutableSharedFlow { * This function use a regular comparison using [Any.equals]. If both [expect] and [update] are equal to the * current [value], this function returns `true`, but it does not actually change the reference that is * stored in the [value]. + * + * This method is **thread-safe** and can be safely invoked from concurrent coroutines without + * external synchronization. */ public fun compareAndSet(expect: T, update: T): Boolean } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index bbdebd08b9..d276e5100a 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -137,7 +137,7 @@ internal fun zipImpl(flow: Flow, flow2: Flow, transform: sus } catch (e: AbortFlowException) { e.checkOwnership(owner = this@unsafeFlow) } finally { - if (!second.isClosedForReceive) second.cancel() + second.cancel() } } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt b/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt index 7a70fbf7f2..9aa240d8a9 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt @@ -2,12 +2,13 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("unused") +@file:Suppress("unused", "INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlin.coroutines.* +import kotlin.internal.InlineOnly /** * Applying [cancellable][Flow.cancellable] to a [SharedFlow] has no effect. @@ -79,4 +80,61 @@ public fun FlowCollector<*>.cancel(cause: CancellationException? = null): Unit = replaceWith = ReplaceWith("currentCoroutineContext()") ) public val FlowCollector<*>.coroutineContext: CoroutineContext - get() = noImpl() \ No newline at end of file + get() = noImpl() + +@Deprecated( + message = "SharedFlow never completes, so this operator has no effect.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this") +) +@InlineOnly +public inline fun SharedFlow.catch(noinline action: suspend FlowCollector.(cause: Throwable) -> Unit): Flow = + (this as Flow).catch(action) + +@Deprecated( + message = "SharedFlow never completes, so this operator has no effect.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this") +) +@InlineOnly +public inline fun SharedFlow.retry( + retries: Long = Long.MAX_VALUE, + noinline predicate: suspend (cause: Throwable) -> Boolean = { true } +): Flow = + (this as Flow).retry(retries, predicate) + +@Deprecated( + message = "SharedFlow never completes, so this operator has no effect.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this") +) +@InlineOnly +public inline fun SharedFlow.retryWhen(noinline predicate: suspend FlowCollector.(cause: Throwable, attempt: Long) -> Boolean): Flow = + (this as Flow).retryWhen(predicate) + +@Suppress("DeprecatedCallableAddReplaceWith") +@Deprecated( + message = "SharedFlow never completes, so this terminal operation never completes.", + level = DeprecationLevel.WARNING +) +@InlineOnly +public suspend inline fun SharedFlow.toList(): List = + (this as Flow).toList() + +@Suppress("DeprecatedCallableAddReplaceWith") +@Deprecated( + message = "SharedFlow never completes, so this terminal operation never completes.", + level = DeprecationLevel.WARNING +) +@InlineOnly +public suspend inline fun SharedFlow.toSet(): Set = + (this as Flow).toSet() + +@Suppress("DeprecatedCallableAddReplaceWith") +@Deprecated( + message = "SharedFlow never completes, so this terminal operation never completes.", + level = DeprecationLevel.WARNING +) +@InlineOnly +public suspend inline fun SharedFlow.count(): Int = + (this as Flow).count() diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt index 1f4942a358..caf87f143e 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt @@ -23,7 +23,7 @@ internal const val MODE_ATOMIC = 0 * **DO NOT CHANGE THE CONSTANT VALUE**. It is being into the user code from [suspendCancellableCoroutine]. */ @PublishedApi -internal const val MODE_CANCELLABLE = 1 +internal const val MODE_CANCELLABLE: Int = 1 /** * Cancellable dispatch mode for [suspendCancellableCoroutineReusable]. diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 9bb2ce3d29..f9362cff11 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -16,7 +16,7 @@ internal open class ScopeCoroutine( context: CoroutineContext, @JvmField val uCont: Continuation // unintercepted continuation ) : AbstractCoroutine(context, true), CoroutineStackFrame { - final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame? + final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame final override fun getStackTraceElement(): StackTraceElement? = null final override val isScopedCoroutine: Boolean get() = true diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 99c54f8417..81d3745e62 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -339,7 +339,6 @@ internal class SelectBuilderImpl( if (trySelect()) resumeSelectWithException(job.getCancellationException()) } - override fun toString(): String = "SelectOnCancelling[${this@SelectBuilderImpl}]" } @PublishedApi diff --git a/kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt b/kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt index edcf123b0a..d691c725b5 100644 --- a/kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt +++ b/kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt @@ -36,7 +36,7 @@ internal class UnbiasedSelectBuilderImpl(uCont: Continuation) : val clauses = arrayListOf<() -> Unit>() @PublishedApi - internal fun handleBuilderException(e: Throwable) = instance.handleBuilderException(e) + internal fun handleBuilderException(e: Throwable): Unit = instance.handleBuilderException(e) @PublishedApi internal fun initSelectResult(): Any? { diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 73aaab5fbf..707c4640bc 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -201,7 +201,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { // try lock val update = if (owner == null) EMPTY_LOCKED else Empty(owner) if (_state.compareAndSet(state, update)) { // locked - cont.resume(Unit) + // TODO implement functional type in LockCont as soon as we get rid of legacy JS + cont.resume(Unit) { unlock(owner) } return@sc } } diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 84b7f4f8a2..c342bb3009 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -172,7 +172,7 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se if (addAcquireToQueue(cont)) return@sc val p = _availablePermits.getAndDecrement() if (p > 0) { // permit acquired - cont.resume(Unit) + cont.resume(Unit, onCancellationRelease) return@sc } } @@ -206,9 +206,8 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se // On CAS failure -- the cell must be either PERMIT or BROKEN // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair - // The following resume must always succeed, since continuation was not published yet and we don't have - // to pass onCancellationRelease handle, since the coroutine did not suspend yet and cannot be cancelled - cont.resume(Unit) + /// This continuation is not yet published, but still can be cancelled via outer job + cont.resume(Unit, onCancellationRelease) return true } assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt index 9020f5f311..42cdb1e19f 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt @@ -187,11 +187,9 @@ class ShareInTest : TestBase() { } @Suppress("TestFunctionName") - private fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int): SharingStarted = - object : SharingStarted { - override fun command(subscriptionCount: StateFlow): Flow = - subscriptionCount - .map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP } + private fun SharingStarted.Companion.WhileSubscribedAtLeast(threshold: Int) = + SharingStarted { subscriptionCount -> + subscriptionCount.map { if (it >= threshold) SharingCommand.START else SharingCommand.STOP } } private class FlowState { diff --git a/kotlinx-coroutines-core/common/test/sync/MutexTest.kt b/kotlinx-coroutines-core/common/test/sync/MutexTest.kt index c5d0ccf187..4f428bc4b0 100644 --- a/kotlinx-coroutines-core/common/test/sync/MutexTest.kt +++ b/kotlinx-coroutines-core/common/test/sync/MutexTest.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.sync +import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlin.test.* @@ -106,4 +107,4 @@ class MutexTest : TestBase() { assertFalse(mutex.holdsLock(firstOwner)) assertFalse(mutex.holdsLock(secondOwner)) } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index bd16f49af0..58792ced31 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -41,7 +41,6 @@ private class CancelFutureOnCompletion( // interruption flag and it will cause spurious failures elsewhere future.cancel(false) } - override fun toString() = "CancelFutureOnCompletion[$future]" } private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler() { diff --git a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt index 97f9978139..d08f41bf8a 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt @@ -11,13 +11,13 @@ import kotlinx.coroutines.* private typealias Node = LockFreeLinkedListNode @PublishedApi -internal const val UNDECIDED = 0 +internal const val UNDECIDED: Int = 0 @PublishedApi -internal const val SUCCESS = 1 +internal const val SUCCESS: Int = 1 @PublishedApi -internal const val FAILURE = 2 +internal const val FAILURE: Int = 2 @PublishedApi internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE") diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 62cf80f7f8..ad61224b52 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -721,7 +721,19 @@ internal class CoroutineScheduler( } assert { localQueue.size == 0 } workerCtl.value = PARKED // Update value once - while (inStack()) { // Prevent spurious wakeups + /* + * inStack() prevents spurious wakeups, while workerCtl.value == PARKED + * prevents the following race: + * + * - T2 scans the queue, adds itself to the stack, goes to rescan + * - T2 suspends in 'workerCtl.value = PARKED' line + * - T1 pops T2 from the stack, claims workerCtl, suspends + * - T2 fails 'while (inStack())' check, goes to full rescan + * - T2 adds itself to the stack, parks + * - T1 unparks T2, bails out with success + * - T2 unparks and loops in 'while (inStack())' + */ + while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups if (isTerminated || state == WorkerState.TERMINATED) break tryReleaseCpu(WorkerState.PARKING) interrupted() // Cleanup interruptions diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt index 1fe0d8386d..3a55f8c4f2 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt @@ -77,4 +77,4 @@ class BlockingCoroutineDispatcherMixedStealingStressTest : SchedulerTestBase() { cpuBlocker.await() } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt index bb713b258d..027f3c514d 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt @@ -90,4 +90,21 @@ class MutexStressTest : TestBase() { } } } -} \ No newline at end of file + + @Test + fun testShouldBeUnlockedOnCancellation() = runTest { + val mutex = Mutex() + val n = 1000 * stressTestMultiplier + repeat(n) { + val job = launch(Dispatchers.Default) { + mutex.lock() + mutex.unlock() + } + mutex.withLock { + job.cancel() + } + job.join() + assertFalse { mutex.isLocked } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt index 374a1e3d7c..2ceed64b95 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt @@ -2,7 +2,7 @@ package kotlinx.coroutines.sync import kotlinx.coroutines.* import org.junit.Test -import kotlin.test.assertEquals +import kotlin.test.* class SemaphoreStressTest : TestBase() { @Test @@ -90,4 +90,21 @@ class SemaphoreStressTest : TestBase() { } } } + + @Test + fun testShouldBeUnlockedOnCancellation() = runTest { + val semaphore = Semaphore(1) + val n = 1000 * stressTestMultiplier + repeat(n) { + val job = launch(Dispatchers.Default) { + semaphore.acquire() + semaphore.release() + } + semaphore.withPermit { + job.cancel() + } + job.join() + assertTrue { semaphore.availablePermits == 1 } + } + } } diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md index 5525f9129f..fc9637a9d5 100644 --- a/kotlinx-coroutines-debug/README.md +++ b/kotlinx-coroutines-debug/README.md @@ -61,7 +61,7 @@ stacktraces will be dumped to the console. ### Using as JVM agent Debug module can also be used as a standalone JVM agent to enable debug probes on the application startup. -You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.4.1.jar`. +You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.4.2.jar`. Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines. When used as Java agent, `"kotlinx.coroutines.debug.enable.creation.stack.trace"` system property can be used to control [DebugProbes.enableCreationStackTraces] along with agent startup. @@ -138,8 +138,8 @@ Coroutine "coroutine#2":DeferredCoroutine{Active}@289d1c02, state: SUSPENDED Dumping only deferred "coroutine#2":DeferredCoroutine{Active}, continuation is SUSPENDED at line kotlinx.coroutines.DeferredCoroutine.await$suspendImpl(Builders.common.kt:99) - "coroutine#3":DeferredCoroutine{Active}, continuation is SUSPENDED at line ExampleKt.computeOne(Example.kt:14) - "coroutine#4":DeferredCoroutine{Active}, continuation is SUSPENDED at line ExampleKt.computeTwo(Example.kt:19) + "coroutine#3":DeferredCoroutine{Active}, continuation is SUSPENDED at line ExampleKt.computeOne(Example.kt:14) + "coroutine#4":DeferredCoroutine{Active}, continuation is SUSPENDED at line ExampleKt.computeTwo(Example.kt:19) ``` ### Status of the API diff --git a/kotlinx-coroutines-debug/build.gradle b/kotlinx-coroutines-debug/build.gradle index ab7f28c6a8..2a11bbb38c 100644 --- a/kotlinx-coroutines-debug/build.gradle +++ b/kotlinx-coroutines-debug/build.gradle @@ -28,6 +28,16 @@ dependencies { api "net.java.dev.jna:jna-platform:$jna_version" } +// TODO: JVM IR generates different stacktrace so temporary disable stacktrace tests +if (rootProject.ext.jvm_ir_enabled) { + tasks.named('test', Test) { + filter { +// excludeTest('kotlinx.coroutines.debug.CoroutinesDumpTest', 'testCreationStackTrace') + excludeTestsMatching('kotlinx.coroutines.debug.DebugProbesTest') + } + } +} + jar { manifest { attributes "Premain-Class": "kotlinx.coroutines.debug.AgentPremain" diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md index afcd4a3b3b..6022955254 100644 --- a/kotlinx-coroutines-test/README.md +++ b/kotlinx-coroutines-test/README.md @@ -9,7 +9,7 @@ This package provides testing utilities for effectively testing coroutines. Add `kotlinx-coroutines-test` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.2' } ``` diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md index 9c1251fe21..c2bbff22a8 100644 --- a/ui/coroutines-guide-ui.md +++ b/ui/coroutines-guide-ui.md @@ -110,7 +110,7 @@ Add dependencies on `kotlinx-coroutines-android` module to the `dependencies { . `app/build.gradle` file: ```groovy -implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1" +implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2" ``` You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties index c4aa67585e..98898bc9dc 100644 --- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties @@ -21,7 +21,7 @@ org.gradle.jvmargs=-Xmx1536m # org.gradle.parallel=true kotlin_version=1.4.0 -coroutines_version=1.4.1 +coroutines_version=1.4.2 android.useAndroidX=true android.enableJetifier=true diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties index c4aa67585e..98898bc9dc 100644 --- a/ui/kotlinx-coroutines-android/example-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties @@ -21,7 +21,7 @@ org.gradle.jvmargs=-Xmx1536m # org.gradle.parallel=true kotlin_version=1.4.0 -coroutines_version=1.4.1 +coroutines_version=1.4.2 android.useAndroidX=true android.enableJetifier=true diff --git a/ui/kotlinx-coroutines-javafx/build.gradle.kts b/ui/kotlinx-coroutines-javafx/build.gradle.kts index 112441e0ed..e850e3940e 100644 --- a/ui/kotlinx-coroutines-javafx/build.gradle.kts +++ b/ui/kotlinx-coroutines-javafx/build.gradle.kts @@ -3,13 +3,20 @@ */ plugins { - id("org.openjfx.javafxplugin") + id("org.openjfx.javafxplugin") version "0.0.9" } javafx { version = version("javafx") modules = listOf("javafx.controls") - configuration = "compile" + configuration = "compileOnly" +} + +sourceSets { + test.configure { + compileClasspath += configurations.compileOnly + runtimeClasspath += configurations.compileOnly + } } val JDK_18: String? by lazy {