Dispatchers.Default/Dispatchers.IO到底创建几个线程
                    
                    
            
                        你知道吗?Dispatchers.Default和Dispatchers.IO是共用线程池的。那么他们就能混用了吗?
                    
                    
                    
                首先让我们看Dispatchers.IO,毫无疑问它创建64个worker
// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
    private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        default.dispatch(context, block)
    }
}
继续跟进,发现Dispatchers.IO竟然将任务直接提交给Dispatchers.Default
// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {
    ...
    
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
    }
}
接下来是Default,CORE_POOL_SIZE就是CPU核心数,
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    ...
}
继续跟进,到创建Worker的地方
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
    val created = createdWorkers(state) // 已创建worker数
    val blocking = blockingTasks(state) // 已提交的阻塞任务数
    val cpuWorkers = (created - blocking).coerceAtLeast(0)
    /*
     * We check how many threads are there to handle non-blocking work,
     * and create one more if we have not enough of them.
     */
    if (cpuWorkers < corePoolSize) { // worker数减去阻塞任务数小于核心数
        val newCpuWorkers = createNewWorker() // 创建Worker
        // If we've created the first cpu worker and corePoolSize > 1 then create
        // one more (second) cpu worker, so that stealing between them is operational
        if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
        if (newCpuWorkers > 0) return true
    }
    return false
}
所以结论是
| 项目 | 适用 | 线程数 | 
|---|---|---|
| Dispatchers.Default | CPU密集型 | max(CPU核心数, 2) | 
| Dispatchers.IO | IO密集型 | max(64, CPU核心数) | 
二者实现实际是共享一个线程池,但用了两个任务队列,当一个队列空时,可根据工作窃取算法去执行另一个队列的任务。但这部分的实现还是比较复杂的,我暂时没有阅读整个流程。
最后修改于 2023-04-15