Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在 Kotlin Coroutines 中使用 launch、async、Channel 和 Flow #5

Open
cnwutianhao opened this issue Jan 5, 2024 · 0 comments
Open
Labels

Comments

@cnwutianhao
Copy link
Owner

在 Kotlin 中,有多种实现异步处理(线程)的方法。老实说,很多开发人员不知道在使用 Kotlin 开发应用程序时应该使用什么。下面,我将详细介绍并说明 launch、async、Channel 和 Flow 的用途。

一、操作环境

Kotlin:1.9.21

二、目标

阐明协程异步处理的正确使用

三、各种方法的定义

通过查阅资料,我整理了一个表格:

方法 方法定义 返回值 概述
launch
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job
Job 无法返回任意结果的协程。
async
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>
Deferred 返回任意单个结果。
Channel#send
public suspend fun send(element: E)
将结果发送到 Channel。
Channel#receive
public suspend fun receive(): E
Channel 实例化时在泛型中指定的类型 从 Channel 中检索数据。
flow function (Flow Builder)
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
Flow 对象 当构建器生成一个实例时,它会返回由泛型指定的数据类型的数据。
FlowCollector#emit
public suspend fun emit(value: T)
向 Flow 发送数值。
Flow#collect
public suspend fun Flow<*>.collect(): Unit
在 Flow 接收数据的那个地方,输出由 collect 扩展函数收集到的值。

下面通过实际情况来理解它们。

四、launch

  1. 概述

    launch函数会创建并执行一个线程。由于它在主线程之外运行,所以需要等待launch的线程完成。可以使用一个属性来检查线程的状态,需要巧妙地利用它来等待线程的结束。

    检查状态的属性如下:

    方法 解释
    Job#isActive 如果线程正在运行且尚未取消,则返回 true。
    Job#isCancelled 当线程被取消时返回 true。
    Job#isCompleted 当线程完成执行时返回 true。
  2. 流程图

    graph LR
        A[开始] --> B[启动协程]
        B --> C[执行协程代码块]
        C --> D[异步操作1]
        C --> E[异步操作2]
        D --> F[处理异步操作1结果]
        E --> G[处理异步操作2结果]
        F --> H[结束]
        G --> H[结束]
    
    Loading
  3. 示例代码

    import kotlinx.coroutines.CoroutineScope
    import kotlinx.coroutines.Dispatchers
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.launch
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun main() {
        val job = CoroutineScope(Dispatchers.Default).launch {
            repeat(5) { count ->
                delay(500)
                println(stories[count])
            }
        }
    
        while (!job.isCompleted) {
            Thread.sleep(100)
        }
    }
    庙里有个老和尚在讲故事
    1.从前有座山
    2.山里有座庙
    3.庙里有个盆
    4.盆里有个钵

五、async

  1. 概述

    与 launch 的区别在于它可以返回一个返回值。返回值类型没有特殊限制,因此可以返回任何值。此外,在使用 launch 的情况下,我们使用属性来判断处理是否已完成。而在使用 async 的情况下,返回值是一个 Deferred 对象,它提供了一种等待处理完成的方法。此外,还可以进行取消操作,这是一种协作式的取消。

    以下是接收方法结束和返回值的方法:

    方法 解释
    Deferred#await 等待异步处理完成。
    Deferred#getComplete 可以获取返回值。返回类型没有限制,因此可以指定任何类型。
  2. 流程图

    graph LR
        A[开始] --> B[启动协程]
        B --> C[执行异步操作]
        C --> D[等待异步操作完成]
        D --> E[获取异步操作结果]
        E --> F[结束]
    
    Loading
  3. 示例代码

    Basic:

    import kotlinx.coroutines.*
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun main() {
        val job = CoroutineScope(Dispatchers.Default).launch {
            repeat(5) { count ->
                val deffer = async {
                    delay(500)
                    stories[count]
                }
    
                println("等待第${count + 1}个异步完成")
                deffer.await()
                println(deffer.getCompleted())
            }
        }
    
        // 等待协程完成
        while (!job.isCompleted) {
            Thread.sleep(100)
        }
    }
    等待第1个异步完成
    庙里有个老和尚在讲故事
    等待第2个异步完成
    1.从前有座山
    等待第3个异步完成
    2.山里有座庙
    等待第4个异步完成
    3.庙里有个盆
    等待第5个异步完成
    4.盆里有个钵

    Cancel:

    import kotlinx.coroutines.*
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun main() {
        val job = CoroutineScope(Dispatchers.Default).launch {
            repeat(5) { count ->
    
                val deffer = async {
                    // 第二次尝试时被取消
                    if (count == 2) {
                        cancel("取消")
                    }
                    delay(500)
                    stories[count]
                }
    
                println("等待第${count + 1}个异步完成")
                deffer.await()
                println(deffer.getCompleted())
            }
        }
    
        // 等待协程完成
        while (!job.isCompleted || !job.isCancelled) {
            Thread.sleep(100)
        }
    }
    等待第1个异步完成
    庙里有个老和尚在讲故事
    等待第2个异步完成
    1.从前有座山
    等待第3个异步完成

六、Channel

  1. 概述

    Channel是一个可在主程序和协程中使用的容器。它不仅仅是一个容器,还可以让接收数据的线程等待,并让将数据放入Channel的线程等待发送,因为Channel中包含了数据。我们需要对数据进行流量控制,例如让线程等待。这就是Channel被称为热流的原因,它的特点是无论是否有接收者(取出值的处理)都会执行发送过程(放入数据)。

    因此,需要采取以下预防措施。如果发送的次数没有收到数据,就会发生内存泄漏。换句话说,发送方的处理会继续进行,直到收到为止!

    以下是发送和接收的方法:

    方法 解释
    Channel#send 向 Channel 发送数据。
    Channel#receive 从 Channel 接收数据。
  2. 流程图

    graph LR
        A[开始] --> B[创建Channel]
        B --> C[发送数据至Channel]
        C --> D[接收Channel中的数据]
        D --> E[处理接收到的数据]
        E --> F[循环接收数据]
        F --> D[继续接收数据]
        E --> G[结束]
    
    Loading
  3. 示例代码

    import kotlinx.coroutines.channels.Channel
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.launch
    import kotlinx.coroutines.runBlocking
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun main() = runBlocking {
        // 初始化 Channel(只能发送和接收 String)
        val channel = Channel<String>()
    
        // 调用异步处理
        launch {
            stories.forEach {
                // 发送字符串到 Channel
                channel.send(it)
                println("发送:$it")
                // 1秒等待
                delay(1000)
            }
        }
    
        // 重复5次,就像发送到 Channel 5次一样
        repeat(5) {
            // 从 Channel 接收
            val story = channel.receive()
            println("接收:$story")
        }
    
        println("结束")
    }
    发送:庙里有个老和尚在讲故事
    接收:庙里有个老和尚在讲故事
    发送:1.从前有座山
    接收:1.从前有座山
    发送:2.山里有座庙
    接收:2.山里有座庙
    发送:3.庙里有个盆
    接收:3.庙里有个盆
    发送:4.盆里有个钵
    接收:4.盆里有个钵
    结束

七、Channel Buffer

  1. 概述

    Channel 是数据的容器,但到目前为止它只能存储单条数据。但是,可以通过设置缓冲区来存储多个数据。

  2. 流程图

    graph LR
        A[开始] --> B[创建Channel]
        B --> C[设置Channel缓冲区大小]
        C --> D[发送数据至Channel]
        D --> E[将数据放入缓冲区]
        E --> F[接收Channel中的数据]
        F --> G[从缓冲区获取数据]
        G --> H[处理接收到的数据]
        H --> I[循环接收数据]
        I --> F[继续接收数据]
        H --> J[结束]
    
    Loading
  3. 示例代码

    实例化 Channel 时,可以通过将整数传递给构造函数来设置缓冲区。

    import kotlinx.coroutines.channels.Channel
    import kotlinx.coroutines.launch
    import kotlinx.coroutines.runBlocking
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun main() = runBlocking {
        // 初始化 Channel(只能发送和接收 String)
        val channel = Channel<String>(5)
    
        // 调用异步处理
        launch {
            stories.forEach {
                // 发送字符串到 Channel
                channel.send(it)
                println("发送:$it")
            }
        }
    
        // 重复5次,就像发送到 Channel 5次一样
        repeat(5) {
            Thread.sleep(1000)
            // 从 Channel 接收
            val story = channel.receive()
            println("接收:$story")
        }
        println("结束")
    }
    发送:庙里有个老和尚在讲故事
    发送:1.从前有座山
    发送:2.山里有座庙
    发送:3.庙里有个盆
    发送:4.盆里有个钵
    接收:庙里有个老和尚在讲故事
    接收:1.从前有座山
    接收:2.山里有座庙
    接收:3.庙里有个盆
    接收:4.盆里有个钵
    结束

八、Channel Cancel&Close

  1. 概述

    前面提到过 Channel 可能会导致内存泄漏,但是可以通过使用 cancel 和 close 来避免这种情况。

    Cancel 和 Close 的方法说明如下:

    方法 解释
    Channel#cancel 取消接收元素。 关闭 Channel 并删除所有缓冲的发送元素。 如果取消,将会发生异常(java.util.concurrent.CancellationException)。
    Channel#close 关闭 Channel。从现在开始,即使你调用它,它也会返回 false。
  2. 流程图

    Cancel:

    graph LR
        A[开始] --> B[创建Channel]
        B --> C[发送数据至Channel]
        C --> D[接收Channel中的数据]
        D --> E[处理接收到的数据]
        E --> F[检查取消状态]
        F --> G[取消Channel接收操作]
        G --> H[结束]
    
    Loading

    Close:

    graph LR
        A[开始] --> B[创建Channel]
        B --> C[发送数据至Channel]
        C --> D[接收Channel中的数据]
        D --> E[处理接收到的数据]
        E --> F[检查Channel是否关闭]
        F --> G[关闭Channel]
        G --> H[结束]
    
    Loading
  3. 示例代码

    Cancel:

    import kotlinx.coroutines.channels.Channel
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.launch
    import kotlinx.coroutines.runBlocking
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun main() = runBlocking {
        // 初始化 Channel(只能发送和接收 String)
        val channel = Channel<String>()
    
        // 调用异步处理
        launch {
            stories.forEach {
    
                // 发送字符串到 Channel
                channel.send(it)
                println("发送:$it")
                // 0.5秒等待
                delay(500)
    
                // 中途被打断
                if (stories.indexOf(it) == 2) {
                    channel.cancel()
                }
            }
        }
    
        // 重复5次,就像发送到 Channel 5次一样
        repeat(5) {
            // 从 Channel 接收
            val story = channel.receive()
            println("接收:$story")
        }
    
        println("结束")
    }
    发送:庙里有个老和尚在讲故事
    接收:庙里有个老和尚在讲故事
    发送:1.从前有座山
    接收:1.从前有座山
    发送:2.山里有座庙
    接收:2.山里有座庙
    Exception in thread "main" java.util.concurrent.CancellationException: Channel was cancelled

    Close:

    import kotlinx.coroutines.channels.Channel
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.launch
    import kotlinx.coroutines.runBlocking
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun main() = runBlocking {
        // 初始化 Channel(只能发送和接收 String)
        val channel = Channel<String>()
    
        // 调用异步处理
        launch {
            stories.forEach {
    
                // 发送字符串到 Channel
                channel.send(it)
                println("发送:$it")
                // 0.5秒等待
                delay(500)
    
                // 中途被打断
                if (stories.indexOf(it) == 2) {
                    channel.close()
                }
            }
        }
    
        // 重复5次,就像发送到 Channel 5次一样
        repeat(5) {
            // 从 Channel 接收
            val story = channel.receive()
            println("接收:$story")
        }
    
        println("结束")
    }
    发送:庙里有个老和尚在讲故事
    接收:庙里有个老和尚在讲故事
    发送:1.从前有座山
    接收:1.从前有座山
    发送:2.山里有座庙
    接收:2.山里有座庙
    Exception in thread "main" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed

九、Flow

  1. 概述

    Flow 称为冷流,其行为与热流 Channel 有很大不同。与 Channel 不同的是,除非确定了接收处理​​(collect方法),否则 Flow 不会执行发送处理(emit方法)。结果,Flow 将不会运行。因此,不会发生内存泄漏。此外,取消操作实现了协作式取消。

  2. 流程图

    graph LR
        A[开始] --> B[创建Flow]
        B --> C[定义Flow的数据流]
        C --> D[收集Flow的数据]
        D --> E[处理收集到的数据]
        E --> F[循环收集数据]
        F --> D[继续收集数据]
        E --> G[结束]
    
    Loading
  3. 示例代码

    Flow 的基本使用:

    import kotlinx.coroutines.delay
    import kotlinx.coroutines.flow.flow
    import kotlinx.coroutines.launch
    import kotlinx.coroutines.runBlocking
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun teller() = flow {
        repeat(stories.count()) {
            Thread.sleep(1000)
            emit(stories[it])
            println("${it + 1}次调用了emit")
        }
    }
    
    fun main() = runBlocking {
        launch {
            for (i in 1..3) {
                println("${i}次在main方法中进行延迟处理")
                delay(100)
            }
        }
    
        val collector = teller()
    
        collector.collect { value ->
            println(value)
            Thread.sleep(100)
        }
    
        println("结束")
    }
    庙里有个老和尚在讲故事
    第1次调用了emit
    1.从前有座山
    第2次调用了emit
    2.山里有座庙
    第3次调用了emit
    3.庙里有个盆
    第4次调用了emit
    4.盆里有个钵
    第5次调用了emit
    结束
    第1次在main方法中进行延迟处理
    第2次在main方法中进行延迟处理
    第3次在main方法中进行延迟处理

    协作式取消:

    import kotlinx.coroutines.flow.flow
    import kotlinx.coroutines.runBlocking
    import kotlinx.coroutines.withTimeoutOrNull
    
    val stories = arrayOf(
        "庙里有个老和尚在讲故事",
        "1.从前有座山",
        "2.山里有座庙",
        "3.庙里有个盆",
        "4.盆里有个钵"
    )
    
    fun teller() = flow {
    
        repeat(stories.count()) {
            Thread.sleep(1000)
            emit(stories[it])
            println("${it + 1}次调用了emit")
        }
    }
    
    fun main() = runBlocking {
    
        val collector = teller()
    
        // 如果时间超过2.5秒,则取消
        withTimeoutOrNull(2500) {
            collector.collect { value ->
                println(value)
                Thread.sleep(100)
            }
        }
    
        println("结束")
    }
    庙里有个老和尚在讲故事
    第1次调用了emit
    1.从前有座山
    第2次调用了emit
    结束

十、总结

总结以上所述内容并考虑使用场景时,可以如下所示:

类型 使用场景
launch 当你想要启动一个协程而不需要返回值时
async 当需要获得任意返回值并且希望并行执行多个任务并等待它们完成时
Channel 当需要实时处理具有相同类型的多个返回值时
Flow 当需要确保以相同类型的多个返回值必须完全处理时
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant