Dispatcher拆解
这是公司里的协程系列分享课程,不过这里只有Dispatcher部分,因为这部分是我做的。

首先让我们从一个小例子开始,界面只有一个输入框和一个图片,用户输入后把用户的输入转换成二维码显示给他。

显示二维码demo

// in XXFragment

override fun onViewCreated() {
    super.onViewCreated()
    var job: Job? = null
    binding.input.doAfterTextChanged {
        job?.cancel()
        job = lifecycleScope.launchWhenResumed {
            delay(1200L)
            generateQRCode()
        }
    }
}

private suspend fun generateQRCode() = withContext(Dispatchers.Main.immediate) {
    val url = binding.input.text // 我在主线程
    val image = withContext(Dispatchers.Default) { // 穿上马甲我在后台
        runCatching { 
            QRCode(value.trim()).render().nativeImage() as Bitmap
        }.getOrNull()
    }
    binding.imageView.setImageBitmap(image) // 脱了马甲我又回主线程
}
  1. Dispatcher到底是什么,它是如何完成线程切换的?
  2. 我们常用的Dispatcher有哪些,它们有哪些区别?
  3. Dispatchers.Default是什么*,*大家常用Dispatchers.IO切换后台线程,他们有什么区别,我们应该用哪个?
  4. Dispatchers.Main和Dispatchers.Main.immediate有什么区别?
  5. launchWhenResumed起什么作用,它是如何工作的?

Dispatcher定义与线程切换实现

Dispatcher意为调度器,是Kotlin协程中的重要概念,它决定协程与线程如何进行管理,如何调度执行。我们知道协程以有限状态机管理,所谓挂起函数的挂起,其实意味着停在某个状态机节点上,而不阻塞线程。

在讲Dispatcher工作过程,或者说协程启动过程前,我们首先回顾此前Kotlin Coroutines系列分享中Suspend专题的内容。原有挂起函数中代码编译后是一个继承自SuspendLambda的状态机实现,通过多次调用invokeSuspend方法实现执行。

SuspendLambda就是编译后的协程代码,它继承自Continuation。后面分析中我们需要用到Continuation这个概念。

源代码

fun main(args: Array<String>) = runBlocking {
    val a = 1
    val b = 2
    val result1 = compute1(a, b)
    val result2 = compute2(a, b)
    println(result1)
    println(result2)
}

suspend fun compute1(a: Int, b: Int) = withContext(Dispatchers.Default) {
    return@withContext a + b
}

suspend fun compute2(a: Int, b: Int) = withContext(Dispatchers.Default) {
    return@withContext a - b
}

编译结果(反编译整理后的伪代码,省略了无关内容)

final class MainKt$main$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {

    public final java.lang.Object invokeSuspend(@org.jetbrains.annotations.NotNull java.lang.Object r7) {
    switch(this.label) {
    case 0:
    // 计算compute1的代码
    return
    case 1:
    // 计算compute2的代码
    return
    case 2:
    // 输出println
    return
    default:
        throw new java.lang.IllegalStateException()
    }
}

下面这张图就是协程启动的整体流程。

协程启动过程

launch { // 1. launch方法开始
    ...
}

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    public fun intercepted(): Continuation<Any?> =
        intercepted // 2. 获取Dispatcher,并封装为DispatchedContinuation
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
}

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) { // 3.判断是否需要dispatch 一般返回true
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this) // 4. 走dispatch进行分发
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result) // 5. 直接执行
                }
            }
        }
    }
}

resume时CoroutineContext保存了协程上下文,其中包括当前dispatcher,所以可以保证挂起恢复后仍然是同一个线程(池)。

常见Dispatcher种类

我们上面用Dispatchers.Main和Dispatchers.Default来切换线程,这里我们一起看一下我们熟悉和不熟悉的Dispatcher有哪些

Dispatcher种类

Dispatchers.Main

Dispatchers.Main在主线程执行任务,即安卓的MainThread,Swing中的EventDispatchThread,JavaFX中的Application Thread等等。它是通过SPI获取实现的()。

Dispatchers.Main类图

SPI实现因IO操作在安卓有性能问题,导致早期协程库因性能问题被人诟病。后来协程库版本升级,官方通过引入FastServiceLoader解决了这个问题

在安卓中Dispatchers.Main由HandlerContext实现,它的dispatch方法就是用Handler直接post

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null

    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }

    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    
    ...
}

Dispatchers.Main vs Dispatchers.Main.immediate

Dispatchers.Main vs Dispatchers.Main.immediate

Dispatchers.Main.immediate就是对Dispatchers.Main的一个小优化,当当前已经在主线程的情况下,它就不再post到下一次循环中了,而是原地执行。androidx系列的库都已经使用上了,比如我们常用的lifecycleScope的Context就是*SupervisorJob*() + Dispatchers.Main.immediate

注意我们前文所述,isDispatchNeeded为false时代码会原地执行,那么我们看HandlerContext的实现

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
    return !invokeImmediately || Looper.myLooper() != handler.looper
}

如果当前在同一个线程,则不走dispatch(post),而是直接原地执行。

如果你是个初学者,无脑用Dispatchers.Main就没有问题。但如果你理解了immediate做了什么,那么用Dispatchers.Main.immediate也是个不错的选择。

Dispatchers.Default vs Dispatchers.IO

我开头的例子中,在进行二维码的计算的时候,使用Dispatchers.Default切换到了后台线程,而大家常用的都是Dispatchers.IO,他们有什么联系和区别呢?

Dispatcher IO 类图

(上图没有表现的是,DefaultIoScheduler实际上与DefaultScheduler共享线程池,但这是一项优化,我们分析时可以暂时忽略)

Dispatchers.Default(DefaultScheduler)和Dispatchers.IO(DefaultIoScheduler)最终都继承自kotlinx.coroutines.ExecutorCoroutineDispatcher,它持有一个线程池对象用来执行任务,也就是说,二者都是线程池实现,不同也就局限于线程池的参数不同。他们的参数都是通过读JVM环境变量得到的,在我的手机上测试这些变量都没有被设置,即都走默认值。

项目 适用 线程数
Dispatchers.Default CPU密集型 max(CPU核心数, 2)
Dispatchers.IO IO密集型 max(64, CPU核心数)

可以看到,Dispatchers.Default创建CPU核心数个线程,而Dispatchers.IO创建64个线程执行任务,远多于Dispatchers.Default的线程数。

这是因为Dispatchers.Default适用于计算任务,创建太多线程也不会加快计算速度,反而增加上下文切换成本。而Dispatchers.IO执行阻塞式任务,同时执行多个任务对CPU并没有太大负担。

所以我们日常使用中,当处理计算任务,如计算高斯模糊,计算图片裁剪时,应使用Dispatchers.Default,而执行阻塞任务,如网络请求文件读写,应使用Dispatchers.IO。

Dispatchers.UnConfined

confined意为受限的,Dispatchers.UnConfined为非受限的调度器

行为比较特殊,它会在当前线程启动任务,而挂起返回时直接在对应线程执行任务。该特性的实现是通过isDispatchNeeded固定返回false实现的。

internal object Unconfined : CoroutineDispatcher() {
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

    ...
}

记得我们前面所说,isDispatchNeeded返回false意味着什么?意味着它会立即原地继续执行。Dispatchers.UnConfined不判断线程直接返回false,即它会在挂起返回线程执行。

首先看受限的Dispatcher执行情况

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch {
        println("Main: I'm working in thread ${Thread.currentThread().name}")
        withContext(Dispatchers.IO) {
            println("IO output")
        }
        println("Main: After delay in thread ${Thread.currentThread().name}")
    }
}
Main: I'm working in thread main @coroutine#2
IO output
Main: After delay in thread main @coroutine#2

而对于Dispatchers.Unconfined

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) {
        println("Unconfined: I'm working in thread ${Thread.currentThread().name}")
        withContext(Dispatchers.IO) {
            println("IO output")
        }
        println("Unconfined: After delay in thread ${Thread.currentThread().name}")
    }
}
Unconfined: I'm working in thread main @coroutine#2
IO output
Unconfined: After delay in thread DefaultDispatcher-worker-1 @coroutine#2

launchWhenResumed——PausingDispatcher

回顾开头的例子,我们还用了launchWhenResumed来启动协程,它可以保证生命周期到达resume之后才执行,且pause后暂停,resume后再恢复,使用方便,能避免很多生命周期导致的问题。

override fun onViewCreated() {
    super.onViewCreated()
    var job: Job? = null
    binding.input.doAfterTextChanged {
        job?.cancel()
        job = lifecycleScope.launchWhenResumed {
            delay(1200L)
            generateQRCode()
        }
    }
}

其关键的实现就在于PausingDispatcher。PausingDispatcher中有一个队列来执行任务,任务先入队,检查当前状态,若满足才执行。

PausingDispatcher

internal class PausingDispatcher : CoroutineDispatcher() {
    internal val dispatchQueue = DispatchQueue()
    
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        dispatchQueue.dispatchAndEnqueue(context, block) // 入队执行
    }
    
    ...
}


@AnyThread
@SuppressLint("WrongThread") // false negative, we are checking the thread
fun dispatchAndEnqueue(context: CoroutineContext, runnable: Runnable) {
    // 实际调用enqueue
}

@MainThread
private fun enqueue(runnable: Runnable) {
    check(queue.offer(runnable)) { // 入队
        "cannot enqueue any more runnables"
    }
    drainQueue() // 尝试执行
}

internal class DispatchQueue {
    fun drainQueue() {
        if (isDraining) {
            return
        }
        try {
            isDraining = true
            while (queue.isNotEmpty()) {
                if (!canRun()) {
                    break
                }
                queue.poll()?.run()
            }
        } finally {
            isDraining = false
        }
    }
    @MainThread
    fun canRun() = finished || !paused
}

至此,我们知道暂停其实就是只入队不执行,这样在生命周期暂停时,监听代码不会继续执行。但监听注册并没有取消,他们只是被放到了一个队列中,待resume时执行。

拓展阅读

自定义一个Dispatcher

听完前文的分享,我们已经初步揭开了Dispatcher的神秘面纱,那接下来我们就尝试自己实现一个Dispatcher吧!

基于HandlerThread的Dispatcher

private val computeThread = HandlerThread("compute")

val Dispatchers.SingleCompute = HandlerContext(computeThread.looper.asHandler())

基于线程池的Dispatcher

官方有个拓展方法,可以快速获取基于线程池的Dispatcher。

public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
    ExecutorCoroutineDispatcherImpl(this)

即我们使用时

private val executor = Executors.newFixedThreadPool(15)

val Dispatchers.MyCompute = executor.asCoroutineDispatcher()

限制同时执行任务数

虽然我们已经学会了如何自己借助线程池实现一个Dispatcher,但定义过多的线程池并不加快执行,有害无益。如果只是想限制同时执行的任务数,官方新增下面的方法(实验API)。

val Dispatchers.MyCompute = Dispatchers.Default.limitedParallelism(3)

新的Dispatcher仍然用Dispatchers.Default分发任务,但会限制同时执行的任务数。其实Dispatchers.IO就是这样实现的。


最后修改于 2023-04-23