Dispatchers.Default/Dispatchers.IO到底创建几个线程
你知道吗?Dispatchers.Default和Dispatchers.IO是共用线程池的。那么他们就能混用了吗?
首先让我们看Dispatchers.IO,毫无疑问它创建64个worker
// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)
override fun dispatch(context: CoroutineContext, block: Runnable) {
default.dispatch(context, block)
}
}
继续跟进,发现Dispatchers.IO竟然将任务直接提交给Dispatchers.Default
// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {
...
override fun dispatch(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
}
}
接下来是Default,CORE_POOL_SIZE就是CPU核心数,
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
...
}
继续跟进,到创建Worker的地方
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
val created = createdWorkers(state) // 已创建worker数
val blocking = blockingTasks(state) // 已提交的阻塞任务数
val cpuWorkers = (created - blocking).coerceAtLeast(0)
/*
* We check how many threads are there to handle non-blocking work,
* and create one more if we have not enough of them.
*/
if (cpuWorkers < corePoolSize) { // worker数减去阻塞任务数小于核心数
val newCpuWorkers = createNewWorker() // 创建Worker
// If we've created the first cpu worker and corePoolSize > 1 then create
// one more (second) cpu worker, so that stealing between them is operational
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
if (newCpuWorkers > 0) return true
}
return false
}
所以结论是
项目 | 适用 | 线程数 |
---|---|---|
Dispatchers.Default | CPU密集型 | max(CPU核心数, 2) |
Dispatchers.IO | IO密集型 | max(64, CPU核心数) |
二者实现实际是共享一个线程池,但用了两个任务队列,当一个队列空时,可根据工作窃取算法去执行另一个队列的任务。但这部分的实现还是比较复杂的,我暂时没有阅读整个流程。
最后修改于 2023-04-15