我如何在项目中用协程替换RxJava,以及为什么您可能也应该这样做

哈Ha! 我向您呈现Paulo Sato的一篇文章的翻译,该文章涉及在其Android项目中使用Kotlin Coroutines代替RxJava。

RxJava作为火箭筒,大多数应用程序甚至不使用其一半的火力。 本文将讨论如何用Kotlin协程(协程)替换它。

我已经使用RxJava几年了。 对于任何Android项目,这绝对是最好的库之一,今天仍然感到震惊,特别是如果您使用Java编程。 如果您使用Kotlin,那么我们可以说这个城市有一个新的警长。

大多数人只使用RxJava来控制线程并防止回调地狱(如果您不知道这是什么,请认为自己很幸运, 这就是原因 )。 事实是,我们必须牢记RxJava的真正功能是反应式编程和背压。 如果使用它控制异步请求,则使用火箭筒杀死蜘蛛。 她会做好自己的工作,但这太过分了。

RxJava的一个显着缺点是方法数量众多。 它非常庞大,并且倾向于在整个代码中传播。 在Kotlin中,您可以使用协程来实现以前使用RxJava创建的大多数行为。

但是……什么是协程?

Corutin是一种在线程中处理竞争任务的方法。 线程将一直工作到停止为止,并且上下文将针对每个协程而更改,而无需创建新线程。
Kotlin中的协程仍处于实验阶段,但它们包含在Kotlin 1.3中,因此我在下面使用它们编写了一个新的UseCase类(用于干净的体系结构)。 在此示例中,协程调用被封装在单个文件中。 因此,其他层将不依赖于正在执行的协程,从而提供了更分离的体系结构。

/** * (C) Copyright 2018 Paulo Vitor Sato Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package com.psato.devcamp.interactor.usecase import android.util.Log import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.android.UI import kotlin.coroutines.experimental.CoroutineContext /** * Abstract class for a Use Case (Interactor in terms of Clean Architecture). * This interface represents a execution unit for different use cases (this means any use case * in the application should implement this contract). * <p> * By convention each UseCase implementation will return the result using a coroutine * that will execute its job in a background thread and will post the result in the UI thread. */ abstract class UseCase<T> { protected var parentJob: Job = Job() //var backgroundContext: CoroutineContext = IO var backgroundContext: CoroutineContext = CommonPool var foregroundContext: CoroutineContext = UI protected abstract suspend fun executeOnBackground(): T fun execute(onComplete: (T) -> Unit, onError: (Throwable) -> Unit) { parentJob.cancel() parentJob = Job() launch(foregroundContext, parent = parentJob) { try { val result = withContext(backgroundContext) { executeOnBackground() } onComplete.invoke(result) } catch (e: CancellationException) { Log.d("UseCase", "canceled by user") } catch (e: Exception) { onError(e) } } } protected suspend fun <X> background(context: CoroutineContext = backgroundContext, block: suspend () -> X): Deferred<X> { return async(context, parent = parentJob) { block.invoke() } } fun unsubscribe() { parentJob.cancel() } } 

首先,我创建了一个父任务。 这是撤消UseCase类中创建的所有协程的关键。 当我们调用执行时,取消旧任务很重要,以确保我们没有错过一个协程(如果我们取消订阅此UseCase,也会发生这种情况)。

另外,我调用启动(UI)。 这意味着我要创建将在UI线程中执行的协程。 之后,我调用在CommonPool中创建异步的背景方法(这种方法实际上性能很差)。 反过来,异步将返回Deffered,然后,我将调用其wait方法。 他等待背景协程的完成,这将带来结果或错误。

这可以用来实现我们使用RxJava所做的大多数事情。 以下是一些示例。

地图


我下载了searchShow结果,并将其更改为返回第一个节目的名称。
RxJava代码:
 public class SearchShows extends UseCase { private ShowRepository showRepository; private ResourceRepository resourceRepository; private String query; @Inject public SearchShows(ShowRepository showRepository, ResourceRepository resourceRepository) { this.showRepository = showRepository; this.resourceRepository = resourceRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<String> buildUseCaseObservable() { return showRepository.searchShow(query).map(showInfos -> { if (showInfos != null && !showInfos.isEmpty() && showInfos.get(0).getShow() != null) { return showInfos.get(0).getShow().getTitle(); } else { return resourceRepository.getNotFoundShow(); } }); } } 

协程代码:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<String>() { var query: String? = null override suspend fun executeOnBackground(): String { query?.let { val showsInfo = showRepository.searchShow(it) val showName: String? = showsInfo?.getOrNull(0)?.show?.title return showName ?: resourceRepository.notFoundShow } return "" } } 

邮编


Zip将从Observer中获取两个排放,并将它们放在一起成为一个新排放。 请注意,对于RxJava,您必须指定在每个Single中使用subscriptionOn并行进行调用。 我们希望同时获得它们并一起归还。

RxJava代码:

 public class ShowDetail extends UseCase { private ShowRepository showRepository; private String id; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setId(String id) { this.id = id; } @Override protected Single<Show> buildUseCaseObservable() { Single<ShowDetail> singleDetail = showRepository.showDetail(id).subscribeOn(Schedulers.io()); Single<ShowBanner> singleBanner = showRepository.showBanner(id).subscribeOn(Schedulers.io()); return Single.zip(singleDetail, singleBanner, (detail, banner) -> new Show(detail,banner)); } 

协程代码:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<Show>() { var id: String? = null override suspend fun executeOnBackground(): Show { id?.let { val showDetail = background{ showRepository.showDetail(it) } val showBanner = background{ showRepository.showBanner(it) } return Show(showDetail.await(), showBanner.await()) } return Show() } } 

平面图


在这种情况下,我正在寻找具有查询字符串的节目,并且对于每个结果(限制为200个结果),我还会获得该节目的评分。 最后,我返回了具有相应评分的节目列表。

RxJava代码:

 public class SearchShows extends UseCase { private ShowRepository showRepository; private String query; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<List<ShowResponse>> buildUseCaseObservable() { return showRepository.searchShow(query).flatMapPublisher( (Function<List<ShowInfo>, Flowable<ShowInfo>>) Flowable::fromIterable) .flatMapSingle((Function<ShowInfo, SingleSource<ShowResponse>>) showInfo -> showRepository.showRating(showInfo.getShow().getIds().getTrakt()) .map(rating -> new ShowResponse(showInfo.getShow().getTitle(), rating .getRating())).subscribeOn(Schedulers.io()), false, 4).toList(); } } 

协程代码:

 class SearchShows @Inject constructor(private val showRepository: ShowRepository) : UseCase<List<ShowResponse>>() { var query: String? = null override suspend fun executeOnBackground(): List<ShowResponse> { query?.let { query -> return showRepository.searchShow(query).map { background { val rating: Rating = showRepository.showRating(it.show!!.ids!!.trakt!!) ShowResponse(it.show.title!!, rating.rating) } }.map { it.await() } } return arrayListOf() } } 

让我解释一下。 使用RxJava,我的存储库仅返回一个列表发射,因此我需要多个发射,每个ShowInfo发射一个。 为此,我调用了flatMapPublisher。 对于每个问题,我必须突出显示ShowResponse,最后将所有这些收集到一个列表中。

我们最终得到以下构造:List foreach→(ShowInfo→ShowRating→ShowResponse)→List。

使用协程,我为每个List元素创建了一个映射,以将其转换为List <Deffered>。

如您所见,通过同步调用,我们使用RxJava所做的大多数操作都更易于实现。 协程甚至可以处理flatMap,我相信它是RxJava中最复杂的功能之一。

众所周知,协程可以是轻量级的( 这里是一个示例),但是结果使我感到困惑。 在此示例中,RxJava在大约3.1秒内启动,而协同程序在CommonPool上运行大约需要5.8秒。

这些结果向我提出了一个问题,那就是其中可能存在不当之处。 后来,我找到了。 我使用了改进型Call,这阻止了流程。

有两种方法可以修复此错误,具体取决于您使用的Android Studio版本。 在Android Studio 3.1中,我们需要确保不阻塞后台线程。 为此,我使用了这个库:
实施'ru.gildor.coroutines:kotlin-coroutines-retrofit:0.12.0'

这段代码创建了改造Call函数的扩展以暂停流:

 public suspend fun <T : Any> Call<T>.await(): T { return suspendCancellableCoroutine { continuation -> enqueue(object : Callback<T> { override fun onResponse(call: Call<T>?, response: Response<T?>) { if (response.isSuccessful) { val body = response.body() if (body == null) { continuation.resumeWithException( NullPointerException("Response body is null: $response") ) } else { continuation.resume(body) } } else { continuation.resumeWithException(HttpException(response)) } } override fun onFailure(call: Call<T>, t: Throwable) { // Don't bother with resuming the continuation if it is already cancelled. if (continuation.isCancelled) return continuation.resumeWithException(t) } }) registerOnCompletion(continuation) } } 

在Android Studio 3.2中,您可以将corutin库更新为版本0.25.0。 此版本具有CoroutineContext IO(您可以在UseCase类中看到相应的注释)。

在CommonPool上运行而没有阻塞的调用分别花费了2.3秒和IO和阻塞调用的2.4秒。

图片

我希望本文能激发您使用Corutin(它是RxJava的一种更轻巧,更快的替代方法)的功能,并使您更容易理解正在编写异步运行的同步代码。

Source: https://habr.com/ru/post/zh-CN421739/


All Articles