Skip to content

Commit

Permalink
Kotlin/Native multithreading
Browse files Browse the repository at this point in the history
* Provides newSingleThreadedContext.
* Provides Dispatchers.Main on iOS, Dispatchers.Default everywhere.
* Coroutine references (Job), all kinds of channels and StateFlow are shareable across workers.
* Each individual coroutine is confined to a single worker.
* Update Dispatchers docs to account for native-mt changes.
* Multithreaded support in select expression.

Additional fixes:
* Fixed broadcast builder with different thread
* Fixed adding a child to a frozen parent job

Fixes #462
Fixes #470
Fixes #765
Fixes #1645
Fixes #1751
Fixes #1828
Fixes #1831
Fixes #1764
Fixes #2064
Fixes #2025
Fixes #2226
Fixes #2138
Fixes #2263
  • Loading branch information
elizarov authored and qwwdfsad committed Oct 27, 2020
1 parent 83916b3 commit 4b9d15b
Show file tree
Hide file tree
Showing 109 changed files with 3,117 additions and 915 deletions.
27 changes: 17 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,24 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli
### Native

[Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as
[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.9/jar)
(follow the link to get the dependency declaration snippet).
[`kotlinx-coroutines-core`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core/1.3.9/jar)
(follow the link to get the dependency declaration snippet). **Kotlin/Native requires Gradle version 6.0 or later**
to resolve that dependency properly into the corresponding platform-specific artifacts.

Only single-threaded code (JS-style) on Kotlin/Native is currently supported.
Kotlin/Native supports only Gradle version 4.10 and you need to enable Gradle metadata in your
`settings.gradle` file:
Kotlin/Native does not generally provide binary compatibility between versions.
You should use the same version of Kotlin/Native compiler as was used to build `kotlinx.coroutines`.

```groovy
enableFeaturePreview('GRADLE_METADATA')
```
Kotlin/Native does not support free sharing of mutable objects between threads as on JVM, so several
limitations apply to using coroutines on Kotlin/Native.
See [Sharing and background threads on Kotlin/Native](kotlin-native-sharing.md) for details.

Some functions like [newSingleThreadContext] and [runBlocking] are available only for Kotlin/JVM and Kotlin/Native
and are not available on Kotlin/JS. In order to access them from the code that is shared between JVM and Native
you need to enable granular metadata (aka HMPP) in your `gradle.properties` file:

Since Kotlin/Native does not generally provide binary compatibility between versions,
you should use the same version of Kotlin/Native compiler as was used to build `kotlinx.coroutines`.
```properties
kotlin.mpp.enableGranularSourceSetsMetadata=true
```

## Building

Expand Down Expand Up @@ -252,6 +257,8 @@ The `develop` branch is pushed to `master` during release.
[Promise.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.js.-promise/await.html
[promise]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/promise.html
[Window.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/org.w3c.dom.-window/as-coroutine-dispatcher.html
[newSingleThreadContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-single-thread-context.html
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
<!--- INDEX kotlinx.coroutines.flow -->
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
[_flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

# Kotlin
version=1.3.9-SNAPSHOT
version=1.3.9-native-mt-2-SNAPSHOT
group=org.jetbrains.kotlinx
kotlin_version=1.4.0

Expand Down
184 changes: 184 additions & 0 deletions kotlin-native-sharing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Sharing and background threads on Kotlin/Native

## Preview disclaimer

This is a preview release of sharing and backgrounds threads for coroutines on Kotlin/Native.
Details of this implementation will change in the future. See also [Known Problems](#known-problems)
at the end of this document.

## Introduction

Kotlin/Native provides an automated memory management that works with mutable data objects separately
and independently in each thread that uses Kotlin/Native runtime. Sharing data between threads is limited:

* Objects to be shared between threads can be [frozen](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.native.concurrent/freeze.html).
This makes the whole object graph deeply immutable and allows to share it between threads.
* Mutable objects can be wrapped into [DetachedObjectGraph](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.native.concurrent/-detached-object-graph/index.html)
on one thread and later reattached onto the different thread.

This introduces several differences between Kotlin/JVM and Kotlin/Native in terms of coroutines that must
be accounted for when writing cross-platform applications.

## Threads and dispatchers

An active coroutine has a mutable state. It cannot migrate from thread to thread. A coroutine in Kotlin/Native
is always bound to a specific thread. Coroutines that are detached from a thread are currently not supported.

`kotlinx.coroutines` provides ability to create single-threaded dispatchers for background work
via [newSingleThreadContext] function that is available for both Kotlin/JVM and Kotlin/Native. It is not
recommended shutting down such a dispatcher on Kotlin/Native via [SingleThreadDispatcher.close] function
while the application still working unless you are absolutely sure all coroutines running in this
dispatcher have completed. Unlike Kotlin/JVM, there is no backup default thread that might
execute cleanup code for coroutines that might have been still working in this dispatcher.

For interoperability with code that is using Kotlin/Native
[Worker](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.native.concurrent/-worker/index.html)
API you can get a reference to single-threaded dispacher's worker using its [SingleThreadDispatcher.worker] property.

A [Default][Dispatchers.Default] dispatcher on Kotlin/Native contains a single background thread.
This is the dispatcher that is used by default in [GlobalScope].

> This limitation may be lifted in the future with the default dispatcher becoming multi-threaded and/or
> its coroutines becoming isolated from each other, so please do not assume that different coroutines running
> in the default dispatcher can share mutable data between themselves.
A [Main][Dispatchers.Main] dispatcher is
properly defined for all Darwin (Apple) targets, refers to the main thread, and integrates
with Core Foundation main event loop.
On Linux and Windows there is no platform-defined main thread, so [Main][Dispatchers.Main] simply refers
to the current thread that must have been either created with `newSingleThreadContext` or be running
inside [runBlocking] function.

The main thread of application has two options on using coroutines.
A backend application's main thread shall use [runBlocking].
A UI application running on one Apple's Darwin OSes shall run
its main queue event loop using `NSRunLoopRun`, `UIApplicationMain`, or ` NSApplicationMain`.
For example, that is how you can have main dispatcher in your own `main` function:

```kotlin
fun main() {
val mainScope = MainScope()
mainScope.launch { /* coroutine in the main thread */ }
CFRunLoopRun() // run event loop
}
```

## Switching threads

You switch from one dispatcher to another using a regular [withContext] function. For example, a code running
on the main thread might do:

```kotlin
// in the main thead
val result = withContext(Dispatcher.Default) {
// now executing in background thread
}
// now back to the main thread
result // use result here
```

If you capture a reference to any object that is defined in the main thread outside of `withContext` into the
block inside `withContext` then it gets automatically frozen for transfer from the main thread to the
background thread. Freezing is recursive, so you might accidentally freeze unrelated objects that are part of
main thread's mutable state and get
[InvalidMutabilityException](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.native.concurrent/-invalid-mutability-exception/index.html)
later in unrelated parts of your code.
The easiest way to trouble-shoot it is to mark the objects that should not have been frozen using
[ensureNeverFrozen](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.native.concurrent/ensure-never-frozen.html)
function so that you get exception in the very place they were frozen that would pinpoint the corresponding
`withContext` call in your code.

The `result` of `withContext` call can be used after `withContext` call. It gets automatically frozen
for transfer from background to the main thread, too.

A disciplined use of threads in Kotlin/Native is to transfer only immutable data between the threads.
Such code works equally well both on Kotlin/JVM and Kotlin/Native.

> Note: freezing only happens when `withContext` changes from one thread to another. If you call
> `withContext` and execution stays in the same thread, then there is not freezing and mutable data
> can be captured and operated on as usual.
The same rule on freezing applies to coroutines launched with any builder like [launch], [async], [produce], etc.

## Communication objects

All core communication and synchronization objects in `kotlin.coroutines` such as
[Job], [Deferred], [Channel], [BroadcastChannel], [Mutex], and [Semaphore] are _shareable_.
It means that they can be frozen for sharing with another thread and still continue to operate normally.
Any object that is transferred via a frozen (shared) [Deferred] or any [Channel] is also automatically frozen.

Similar rules apply to [Flow]. When an instance of a [Flow] itself is shared (frozen), then all the references that
are captured in to the lambdas in this flow operators are frozen. Regardless of whether the flow instance itself
was frozen, by default, the whole flow operates in a single thread, so mutable data can freely travel down the
flow from emitter to collector. However, when [flowOn] operator is used to change the thread, then
objects crossing the thread boundary get frozen.

Note, that if you protect any piece of mutable data with a [Mutex] or a [Semaphore] then it does not
automatically become shareable. In order to share mutable data you have to either
wrap it into [DetachedObjectGraph](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.native.concurrent/-detached-object-graph/index.html)
or use atomic classes ([AtomicInt](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.native.concurrent/-atomic-int/index.html), etc).

## Cyclic garbage

Code working in a single thread on Kotlin/Native enjoys fully automatic memory management. Any object graph that
is not referenced anymore is automatically reclaimed even if it contains cyclic chains of references. This does
not extend to shared objects, though. Frozen immutable objects can be freely shared, even if then can contain
reference cycles, but shareable [communication objects](#communication-objects) leak if a reference cycle
to them appears. The easiest way to demonstrate it is to return a reference to a [async] coroutine as its result,
so that the resulting [Deferred] contains a reference to itself:

```kotlin
// from the main thread call coroutine in a background thread or otherwise share it
val result = GlobalScope.async {
coroutineContext // return its coroutine context that contains a self-reference
}
// now result will not be reclaimed -- memory leak
```

A disciplined use of communication objects to transfer immutable data between coroutines does not
result in any memory reclamation problems.

## Shared channels are resources

All kinds of [Channel] and [BroadcastChannel] implementations become _resources_ on Kotlin/Native when shared.
They must be closed and fully consumed in order for their memory to be reclaimed. When they are not shared, they
can be dropped in any state and will be reclaimed by memory manager, but a shared channel generally will not be reclaimed
unless closed and consumed.

This does not affect [Flow], because it is a cold abstraction. Even though [Flow] internally uses channels to transfer
data between threads, it always properly closes these channels when completing collection of data.

## Known problems

The current implementation is tested and works for all kinds of single-threaded cases and simple scenarios that
transfer data between two thread like shown in [Switching Threads](#switching-threads) section. However, it is known
to leak memory in scenarios involving concurrency under load, for example when multiple children coroutines running
in different threads are simultaneously cancelled.

<!--- MODULE kotlinx-coroutines-core -->
<!--- INDEX kotlinx.coroutines -->
[newSingleThreadContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-single-thread-context.html
[SingleThreadDispatcher.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-single-thread-dispatcher/-single-thread-dispatcher/close.html
[SingleThreadDispatcher.worker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-single-thread-dispatcher/-single-thread-dispatcher/worker.html
[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
[GlobalScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-global-scope/index.html
[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/launch.html
[async]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-deferred/index.html
<!--- INDEX kotlinx.coroutines.flow -->
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
<!--- INDEX kotlinx.coroutines.channels -->
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html
<!--- INDEX kotlinx.coroutines.selects -->
<!--- INDEX kotlinx.coroutines.sync -->
[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[Semaphore]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-semaphore/index.html
<!--- END -->

6 changes: 2 additions & 4 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
protected fun onStart ()V
public final fun resumeWith (Ljava/lang/Object;)V
public final fun start (Lkotlinx/coroutines/CoroutineStart;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public final fun start (Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;)V
}

public final class kotlinx/coroutines/AwaitKt {
Expand Down Expand Up @@ -61,6 +60,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
public fun getContext ()Lkotlin/coroutines/CoroutineContext;
public fun getContinuationCancellationCause (Lkotlinx/coroutines/Job;)Ljava/lang/Throwable;
public synthetic fun getDelegate$kotlinx_coroutines_core ()Lkotlin/coroutines/Continuation;
public final fun getResult ()Ljava/lang/Object;
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
public fun initCancellability ()V
Expand Down Expand Up @@ -156,7 +156,7 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
public abstract fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
public fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
Expand Down Expand Up @@ -225,8 +225,6 @@ public final class kotlinx/coroutines/CoroutineStart : java/lang/Enum {
public static final field DEFAULT Lkotlinx/coroutines/CoroutineStart;
public static final field LAZY Lkotlinx/coroutines/CoroutineStart;
public static final field UNDISPATCHED Lkotlinx/coroutines/CoroutineStart;
public final fun invoke (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V
public final fun invoke (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)V
public final fun isLazy ()Z
public static fun valueOf (Ljava/lang/String;)Lkotlinx/coroutines/CoroutineStart;
public static fun values ()[Lkotlinx/coroutines/CoroutineStart;
Expand Down
20 changes: 1 addition & 19 deletions kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,6 @@ public abstract class AbstractCoroutine<in T>(
return "\"$coroutineName\":${super.nameString()}"
}

/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
* during construction. Second, it starts the coroutine based on [start] parameter:
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public fun start(start: CoroutineStart, block: suspend () -> T) {
initParentJob()
start(block, this)
}

/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
Expand All @@ -154,7 +137,6 @@ public abstract class AbstractCoroutine<in T>(
* * [LAZY] does nothing.
*/
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
startAbstractCoroutine(start, receiver, this, block)
}
}
Loading

0 comments on commit 4b9d15b

Please sign in to comment.