协程是异步代码执行的强大工具。 它们并行工作,彼此通信并且消耗很少的资源。 似乎不用担心,协程可以引入生产中。 但是有恐惧,他们在干涉。
弗拉基米尔·伊万诺夫 (
Vladimir Ivanov )在
AppsConf上的
报告所讲述的事实是,魔鬼并不那么可怕,您今天可以使用协程:
演讲者简介 :Vladimir Ivanov(
dzigoro )是
EPAM的领先Android开发人员,具有7年的经验,喜欢Solution Architecture,React Native和iOS开发,还拥有
Google Cloud Architect证书。
您阅读的所有内容都是经验和各种研究成果的产物,因此按原样进行,没有任何保证。
协程,Kotlin和RxJava
有关信息:corutin的当前状态在发行版中(左侧Beta)。
Kotlin 1.3已发布,协程已宣布稳定,世界上有和平。

我最近在Twitter上进行了一项调查,发现使用协程的人们:
- 食品中含有13%的协程。 一切都很好;
- 25%的人在宠物项目中尝试过它们;
- 24%-什么是Kotlin?
- 38%的RxJava随处可见。
统计数据不令人满意。 我认为
RxJava对于开发人员通常使用的任务而言
过于复杂 。 协程更适合于控制异步操作。
在我以前的报告中,我谈到了如何从RxJava重构到Kotlin中的协程,因此,我不会在此进行详细介绍,而只是回顾要点。
我们为什么要使用协程?
因为如果我们使用RxJava,那么通常的实现示例如下所示:
interface ApiClientRx { fun login(auth: Authorization) : Single<GithubUser> fun getRepositories (reposUrl: String, auth: Authorization) : Single<List<GithubRepository>> }
我们有一个接口,例如,我们编写一个GitHub客户端,并希望对其执行一些操作:
- 登录用户。
- 获取GitHub存储库列表。
在这两种情况下,函数都将返回单个业务对象:GitHubUser或GitHubRepository列表。
该接口的实现代码如下:
private fun attemptLoginRx () { showProgress(true) compositeDisposable.add(apiClient.login(auth) .flatMap { user -> apiClient.getRepositories(user.repos_url, auth) } .map { list -> list.map { it.full_name } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally { showProgress(false) } .subscribe( { list -> showRepositories(this, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } )) }
-我们采用
CompositeDisposable,以便没有内存泄漏。
-将调用添加到第一个方法。
-我们使用方便的运算符来获取用户,例如
flatMap 。
-我们获得了其存储库列表。
-我们编写
样板 ,使其在正确的线程上
运行 。
-一切准备就绪后,我们将显示已登录用户的存储库列表。
RxJava代码难点:- 复杂性 我认为,对于两个网络调用并在UI上显示某些内容这样的简单任务,代码太复杂了。
- 未绑定的堆栈跟踪。 堆栈跟踪几乎与您编写的代码无关。
- 资源超支 。 RxJava会生成大量对象,并且性能可能会下降。
协程到0.26版之前的相同代码是什么?API更改为0.26,我们正在谈论生产。 尚无人在产品中应用0.26,但我们正在努力。
使用协程,我们的界面将发生很大变化 。 函数将停止返回任何Singles和其他帮助对象。 他们将立即返回业务对象:GitHubUser和GitHubRepository列表。 GitHubUser和GitHubRepository函数将具有
暂停修饰符。 这很好,因为暂停几乎不会使我们有任何义务:
interface ApiClient { suspend fun login(auth: Authorization) : GithubUser suspend fun getRepositories (reposUrl: String, auth: Authorization) : List<GithubRepository> }
如果您已经看过使用此接口的实现的代码,则与RxJava相比,它将发生重大变化:
private fun attemptLogin () { launch(UI) { val auth = BasicAuthorization(login, pass) try { showProgress(true) val userlnfo = async { apiClient.login(auth) }.await() val repoUrl = userlnfo.repos_url val list = async { apiClient.getRepositories(repoUrl, auth) }.await() showRepositories( this, list.map { it -> it.full_name } ) } catch (e: RuntimeException) { showToast("Oops!") } finally { showProgress(false) } } }
-主要动作发生在我们称为
协 程生成 器异步 ,等待响应并获取
userlnfo的地方 。
-我们使用来自该对象的数据。
-拨打另一个
异步电话,然后呼叫
await 。
一切看起来好像没有异步工作在发生,我们只需在该列中编写命令并执行它们即可。 最后,我们要做在UI上需要做的事情。
为什么协程更好?- 此代码更易于阅读。 它写得好像是一致的。
- 此代码的性能很可能比RxJava更好。
- 编写测试非常简单,但是稍后我们将介绍它们。
侧面2步
让我们离开题外话,还有几件事需要讨论。
第1步。withContext与启动/异步
除了
协程构建器异步之外,还有
协程构建器withContext 。
启动或
异步创建新的
Coroutine上下文 ,这并非总是必要的。 如果您有要在整个应用程序中使用的Coroutine上下文,则无需重新创建它。 您可以简单地重用现有的。 为此,您需要一个带有上下文的协程生成器。 它只是简单地重用现有的协程上下文。 它将快2-3倍,但是现在这是一个没有原则的问题。 如果确切的数字很有趣,那么
这里是有关带有基准和详细信息的
stackoverflow 问题 。
一般规则:毫无疑问在语义上适合使用withContext。 但是,如果您需要并行加载,例如几张图片或几段数据,则可以选择异步/等待。
步骤2.重构
如果您重构一个非常复杂的RxJava链怎么办? 我在生产中遇到了这个问题:
observable1.getSubject().zipWith(observable2.getSubject(), (t1, t2) -> {
我遇到了一个带有
公共主题的复杂链,每个
拉链中都有
拉链和
副作用 ,从而将其他内容发送到事件总线。 任务至少是摆脱事件总线。 我坐了一天,但无法重构代码来解决问题。
正确的决定原来是扔掉所有东西,并在4小时内用协程重写代码 。
下面的代码与我得到的非常相似:
try { val firstChunkJob = async { call1 } val secondChunkJob = async { call2 } val thirdChunkJob = async { call3 } return Result( firstChunkJob.await(), secondChunkJob.await(), thirdChunkJob.await()) } catch (e: Exception) {
-我们对第二个任务和第三个任务执行异步操作。
-我们正在等待结果并将其全部放入一个对象中。
-做完!
如果您的链条复杂且有协程,则只需重构即可。 真的很快。
是什么阻止开发人员在产品中使用协程?
我认为,作为开发人员,我们目前仅由于担心新事物而无法使用协程:
- 我们不知道该如何处理生命周期 , 活动和碎片生命周期。 在这些情况下如何使用协程?
- 没有使用Corutin解决生产中日常复杂任务的经验。
- 工具不足。 已经为RxJava编写了许多库和函数。 例如RxFCM 。 RxJava本身有很多运算符,这很好,但是协程呢?
- 我们并不真正了解如何测试协程。
如果我们摆脱了这四种恐惧,我们可以在晚上安然入睡,并在生产中使用协程。
让我们一点一点地讲。
1.生命周期管理
- 协程可以作为一次性或AsyncTask泄漏。 此问题必须手动解决。
- 为避免随机的Null指针异常,必须停止协程。
停下
您是否熟悉
Thread.stop() ? 如果您使用过它,那么时间不会太长。 在
JDK 1.1中,该方法立即被声明为过时的,因为不可能采取和停止某些代码,并且不能保证它将正确完成。 您很可能只会得到
内存损坏 。
因此,
Thread.stop()不起作用 。 您需要取消进行协作,也就是说,另一边的代码才能知道您正在取消它。
我们如何使用RxJava应用止损:
private val compositeDisposable = CompositeDisposable() fun requestSmth() { compositeDisposable.add( apiClientRx.requestSomething() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(result -> {}) } override fun onDestroy() { compositeDisposable.dispose() }
在RxJava中,我们
使用CompositeDisposable 。
-将变量
CompositeDisposable添加到片段或演示者(使用RxJava的演示者)中的活动中。
-在
onDestro中,添加
Dispose ,所有异常将自行消失。
协程的原理大致相同:
private val job: Job? = null fun requestSmth() { job = launch(UI) { val user = apiClient.requestSomething() … } } override fun onDestroy() { job?.cancel() }
考虑一个
简单任务的例子。
通常,
协程构建器会返回
工作 ,在某些情况下会
延迟执行 。
-我们可以记住这项工作。
-发出命令
“启动” 协程生成器 。 该过程开始,发生了一些事情,执行的结果被记住了。
-如果我们没有传递其他任何内容,则“启动”将启动该功能,并向我们返回该工作的链接。
-工作被记住,在onDestroy中,我们说
“取消” ,一切正常。
该方法有什么问题? 每个工作都需要一个字段。 您需要维护作业列表以将其全部取消。 该方法会导致代码重复,请不要这样做。
好消息是,我们有
其他选择 :
CompositeJob和
Lifecycle-aware job 。
CompositeJob是CompositeDisposable的类似物。 看起来像这样
: private val job: CompositeJob = CompositeJob() fun requestSmth() { job.add(launch(UI) { val user = apiClient.requestSomething() ... }) } override fun onDestroy() { job.cancel() }
-对于一个片段,我们开始一项工作。
-我们将所有
作业放到CompositeJob中,并输入命令:
“ job.cancel()for all!” 。
该方法很容易在4行中实现,无需计算类声明:
Class CompositeJob { private val map = hashMapOf<String, Job>() fun add(job: Job, key: String = job.hashCode().toString()) = map.put(key, job)?.cancel() fun cancel(key: String) = map[key]?.cancel() fun cancel() = map.forEach { _ ,u -> u.cancel() } }
您将需要:
-使用字符串键进行
映射 ,
-
添加方法,您将在其中添加工作,
-可选的
关键参数。
如果您想对同一工作使用相同的密钥-请。 如果没有,那么
hashCode将解决我们的问题。 将作业添加到我们通过的地图中,并使用相同的键取消上一个作业。 如果我们超额完成任务,那么先前的结果将使我们不感兴趣。 我们取消并再次驱动。
取消很简单:我们通过按键获取作业并取消。 整个地图的第二个取消会取消所有内容。 所有代码在半小时内分四行编写,并且可以正常工作。 如果您不想写,请参考上面的示例。
生命周期意识的工作
您是否使用过
Android Lifecycle ,
Lifecycle所有者或
观察者 ?

我们的
活动和
片段具有某些状态。 要点:
创建, 开始和
恢复 。 状态之间有不同的过渡。
LifecycleObserver允许您订阅这些转换,并在其中一个转换发生时执行某些操作。
看起来很简单:
public class MyObserver implements LifecycleObserver { @OnLifecycleEvent(Lifecycle.Event.ON_RESUME) public void connectListener() { ... } @OnLifecycleEvent(Lifecycle.Event.ON_PAUSE) public void disconnectListener() { … } }
您在方法上使用某些参数挂接注释,并通过相应的转换对其进行调用。 只需对协程使用此方法:
class AndroidJob(lifecycle: Lifecycle) : Job by Job(), LifecycleObserver { init { lifecycle.addObserver(this) } @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY) fun destroy() { Log.d("AndroidJob", "Cancelling a coroutine") cancel() } }
-您可以编写基类
AndroidJob 。
-我们会将
生命周期转移
到全班。
-LifecycleObserver接口将实现作业。
我们所需要的:
-在构造函数中,以Observer身份添加到Lifecycle。
-订阅
ON_DESTROY或我们感兴趣的其他内容。
-在ON_DESTROY中取消。
-在片段中
获取一个
parentJob 。
-调用构造函数
Joy作业或活动片段的
生命周期 。 没关系
-将此
parentJob作为
父级传递。
完成的代码如下所示:
private var parentJob = AndroidJob(lifecycle) fun do() { job = launch(UI, parent = parentJob) {
当您取消父级时,所有子协程都将被取消,并且您不再需要在片段中编写任何内容。 一切都会自动发生,不再需要ON_DESTROY。 最重要
的是不要忘记传递
parent = parentJob 。
如果使用的话,您可以编写一条简单的棉绒规则来突出显示您的名字:“哦,您忘记了父母!”
用
生命周期管理得到解决。 我们有几个工具,可让您轻松而舒适地完成所有操作。
生产中复杂的场景和非平凡的任务呢?
2.复杂的用例
复杂的场景和不平凡的任务是:
-
运算符-RxJava中的复杂运算符:flatMap,反跳等
-
错误处理-复杂的错误处理。 不只是
try..catch ,而且例如嵌套。
-
缓存 是一项艰巨的任务。 在生产中,我们遇到了一个缓存,希望获得一种工具来轻松解决协程的缓存问题。
重覆
当我们考虑协程的运算符时,第一个选择是
repeatWhen() 。
如果出现问题,而Corutin无法访问内部的服务器,则我们想通过某种指数回退重试几次。 也许是由于连接不良,我们将通过重复几次操作来获得理想的结果。
使用协程,可以轻松实现此任务:
suspend fun <T> retryDeferredWithDelay( deferred: () -> Deferred<T>, tries: Int = 3, timeDelay: Long = 1000L ): T { for (i in 1..tries) { try { return deferred().await() } catch (e: Exception) { if (i < tries) delay(timeDelay) else throw e } } throw UnsupportedOperationException() }
操作员实施:
-他带了
Deferred 。
-您将需要调用
异步方法来获取此对象。
-除了传递
Deferred之外,您还可以传递一个suspend块和任何一个
suspend函数。-for循环-您正在等待协程的结果。 如果发生某种情况并且重复计数器没有用尽,请通过
Delay再试一次。 如果没有,那就没有。
该函数可以轻松自定义:放置指数延迟或传递lambda函数,该函数将根据情况计算延迟。
使用它,就可以了!
拉链
我们也经常遇到他们。 同样,一切都很简单:
suspend fun <T1, T2, R> zip( source1: Deferred<T1>, source2: Deferred<T2>, zipper: BiFunction<T1, T2, R>): R { return zipper.apply(sourcel.await(), source2.await()) } suspend fun <T1, T2, R> Deferred<T1>.zipWith( other: Deferred<T2>, zipper: BiFunction<T1, T2, R>): R { return zip(this, other, zipper) }
-使用
拉链,并在您的Deferred上等待。
-可以使用withContext来使用suspend函数和协程生成器,而不是Deferred。 您将传达所需的上下文。
这再次起作用,我希望我消除了这种恐惧。
快取
使用RxJava在生产中是否有缓存实现? 我们使用RxCache。

在左侧的图中:
View和
ViewModel 。 右侧是数据源:网络调用和数据库。
如果我们要缓存某些内容,那么缓存将是另一个数据源。
缓存类型:
- 网络呼叫的网络源 。
- 内存缓存 。
- 具有到期时间的永久性高速缓存将存储在磁盘上,以便高速缓存在应用程序重新启动后得以幸免。
让我们为第三种情况编写一个简单的原始
缓存 。 协程生成器withContext再次救出。
launch(UI) { var data = withContext(dispatcher) { persistence.getData() } if (data == null) { data = withContext(dispatcher) { memory.getData() } if (data == null) { data = withContext(dispatcher) { network.getData() } memory.cache(url, data) persistence.cache(url, data) } } }
-使用withContext执行每个操作,然后查看是否有任何数据。
-如果
持久性数据不
存在 ,则您正在尝试从
memory.cache获取
数据 。
-如果也没有memory.cache,请与
网络来源联系并获取数据。 当然,不要忘记放入所有缓存。
这是一个非常原始的实现,有很多问题,但是如果您需要在一个地方缓存,则该方法有效。 对于生产任务,此缓存还不够。 需要更复杂的东西。
Rx具有RxCache
对于仍然使用RxJava的用户,可以使用RxCache。 我们仍然使用它。
RxCache是一个特殊的库。 允许您缓存数据并管理其生命周期。
例如,您想说这些数据将在15分钟后过期:“请在这段时间之后,不要从缓存中发送数据,而向我发送新数据。”
该库很棒,因为它以声明方式支持团队。 该声明与您对
Retrofit所做的非常相似:
public interface FeatureConfigCacheProvider { @ProviderKey("features") @LifeCache(duration = 15, timeUnit = TimeUnit.MINUTES) fun getFeatures( result: Observable<Features>, cacheName: DynamicKey ): Observable<Reply<Features>> }
-您说您有一个
CacheProvider 。
-启动一个方法,并说
LifeCache生存期
为 15分钟。 可用的密钥是
Features 。
-返回
Observable <Reply ,其中
Reply是用于处理缓存的辅助库对象。
使用非常简单:
val restObservable = configServiceRestApi.getFeatures() val features = featureConfigCacheProvider.getFeatures( restObservable, DynamicKey(CACHE_KEY) )
-从Rx缓存中,访问
RestApi 。
-
转到CacheProvider 。
-喂他一个可观察的东西。
-库本身将弄清楚该怎么做:是否去缓存,如果时间用完了,转到
Observable并执行另一项操作。
使用该库非常方便,我想为协程提供一个类似的库。
开发中的协程缓存
在EPAM内部,我们正在编写
Coroutine Cache库,该库将执行RxCache的所有功能。 我们编写了第一个版本,并在公司内部运行。 第一个版本发布后,我将很乐意将其发布在我的Twitter上。 它看起来像这样:
val restFunction = configServiceRestApi.getFeatures() val features = withCache(CACHE_KEY) { restFunction() }
我们将有一个暂停功能
getFeatures 。 我们将把该函数作为一个块传递给带有
Cache的特殊高阶函数,该函数将
找出需要完成的工作。
也许我们将使用相同的接口来支持声明性功能。
错误处理

开发人员经常发现简单的错误处理,并且通常很简单地解决。 如果您没有复杂的事物,那么在捕获中您会捕获
异常并查看发生的情况,写日志或向用户显示错误。 在用户界面上,您可以轻松地做到这一点。
在简单的情况下,一切都应该是简单的-使用协程的错误处理是通过
try-catch-finally完成的 。
在生产中,除了简单的案例外,还有:
-嵌套的
try-catch ,
-许多不同种类的
例外 ,
-网络或业务逻辑中的错误,
-用户错误。 他再次做错了事,应为一切负责。
我们必须为此做好准备。
有2个解决方案:
CoroutineExceptionHandler和带有
Result类的方法。
协程异常处理程序
这是处理复杂错误案例的特殊类。
ExceptionHandler允许您将
Exception作为错误的参数进行处理。
我们通常如何处理复杂的错误?
用户按下了某个按钮,该按钮不起作用。 他需要说出问题所在,然后将其定向到特定的操作:检查Internet,Wi-Fi,稍后再试或删除该应用程序,而不再使用它。 对用户说起来很简单:
val handler = CoroutineExceptionHandler(handler = { , error -> hideProgressDialog() val defaultErrorMsg = "Something went wrong" val errorMsg = when (error) { is ConnectionException -> userFriendlyErrorMessage(error, defaultErrorMsg) is HttpResponseException -> userFriendlyErrorMessage(Endpoint.EndpointType.ENDPOINT_SYNCPLICITY, error) is EncodingException -> "Failed to decode data, please try again" else -> defaultErrorMsg } Toast.makeText(context, errorMsg, Toast.LENGTH_SHORT).show() })
-让我们获取默认消息:“出了点问题!” 并分析异常。
-如果这是
ConnectionException,则我们从资源中获取本地化消息:“伙计,打开Wi-Fi,您的问题将消失。 我保证。”
-如果
服务器说错了 ,那么您需要告诉客户端:“注销并再次登录”,或者“不要在莫斯科这样做,请在另一个国家这样做”,或者“对不起,同志。 我所能做的就是说出问题了。”
-如果这是完全
不同的错误 ,例如
内存不足 ,我们说:“出了点问题,对不起。”
-显示所有消息。
您写入
CoroutineExceptionHandler的内容将在运行协程的同一
Dispatcher上执行。 因此,如果您提供“启动” UI命令,则UI上将发生所有事情。 您不需要单独的
调度,这非常方便。
使用很简单:
launch(uiDispatcher + handler) { ... }
有一个
加号运算符。 在Coroutine上下文中,添加一个
处理程序 ,一切正常,这非常方便。 我们使用了一段时间。
结果类
后来我们意识到可能会缺少CoroutineExceptionHandler。 由协程的工作形成的结果可以包含来自不同部分的多个数据或处理多种情况。
结果类方法有助于解决此问题:
sealed class Result { data class Success(val payload: String) : Result() data class Error(val exception: Exception) : Result() }
-在业务逻辑中,启动
Result类 。
-标记为已
密封 。
-您从该类继承了另外两个数据类:
Success和
Error 。
—
Success , .
—
Error exception.
- :
override suspend fun doTask(): Result = withContext(CommonPool) { if ( !isSessionValidForTask() ) { return@withContext Result.Error(Exception()) } … try { Result.Success(restApi.call()) } catch (e: Exception) { Result.Error(e) } }
Coroutine context — Coroutine builder withContex .
, :
— , error. .
— RestApi -.
— ,
Result.Success .
— ,
Result.Error .
- , ExceptionHandler .
Result classes , . Result classes, ExceptionHandler try-catch.
3.
, .
unit- , , . unit-.
, . , unit-, 2 :
- Replacing context . , ;
- Mocking coroutines . .
Replacing context
presenter:
val login() { launch(UI) { … } }
,
login , UI-. , ,
. , , unit-.
:
val login (val coroutineContext = UI) { launch(coroutineContext) { ... } }
— login coroutineContext. , . Kotlin , UI .
— Coroutine builder Coroutine Contex, .
unit- :
fun testLogin() { val presenter = LoginPresenter () presenter.login(Unconfined) }
—
LoginPresenter login - , , Unconfined.
—
Unconfined , , . .
Mocking coroutines
— .
Mockk unit-. unit- Kotlin, . suspend-
coEvery -.
login
githubUser :
coEvery { apiClient.login(any()) } returns githubUser
Mockito-kotlin , — . , , :
given { runBlocking { apiClient.login(any()) } }.willReturn (githubUser)
runBlocking .
given- , .
Presenter :
fun testLogin() { val githubUser = GithubUser('login') val presenter = LoginPresenter(mockApi) presenter.login (Unconfined) assertEquals(githubUser, presenter.user()) }
— -, ,
GitHubUser .
— LoginPresenter API, . .
—
presenter.login Unconfined , Presenter , .
仅此而已! .
- Rx- . . , RxJava RxJava. - — , .
- . , . Unit- — , , , . — welcome!
- . , , , , . .
有用的链接
最新消息
30 Mail.ru . , .
AppsConf , .
, , , .
youtube- AppsConf 2018 — :)