Kotlin 1.5 协程笔记

yuan
·
// in CoroutineScope
launch {
    val users = loadContributorsSuspend(req) //  suspend 方法
    updateResults(users, startTime)
}

suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> {
    ......
}

这里的 launch 方法是 CoroutineScope 的一个方法,它能够创建一个协程,Kotlin 里协程被视作“轻量化”的线程,它是可以挂起 (suspend) 的。suspend 修饰的方法叫作 suspend 方法,例如这里的 loadContributorsSuspend()。suspend 方法只能在协程中被调用,或者被另一个 suspend 方法调用。

在一个协程内调用 suspend 方法(例如调用一个发起网络请求的 suspend 方法),该协程会被挂起,变为 suspended 状态,并被从当前线程中移除,保存在内存当中,当前线程可以继续执行其它任务。当该协程准备好继续执行时(例如网络请求完成并返回),协程会重新加入一个线程(不一定是之前的那个线程),继续执行内部剩余部分的代码。

runBlocking, launchasync 都能用于创建一个协程,区别在于:

  • runBlocking 主要是用于桥接起普通方法和 suspend 方法,桥接起 blocking world 和 non-blocking world. 通常被用作创建最顶层的协程。它的调用会引起当前线程阻塞;

  • async()launch() 一样,是 CoroutineScope 的一个方法,它也能够创建一个协程,并且会返回一个 Deferred 对象。Deferred 是一个泛型类,可以对其调用 await() 方法用于获取 async() 所返回的数据。对 await() 方法调用会引起调用该方法的协程(也就是父协程)挂起。对于 Collection<Deferred<T>> 类,Kotlin 还提供了一个 awaitAll() 方法,用于获取一组 Deferred 对象的返回结果:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val deferreds: List<Deferred<Int>> = (1..3).map {
        async {
            delay(1000L * it)
            println("Loading $it")
            it
        }
    }
    val sum = deferreds.awaitAll().sum()
    println("$sum")
}

当然,Collection<Job<T>> 也有与之对应的 joinAll() 方法。

  • launch() 方法也能够创建一个协程,并且返回一个 Job 对象。JobDeferred 的父类。如果父协程需要等待该协程执行完毕,可以对其调用 join() 方法达到与 Deferred.await() 类似的效果,但区别是 join() 方法不返回值。 当然,因为 DeferredJob 的子类,所以 Deferred 也可以调用 join() 方法,只是这种情况下没有必要使用 Deferred,直接用 Job 即可。

  • async() 会将异常包装在 Deferred 对象里返回,launch() 则会抛出未捕获的异常。

要注意的是,launch()async() 是创建协程,并返回一个 Job, 这里的 Job 和协程不是一回事。在 JVM 上,Kotlin 的协程本质是一个被管理着的线程。

runBlocking(), launch(), async() 以及后面会提到的 withContext() coroutineScope() 等方法,它们都接收一个 block 参数,该参数被定义为 CoroutineScope 的一个扩展方法,所以在这些 block 内部可以直接调用 CoroutineScope 的实例方法,其中就包括 launch(), async().

Channel

Kotlin 的 coroutine 包里提供了个叫作 Channel 的类,用于在各协程之间通信。Channel 里可以存放任意类型的数据。提供的操作就是生产者方 send, 消费者方 receive, 很常见的生产-消费模式。

Kotlin 里的 Channel 是个接口,继承了 SendChannelReceiveChannel,前者有一个 send() 和一个 close() 方法,后者有一个 receive() 方法。send() 方法和 receive() 方法都是 suspend 方法。

这里主要记录一下各种类型 Channel 的异同。

  • Unlimited Channel: 容量无限,因此 send() 方法不会被挂起,但如果内存不足的话会抛出异常 OutOfMemoryException;当 channel 是空的的时候,receive() 方法会被挂起;

  • Buffered Channel: 设定了容量的 Channel,send() 方法在 channel 满的时候会挂起;

  • Rendezvous Channel: 相当于容量为 0 的 Buffered Channel。当 send() 方法被调用,并且 receive() 方法没有被调用时,send() 方法会被挂起,直到 receive() 方法被调用;同样,当 receive() 方法被调用,并且 send() 方法没有被调用的时候,receive() 方法会被挂起;

  • Conflated Channel: send() 方法连续调用,并且 channel 里的元素没有被消费的话,先前的元素会被后来的元素所覆盖,消费方只能取到最后放进去的元素。

Kotlin 提供了一个工厂方法 Channel() 用来创建不同类型的 Channel。不传任何参数时,默认创建的是 Rendezvous Channel;当参数为任意正整数时,创建的是 Buffered Channel,参数为该 Channel 的容量;当参数为 Channel.UNLIMITED, Channel.CONFLATED 等用于指定类型的常量时,创建的就是对应类型的 Channel。

延迟执行

launch()async() 的 start 参数可以用来指定协程的启动模式。async(start = CoroutineStart.LAZY) 可以让协程延迟执行,直到 start() 方法被调用。

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
            val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
            // some computation
            one.start() // start the first one
            two.start() // start the second one
            println("all coroutine started")
            println("The answer is ${one.await() + two.await()}")
        }
        println("Completed in $time ms")
    }
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    println("do something useful one")
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    println("do something useful two")
    return 29
}

运行这段代码会发现总运行时间为 1 秒左右。但是如果把代码中的两处 start() 调用删掉,会发现用时为 2 秒左右。因为 await() 会挂起当前协程,使两个协程按 await() 的调用顺序执行。再同时删除掉 async() 的参数的话,运行时间又变回 1 秒左右了。

取消和超时

fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion 
    println("main: Now I can quit.")    
}

这里的 job 会在执行第 3 次时被取消。join() 方法的调用,是为了确保当前协程退出前,job 里的代码执行完毕(包括资源释放)。cancel() 方法不能在 join() 之后调用,因为先调用 join(),当前协程会等待 job 执行完毕,而 job 内部耗时很长,如果等待其执行完毕,那么后面的 cancel() 就完全没有必要了。而如果 job 内部是个死循环,则 cancel() 方法永远执行不到。

由于 cancel()join() 常常同时使用,Kotlin 另外提供了个 cancelAndJoin() 的快捷方法。

kotlinx.coroutines 包里的所有 suspend 方法都是可取消的 (cancellable),这些方法会检查协程的取消动作 (cancellation),并在取消时抛出 CancellationException 异常。但如果一个协程内没有任何 suspend 方法的调用,则无法取消。可以理解为需要被取消的是协程内部的 suspend 方法。

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (i < 5) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

上面的代码会一直执行到 5 次打印结束。如果你想让这种没有 suspend 方法调用的代码可取消,一种办法是加入一个 suspend 方法,yield() 通常可以起到这个作用。另一个办法是加入 isActive 作为判断条件。

while (i < 5 && isActive) {
        ........
}

前面提到取消协程会抛出 CancellationException 异常,如果需要在协程取消时释放资源,可以把释放资源的代码写在 finally 块里。

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("job: I'm running finally")
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

任何企图在 finally 里执行的 suspend 方法,在当被捕获的异常为 CancellationException 时,都会同样抛出 CancellationException,这通常不是什么问题。因为通常来说,释放资源的代码都是非阻塞 (non-blocking) 的。但在某些罕见的情况下,如果你需要在 finally 里调用 suspend 方法,可以使用 withContext(NonCancellable) {...}

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        withContext(NonCancellable) {
            println("job: I'm running finally")
            delay(1000L)
            println("job: And I've just delayed for 1 sec because I'm non-cancellable")
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

如果想为某 suspend 方法的调用设置超时时间,可以使用 withTimeout() 方法

withTimeout(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
}

该方法会在其内部的 suspend 方法执行超时时将其取消,并抛出 TimeoutCancellationException,该异常是 CancellationException 的子类。但这里有个区别是:前者被引发时,会输出异常堆栈信息,后者不会有任何输出。

因为我们会为 CancellationExceptionfinally 块里编写资源释放的代码,所以没有必要再专门为 TimeoutCancellationException 写一遍。如果有专门针对该子类异常的处理逻辑,可以写在 try {...} catch (e: TimeoutCancellationException) {...} 里边,或者使用 withTimeoutOrNull() 方法,该方法在超时时不会抛出异常,取而代之的是返回一个 null 值。

结构化并发 (Structured Concurrency)

协程是可以嵌套的,每一个协程自己界定了一个范围 (CoroutineScope),限制了在其范围内的子协程的寿命。父协程被取消,所有子协程都将被取消;父协程的结束,也必须等待所有子协程结束。

coroutineScope() 方法可用于在不创建协程的情况下创建一个 CoroutineScope,继承了当前上下文 (CoroutineContext) 中除了 Job 元素以外的所有的元素 (Element). 它是个 suspend 方法,会挂起当前的协程,直至内部协程执行完毕。

fun main() {
    runBlocking {
        coroutineScope {
            launch {
                delay(2000)
                println("- World 2")
            }
            launch {
                delay(1000)
                println("- World")
            }
            println("Hello")
        }
        println("Done")
    }
}

输出

Hello
- World
- World 2
Done

可以看到打印 Hello 之后没有接着打印 Done,而是等待内部的两个协程执行完。如果去掉这个 coroutingScope 的调用,则会输出:

Hello
Done
- World
- World 2

Kotlin 另外提供了一个 CoroutineScope() 方法,它与 coroutineScope() 的区别在于,前者是用于创建并返回一个 CoroutineScope 实例,可以用一个变量指向它,在需要的地方调用它的 launch() 等方法来创建协程。后者则是直接在当前上下文当中就地创建一个 CoroutineScope (并没有创建协程)并执行其中的代码。

考虑如下代码

// The result type of somethingUsefulOneAsync is Deferred<Int>
@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

// The result type of somethingUsefulTwoAsync is Deferred<Int>
@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

// note that we don't have `runBlocking` to the right of `main` in this example
fun main() {
    val time = measureTimeMillis {
        // we can initiate async actions outside of a coroutine
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        // but waiting for a result must involve either suspending or blocking.
        // here we use `runBlocking { ... }` to block the main thread while waiting for the result
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

这里在 runBlocking() 之外使用了 GlobalScope.async(),它创建的协程与其它协程没有任何层级关系。假如 two 执行出错,抛出异常,one 会仍然在后台执行。在结构化并发的写法中,这种情况可以避免。

suspend fun concurrentSum(): Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        println("The answer is ${concurrentSum()}")
    }
    println("Completed in $time ms")
}

这里使用 coroutineScope() 创建了一个 CoroutineScope, 在其内部创建的协程算作该 coroutine scope 的子协程,如果 concurrentSum() 内部有任何代码抛出异常,它内部创建的所有协程都会被取消。这也是“结构化并发”的一个特点。结构化并发有助于管理协程的生命周期和资源占用。

Context 和 Dispatcher

Kotlin 的协程总是运行在一个叫 CoroutineContext 的环境里,CoroutineContext 里包含的主要元素 (element) 有协程本身的 Job, CoroutineDispatcher, CoroutineExceptionHandlerCoroutineName.

CoroutineDispatcher 是实现了 CoroutineContext 的一个抽象类,它可以用来决定协程该运行在哪个线程或哪个线程池里的线程上。

launchasync 接受一个可选的 CoroutineContext 参数,该参数用于显式地指定协程及其上下文 (Context) 中的其它元素 (element) 所使用的 dispatcher. 不指定参数的时候,新的协程从父级的 CoroutineScope 里继承 context 和 dispatcher。

Dispatchers.Default 是 CoroutineDispatcher 的其中一种实现。当你给 async() 或者 launch() 方法传递参数 Dispatchers.Default 时,它会从 JVM 的一个共享线程池里获取空闲的线程。这个共享线程池的线程数量与 CPU 的核心数量相等。但是当 CPU 只有一个核心时,线程池里会有两个线程。只要并发量足够大,便可以在日志中打印线程 ID 来观察到效果。同时我们会看到,启动协程的线程,和协程由 suspended 状态恢复后所在的线程很多时候并不是同一个线程。

实际上 Dispatchers.Default 比较适用于 CPU 计算密集型的任务。另外 Kotlin 还提供了 Dispatchers.IO,从名字上很容易看出,这个 Dispatcher 适用于 IO 密集型的任务。如果希望协程只在主线程当中运行,可以使用 Dispatchers.MainMainScope() 工厂方法创建的 CoroutineScope 默认使用 Dispatchers.Main.

还有一个 withContext() 方法,它的作用是创建一个协程,然后把父协程挂起,等待代码块执行完毕,并返回结果。其作用可以认为与 async(){}.await() 效果相同。实际上,当你写出 async(CoroutineDispatcher){}.await() 这样的代码时,编译器会提示你把这段代码用 withContext(CoroutineDispatcher) 来替代。

使用 Dispatchers.Unconfined 时,新的协程会先在创建协程的那个线程中执行,直到遇到协程内部的 suspend 方法,之后的代码会在运行 suspend 代码的那个线程上执行。总之就是很随意。适用于不消耗 CPU,又不更新任何共享数据(比如 UI 状态)的操作。(这是官方说法。我暂时也想不到适用于哪里。)

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch { // context of the parent, main runBlocking coroutine
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }    
}
// Unconfined      : I'm working in thread main @coroutine#2
// main runBlocking: I'm working in thread main @coroutine#3
// Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor@coroutine#2
// main runBlocking: After delay in thread main @coroutine#3

Flow

Flow 的用法有点类似 sequence, 也有个 flow builder.

fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) }
}

不同的是 flow 得运行在协程里。确切地说,是 flow 的 terminal operator (示例里的 collect())得运行在协程里。至于什么是 terminal operator 后面会提到。

RestrictsSuspension

另外,sequence 有个特点是:在其内部无法执行自身以外的 suspend 方法。想要理解这一点,需要先搞清楚 RestrictsSuspension 这个注解的作用。官方文档里写着:

Classes and interfaces marked with this annotation are restricted when used as receivers for extension suspend functions. These suspend extensions can only invoke other member or extension suspend functions on this particular receiver and are restricted from calling arbitrary suspension functions.

意思是,被该注解标记的类和接口,它们的扩展 suspend 方法 (extension suspend function) 无法调用定义于其外部的任何其它 suspend 方法。用一段代码来理解:

@RestrictsSuspension
class RestrictsSuspensionClass {
    suspend fun a() {
        println("RestrictsSuspensionClass fun a")
        NormalClass().a() // (1)
        delay(1) // (2)
    }
}

suspend fun RestrictsSuspensionClass.b() {
    println("RestrictsSuspensionClass fun b")
    NormalClass().a() // (3)
    this.a() // (4)
    delay(1) // (5)
}

class NormalClass {
    suspend fun a() {
        println("NormalClass fun a")
    }
}

这段代码里,(1) 和 (2) 能通过编译,是因为这两处的调用在 RestrictsSuspensionClass 的 fun a() 内部,这个 fun a() 不是一个扩展方法,它对其它 suspend 方法的调用不受限制。

(3) 无法通过编译,是因为 fun RestrictsSuspensionClass.b() 是一个扩展方法,并且 NormalClass().a() 是一个定义于 NormalClass 之上的 suspend 方法。

(4) 也是一个 suspend 方法,但它是 RestrictsSuspensionClass 自己的方法,所以可以通过编译。

(5) 无法通过编译的原因和 (3) 一样,delay() 是一个定义于 RestrictsSuspensionClass 之外的 suspend 方法。

现在再回头看,sequence 这个 sequence builder 接受的参数是个 lambda,并且该 lambda 被定义为 SequenceScope 的成员方法。而 SequenceScope 本身又被 RestrictsSuspension 这个注解标记,所以无法调用 delay() 方法。如果需要类似 sequence 的特性,而又想在其内部调用任意 suspend 方法,则需要考虑使用 flow.

和 sequence 一样,flow 也是惰性的,在调用 collect() 等 terminal operator 之前,其内部的代码不会被执行。

flow 的取消

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

执行这段代码,会发现它只打印了 1 和 2.

创建 flow 的方法

除了上述示例中用到的 flow builder 以外,还有以下创建 flow 的方式:

val flow1  = flowOf(1, 2, 3)
(1..3).asFlow()
sequenceOf(1, 2, 3).asFlow()

Transformation operator

flow 有许多 transformation operators, 包括在 Iterable 里常见的 map(), filter() 等。还有个更基础的 transform()

(1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }

Size-limiting operator

flow 还有个 take() 方法叫作 size-limiting operator, 它在取得足够多的结果之后会取消该 flow 的执行,同时像协程的取消操作一样,flow 的取消会抛出一个异常。

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}

Terminal operator

toList(), toSet(), first(), reduce(), fold() 这些从 flow 里取结果的方法,包括前面提到的 collect(),叫作 terminal operator.

Flow 的上下文

Flow 执行的上下文默认为调用该 flow 的 terminal operator 时所在的上下文,这种特性叫作“上下文保留” (context preservation).

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}

fun <T> log(msg: T) = println("[${Thread.currentThread().name}] $msg")

如果想要改变 flow 的执行上下文,需要使用 flowOn() 方法。

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") }
}

fun <T> log(msg: T) = println("[${Thread.currentThread().name}] $msg")

缓冲 (Buffering)

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000) // pretend we are asynchronously waiting 1 second
        log("processing before emit")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(2000) // pretend we are processing it for 2 seconds
            log(value) 
        } 
    }   
    println("Collected in $time ms")
}

这段代码在执行时,先 delay 1 秒,接着执行 collector (emit 里的代码),耗时 2 秒,再 delay 1 秒,接着执行 collector … 总耗时 9 秒。如果用上 buffer(),会开启另一个协程去执行 collector,然后直接返回,执行下一个 1 秒的 delay。这样由于后两个 1 秒的 delay 花费的时间包含在了更花时间的 emit 里,最终总耗时 7 秒:

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().buffer().collect { value -> 
            delay(2000) // pretend we are processing it for 2 seconds
            log(value) 
        } 
    }   
    println("Collected in $time ms")
}

合流 (Conflation)

conflate() 的表现和 buffer() 差不多,区别在于,使用 buffer() 时,如果单个 collector 耗时太长,后续的所有 collector 会一直等待,直到挨个执行完毕。而使用 conflate() 时,单个 collector 如果耗时太长,只会有最新的一个 collector 在等待,后续 emit 的 collector 会覆盖之前的。行为跟前面提到的 conflated channel 一样。

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .conflate() // conflate emissions, don't process each one
            .collect { value ->
                delay(2000) // pretend we are processing it for 2 seconds
                log(value)
            }
    }
    log("Collected in $time ms")
}

处理最后一个值

Flow 还提供了一系列 xxxLatest 的 operator, 例如:

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .collectLatest { value ->
                delay(2000) // pretend we are processing it for 2 seconds
                log(value)
            }
    }
    log("Collected in $time ms")
}

行为与 conflate() 比较像,区别在于,使用 xxxLatest, 后续 emit 的 collector 不是覆盖之前的,而是将当前的 collector 取消,直接开始执行新 emit 的 collector. 当然如果 collector 执行得足够快,在下一个 collector 被 emit 之前执行完,就不会被取消。

同样类似的 operator 还有 flatMapLatest(), mapLatest()transformLatest().

Zip

zip() 可以把两个 flow 用你指定的方式合并成成一个 flow:

val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { println(it) } // collect and print

Sequence 也支持同样的操作

Combine

combine() 的行为和 zip() 差不多,区别在于如果两个 flow 的执行时间不一样,zip() 会按顺序将两个 flow 的执行结果一一对应起来,而 combine() 则是无论哪个 flow 先执行完,都会去另一个 flow 里取当前值。

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

可以将以上代码里的 zip() 改成 combine() 对比输出结果。

Flow 展开

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(300) } // a number every 300 ms
        .map { requestFlow(it) }
        .collect { value -> // collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

以上代码的 collector 里,value 本身是个 flow,因为 requestFlow() 的返回值是个 flow. map() 操作之后的结果实际上是个 Flow<Flow>. 要拿到里面的字符串,就需要对其展开。这时候需要用 flatMapConcat() 方法来替换 map().

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(300) } // a number every 300 ms
        .flatMapConcat { requestFlow(it) }
        .collect { value -> // collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

这样就能得到期望的结果。flatMapMerge() 则可以让 flatMapConcat() 的操作并发执行。flatMapLatest() 则类似 collectLatest().

异常的透明性

Kotlin 不建议在 flow builder 内部的 try/catch 块内调用 emit. 这违反了 flow 的 exception transparency 原则。要处理异常,可以调用 flow 的 catch() 方法,在 catch() {} 的 lambda 内针对具体的异常做处理。你可以:

  • 重新抛出这个异常;
  • emit() 把它交给外部处理;
  • 也可以忽略,或者记录在日志里,或者用任何其它方式来处理异常。
simple()
    .catch { e -> emit("Caught $e") } // emit on exception
    .collect { value -> println(value) }

catch() 方法只能捕获上游的异常,无法捕获下游的异常。

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }                 
            println(value) 
        }
}     

上面代码中的 catch() 就没有捕获 collector 里边的异常,要想达到效果,可以把抛出异常的代码放到 onEach() 里,并在 catch() 之前调用:

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()

Flow completion

Flow 在结束时有时需要执行一些操作,这些操作可以放在 finally 块里:

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}

Flow 提供了另一个叫 onCompletion() 的方法来处理类似的情况:

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }

它的 lambda 有一个 Throwable? 类型的参数,可以用于判断 flow 是正常结束还是异常结束,如果是正常结束,则该参数为 null.

onCompletion() 方法不像 catch() 一样会拦截异常,异常仍然会被后面的 catch() 捕获:

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .catch { cause -> println("Caught exception again") }
        .collect { value -> println(value) }
}

至于使用 try/finally 还是 onCompletion() 的方式, 官方没有给出具体建议,由开发者根据需要自行选择。

Launching flow

Flow 的 onEach() 方法可以起到一个类似 addEventListener 的作用,把一小段代码插入 flow 的处理逻辑当中。但是 onEach() 是一个 intermediate operator, 不会触发 flow 的执行,所以需要配合 collect() 方法:

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}  

Flow 提供了一个 launchIn() 方法,该方法作用和 collect() 类似,区别是 launchIn() 会在指定上下文里启动一个协程执行。所以它接受一个类型为 CoroutineScope 的参数,用于指定执行的环境。

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}

launchIn() 的返回值是一个 Job, 你可以对其进行各种协程允许的操作,例如取消。

flow builder 会为每个 emit 自动执行 ensureActive() 用于检测取消动作,这意味着循环执行的 emit 是可以取消的。

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

但是出于性能的考量,多数的 flow operator 没有额外的取消动作检测,所以是无法取消的,例如:

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

在这种情况下,如果你想要让这个 flow 变得可取消,可以使用 .onEach { currentCoroutineContext().ensureActive() },不过这个写法比较麻烦,写多了又成了重复代码,因此 Kotlin 提供了一个 cancellable() 方法:

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

异常

使用 launch 创建根协程(没有父协程的协程)时,协程内部抛出的异常被当作未捕获的异常 (uncaught exception) 来对待。使用 async 创建根协程时,异常的表现根据用户的使用方式而有所不同。

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val job = GlobalScope.launch { // root coroutine with launch
        println("Throwing exception from launch")
        throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async { // root coroutine with async
        println("Throwing exception from async")
        throw ArithmeticException() // Nothing is printed, relying on user to call await
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}

上面的代码会输出

Throwing exception from launch Exception in thread "DefaultDispatcher-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException Joined failed job Throwing exception from async Caught ArithmeticException

将代码中的 await() 调用改为 join(), 会发现异常同样无法捕获,但并没有中断后续的程序执行,之前没有输出的 "Unreached" 现在输出了。

对于 uncaught exception, Kotlin 默认的处理方式就是把异常栈打印出来。你可以通过 CoroutineExceptionHandler 来处理 uncaught exception, 从而改变这种默认处理方式,类似 Java 的 Thread.uncaughtExceptionHandler. 一般用于记录异常日志、显示一些错误信息、终止或者重启应用。

在 JVM 平台上,可以通过用 ServiceLoader 注册 CoroutineExceptionHandler 的方法重新定义 global exception handler. Global exception handler 类似 Thread.defaultUncaughtExceptionHandler, 没有特别指定 handler 时,会被默认使用。在 Android 平台上,uncaughtExceptionPreHandler 被用作 global exception handler.

CoroutineExceptionHandler 仅在 uncaught exceptions 发生时被调用。实际上,所有子协程的异常处理会被代理给父协程,直至根协程。在子协程上设置 exception handler 是无效的,不会被使用。

val handler = CoroutineExceptionHandler { _, exception -> 
    println("CoroutineExceptionHandler got $exception") 
}
val job = GlobalScope.launch(handler) { // 根协程,运行在 GlobalScope 里
    throw AssertionError()
}
val deferred = GlobalScope.async(handler) { // 同样是根协程,但这里用的是 async
    throw ArithmeticException() // 没有打印出异常信息
}
joinAll(job, deferred)
val handler = CoroutineExceptionHandler { _, exception ->
    println("CoroutineExceptionHandler got $exception")
}

val handler2 = CoroutineExceptionHandler { _, e ->
    println("CoroutineExceptionHandler 2 got $e")
}

@OptIn(DelicateCoroutinesApi::class)
fun main() {
    runBlocking {
        GlobalScope.launch(handler) {
            launch(handler2) { // exception handler 设置在子协程里无效
                throw AssertionError()
            }.join()
        }.join()
    }
}

实际上,把上面示例里的 GlobalScope.launch 的参数移除,子协程里的 handler2 仍然是无效的。同时这里的根协程如果不调用 join 方法,同样不会打印异常信息。

前面提到过,协程的取消会抛出 CancellationException, 这些异常会被所有的 exception handler 忽略。它们只应该在 debug 时的 catch 块里用作 debug 信息。当一个协程被调用 Job.cancel() 取消时,这个协程会终止,但是不会波及它的父协程。

当某个协程内部抛出除 CancellationException 以外的异常时,这个异常会往上传播给父级协程,然后,父级协程将会:1, 取消它内部的所有其它子协程;2, 取消它自己;3, 将这个异常继续往上传播给该父级协程的父级协程。这样直达根协程。

当多个子协程抛出异常时,只有第一个抛出的异常会被处理,其余的异常会作为第一个异常的 suppressed exception.

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE) // it gets cancelled when another sibling fails with IOException
            } finally {
                throw ArithmeticException() // the second exception
            }
        }
        launch {
            delay(100)
            throw IOException() // the first exception
        }
        delay(Long.MAX_VALUE)
    }
    job.join()
}

根协程对异常进行处理的触发时机,是在所有子协程运行结束以后,下面是个示例:

val handler = CoroutineExceptionHandler { _, exception ->
    println("CoroutineExceptionHandler got $exception")
}

fun main() {
    runBlocking {
        val job = GlobalScope.launch(handler) {
            launch { // the first child
                try {
                    delay(Long.MAX_VALUE)
                } finally {
                    withContext(NonCancellable) {
                        println("Children are cancelled, but exception is not handled until all children terminate")
                        delay(100)
                        println("The first child finished its non cancellable block")
                    }
                }
            }
            launch { // the second child
                delay(10)
                println("Second child throws an exception")
                throw ArithmeticException()
            }
        }
        job.join()
    }
}

你会发现 exception handler 对于异常的处理发生在两个子协程都终止之后。

为了保证结构化并发的稳定,协程异常传播的这种行为无法被覆盖。这在某些情况下是有合适的,但是在另一些情况可能就不那么合适了。比如说在一个 UI 相关的 CoroutineScope 里,一旦子协程抛出异常,该 CoroutineScope 整个都会被取消,无法启动新的协程,UI 就失去了响应。

这种情况下,可以使用 Job 的另一种实现:SupervisorJob. 它跟普通的 Job 基本相同,区别在于,子协程的异常不会影响到父协程,这样自然也就不会波及同一父协程的其它子协程。而同时,父协程又能够控制所有的子协程。或者换一种说法:异常引起的协程取消只会向下传播,而不会向上传播。下面这个例子可以说明这一点:

fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // launch the first child -- its exception is ignored for this example (don't do this in practice!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("The first child is failing")
            throw AssertionError("The first child is cancelled")
        }
        // launch the second child
        val secondChild = launch {
            firstChild.join()
            // Cancellation of the first child is not propagated to the second child
            println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // But cancellation of the supervisor is propagated
                println("The second child is cancelled because the supervisor was cancelled")
            }
        }
        // wait until the first child fails & completes
        firstChild.join()
        println("Cancelling the supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}

在上面的例子中,要是把 supervisor.cancel() 这一行注释掉,可以观察到 firstChild 虽然失败了,但是 secondChild 仍然在运行,supervisor 本身也在运行。

supervisorScope() 方法与 coroutineScope() 方法相同,区别在于前者的 block 创建的是 SupervisorJob, 后者的 block 创建的是 Job.

SupervisorJob 和普通的 Job 在异常处理上还有一个很大的区别是,前者由于子协程不会把异常传播到父协程,所以每个子协程得指定自己的 exception handler,根协程的 exception handler 不会对子协程的异常进行任何处理。

val handler = CoroutineExceptionHandler { _, exception ->
    println("CoroutineExceptionHandler got $exception")
}

val handler2 = CoroutineExceptionHandler { _, e ->
    println("CoroutineExceptionHandler 2 got $e")
}

fun main() = runBlocking {

    GlobalScope.launch(handler) {
        supervisorScope {
            val child = launch(handler2) {
                println("The child throws an exception")
                throw AssertionError()
            }
            println("The scope is completing")
        }
    }.join()
    println("The scope is completed")
}

把上例的 supervisorScope() 替换成 coroutineScope() 可以观察到不同的输出结果。

需要注意的是,supervisor job 只保证自己不向上传播异常,但当它的子协程是普通 job 时,某个子协程引发的异常还是会导致自身取消,从而继续取消它的所有子协程。

val scope = CoroutineScope(SupervisorJob())
val job = scope.launch {
    launch {
        // child job
    }
}

上面的例子中,child job 就是一个普通的 job, 而不是 supervisor job. 如果你希望它是一个 supervisor job,就把它用 supervisorScope(){} 包起来,或者通过 scope.launch{} 来创建,或者传递一个 SupervisorJob()launch() 方法里。

Android 中的协程编程最佳实践

注入 Dispatcher

在新建协程或者调用 withContext() 时不要硬编码 Dispatcher

// 注入 Dispatchers
class NewsRepository(
    private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
    suspend fun loadNews() = withContext(defaultDispatcher) { /* ... */ }
}

// 不要硬编码 Dispatchers
class NewsRepository {
    // 像上一个示例那样注入 Dispatcher, 不要直接使用 Dispatchers.Default
    suspend fun loadNews() = withContext(Dispatchers.Default) { /* ... */ }
}

使用依赖注入模式能让代码变得更容易测试。这样就可以在单元测试和 instrumentation 测试中使用 TestCoroutineDispatcher.

viewModelScope 属性内部的 dispatcher 被硬编码成了 Dispatchers.Main, 测试时可以使用 Dispatchers.setMain() 方法将其替换成 TestCoroutineDispatcher. 详见这里

Suspend 方法应该能够在主线程当中安全地调用

Suspend 方法应当是“主线程安全”( main-safety ) 的。如果一个类在协程里进行长时间的阻塞操作,应该使用 withContext() 方法把操作从主线程当中移除。这个原则适用于 APP 当中所有的类。

class NewsRepository(private val ioDispatcher: CoroutineDispatcher) {

    // 这个操作是从服务器上获取新闻
    // 它使用了一个阻塞的 HttpURLConnection
    // 所以需要把操作移入一个 IO dispatcher,使之成为“主线程安全”的
    suspend fun fetchLatestNews(): List<Article> {
        withContext(ioDispatcher) { /* ... implementation ... */ }
    }
}

// 抓取最新的新闻以及相关联的作者
class GetLatestNewsWithAuthorsUseCase(
    private val newsRepository: NewsRepository,
    private val authorsRepository: AuthorsRepository
) {
    // 因为 newsRepository 是“主线程安全”的
    // 所以这个方法不需要将操作的协程移入另一个线程
    // 该协程只是创建了一个 list,并往里添加元素
    suspend operator fun invoke(): List<ArticleWithAuthor> {
        val news = newsRepository.fetchLatestNews()

        val response: List<ArticleWithAuthor> = mutableEmptyList()
        for (article in news) {
            val author = authorsRepository.getAuthor(article.author)
            response.add(ArticleWithAuthor(article, author))
        }
        return Result.Success(response)
    }
}

该模式能让你的 APP 更具可扩展性,因为在调用 suspend 方法时不再需要考虑该为哪种工作场景使用哪个 dispatcher, 这是属于方法提供者该考虑的事情。

ViewModel 应当创建协程(而不是暴露 suspend 方法)

// DO create coroutines in the ViewModel
class LatestNewsViewModel(
    private val getLatestNewsWithAuthors: GetLatestNewsWithAuthorsUseCase
) : ViewModel() {

    private val _uiState = MutableStateFlow<LatestNewsUiState>(LatestNewsUiState.Loading)
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    fun loadNews() {
        viewModelScope.launch {
            val latestNewsWithAuthors = getLatestNewsWithAuthors()
            _uiState.value = LatestNewsUiState.Success(latestNewsWithAuthors)
        }
    }
}

// Prefer observable state rather than suspend functions from the ViewModel
class LatestNewsViewModel(
    private val getLatestNewsWithAuthors: GetLatestNewsWithAuthorsUseCase
) : ViewModel() {
    // DO NOT do this. News would probably need to be refreshed as well.
    // Instead of exposing a single value with a suspend function, news should
    // be exposed using a stream of data as in the code snippet above.
    suspend fun loadNews() = getLatestNewsWithAuthors()
}
1
社区准则 博客 联系 社区 状态
主题