哈Ha! 我向您呈现Paulo Sato的
一篇文章的翻译,该
文章涉及在其Android项目中使用Kotlin Coroutines代替RxJava。
RxJava作为火箭筒,大多数应用程序甚至不使用其一半的火力。 本文将讨论如何用Kotlin协程(协程)替换它。
我已经使用RxJava几年了。 对于任何Android项目,这绝对是最好的库之一,今天仍然感到震惊,特别是如果您使用Java编程。 如果您使用Kotlin,那么我们可以说这个城市有一个新的警长。
大多数人只使用RxJava来控制线程并防止回调地狱(如果您不知道这是什么,请认为自己很幸运,
这就是原因 )。 事实是,我们必须牢记RxJava的真正功能是反应式编程和背压。 如果使用它控制异步请求,则使用火箭筒杀死蜘蛛。 她会做好自己的工作,但这太过分了。
RxJava的一个显着缺点是方法数量众多。 它非常庞大,并且倾向于在整个代码中传播。 在Kotlin中,您可以使用协程来实现以前使用RxJava创建的大多数行为。
但是……什么是协程?
Corutin是一种在线程中处理竞争任务的方法。 线程将一直工作到停止为止,并且上下文将针对每个协程而更改,而无需创建新线程。
Kotlin中的协程仍处于实验阶段,但它们包含在Kotlin 1.3中,因此我在下面使用它们编写了一个新的UseCase类(用于干净的体系结构)。 在此示例中,协程调用被封装在单个文件中。 因此,其他层将不依赖于正在执行的协程,从而提供了更分离的体系结构。
package com.psato.devcamp.interactor.usecase import android.util.Log import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.android.UI import kotlin.coroutines.experimental.CoroutineContext abstract class UseCase<T> { protected var parentJob: Job = Job() //var backgroundContext: CoroutineContext = IO var backgroundContext: CoroutineContext = CommonPool var foregroundContext: CoroutineContext = UI protected abstract suspend fun executeOnBackground(): T fun execute(onComplete: (T) -> Unit, onError: (Throwable) -> Unit) { parentJob.cancel() parentJob = Job() launch(foregroundContext, parent = parentJob) { try { val result = withContext(backgroundContext) { executeOnBackground() } onComplete.invoke(result) } catch (e: CancellationException) { Log.d("UseCase", "canceled by user") } catch (e: Exception) { onError(e) } } } protected suspend fun <X> background(context: CoroutineContext = backgroundContext, block: suspend () -> X): Deferred<X> { return async(context, parent = parentJob) { block.invoke() } } fun unsubscribe() { parentJob.cancel() } }
首先,我创建了一个父任务。 这是撤消UseCase类中创建的所有协程的关键。 当我们调用执行时,取消旧任务很重要,以确保我们没有错过一个协程(如果我们取消订阅此UseCase,也会发生这种情况)。
另外,我调用启动(UI)。 这意味着我要创建将在UI线程中执行的协程。 之后,我调用在CommonPool中创建异步的背景方法(这种方法实际上性能很差)。 反过来,异步将返回Deffered,然后,我将调用其wait方法。 他等待背景协程的完成,这将带来结果或错误。
这可以用来实现我们使用RxJava所做的大多数事情。 以下是一些示例。
地图
我下载了searchShow结果,并将其更改为返回第一个节目的名称。
RxJava代码:
public class SearchShows extends UseCase { private ShowRepository showRepository; private ResourceRepository resourceRepository; private String query; @Inject public SearchShows(ShowRepository showRepository, ResourceRepository resourceRepository) { this.showRepository = showRepository; this.resourceRepository = resourceRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<String> buildUseCaseObservable() { return showRepository.searchShow(query).map(showInfos -> { if (showInfos != null && !showInfos.isEmpty() && showInfos.get(0).getShow() != null) { return showInfos.get(0).getShow().getTitle(); } else { return resourceRepository.getNotFoundShow(); } }); } }
协程代码:
class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<String>() { var query: String? = null override suspend fun executeOnBackground(): String { query?.let { val showsInfo = showRepository.searchShow(it) val showName: String? = showsInfo?.getOrNull(0)?.show?.title return showName ?: resourceRepository.notFoundShow } return "" } }
邮编
Zip将从Observer中获取两个排放,并将它们放在一起成为一个新排放。 请注意,对于RxJava,您必须指定在每个Single中使用subscriptionOn并行进行调用。 我们希望同时获得它们并一起归还。
RxJava代码:
public class ShowDetail extends UseCase { private ShowRepository showRepository; private String id; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setId(String id) { this.id = id; } @Override protected Single<Show> buildUseCaseObservable() { Single<ShowDetail> singleDetail = showRepository.showDetail(id).subscribeOn(Schedulers.io()); Single<ShowBanner> singleBanner = showRepository.showBanner(id).subscribeOn(Schedulers.io()); return Single.zip(singleDetail, singleBanner, (detail, banner) -> new Show(detail,banner)); }
协程代码:
class SearchShows @Inject constructor(private val showRepository: ShowRepository, private val resourceRepository: ResourceRepository) : UseCase<Show>() { var id: String? = null override suspend fun executeOnBackground(): Show { id?.let { val showDetail = background{ showRepository.showDetail(it) } val showBanner = background{ showRepository.showBanner(it) } return Show(showDetail.await(), showBanner.await()) } return Show() } }
平面图
在这种情况下,我正在寻找具有查询字符串的节目,并且对于每个结果(限制为200个结果),我还会获得该节目的评分。 最后,我返回了具有相应评分的节目列表。
RxJava代码:
public class SearchShows extends UseCase { private ShowRepository showRepository; private String query; @Inject public SearchShows(ShowRepository showRepository) { this.showRepository = showRepository; } public void setQuery(String query) { this.query = query; } @Override protected Single<List<ShowResponse>> buildUseCaseObservable() { return showRepository.searchShow(query).flatMapPublisher( (Function<List<ShowInfo>, Flowable<ShowInfo>>) Flowable::fromIterable) .flatMapSingle((Function<ShowInfo, SingleSource<ShowResponse>>) showInfo -> showRepository.showRating(showInfo.getShow().getIds().getTrakt()) .map(rating -> new ShowResponse(showInfo.getShow().getTitle(), rating .getRating())).subscribeOn(Schedulers.io()), false, 4).toList(); } }
协程代码:
class SearchShows @Inject constructor(private val showRepository: ShowRepository) : UseCase<List<ShowResponse>>() { var query: String? = null override suspend fun executeOnBackground(): List<ShowResponse> { query?.let { query -> return showRepository.searchShow(query).map { background { val rating: Rating = showRepository.showRating(it.show!!.ids!!.trakt!!) ShowResponse(it.show.title!!, rating.rating) } }.map { it.await() } } return arrayListOf() } }
让我解释一下。 使用RxJava,我的存储库仅返回一个列表发射,因此我需要多个发射,每个ShowInfo发射一个。 为此,我调用了flatMapPublisher。 对于每个问题,我必须突出显示ShowResponse,最后将所有这些收集到一个列表中。
我们最终得到以下构造:List foreach→(ShowInfo→ShowRating→ShowResponse)→List。
使用协程,我为每个List元素创建了一个映射,以将其转换为List <Deffered>。
如您所见,通过同步调用,我们使用RxJava所做的大多数操作都更易于实现。 协程甚至可以处理flatMap,我相信它是RxJava中最复杂的功能之一。
众所周知,协程可以是轻量级的(
这里是一个示例),但是结果使我感到困惑。 在此示例中,RxJava在大约3.1秒内启动,而协同程序在CommonPool上运行大约需要5.8秒。
这些结果向我提出了一个问题,那就是其中可能存在不当之处。 后来,我找到了。 我使用了改进型Call,这阻止了流程。
有两种方法可以修复此错误,具体取决于您使用的Android Studio版本。 在Android Studio 3.1中,我们需要确保不阻塞后台线程。 为此,我使用了这个库:
实施'ru.gildor.coroutines:kotlin-coroutines-retrofit:0.12.0'
这段代码创建了改造Call函数的扩展以暂停流:
public suspend fun <T : Any> Call<T>.await(): T { return suspendCancellableCoroutine { continuation -> enqueue(object : Callback<T> { override fun onResponse(call: Call<T>?, response: Response<T?>) { if (response.isSuccessful) { val body = response.body() if (body == null) { continuation.resumeWithException( NullPointerException("Response body is null: $response") ) } else { continuation.resume(body) } } else { continuation.resumeWithException(HttpException(response)) } } override fun onFailure(call: Call<T>, t: Throwable) {
在Android Studio 3.2中,您可以将corutin库更新为版本0.25.0。 此版本具有CoroutineContext IO(您可以在UseCase类中看到相应的注释)。
在CommonPool上运行而没有阻塞的调用分别花费了2.3秒和IO和阻塞调用的2.4秒。

我希望本文能激发您使用Corutin(它是RxJava的一种更轻巧,更快的替代方法)的功能,并使您更容易理解正在编写异步运行的同步代码。