Android背景教程。 第5部分:科特林的协程


科特林岛

本系列中的先前文章: 关于AsyncTask关于 Loader关于 Executors和EventBus关于RxJava

因此,这个小时到了。 这是撰写整个系列文章的文章:对新方法如何“在幕后”进行解释。 如果您还不知道如何使用它,请参考以下有用的链接:


掌握了协程后,您可能想知道是什么让Kotlin提供了这个机会以及它是如何工作的。 请注意,这里我们仅关注编译阶段:您可以撰写有关执行的单独文章。

我们需要了解的第一件事是,在语料库中,实际上不存在协程。 编译器将带有suspend修饰符的函数转换为具有Continuation参数的函数。 此接口有两种方法:

abstract fun resume(value: T) abstract fun resumeWithException(exception: Throwable) 

类型T是原始挂起函数的返回类型。 这是实际发生的情况:此函数在某个线程中执行(耐心,我们也可以做到这一点),并将结果传递到该延续的恢复函数,在该上下文中调用了suspend函数。 如果该函数未收到结果并引发异常,则将引发resumeWithException,从而向调用代码引发错误。

好的,但是延续从何而来? 当然,来自Corutin的构建器! 让我们看一下创建任何协程的代码,例如,启动:

 public actual fun launch( context: CoroutineContext = DefaultDispatcher, start: CoroutineStart = CoroutineStart.DEFAULT, parent: Job? = null, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context, parent) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine } 

在这里,builder创建一个协程-AbstractCoroutine类的实例,该类又实现了Continuation接口。 start方法属于Job接口。 但是,找到启动方法的定义非常困难。 但是我们可以从另一端来到这里。 细心的读者已经注意到启动函数的第一个参数是CoroutineContext,默认情况下将其设置为DefaultDispatcher。 调度程序是控制协程执行的类,因此它们对于理解正在发生的事情绝对重要。 让我们看一下DefaultDispatcher声明:

 public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool 

因此,实际上,这是CommonPool,尽管java扩展坞告诉我们这可以更改。 什么是CommonPool?

这是一个协程管理器,使用ForkJoinPool作为ExecutorService的实现。 是的,它是:最后,您的所有lambda协程都只是可运行的,并且通过一堆棘手的转换被困在Executor中。 但是魔鬼一如既往地存在于细节中。


叉子? 还是加入?

从我的Twitter上的调查结果来看,这里我需要简要解释一下FJP是什么:)


首先,ForkJoinPool是为Java 8并行流创建的现代执行程序。最初的任务是在使用Stream API时实现高效的并行性,这实际上意味着将流拆分为处理部分数据,然后在处理完所有数据后将其合并。 为了简化,假设您有以下代码:

 IntStream .range(1, 1_000_000) .parallel() .sum() 

这样的流的数量将不会在一个流中计算,相反,ForkJoinPool将把范围递归地分成几部分(首先分为两部分,分别为500,000,然后分别为250,000,依此类推),计算每一部分的总和,并将结果合并为一个数量。 这是此过程的可视化:


线程被拆分为不同的任务,并在完成后再次合并

FJP的有效性基于“作业盗窃”算法:当特定线程用尽任务时,它将进入其他池线程的队列并窃取其任务。 为了更好地理解,您可以查看Alexei Shipilev的报告或观看演示

好吧,我们意识到协程在做什么! 但是,它们如何最终到达那里?

这在CommonPool#派发方法内部发生:

 _pool.execute(timeSource.trackTask(block)) 

从DispatchedContinuation中的resume(Value:T)方法中调用调度方法。 听起来很熟悉! 我们记得Continuation是AbstractCoroutine中实现的接口。 但是它们有什么关系?

诀窍在CoroutineDispatcher类中。 它实现ContinuationInterceptor接口,如下所示:

 public actual override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) 

看吗 您为构建者corutin提供了一个简单的块。 您不需要实现任何您一无所知的接口。 协程库负责所有这些工作。 她是
拦截执行,将连续替换为DispatchedContinuation,并将其发送给executor,以确保最高效地执行代码。

现在,我们唯一需要处理的就是如何从start方法中调用dispatch。 让我们填补这个空白。 从块的扩展功能中的startCoroutine调用resume方法:

 public fun <R, T> (suspend R.() -> T).startCoroutine( receiver: R, completion: Continuation<T> ) { createCoroutineUnchecked(receiver, completion).resume(Unit) } 

然后,在CoroutineStart枚举中由“()”运算符调用startCoroutine。 您的构建器将其作为第二个参数,默认值为CoroutineStart.DEFAULT。 仅此而已!

这就是为什么我喜欢corutin方法的原因:它不仅是一种出色的语法,而且还是一种出色的实现。

对于那些读到最后的人,他们将获得独家代理:来自Mobius会议我的报告“不需要小提琴手:我们拒绝RxJava支持Kotlin的协程”的视频。 享受:)

Source: https://habr.com/ru/post/zh-CN415335/


All Articles