Kotlin 协程
基本概念
协程(Coroutine)是一个可以被挂起和恢复的代码执行单元。概念上它类似一个线程,在协程的代码块内的代码,与剩余的代码是并发执行的。概念上它可以被当作是一种轻量级的线程,但在使用上,又与线程有着非常多的不同。
协程是一种轻量级的异步编程模型,用于简化异步和并发任务的实现。通过挂起函数,协程实现了非阻塞的代码结构,允许以同步风格编写异步代码,从而减少回调地狱和复杂性。
挂起函数在编译时被转换为状态机,每个挂起点对应状态机中的一个状态。而协程则集成了状态机(挂起函数)、上下文和调度器等组件。
Kotlin 运行时(通过 kotlinx.coroutines
库)负责管理协程的生命周期,包括挂起、恢复、上下文切换和任务调度。
协程主要在用户态完成任务切换,避免了频繁的线程创建和销毁,不直接依赖操作系统的线程调度机制。这使协程非常轻量级,具有较小的内存开销和较低的切换成本,特别适合高并发的场景。
第一个协程
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}
launch()
launch()
用于创建一个协程,与余下的代码同时执行,所以在这个示例中,"Hello" 被先打印了出来。
delay()
是一个特殊的挂起函数,可以让协程挂起指定长度的时间(这里是 1000ms)。挂起的协程并不会阻塞承载它的线程,该线程会去执行其它任务。
runBlocking()
runBlocking()
也是一个协程创建函数,它用于桥接起非协程世界(non-coroutine world)的普通函数(fun main
)和它(协程)内部的代码块。它通常被用作创建最顶层的协程。它的调用会引起当前线程阻塞。
如果把代码中的 runBlocking()
移除,会发现 launch()
调用出错,无法编译。这是因为 launch()
是定义在 CoroutineScope
上的。而 runBlocking()
的 lambda 参数则是一个 CoroutineScope
的扩展函数,因此在其内部可以直接调用 launch()
。
挂起函数
// in CoroutineScope
launch {
val users = loadContributorsSuspend(req) // 挂起函数
updateResults(users, startTime)
}
suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> {
// ......
}
suspend
修饰的函数叫作挂起函数,例如这里的 loadContributorsSuspend()
。挂起函数只能在协程中被调用,或者被另一个挂起函数调用。
在一个协程内调用挂起函数(例如调用一个发起网络请求的挂起函数),该协程会变为挂起状态,并被从当前线程中移除,保存在内存当中,当前线程可以继续执行其它任务。当该协程准备好继续执行时(例如网络请求完成并返回),协程会重新加入一个线程(不一定是之前的那个线程),继续执行内部剩余部分的代码。
以下用一段代码来说明这个流程:
// 假设这是个网络请求的挂起函数
suspend fun fetchData(): String {
// 这一行是关键,它会创建一个状态,并告诉调度器"这里有个网络请求要处理"
return withContext(Dispatchers.IO) {
// 具体的网络请求代码
}
}
suspend fun example() {
// 开始时这个协程(状态机)运行在线程A上
println("开始")
// 遇到挂起点:
// 1. 保存当前状态
// 2. 告诉调度器"有个网络请求要处理"
// 3. 线程A被释放,可以去执行其他协程
val data = fetchData()
// 网络请求完成后:
// 协程(状态机)被调度到某个线程(可能是B)上继续执行
println("结束,数据:$data")
}
关键是:
- 协程是个状态机,它会告诉调度器"这里有个任务要处理"
- 调度器负责找合适的线程来处理这个任务
- 协程本身不关心具体是哪个线程在执行
所以协程跟线程没有固定的对应关系,它只是一系列可以被调度执行的状态。
async()
和 Deferred
async()
和 launch()
一样,是 CoroutineScope
的一个函数,也能够创建一个协程,并且会返回一个 Deferred
对象。Deferred
是一个泛型类,可以对其调用 await()
函数用于获取 async()
所返回的数据。对 await()
函数的调用会引起调用该方函数协程(也就是父协程)挂起。可以理解为把 Deffered 所控制的那个协程与当前协程汇合,并等待 Deffered 所控制的协程执行完毕。
suspend fun loadData(): Int {
println("loading...")
delay(1000L)
println("loaded!")
return 42
}
fun main() = runBlocking {
val deferred: Deferred<Int> = async {
loadData()
}
println("waiting...")
println(deferred.await())
println("end")
}
对于 Collection<Deferred<T>>
类,Kotlin 还提供了一个 awaitAll()
函数,用于获取一组 Deferred
对象的返回结果:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferreds: List<Deferred<Int>> = (1..3).map {
async {
delay(1000L * it)
println("Loading $it")
it
}
}
val sum = deferreds.awaitAll().sum()
println("$sum")
}
Job
launch()
创建协程后,会返回一个 Job
对象。Job
是 Deferred
的父类。如果父协程需要等待该协程执行完毕,可以对其调用 join()
函数达到与 Deferred.await()
类似的效果,但区别是 join()
函数不返回值。
suspend fun loadData(): Int {
// ...
}
fun main() = runBlocking {
val job = launch {
loadData()
}
println("waiting...")
job.join()
println("end")
}
当然,因为 Deferred
是 Job
的子类,所以 Deferred
也可以调用 join()
函数,只是这种情况下没有必要使用 Deferred
,直接用 Job
即可。
async()
会将异常包装在 Deferred
对象里返回,launch()
则会抛出未捕获的异常。
要注意的是,launch()
和 async()
是创建协程,并返回一个 Job
, 这里的 Job
和协程不是一回事。Job
跟 Deffered
是协程的一个控制器(handle),通常也被译为“句柄”。
同 Collection<Deferred<T>>
的 awaitAll()
一样,Collection<Job<T>>
也有与之对应的 joinAll()
函数。
CoroutineScope
runBlocking()
, launch()
, async()
以及后面会提到的 withContext()
coroutineScope()
等函数,它们都接收一个 lambda 参数,该 lambda 参数被定义为 CoroutineScope
的一个扩展函数,所以在这些 lambda 内部可以直接调用 CoroutineScope
的成员函数,其中就包括 launch()
,async()
。
Channel
基本概念
Kotlin 的 coroutine 包里提供了个叫作 Channel
的类,用于在各协程之间通信、共享数据。Channel 是线程安全的。Channel 里可以存放任意类型的数据。提供的操作就是生产者方 send,消费者方 receive,很常见的生产-消费模式。
Kotlin 里的 Channel
是个接口,继承了 SendChannel
和 ReceiveChannel
,前者有一个 send()
和一个 close()
函数,后者有一个 receive()
函数。send()
函数和 receive()
函数都是挂起函数。以下是用法:
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
// 这里可以是很消耗 CPU 的计算,或者异步逻辑
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
注意,以上代码中的 Channel()
是个与 Channel
接口同名的工厂函数。
关闭 Channel
close()
函数用于关闭发送方的 Channel,告知接收方 Channel 已关闭。接收方有几种方式获知 Channel 是否关闭:
- for 循环会在 channel 关闭且所有数据都被消费后自动结束
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
channel.send(1)
channel.send(2)
channel.close()
}
// for 循环会在 channel 关闭且所有数据都被消费后自动结束
for (item in channel) {
println(item)
}
println("Channel is closed")
}
- 使用
receiveCatching()
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
channel.send(1)
channel.close()
}
// receiveCatching 返回 ChannelResult
val result = channel.receiveCatching()
if (result.isClosed) {
println("Channel is closed")
} else {
println("Received: ${result.getOrNull()}")
}
}
- 通过
isClosedForReceive
属性
while (!channel.isClosedForReceive) {
try {
val value = channel.receive()
println(value)
} catch (e: ClosedReceiveChannelException) {
println("Channel was closed")
break
}
}
- 捕获异常
try {
while (true) {
val value = channel.receive()
println(value)
}
} catch (e: ClosedReceiveChannelException) {
println("Channel is closed")
}
如果不调用 close()
:
- channel 会一直保持打开状态
- 接收方可能一直等待新数据
- 可能导致协程泄漏
- 无法通过 for 循环优雅地遍历 channel
所以在使用完 Channel 后调用 close()
是一个很重要的最佳实践。
Channel 的类型
-
Unlimited Channel: 容量无限,因此
send()
函数不会被挂起,但如果内存不足的话会抛出异常OutOfMemoryException
;当 channel 是空的的时候,receive()
函数会被挂起; -
Buffered Channel: 设定了容量的 Channel,
send()
函数在 channel 满的时候会挂起; -
Rendezvous Channel: 相当于容量为 0 的 Buffered Channel。当
send()
函数被调用,并且receive()
函数没有被调用时,send()
函数会被挂起,直到receive()
函数被调用;同样,当receive()
函数被调用,并且send()
函数没有被调用的时候,receive()
函数会被挂起; -
Conflated Channel:
send()
函数连续调用,并且 channel 里的元素没有被消费的话,先前的元素会被后来的元素所覆盖,消费方只能取到最后放进去的元素。
Kotlin 提供了一个工厂函数 Channel()
用来创建不同类型的 Channel。不传任何参数时,默认创建的是 Rendezvous Channel;当参数为任意正整数时,创建的是 Buffered Channel,参数为该 Channel 的容量;当参数为 Channel.UNLIMITED
, Channel.CONFLATED
等用于指定类型的常量时,创建的就是对应类型的 Channel。
延迟执行(CoroutineStart.LAZY)
launch()
和 async()
的 start 参数可以用来指定协程的启动模式。async(start = CoroutineStart.LAZY)
可以让协程延迟执行,直到 start()
或 await()
函数被调用。
fun main() {
runBlocking {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// some computation
one.start() // start the first one
two.start() // start the second one
println("all coroutine started")
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
}
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
println("do something useful one")
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
println("do something useful two")
return 29
}
运行这段代码会发现总运行时间为 1 秒左右。但是如果把代码中的两处 start()
调用删掉,会发现用时为 2 秒左右。因为两个协程直到 await()
调用时才开始执行,并且 await()
会挂起当前协程,使两个协程按 await()
的调用顺序执行。而 start()
能让协程开始执行,同时又不挂起当前协程。
但如果再同时删除掉 async()
的参数的话,默认参数是 CoroutineStart.DEFAULT
,协程在创建时会立即启动并发运行,运行时间又变回 1 秒左右了。
CoroutineStart.LAZY
配合 start()
函数使用,可以控制协程的执行时机。
取消协程
取消协程有两种方式,一种是通过流程控制,提前结束协程的执行,另一种是通过直接或间接地(例如通过 withTimeout()
函数)调用 cancel()
函数来结束协程。
cancel()
和 cancelAndJoin()
fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
这里的 job 会在执行第 3 次时被取消。join()
函数的调用,是为了确保当前协程退出前,job 里的代码执行完毕(包括资源释放)。cancel()
函数不能在 join()
之后调用,因为先调用 join()
,当前协程会等待 job 执行完毕,而 job 内部耗时很长,如果等待其执行完毕,那么后面的 cancel()
就完全没有必要了。而如果 job 内部是个死循环,则 cancel()
函数永远执行不到。
由于 cancel()
和 join()
常常同时使用,Kotlin 另外提供了个 cancelAndJoin()
的快捷函数。
协程取消的流程
首先,cancel()
函数会将协程标记为取消状态。
然后,kotlinx.coroutines
包里的所有挂起函数内部都含有一个“检查点”,在执行到检查点时,会判断协程的状态,如果协程处于取消状态,则抛出 CancellationException
异常,从而中断接下来的操作。
但并不是每个挂起函数都含有检查点,例如自定义的未调用其它任何挂起函数的挂起函数:
suspend fun myFunction() {
// 这个函数本身不会检查取消状态
heavyComputation()
}
用 NonCancellable
上下文包装了的挂起函数调用:
suspend fun function1() {
withContext(NonCancellable) {
delay(100)
}
}
runInterruptible
内部的代码不检查取消状态:
suspend fun function2() = runInterruptible {
Thread.sleep(100)
}
suspendCoroutine
本身不会检查取消状态:
suspend fun function3() = suspendCoroutine<Unit> { continuation ->
continuation.resume(Unit)
}
如果一个协程内没有任何检查点,则无法取消。
取消没有检查点的协程
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
上面的代码会一直执行到 5 次打印结束。如果你想让这种没有挂起函数调用的协程可取消,一种办法是加入一个包含检查点的挂起函数。在这里,yield()
通常可以起到这个作用。yield()
会让出当前协程的执行权。
while (i < 5) {
yield()
......
}
另一个办法是加入 isActive
作为判断条件。
while (i < 5 && isActive) {
......
}
或者用 ensureActive()
:
while (i < 5) {
ensureActive()
......
}
isActive
和 ensureActive()
的区别在于,后者是个含有检查点的挂起函数,在检测到协程取消时会抛出异常,而 isActive
只用于检查状态。实际上,ensureActive()
就是所谓的检查点,而对 isActive
进行判断,是它内部的一部分逻辑。
至于选择使用 yield()
还是 ensureActive()
,取决于你是否需要协程暂停执行,以及是否需要在检查取消状态之外进行其他协程的调度。
释放资源
前面提到取消协程会抛出 CancellationException
异常,如果需要在协程取消时释放资源,可以把释放资源的代码写在 finally
块里。
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
无论如何都要执行的释放操作
任何企图在 finally
里执行的挂起函数,在当被捕获的异常为 CancellationException
时,都会同样抛出 CancellationException
,这通常不是什么问题。因为通常来说,释放资源的代码都是非阻塞 (non-blocking) 的。
但在某些罕见的情况下,如果你希望在 finally
里调用的挂起函数能够执行完,可以使用 withContext(NonCancellable) {...}
。
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
为协程设置超时
如果想为某挂起函数的调用设置超时时间,可以使用 withTimeout()
函数
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
该函数会在其内部的挂起函数执行超时时将其取消,并抛出 TimeoutCancellationException
,该异常是 CancellationException
的子类。但这里有个区别是:前者被引发时,会输出异常堆栈信息,后者不会有任何输出。这是因为在被取消的协程内部,CancellationException
被认为是一个正常的原因。
因为我们会为 CancellationException
在 finally
块里编写资源释放的代码,所以没有必要再专门为 TimeoutCancellationException
写一遍。如果有专门针对该子类异常的处理逻辑,可以写在 try {...} catch (e: TimeoutCancellationException) {...}
里边,或者使用 withTimeoutOrNull()
函数,该函数在超时时不会抛出异常,取而代之的是返回一个 null
值。
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
结构化并发 (Structured Concurrency)
协程是可以嵌套的,每一个协程自己界定了一个范围 (CoroutineScope),限制了在其范围内的子协程的寿命:父协程被取消,所有子协程都将被取消;父协程的结束,也必须等待所有子协程结束。
coroutineScope()
coroutineScope()
函数可用于在不创建协程的情况下创建一个 CoroutineScope
,继承了当前上下文 (CoroutineContext
) 中除了 Job 元素以外的所有的元素 (element)。它是个挂起函数,会挂起当前的协程,直至内部协程执行完毕。
fun main() {
runBlocking {
coroutineScope {
launch {
delay(2000)
println("- World 2")
}
launch {
delay(1000)
println("- World")
}
println("Hello")
}
println("Done")
}
}
输出
Hello
- World
- World 2
Done
可以看到打印 Hello 之后没有接着打印 Done,而是等待内部的两个协程执行完。如果去掉这个 coroutingScope()
的调用,则会输出:
Hello
Done
- World
- World 2
考虑如下代码
// The result type of somethingUsefulOneAsync is Deferred<Int>
@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulOneAsync() = GlobalScope.async {
doSomethingUsefulOne()
}
// The result type of somethingUsefulTwoAsync is Deferred<Int>
@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulTwoAsync() = GlobalScope.async {
doSomethingUsefulTwo()
}
// note that we don't have `runBlocking` to the right of `main` in this example
fun main() {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
这里在 runBlocking()
之外使用了 GlobalScope.async()
,它创建的协程与其它协程没有任何层级关系。假如 two
执行出错,抛出异常,one
会仍然在后台执行。在结构化并发的写法中,这种情况可以避免。
suspend fun concurrentSum(): Int = coroutineScope {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
one.await() + two.await()
}
fun main() = runBlocking {
val time = measureTimeMillis {
println("The answer is ${concurrentSum()}")
}
println("Completed in $time ms")
}
这里使用 coroutineScope()
创建了一个 CoroutineScope
, 在其内部创建的协程算作该 coroutine scope 的子协程,如果 concurrentSum()
内部有任何代码抛出异常,它内部创建的所有协程都会被取消。这也是“结构化并发”的一个特点。结构化并发有助于管理协程的生命周期和资源占用。
CoroutineScope()
Kotlin 另外提供了一个 CoroutineScope()
工厂函数,它与 coroutineScope()
的区别在于,前者是用于创建并返回一个 CoroutineScope
实例,可以用一个变量指向它,在需要的地方调用它的 launch()
等函数来创建协程。后者则是直接在当前上下文当中就地创建一个 CoroutineScope
(并没有创建协程)并执行其中的代码。
Context
Kotlin 的协程总是运行在一个叫 CoroutineContext 的上下文环境里,CoroutineContext 里包含的主要元素 (element) 有 Job
,CoroutineDispatcher
,CoroutineExceptionHandler
和 CoroutineName
。这些元素都实现了 CoroutineContext.Element
,而 CoroutineContext.Element
又继承了 CoroutineContext
,因此,这些元素本身也是 CoroutineContext
。
CoroutineContext
有一个叫 plus()
的 operator 函数,它可以将各个 CoroutineContext
拼起来,形成一个新的 CoroutineContext
。于是我们会见到类似这样的代码:
launch(Dispatchers.IO + Job() + CoroutineName("MyCoroutine")) {
println("Running with context: $coroutineContext")
}
所有的协程创建函数,例如 launch()
和 async()
,均接受一个可选的 CoroutineContext 参数。该参数用于显式地指定新协程所使用的 dispatcher,以及其它的 context element。不指定参数的时候,新的协程从父级的 CoroutineScope 里继承 context。
Dispatcher
CoroutineDispatcher
是实现了 CoroutineContext
的一个抽象类,它主要用于决定协程该分配到哪个线程或哪个线程池里的线程上。
Dispatchers.Default
Dispatchers.Default
是 CoroutineDispatcher 的其中一种实现。当你给 async()
或者 launch()
函数传递参数 Dispatchers.Default
时,它会从 JVM 的一个共享线程池里获取空闲的线程。这个共享线程池的线程数量与 CPU 的核心数量相等。但是当 CPU 只有一个核心时,线程池里会有两个线程。只要并发量足够大,便可以在日志中打印线程 ID 来观察到效果。同时我们会看到,启动协程的线程,和协程由挂起状态恢复后所在的线程很多时候并不是同一个线程。
实际上 Dispatchers.Default
比较适用于 CPU 计算密集型的任务。
Dispatchers.IO
Kotlin 还提供了 Dispatchers.IO
,从名字上很容易看出,这个 Dispatcher 适用于 IO 密集型的任务。
Dispatchers.Main
如果希望协程只在主线程当中运行,可以使用 Dispatchers.Main
。MainScope()
工厂函数创建的 CoroutineScope 默认使用 Dispatchers.Main
.
Dispatchers.Unconfined
使用 Dispatchers.Unconfined
时,新的协程会先在创建协程的那个线程中执行,直到遇到协程内部的挂起函数,之后的代码会在恢复线程上执行。总之就是很随意。适用于不消耗 CPU,又不更新任何共享数据(比如 UI 状态)的操作。(这是官方说法。我暂时也想不到适用于哪里。)
fun main() = runBlocking<Unit> {
launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
delay(500)
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
launch { // context of the parent, main runBlocking coroutine
println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}
}
// Unconfined : I'm working in thread main @coroutine#2
// main runBlocking: I'm working in thread main @coroutine#3
// Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor@coroutine#2
// main runBlocking: After delay in thread main @coroutine#3
withContext()
withContext()
是一个挂起函数,用于切换协程执行的上下文,并在代码块执行完毕后返回结果。其作用与 async(){}.await()
效果类似。但 withContext()
不会启动新的协程,它只是一个挂起函数,只会切换到目标上下文中运行代码。
suspend fun example() {
println("Before withContext: ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {
println("Inside withContext: ${Thread.currentThread().name}")
}
println("After withContext: ${Thread.currentThread().name}")
}
Flow
挂起函数只能异步地返回单个值,如果要异步地返回多个值,就需要用到 Flow。Flow 的用法类似 Sequence,可以当作是异步版本的 Sequence。它也有个 flow builder。
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
Flow 得运行在协程里。确切地说,是 Flow 的终结操作(terminal operator)得运行在协程里。
和 Sequence 一样,Flow 也是惰性的,在调用 collect()
等终结操作符之前,其内部的代码不会被执行。
RestrictsSuspension
Sequence 有个特点是:在其内部无法执行自身以外的挂起函数。想要理解这一点,需要先搞清楚 RestrictsSuspension
这个注解的作用。官方文档里写着:
Classes and interfaces marked with this annotation are restricted when used as receivers for extension suspend functions. These suspend extensions can only invoke other member or extension suspend functions on this particular receiver and are restricted from calling arbitrary suspension functions.
意思是,被该注解标记的类和接口,它们的扩展挂起函数 (extension suspend function) 无法调用定义于其外部的任何其它挂起函数。用一段代码来理解:
@RestrictsSuspension
class RestrictsSuspensionClass {
suspend fun a() {
println("RestrictsSuspensionClass fun a")
NormalClass().a() // (1)
delay(1) // (2)
}
}
suspend fun RestrictsSuspensionClass.b() {
println("RestrictsSuspensionClass fun b")
NormalClass().a() // (3)
this.a() // (4)
delay(1) // (5)
}
class NormalClass {
suspend fun a() {
println("NormalClass fun a")
}
}
这段代码里,(1) 和 (2) 能通过编译,是因为这两处的调用在 RestrictsSuspensionClass
的 fun a()
内部,这个 fun a()
不是一个扩展函数,它对其它挂起函数的调用不受限制。
(3) 无法通过编译,是因为 fun RestrictsSuspensionClass.b()
是一个扩展函数,并且 NormalClass().a()
是一个定义于 NormalClass 之上的挂起函数。
(4) 也是一个挂起函数,但它是 RestrictsSuspensionClass
自己的函数,所以可以通过编译。
(5) 无法通过编译的原因和 (3) 一样,delay()
是一个定义于 RestrictsSuspensionClass
之外的挂起函数。
现在再回头看,sequence()
这个 sequence builder 接受的参数是个 lambda,并且该 lambda 被定义为 SequenceScope
的扩展挂起成员函数。而 SequenceScope
本身又被 RestrictsSuspension
这个注解标记,所以无法调用 delay()
函数。如果需要类似 Sequence 的特性,而又想在其内部调用任意挂起函数,则需要考虑使用 Flow。
Flow 的取消
Flow 的取消完全依赖于协程的取消机制,Flow 本身只是在一些关键操作点(主要是 emit()
)添加了取消检查的逻辑。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> println(value) }
}
println("Done")
}
withTimeoutOrNull()
函数会在第 250 毫秒时取消协程。执行这段代码,会发现它只打印了 1 和 2。
创建 Flow 的方法
除了上述示例中用到的 flow builder 以外,还有以下创建 flow 的方式:
val flow1 = flowOf(1, 2, 3)
(1..3).asFlow()
sequenceOf(1, 2, 3).asFlow()
Transformation operator
flow 有许多 transformation operators,包括在 Iterable 里常见的 map()
, filter()
等。还有个更基础的 transform()
:
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
Size-limiting operator
flow 还有个 take()
函数叫作 size-limiting operator, 它在取得足够多的结果之后会取消该 flow 的执行,同时像协程的取消操作一样,flow 的取消会抛出一个异常。
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
终结操作符
toList()
, toSet()
, first()
, reduce()
, fold()
这些从 flow 里取结果的函数,包括前面提到的 collect()
,叫作终结操作符(terminal operator)。
Flow 的上下文
Flow 执行的上下文默认为调用该 flow 的终结操作符时所在的协程上下文,这种特性叫作“上下文保留” (context preservation).
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
fun <T> log(msg: T) = println("[${Thread.currentThread().name}] $msg")
如果想要改变 flow 的执行上下文,需要使用 flowOn()
函数。
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}.flowOn(Dispatchers.Default)
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
fun <T> log(msg: T) = println("[${Thread.currentThread().name}] $msg")
缓冲 (Buffering)
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000) // pretend we are asynchronously waiting 1 second
log("processing before emit")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(2000) // pretend we are processing it for 2 seconds
log(value)
}
}
println("Collected in $time ms")
}
这段代码在执行时,先 delay 1 秒,接着执行 collector (emit()
里的代码),耗时 2 秒,再 delay 1 秒,接着执行 collector … 总耗时 9 秒。如果用上 buffer()
,会开启另一个协程去执行 collector,然后直接返回,执行下一个 1 秒的 delay()
。这样由于后两个 1 秒的 delay()
花费的时间包含在了更花时间的 emit()
里,最终总耗时 7 秒:
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().buffer().collect { value ->
delay(2000) // pretend we are processing it for 2 seconds
log(value)
}
}
println("Collected in $time ms")
}
合流 (Conflation)
conflate()
的表现和 buffer()
差不多,区别在于,使用 buffer()
时,如果单个 collector 耗时太长,后续的所有 collector 会一直等待,直到挨个执行完毕。而使用 conflate()
时,单个 collector 如果耗时太长,只会有最新的一个 collector 在等待,后续 emit 的 collector 会覆盖之前的。行为跟前面提到的 conflated channel 一样。
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(2000) // pretend we are processing it for 2 seconds
log(value)
}
}
log("Collected in $time ms")
}
处理最后一个值
Flow 还提供了一系列 xxxLatest 的 operator,例如:
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value ->
delay(2000) // pretend we are processing it for 2 seconds
log(value)
}
}
log("Collected in $time ms")
}
行为与 conflate()
比较像,区别在于,使用 xxxLatest, 后续 emit 的 collector 不是覆盖之前的,而是将当前的 collector 取消,直接开始执行新 emit 的 collector. 当然如果 collector 执行得足够快,在下一个 collector 被 emit 之前执行完,就不会被取消。
同样类似的 operator 还有 flatMapLatest()
, mapLatest()
和 transformLatest()
.
Zip
zip()
可以把两个 flow 用你指定的方式合并成成一个 flow:
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
Sequence 也支持同样的操作。
Combine
combine()
的行为和 zip()
差不多,区别在于如果两个 flow 的执行时间不一样,zip()
会按顺序将两个 flow 的执行结果一一对应起来,而 combine()
则是无论哪个 flow 先执行完,都会去另一个 flow 里取当前值。
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
可以将以上代码里的 zip()
改成 combine()
对比输出结果。
Flow 展开
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(300) } // a number every 300 ms
.map { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
以上代码的 collector 里,value
本身是个 flow,因为 requestFlow()
的返回值是个 flow. map()
操作之后的结果实际上是个 Flow<Flow<String>>
. 要拿到里面的字符串,就需要对其展开。这时候需要用 flatMapConcat()
函数来替换 map()
。
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(300) } // a number every 300 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
这样就能得到期望的结果。flatMapMerge()
则可以让 flatMapConcat()
的操作并发执行。flatMapLatest()
则类似 collectLatest()
.
异常的透明性
Kotlin 不建议在 flow builder 内部的 try/catch
块内调用 emit. 这违反了 flow 的 exception transparency 原则。要处理异常,可以调用 flow 的 catch()
函数,在 catch() {}
的 lambda 内针对具体的异常做处理。你可以:
- 重新抛出这个异常;
- 用
emit()
把它交给外部处理; - 也可以忽略,或者记录在日志里,或者用任何其它方式来处理异常。
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
catch()
函数只能捕获上游的异常,无法捕获下游的异常。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
上面代码中的 catch()
就没有捕获 collector 里边的异常,要想达到效果,可以把抛出异常的代码放到 onEach()
里,并在 catch()
之前调用:
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
Flow completion
Flow 在结束时有时需要执行一些操作,这些操作可以放在 finally
块里:
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
Flow 提供了另一个叫 onCompletion()
的函数来处理类似的情况:
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
它的 lambda 有一个 Throwable?
类型的参数,可以用于判断 flow 是正常结束还是异常结束,如果是正常结束,则该参数为 null
.
onCompletion()
函数不像 catch()
一样会拦截异常,异常仍然会被后面的 catch()
捕获:
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.catch { cause -> println("Caught exception again") }
.collect { value -> println(value) }
}
至于使用 try/finally
还是 onCompletion()
的方式, 官方没有给出具体建议,由开发者根据需要自行选择。
Launching flow
Flow 的 onEach()
函数可以起到一个类似 addEventListener 的作用,把一小段代码插入 flow 的处理逻辑当中。但是 onEach()
是一个 intermediate operator, 不会触发 flow 的执行,所以需要配合 collect()
函数:
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
Flow 提供了一个 launchIn()
函数,该函数作用和 collect()
类似,区别是 launchIn()
会在指定上下文里启动一个协程执行。所以它接受一个类型为 CoroutineScope
的参数,用于指定执行的环境。
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
launchIn()
的返回值是一个 Job
, 你可以对其进行各种协程允许的操作,例如取消。
flow builder 会为每个 emit 自动执行 ensureActive()
用于检测取消动作,这意味着循环执行的 emit 是可以取消的。
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
但是出于性能的考量,多数的 flow operator 没有额外的取消动作检测,所以是无法取消的,例如:
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
在这种情况下,如果你想要让这个 flow 变得可取消,可以使用 .onEach { currentCoroutineContext().ensureActive() }
,不过这个写法比较麻烦,写多了又成了重复代码,因此 Kotlin 提供了一个 cancellable()
函数:
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
异常
使用 launch()
创建协程时,如果协程内部出了异常,你无法捕获和处理该异常。
fun main() = runBlocking {
val job = launch {
throw IndexOutOfBoundsException("Error in coroutine")
}
try {
println("Before join")
job.join()
} catch (e: IndexOutOfBoundsException) {
println("Caught: ${e.message}") // 这里不会执行
}
println("end") // 这里不会执行
}
同时,因为异常导致了协程取消,子协程被取消后,异常会继续层层上向传播给所有父协程,导致父协程被取消,因此运行在父协程里的 println("end")
没有执行。
使用 async()
创建协程时,如果协程内部出了异常,你可以通过使用 try-catch 块包裹 deferred.await()
的方式来捕获异常。
fun main() = runBlocking {
val deferred = async {
throw IndexOutOfBoundsException("Error in coroutine")
}
try {
println("Before await")
deferred.await()
} catch (e: IndexOutOfBoundsException) {
println("Caught: ${e.message}") // 这里会执行
}
println("end") // 这里会执行
}
因为异常被捕获并处理了,所以这里的 println("end")
会被执行。
但这里如果调用的是 deferred.join()
,使用 async()
创建协程和使用 launch()
就没什么区别,异常同样无法捕获。join()
函数本身不会抛出异常,它只是等待协程完成。即使协程内部抛出了异常,join()
也不会将该异常传播到调用者。 因此,在 join()
外面包裹 try-catch 块是没有意义的。
根协程
通过 GlobalScope
创建根协程时,GlobalScope.launch()
创建的协程在抛出异常时,异常同样无法被捕获。同时,因为根协程运行在 runBlocking()
创建的协程之外,不构成结构化并发,所以根协程的异常并不影响 runBlocking()
创建的协程,最后的 println("end")
会执行。
@OptIn(DelicateCoroutinesApi::class) // 用于消除使用 GlobalScope 的警告
fun main() = runBlocking {
val job = GlobalScope.launch {
throw IndexOutOfBoundsException("Error in coroutine")
}
try {
println("Before join")
job.join()
} catch (e: IndexOutOfBoundsException) {
println("Caught: ${e.message}") // 这里不会执行
}
println("end") // 这里会执行
}
而使用 GlobalScope.async()
创建根协程时,同上面一样:如果使用 join()
将协程汇入当前协程,它的表现和 GlobalScope.launch()
创建的协程差不多;如果使用 await()
,则可以捕获异常并进行处理。
异常处理器(ExceptionHandler)
对于未捕获的异常,Kotlin 默认的处理方式就是把异常栈打印出来。你可以通过 CoroutineExceptionHandler
来处理未捕获的异常,从而改变这种默认处理方式,类似 Java 的 Thread.uncaughtExceptionHandler
。一般用于记录异常日志、显示一些错误信息、终止或者重启应用。
在 JVM 平台上,Kotlin 可以通过用 ServiceLoader
注册 CoroutineExceptionHandler
的方法重新定义 global exception handler。Global exception handler 类似 Thread.defaultUncaughtExceptionHandler
,没有特别指定 handler 时,会被默认使用。在 Android 平台上,uncaughtExceptionPreHandler
被用作 global exception handler。
CoroutineExceptionHandler
仅在未捕获的异常发生时被调用。实际上,所有子协程的异常处理会被代理给父协程,直至根协程。在子协程上设置 exception handler 是无效的,不会被使用。
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) { // 根协程,运行在 GlobalScope 里
throw AssertionError()
}
val deferred = GlobalScope.async(handler) { // 同样是根协程,但这里用的是 async
throw ArithmeticException() // 没有打印出异常信息
}
joinAll(job, deferred)
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val handler2 = CoroutineExceptionHandler { _, e ->
println("CoroutineExceptionHandler 2 got $e")
}
@OptIn(DelicateCoroutinesApi::class)
fun main() {
runBlocking {
GlobalScope.launch(handler) {
launch(handler2) { // exception handler 设置在子协程里无效
throw AssertionError()
}.join()
}.join()
}
}
实际上,把上面示例里的 GlobalScope.launch
的参数移除,子协程里的 handler2 仍然是无效的。同时这里的根协程如果不调用 join()
函数,同样不会打印异常信息。
异常的传播
前面提到过,协程的取消会抛出 CancellationException
,这些异常会被所有的 exception handler 忽略。它们只应该在 debug 时的 catch
块里用作 debug 信息。当一个协程被调用 Job.cancel()
取消时,这个协程会终止,但是不会波及它的父协程。
当某个协程内部抛出除 CancellationException
以外的异常时,这个异常会往上传播给父级协程,然后,父级协程将会:1,取消它内部的所有其它子协程;2,取消它自己;3,将这个异常继续往上传播给该父级协程的父级协程。这样直达根协程。
当多个子协程抛出异常时,只有第一个抛出的异常会被处理,其余的异常会作为第一个异常的 suppressed exception。
@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE) // it gets cancelled when another sibling fails with IOException
} finally {
throw ArithmeticException() // the second exception
}
}
launch {
delay(100)
throw IOException() // the first exception
}
delay(Long.MAX_VALUE)
}
job.join()
}
异常处理的时机
根协程对异常进行处理的触发时机,是在所有子协程运行结束以后,下面是个示例:
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
fun main() {
runBlocking {
val job = GlobalScope.launch(handler) {
launch { // the first child
try {
delay(Long.MAX_VALUE)
} finally {
withContext(NonCancellable) {
println("Children are cancelled, but exception is not handled until all children terminate")
delay(100)
println("The first child finished its non cancellable block")
}
}
}
launch { // the second child
delay(10)
println("Second child throws an exception")
throw ArithmeticException()
}
}
job.join()
}
}
你会发现 exception handler 对于异常的处理发生在两个子协程都终止之后。
SupervisorJob
为了保证结构化并发的稳定,协程异常传播的这种行为无法被覆盖。这在某些情况下是有合适的,但是在另一些情况可能就不那么合适了。比如说在一个 UI 相关的 CoroutineScope 里,一旦子协程抛出异常,该 CoroutineScope 整个都会被取消,无法启动新的协程,UI 就失去了响应。
这种情况下,可以使用 Job
的另一种实现:SupervisorJob
。它跟普通的 Job
基本相同,区别在于,子协程的异常不会影响到父协程,这样自然也就不会波及同一父协程的其它子协程。而同时,父协程又能够控制所有的子协程。或者换一种说法:异常引起的协程取消只会向下传播,而不会向上传播。下面这个例子可以说明这一点:
fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// launch the first child -- its exception is ignored for this example (don't do this in practice!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("The first child is failing")
throw AssertionError("The first child is cancelled")
}
// launch the second child
val secondChild = launch {
firstChild.join()
// Cancellation of the first child is not propagated to the second child
println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// But cancellation of the supervisor is propagated
println("The second child is cancelled because the supervisor was cancelled")
}
}
// wait until the first child fails & completes
firstChild.join()
println("Cancelling the supervisor")
supervisor.cancel()
secondChild.join()
}
}
在上面的例子中,要是把 supervisor.cancel()
这一行注释掉,可以观察到 firstChild 虽然失败了,但是 secondChild 仍然在运行,supervisor 本身也在运行。
supervisorScope()
函数与 coroutineScope()
函数相同,区别在于前者的 block 创建的是 SupervisorJob
, 后者的 block 创建的是 Job
.
SupervisorJob
和普通的 Job
在异常处理上还有一个很大的区别是,前者由于子协程不会把异常传播到父协程,所以每个子协程得指定自己的 exception handler,根协程的 exception handler 不会对子协程的异常进行任何处理。
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val handler2 = CoroutineExceptionHandler { _, e ->
println("CoroutineExceptionHandler 2 got $e")
}
fun main() = runBlocking {
GlobalScope.launch(handler) {
supervisorScope {
val child = launch(handler2) {
println("The child throws an exception")
throw AssertionError()
}
println("The scope is completing")
}
}.join()
println("The scope is completed")
}
把上例的 supervisorScope()
替换成 coroutineScope()
可以观察到不同的输出结果。
需要注意的是,supervisor job 只保证自己不向上传播异常,但当它的子协程是普通 job 时,某个子协程引发的异常还是会导致自身取消,从而继续取消它的所有子协程。
val scope = CoroutineScope(SupervisorJob())
val job = scope.launch {
launch {
// child job
}
}
上面的例子中,child job 就是一个普通的 job, 而不是 supervisor job. 如果你希望它是一个 supervisor job,就把它用 supervisorScope(){}
包起来,或者通过 scope.launch{}
来创建,或者传递一个 SupervisorJob()
到 launch()
函数里。
Android 中的协程编程最佳实践
注入 Dispatcher
在新建协程或者调用 withContext()
时不要硬编码 Dispatcher
// 注入 Dispatchers
class NewsRepository(
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
suspend fun loadNews() = withContext(defaultDispatcher) { /* ... */ }
}
// 不要硬编码 Dispatchers
class NewsRepository {
// 像上一个示例那样注入 Dispatcher, 不要直接使用 Dispatchers.Default
suspend fun loadNews() = withContext(Dispatchers.Default) { /* ... */ }
}
使用依赖注入模式能让代码变得更容易测试。这样就可以在单元测试和 instrumentation 测试中使用 TestCoroutineDispatcher
。
viewModelScope
属性内部的 dispatcher 被硬编码成了 Dispatchers.Main
,测试时可以使用 Dispatchers.setMain()
函数将其替换成 TestCoroutineDispatcher
。详见这里。
挂起函数应该能够在主线程当中安全地调用
挂起函数应当是“主线程安全”( main-safety ) 的。如果一个类在协程里进行长时间的阻塞操作,应该使用 withContext()
函数把操作从主线程当中移除。这个原则适用于 APP 当中所有的类。
class NewsRepository(private val ioDispatcher: CoroutineDispatcher) {
// 这个操作是从服务器上获取新闻
// 它使用了一个阻塞的 HttpURLConnection
// 所以需要把操作移入一个 IO dispatcher,使之成为“主线程安全”的
suspend fun fetchLatestNews(): List<Article> {
withContext(ioDispatcher) { /* ... implementation ... */ }
}
}
// 抓取最新的新闻以及相关联的作者
class GetLatestNewsWithAuthorsUseCase(
private val newsRepository: NewsRepository,
private val authorsRepository: AuthorsRepository
) {
// 因为 newsRepository 是“主线程安全”的
// 所以这个函数不需要将操作的协程移入另一个线程
// 该协程只是创建了一个 list,并往里添加元素
suspend operator fun invoke(): List<ArticleWithAuthor> {
val news = newsRepository.fetchLatestNews()
val response: List<ArticleWithAuthor> = mutableEmptyList()
for (article in news) {
val author = authorsRepository.getAuthor(article.author)
response.add(ArticleWithAuthor(article, author))
}
return Result.Success(response)
}
}
该模式能让你的 APP 更具可扩展性,因为在调用挂起函数时不再需要考虑该为哪种工作场景使用哪个 dispatcher, 这是属于函数提供者该考虑的事情。
ViewModel 应当创建协程(而不是暴露挂起函数)
// DO create coroutines in the ViewModel
class LatestNewsViewModel(
private val getLatestNewsWithAuthors: GetLatestNewsWithAuthorsUseCase
) : ViewModel() {
private val _uiState = MutableStateFlow<LatestNewsUiState>(LatestNewsUiState.Loading)
val uiState: StateFlow<LatestNewsUiState> = _uiState
fun loadNews() {
viewModelScope.launch {
val latestNewsWithAuthors = getLatestNewsWithAuthors()
_uiState.value = LatestNewsUiState.Success(latestNewsWithAuthors)
}
}
}
// Prefer observable state rather than suspend functions from the ViewModel
class LatestNewsViewModel(
private val getLatestNewsWithAuthors: GetLatestNewsWithAuthorsUseCase
) : ViewModel() {
// DO NOT do this. News would probably need to be refreshed as well.
// Instead of exposing a single value with a suspend function, news should
// be exposed using a stream of data as in the code snippet above.
suspend fun loadNews() = getLatestNewsWithAuthors()
}