源码分析-Kotlin中协程的挂起和恢复
本文字数:12447字
预计阅读时间:32分钟
源码分析-Kotlin中协程的挂起和恢复
前言
Kotlin中的协程经过几个版本的升级已经非常成熟了,但是协程的概念目前没有一个明确且被普遍接受的定义。究其根源无论我们怎么去理解协程的概念,它最核心的点就是函数或者一段程序能够被挂起,稍后再挂起的位置恢复。所以在任何场景下探讨协程都能够落脚到挂起和恢复。本文通过源码对协程创建->挂起->恢复流程进行分析解读。希望能够帮助大家对Kotlin协程的理解起到帮助。
协程的创建
Kotlin中协程是复合协程,是为了方便开发者使用而进一步封装的API,当我们在分析的时候无从下手就是因为经过封装的协程在经过编译后才能看到它的庐山真面目。下面我们通过构造一个简单的协程并反编译成java代码查看。
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
startCoroutine()
}
private fun startCoroutine() {
val coroutine: suspend CoroutineScope.() -> Unit = {
cumpute1()
cumpute2()
}
GlobalScope.launch(block = coroutine)
}
suspend fun cumpute1() {
print("cupmpute1")
}
suspend fun cumpute2() {
print("cupmpute1")
}
}
coroutine
属性是 suspend CoroutineScope.() -> Unit
函数类型对象,反编译成java代码
final class MainActivity$startCoroutine$coroutine$1 extends SuspendLambda implements Function2 {
//状态机初始值0
int label;
public final Object invokeSuspend(Object $result) {
Object coroutine_suspend = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
...
//将label置为1 开始调用挂起函数cumpute1
this.label = 1;
if (MainActivity.this.cumpute1(this) == coroutine_suspend) {
//如果函数被挂起返回挂起标识
return coroutine_suspend;
}
break;
...
}
//将label置为2 开始调用挂起函数cumpute2
this.label = 2;
if (MainActivity.this.cumpute2(this) == coroutine_suspend) {
return coroutine_suspend;
} else {
return Unit.INSTANCE;
}
}
public final Continuation create(Object value,Continuation completion) {
...
//创建并返回一个Continuation对象
MainActivity$startCoroutine$coroutine$1 coroutine = new
MainActivity$startCoroutine$coroutine$1(completion);
return coroutine;
}
public final Object invoke(Object var1, Object var2) {
return ((MainActivity$startCoroutine$coroutine$1)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}
通过反编译代码可以看到声明的 coroutine
转换成了继承 SuspendLambda
的类,可以称之为协程体类。内部实现了两个方法
-
invokeSuspend()
内部是通过label状态来控制调用流程. -
create()
方法接收一个Continuation
对象,然后创建并返回协程体类对象。
public final Object cumpute1(Continuation completion) {
...
return Unit.INSTANCE;
}
suspend
修饰的函数经过反编译后额外接收了一个 Continuation
类型参数,这也就是为什么普通函数内不能调用 suspend
修饰的函数的原因。附上 SuspendLambda
的类图
从类图上可以看出该类的承链 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
-
Continuation
是一个接口定义了resumeWith(result:Result)
方法 和CoroutineContext
上下文属性 -
BaseContinuationImpl
是一个抽象类实现了resumeWith( result : Result )
方法 ,并声明抽象方法invokeSuspend()
和create()
方法。 -
ContinuationImpl
继承BaseContinuationImpl
构造方法接收Continuation
类型参数,内部实现intercepted()
方法将原始协程体类对象拦截包装,添加调度器实现并返回新的协程体类对象DispatchedContinuation
。
协程的挂起
接上面创建的协程代码,以 launch()
函数作为入口开始分析协程是如何挂起的。launch()
函数是 CoroutineScope
的一个扩展实现
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//合并CoroutineContext上下文
val newContext = newCoroutineContext(context)
//根据isLazy生成不同的Continuation对象
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//开始执行协程体
coroutine.start(start, coroutine, block)
return coroutine
}
launch() 函数接收三个参数
1.CoroutineContext
是协程上下文,内部通过链表实现存储,可以保存协程执行过程中的调度器,异常处理器等信息,可以理解为通过 key-value 的形式存储值的Map。
2.CoroutineStart
是一个枚举类有四种类型分别是:
-
DEFAULT 立即进入调度状态 -
LAZY 延迟启动,只有需要(start/join/await)时才开始调度 -
ATOMIC 和DEFAULT类似,且在第一个挂起点前不能被取消 -
UNDISPATCHED 立即在当前线程执行协程体,直到遇到第一个挂起点(后面取决于调度器)
3.block 协程体类对象newCoroutineContext(context)
内部做了上下文合并操作,如果上下文中没有设置线程调度器则会设置一个默认的线程调度器,isLazy 默认情况下是 false,执行创建 StandalonCoroutine
对象,StandalonCoroutine
继承 AbstractCoroutine
类,AbstractCoroutine
类主要负责协程的恢复和结果的返回,相关类图:
通过类图可以看到该类也是实现了 Continuation
接口,resumeWith()
方法内提供了协程恢复的功能(稍后分析),继续看下 start()
方法,源码如下:
//coroutine.start(start, coroutine, block)
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
//触发CotoutineStart invoke()方法
start(block, receiver, this)
}
CoroutineStart
类 invoke()
方法
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
when (this) {
//默认情况下走DEFAULT
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
CoroutineStart.ATOMIC -> block.startCoroutine(completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
CoroutineStart.LAZY -> Unit
}
这里会根据 launch()
方法中设置的枚举类型来进行分类调用,默认是 CoroutineStart.DEFAULT
。startCoroutineCancellable( completion )
接收一个 Continuation
对象,这个 Continuation
对象就是 AbstractCoroutine
派生类的实例。该方法属于链式调用,可以一步一步来分析,首先是 createCoroutineUnintercepted( completion )
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
...
return if (this is BaseContinuationImpl)
//协程体类 create() 方法构造实例
create(receiver, probeCompletion)
else
...
}
这一步主要是创建一个协程体类的实例,也就是我们协程体代码编译后生成的继承 SuspendLambda
类的实例。内部通过判断是否是BaseContinuationImpl
类型,因为 SuspendLambda
类是继承 BaseContinuationImpl
的,所以条件成立,直接调用 create()
进行实例创建,这个地方就拿到了协程体类的实例。接下来看链式调用的下一步 intercepted()
方法 。
intercepted()
//调用了ContinuationImpl的intercepted方法
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
//如果不是ContinuationImpl类别 返回自身
(this as? ContinuationImpl)?.intercepted() ?: this
public fun intercepted(): Continuation<Any?> =
//如果未经过拦截器调用则使用上下文中设置的拦截器进行拦截
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
intercepted
是拦截之 后的协程体类对象,如果为空就通过上下文中指定的拦截器对原协程体类进行拦截并返回包装好的协程体类对象,上下文中存储的拦截器是在launch()
调用的时候就设置好的,如果我们不进行指定的话,内部会设置一个默认的拦截器 Disptchers.Default
。具体实现在 launch()
调用中的第一步 newCoroutineContext(context)
方法中可以看到:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
//CoroutineContext内部重载了操作符 +
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
//拦截器为空的时候 将Dispatchers.Default加入到CoroutineContext中
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
接下来将会通过 Dispathchers.Default
来探究拦截器的内部实现。我们先从 Default
的定义入手:
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
通过定义可以看出 Default
是 CoroutineDispatcher
类别,实例是通过 createDefaultDispatcher()
方法进行创建,看下 createDefaultDispatcher()
内部如何创建的?
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
这里出现了一个 userCoroutinesScheduler
的标志位,它属于系统变量,默认是开启状态,也就是说 createDefaultDispatcher()
将会返回 DefaultScheduler
。这里为了后续方便理解列一下 DefaultScheduler
的继承链 :DefaultScheduler -> ExperimentalCoroutineDispatcher -> ExecutorCoroutineDispatcher -> CoroutineDispatcher -> ContinuationInterceptor
拿到拦截器DefaultScheduler
后会继续调用 interceptContinuation()
它具体实现在CoroutineDispatcher
类中:
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
流程到这里我们可以确定协程体对象在经过拦截器拦截后返回了 DispatchedContinuation
对象。DispatchedContinuation
对象构造的时候接收了两个参数 第一个是 调度器 DefaultScheduler
第二个是原始协程体类对象,现在到了链式调用的最后一步resumeCancellableWith()
是 Continuation
的一个扩展方法:
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
//判断是否是DispatchedContinuation类别
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
通过 this
判断类别来调用不同的处理方法,如果 this
不是拦截包装后的协程体类对象,则会调用 resumeWith( result )
,如果 this 是经过拦截包装后的DispatchedContinuation
类别对象,那么这里就会调用 resumeCancellableWith( result ,onCancellation )
方法,重点关注下这个方法内部做了什么?
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//判断是否需要调度器分发处理
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//分发处理
dispatcher.dispatch(context, this)
} else {
...
}
}
resumeCancellableWith()
方法中通过 dispatcher.isDispatchNeeded( context )
判断内容是否需要分发处理,这个方法在大多数情况下返回 true,如果返回 false 会立即在当前线程中恢复,可能会形成一个事件循环,造成堆栈溢出。还有就是如果当需要减少不必要的分发提高性能的时候可以通过重写该方法例如:MainCoroutineDispatcher 。dispatch()
方法在 DefaultScheduler
的父类 ExperimentalCoroutineDispatcher
中实现,内部是直接交由 CoroutineScheduler
去执行,CoroutineScheduler
实现 Executor
接口,内部封装线程池。为了方便理解看一下相关类图:
我们先看下 DispatchedContinuation
这个类,它实现了Runnable()
接口,可以直接通过线程池 execute()
方法进行执行。调度器的详细实现原理后续单独进行分析,本篇还是以分析挂起和恢复流程为主。既然是实现了Runnable()
接口,那我们就需要关注在 run()
方法里面做了什么?run()
方法的实现在 DispatchedContinuation
父类 DispatchedTask
中,源码如下
public final override fun run()
//检查状态
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
//取出原始协程体类
val continuation = delegate.continuation
//上下文
val context = continuation.context
//获取状态
val state = takeState()
withCoroutineContext(context, delegate.countOrElement) {
...
//协程体代码执行
continuation.resume(getSuccessfulResult(state))
}
}
}
}
continuation.resume()
内部调用了 resumeWith()
方法 , 具体的实现是在 BaseContinuationImpl
类中,看下实现源码
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
//循环
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
//调用invokeSuspend()方法去执行协程体内代码
val outcome = invokeSuspend(param)
//如果返回挂起标识COROUTINE_SUSPENDED就直接return
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
//递归调用执行
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
内部有一个无限循环,假设我们协程体内没有挂起函数,那么将会循环执行 invokeSuspend()
方法直到结束,方法内部通过状态机依次执行。那么当遇到挂起函数的时候,也就是方法返回 COROUTINE_SUSPENDED
挂起标识,将直接 return 退出循环,同时协程体代码也会退出,因为退出的是协程体,并不会造成线程阻塞。那后面未执行的代码怎么办呢?因为之前状态机在方法执行前将 label 置为某一个状态,当挂起函数恢复执行的时候,会继续向下执行剩余代码。
接下来分析一下协程是如何恢复的。
协程的恢复
协程的恢复分析我们以 withContext
操作为例,将例子中的 cumpute1
方法内部调用 withContext
,我们先看下 withContext
方法。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
//新的context和旧的context合并
val oldContext = uCont.context
val newContext = oldContext + context
//检测是否已经完成
newContext.checkCompletion()
...
//当两者指定的调度器不一致时
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
不难看出 block
是 withContext
需要挂起的协程体,我们再看 suspendCoroutineUninterceptedOrReturn
这个方法接收一个 lambda
表达式参数是 Continuation
对象,这个 uCont
就是用来恢复调用的,但是它从哪里来的呢?withConntext
只接收两个参数,我们从反编译后的代码来一探究竟。
public final Object cumpute1(Continuation $completion) {
Object var1 = BuildersKt.withContext(
(CoroutineContext)EmptyCoroutineContext.INSTANCE,
(Function2)(new Function2((Continuation)null) { ... }),
$completion);
return var1 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var1 : Unit.INSTANCE;
}
我们发现 withContext
方法反编译后接收了三个参数,最后一个参数 completion
就是传入的原协程体对象,suspendCoroutineUninterceptedOrReturn
看不到具体的实现,如果我们想恢复原协程体对象后续操作的话,推测这里 uCont
就是传入的原协程体类对象。
继续分析 withContext
,第一步显示新的 context 和旧的 context 合并,当两者使用的调度器不一致时,DispatchedCoroutine
是 AbstractCoroutine
的子类,当协程体内操作执行完成后会调用 AbstractCoroutine
的 resumeWith()
方法,内部调用afterResume()
方法具体实现交由 afterCompletion()
抽象方法进行处理,DispatchedCoroutine
内部是实现了 afterCompletion()
抽象方法,我们来看下内部源码是如何处理的
override fun afterCompletion(state: Any?) {
afterResume(state)
}
override fun afterResume(state: Any?) {
if (tryResume()) return
//外部协程的恢复调用
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
可以很清楚的看到这里是将原协程体继续执行调用。这里需要在注意一个点,之前我们分析协程体类内部执行的时候
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
当调用的函数返回 COROUTINE_SUSPENDED
的时候直接挂起,在 withContext
中 coroutine.getResult()
会调用 trySuspend()
方法利用 CAS 进行状态变更,如果变更成功就返回true,然后 getResult
函数返回 COROUTINE_SUSPENDED
,协程体就会挂起。
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}
以上就是对协程的启动流程以及挂起和恢复过程的源码分析。
总结
经过上面的流程分析,最后对Kotlin中协程的挂起和恢复的流程做个总结:
-
协程体被编译成了继承 SuspendLambda
的类,具体继承链 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation -
lanuch()
方法内部生成了AbstractCoroutine
子类对象,用来处理结果和恢复协程 -
扩展方法 startCoroutineCancellable(completion: Continuation<T>)
被调用,这个completion
就是处理结果和恢复协程的AbstractCoroutine
子类对象 -
链式调用 createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
-
createCoroutineUnintercepted(completion)
负责生成协程体类对象 -
intercepted()
将原协程体进行包装增加调度器实现返回DispatchedContinuation
对象 -
resumeCancellableWith()
任务执行 ,协程体类的resumeWith()
被调用,循环体中invokeSuspend()
连续被调用,遇到返回COROUTINE_SUSPENDED
标识的函数就退出协程体执行循环,尚未执行完成的协程体会传入挂起函数中,等待挂起函数执行完毕后,通过原协程体类对象继续执行。
引用
1.《深入理解Kotlin协程》- 霍丙乾
2.https://www.kotlincn.net/docs/reference/coroutines-overview.html
3.https://blog.csdn.net/zou8944/article/details/106447727
也许你还想看
(▼点击文章标题或封面查看)