首先让我们从一个小例子开始,界面只有一个输入框和一个图片,用户输入后把用户的输入转换成二维码显示给他。
// in XXFragment
override fun onViewCreated() {
super.onViewCreated()
var job: Job? = null
binding.input.doAfterTextChanged {
job?.cancel()
job = lifecycleScope.launchWhenResumed {
delay(1200L)
generateQRCode()
}
}
}
private suspend fun generateQRCode() = withContext(Dispatchers.Main.immediate) {
val url = binding.input.text // 我在主线程
val image = withContext(Dispatchers.Default) { // 穿上马甲我在后台
runCatching {
QRCode(value.trim()).render().nativeImage() as Bitmap
}.getOrNull()
}
binding.imageView.setImageBitmap(image) // 脱了马甲我又回主线程
}
- Dispatcher到底是什么,它是如何完成线程切换的?
- 我们常用的Dispatcher有哪些,它们有哪些区别?
- Dispatchers.Default是什么*,*大家常用Dispatchers.IO切换后台线程,他们有什么区别,我们应该用哪个?
- Dispatchers.Main和Dispatchers.Main.immediate有什么区别?
- launchWhenResumed起什么作用,它是如何工作的?
Dispatcher定义与线程切换实现
Dispatcher意为调度器,是Kotlin协程中的重要概念,它决定协程与线程如何进行管理,如何调度执行。我们知道协程以有限状态机管理,所谓挂起函数的挂起,其实意味着停在某个状态机节点上,而不阻塞线程。
在讲Dispatcher工作过程,或者说协程启动过程前,我们首先回顾此前Kotlin Coroutines系列分享中Suspend专题的内容。原有挂起函数中代码编译后是一个继承自SuspendLambda的状态机实现,通过多次调用invokeSuspend方法实现执行。
SuspendLambda就是编译后的协程代码,它继承自Continuation。后面分析中我们需要用到Continuation这个概念。
源代码
fun main(args: Array<String>) = runBlocking {
val a = 1
val b = 2
val result1 = compute1(a, b)
val result2 = compute2(a, b)
println(result1)
println(result2)
}
suspend fun compute1(a: Int, b: Int) = withContext(Dispatchers.Default) {
return@withContext a + b
}
suspend fun compute2(a: Int, b: Int) = withContext(Dispatchers.Default) {
return@withContext a - b
}
编译结果(反编译整理后的伪代码,省略了无关内容)
final class MainKt$main$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
public final java.lang.Object invokeSuspend(@org.jetbrains.annotations.NotNull java.lang.Object r7) {
switch(this.label) {
case 0:
// 计算compute1的代码
return
case 1:
// 计算compute2的代码
return
case 2:
// 输出println
return
default:
throw new java.lang.IllegalStateException()
}
}
下面这张图就是协程启动的整体流程。
launch { // 1. launch方法开始
...
}
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
public fun intercepted(): Continuation<Any?> =
intercepted // 2. 获取Dispatcher,并封装为DispatchedContinuation
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) { // 3.判断是否需要dispatch 一般返回true
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this) // 4. 走dispatch进行分发
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result) // 5. 直接执行
}
}
}
}
}
resume时CoroutineContext保存了协程上下文,其中包括当前dispatcher,所以可以保证挂起恢复后仍然是同一个线程(池)。
常见Dispatcher种类
我们上面用Dispatchers.Main和Dispatchers.Default来切换线程,这里我们一起看一下我们熟悉和不熟悉的Dispatcher有哪些
Dispatchers.Main
Dispatchers.Main在主线程执行任务,即安卓的MainThread,Swing中的EventDispatchThread,JavaFX中的Application Thread等等。它是通过SPI获取实现的()。
SPI实现因IO操作在安卓有性能问题,导致早期协程库因性能问题被人诟病。后来协程库版本升级,官方通过引入FastServiceLoader解决了这个问题。
在安卓中Dispatchers.Main由HandlerContext实现,它的dispatch方法就是用Handler直接post
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
Dispatchers.Main vs Dispatchers.Main.immediate
Dispatchers.Main vs Dispatchers.Main.immediate
Dispatchers.Main.immediate
就是对Dispatchers.Main的一个小优化,当当前已经在主线程的情况下,它就不再post到下一次循环中了,而是原地执行。androidx系列的库都已经使用上了,比如我们常用的lifecycleScope的Context就是*SupervisorJob
*() + Dispatchers.Main.immediate
。
注意我们前文所述,isDispatchNeeded为false时代码会原地执行,那么我们看HandlerContext的实现
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
如果当前在同一个线程,则不走dispatch(post),而是直接原地执行。
如果你是个初学者,无脑用Dispatchers.Main就没有问题。但如果你理解了immediate做了什么,那么用Dispatchers.Main.immediate也是个不错的选择。
Dispatchers.Default vs Dispatchers.IO
我开头的例子中,在进行二维码的计算的时候,使用Dispatchers.Default切换到了后台线程,而大家常用的都是Dispatchers.IO,他们有什么联系和区别呢?
(上图没有表现的是,DefaultIoScheduler实际上与DefaultScheduler共享线程池,但这是一项优化,我们分析时可以暂时忽略)
Dispatchers.Default(DefaultScheduler)和Dispatchers.IO(DefaultIoScheduler)最终都继承自kotlinx.coroutines.ExecutorCoroutineDispatcher
,它持有一个线程池对象用来执行任务,也就是说,二者都是线程池实现,不同也就局限于线程池的参数不同。他们的参数都是通过读JVM环境变量得到的,在我的手机上测试这些变量都没有被设置,即都走默认值。
项目 | 适用 | 线程数 |
---|---|---|
Dispatchers.Default | CPU密集型 | max(CPU核心数, 2) |
Dispatchers.IO | IO密集型 | max(64, CPU核心数) |
可以看到,Dispatchers.Default创建CPU核心数个线程,而Dispatchers.IO创建64个线程执行任务,远多于Dispatchers.Default的线程数。
这是因为Dispatchers.Default适用于计算任务,创建太多线程也不会加快计算速度,反而增加上下文切换成本。而Dispatchers.IO执行阻塞式任务,同时执行多个任务对CPU并没有太大负担。
所以我们日常使用中,当处理计算任务,如计算高斯模糊,计算图片裁剪时,应使用Dispatchers.Default,而执行阻塞任务,如网络请求文件读写,应使用Dispatchers.IO。
Dispatchers.UnConfined
confined意为受限的,Dispatchers.UnConfined为非受限的调度器
行为比较特殊,它会在当前线程启动任务,而挂起返回时直接在对应线程执行任务。该特性的实现是通过isDispatchNeeded固定返回false实现的。
internal object Unconfined : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
...
}
记得我们前面所说,isDispatchNeeded返回false意味着什么?意味着它会立即原地继续执行。Dispatchers.UnConfined不判断线程直接返回false,即它会在挂起返回线程执行。
首先看受限的Dispatcher执行情况
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
launch {
println("Main: I'm working in thread ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {
println("IO output")
}
println("Main: After delay in thread ${Thread.currentThread().name}")
}
}
Main: I'm working in thread main @coroutine#2
IO output
Main: After delay in thread main @coroutine#2
而对于Dispatchers.Unconfined
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
launch(Dispatchers.Unconfined) {
println("Unconfined: I'm working in thread ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {
println("IO output")
}
println("Unconfined: After delay in thread ${Thread.currentThread().name}")
}
}
Unconfined: I'm working in thread main @coroutine#2
IO output
Unconfined: After delay in thread DefaultDispatcher-worker-1 @coroutine#2
launchWhenResumed——PausingDispatcher
回顾开头的例子,我们还用了launchWhenResumed来启动协程,它可以保证生命周期到达resume之后才执行,且pause后暂停,resume后再恢复,使用方便,能避免很多生命周期导致的问题。
override fun onViewCreated() {
super.onViewCreated()
var job: Job? = null
binding.input.doAfterTextChanged {
job?.cancel()
job = lifecycleScope.launchWhenResumed {
delay(1200L)
generateQRCode()
}
}
}
其关键的实现就在于PausingDispatcher。PausingDispatcher中有一个队列来执行任务,任务先入队,检查当前状态,若满足才执行。
internal class PausingDispatcher : CoroutineDispatcher() {
internal val dispatchQueue = DispatchQueue()
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatchQueue.dispatchAndEnqueue(context, block) // 入队执行
}
...
}
@AnyThread
@SuppressLint("WrongThread") // false negative, we are checking the thread
fun dispatchAndEnqueue(context: CoroutineContext, runnable: Runnable) {
// 实际调用enqueue
}
@MainThread
private fun enqueue(runnable: Runnable) {
check(queue.offer(runnable)) { // 入队
"cannot enqueue any more runnables"
}
drainQueue() // 尝试执行
}
internal class DispatchQueue {
fun drainQueue() {
if (isDraining) {
return
}
try {
isDraining = true
while (queue.isNotEmpty()) {
if (!canRun()) {
break
}
queue.poll()?.run()
}
} finally {
isDraining = false
}
}
@MainThread
fun canRun() = finished || !paused
}
至此,我们知道暂停其实就是只入队不执行,这样在生命周期暂停时,监听代码不会继续执行。但监听注册并没有取消,他们只是被放到了一个队列中,待resume时执行。
拓展阅读
自定义一个Dispatcher
听完前文的分享,我们已经初步揭开了Dispatcher的神秘面纱,那接下来我们就尝试自己实现一个Dispatcher吧!
基于HandlerThread的Dispatcher
private val computeThread = HandlerThread("compute")
val Dispatchers.SingleCompute = HandlerContext(computeThread.looper.asHandler())
基于线程池的Dispatcher
官方有个拓展方法,可以快速获取基于线程池的Dispatcher。
public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
ExecutorCoroutineDispatcherImpl(this)
即我们使用时
private val executor = Executors.newFixedThreadPool(15)
val Dispatchers.MyCompute = executor.asCoroutineDispatcher()
限制同时执行任务数
虽然我们已经学会了如何自己借助线程池实现一个Dispatcher,但定义过多的线程池并不加快执行,有害无益。如果只是想限制同时执行的任务数,官方新增下面的方法(实验API)。
val Dispatchers.MyCompute = Dispatchers.Default.limitedParallelism(3)
新的Dispatcher仍然用Dispatchers.Default分发任务,但会限制同时执行的任务数。其实Dispatchers.IO就是这样实现的。
最后修改于 2023-04-23