Dispatchers.Default/Dispatchers.IO到底创建几个线程
你知道吗?Dispatchers.Default和Dispatchers.IO是共用线程池的。那么他们就能混用了吗?

首先让我们看Dispatchers.IO,毫无疑问它创建64个worker

// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {

    private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        default.dispatch(context, block)
    }
}

继续跟进,发现Dispatchers.IO竟然将任务直接提交给Dispatchers.Default

// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {
    ...
    
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
    }
}

接下来是Default,CORE_POOL_SIZE就是CPU核心数,

internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    ...
}

继续跟进,到创建Worker的地方

private fun tryCreateWorker(state: Long = controlState.value): Boolean {
    val created = createdWorkers(state) // 已创建worker数
    val blocking = blockingTasks(state) // 已提交的阻塞任务数
    val cpuWorkers = (created - blocking).coerceAtLeast(0)
    /*
     * We check how many threads are there to handle non-blocking work,
     * and create one more if we have not enough of them.
     */
    if (cpuWorkers < corePoolSize) { // worker数减去阻塞任务数小于核心数
        val newCpuWorkers = createNewWorker() // 创建Worker
        // If we've created the first cpu worker and corePoolSize > 1 then create
        // one more (second) cpu worker, so that stealing between them is operational
        if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
        if (newCpuWorkers > 0) return true
    }
    return false
}

所以结论是

项目 适用 线程数
Dispatchers.Default CPU密集型 max(CPU核心数, 2)
Dispatchers.IO IO密集型 max(64, CPU核心数)

二者实现实际是共享一个线程池,但用了两个任务队列,当一个队列空时,可根据工作窃取算法去执行另一个队列的任务。但这部分的实现还是比较复杂的,我暂时没有阅读整个流程。


最后修改于 2023-04-15