异步任务执行层架构

在社交网络的移动应用程序中,用户喜欢,发表评论,然后翻阅提要,开始播放视频并再次放置类似内容。 所有这些都是快速且几乎同时的。 如果应用程序的业务逻辑的实现被完全阻止,则用户将无法访问磁带,直到上载带有图章的记录之类的东西。 但是用户不会等待,因此,在大多数移动应用程序中,异步任务可以独立开始和结束。 用户可以同时执行多个任务,并且它们不会彼此阻塞。 一个异步任务启动并运行,而用户启动下一个任务。



在解释Stepan GoncharovAppsConf上的报告时我们将涉及异步:我们将深入研究移动应用程序的体系结构,讨论为什么我们需要为执行异步任务而分离一个单独的层,我们将分析需求和现有解决方案,我们将探讨利弊,并考虑这种方法的一种实现。 我们还将学习如何管理异步任务,为什么每个任务都有自己的ID,执行策略是什么以及它们如何帮助简化和加速整个应用程序的开发。


关于发言人:Stepan Goncharovstepango )在Grab工作-就像Uber,但在东南亚。 他从事Android开发已有9年以上。 自2014年以来对Kotlin感兴趣,自2016年以来-在产品中使用它。 由新加坡Kotlin用户小组组织。 这就是为什么所有代码​​示例都在Kotlin上的原因之一,而不是因为它很时尚。

我们将研究一种设计应用程序组件的方法。 对于那些想要向应用程序中添加新组件,方便地设计它们然后进行扩展的人来说,这是一个操作指南。 iOS开发人员可以使用iOS方法。 该方法也适用于其他平台。 自2014年以来,我一直对Kotlin感兴趣,因此所有示例均使用该语言。 但是不用担心-您可以用Swift,Objective-C和其他语言编写相同的东西。

让我们从Reactive Extensions的问题和缺点开始。 问题是其他异步原语的典型问题,所以我们说RX-牢记未来和承诺,一切都会以类似的方式进行。

RX问题


高进入门槛 。 RX非常复杂且庞大-它拥有270名操作员,要教导整个团队如何正确使用它们并不容易。 我们不会讨论这个问题-它不在报告的范围之内。

在RX中,您必须手动管理您的订阅,还必须监视应用程序的生命周期 。 如果您已经订阅了Single或Observable, 则无法将其与另一个SIngle进行比较 ,因为您将始终收到一个新对象,并且运行时将始终有不同的订阅。 在RX中,无法比较订阅和流

我们将尝试解决其中一些问题。 我们将解决每个问题一次,然后重用结果。

问题编号1:一次完成一项任务


开发中的一个常见问题是不必要的工作和多次重复执行相同的任务。 想象一下,我们有一个用于输入数据的表格和一个保存按钮。 按下时,将发送一个请求,但是如果在保存表单时单击几次,则会发送几个相同的请求。 我们提供了一个按钮来测试质量检查,他们在一秒钟内按下了40次-我们收到了40个请求,因为例如动画没有时间工作。

该如何解决呢? 每个开发人员都有自己喜欢的解决方法:一个将保持debounce ,另一个将阻止按钮,以防万一通过clickable = false 。 没有通用的方法,因此这些错误将在我们的应用程序中出现或消失。 只有当质量检查告诉我们时,我们才能解决问题:“哦,我点击这里,就坏了!”!

可扩展的解决方案?


为了避免这种情况,我们将包装RX或其他异步框架- 我们将ID添加到所有异步操作中 。 这个想法很简单-我们需要某种比较它们的方法,因为通常这种方法不在框架中。 我们可以完成任务,但是我们不知道它是否已经完成。

让我们将包装器称为“ Act”-其他名称已经被使用。 为此,请创建一个小的typealias和一个简单的interface ,其中只有一个字段:

 typealias Id = String interface Act { val id: Id } 

这很方便,并且稍微减少了代码量。 以后,如果String不喜欢它,我们将用其他东西代替它。 在这段小代码中,我们观察到一个有趣的事实。

接口可能包含属性。

对于来自Java的程序员来说,这是意外的。 通常他们在接口内添加getId()方法,但是从Kotlin的角度来看,这是错误的解决方案。

我们将如何设计?


一个小题外话。 在设计时,我坚持两个原则。 首先是将组件要求和实现分解为小部分 。 这允许对代码编写进行精细控制。 当您创建一个大型组件并尝试一次完成所有操作时,这很不好。 通常,此组件不起作用,您开始插入拐杖,因此我建议您以较小的受控步骤编写并享受它。 第二个原则是在每个步骤之后检查可操作性 ,然后再次重复该过程

为什么ID不够?


让我们回到问题所在。 我们迈出了第一步-添加了一个ID,并且一切都很简单-接口和字段。 这并没有给我们任何好处,因为该接口不包含任何实现并且不能单独工作,但是可以比较操作。

接下来,我们将添加一些组件,这些组件将使我们能够使用该接口,并了解到我们希望在不需要时再次执行某种请求。 我们要做的第一件事就是引入新的抽象

引入新的抽象:MapDisposable


选择在您的代码库中工作的开发人员熟悉的正确名称和抽象很重要。 由于我有关于RX的示例,因此我们将使用RX概念和类似于库开发人员使用的名称。 因此,我们可以轻松地向我们的同事解释他们的工作,原因以及工作方式。 要选择名称,请参见CompositeDiposable文档

让我们创建一个小的MapDisposable接口, 其中包含有关当前任务的信息 ,并 delete上调用dispose() 。 我不会给出实现,您可以在GitHub上看到所有源代码。

之所以这样称呼MapDisposable,是因为该组件将像Map一样工作,但是它将具有CompositeDiposable属性。

引入新的抽象:ActExecutor


下一个抽象组件是ActExecutor。 它启动还是不启动新任务,取决于MapDisposable并委托错误处理。 如何选择名称- 请参阅文档

用与JDK最接近的类推。 它有一个执行器,您可以在其中传递线程并执行某些操作。 在我看来,这是一个很酷的组件,它的设计很好,因此让我们以它为基础。

我们遵循简单的小步骤原则,创建ActExecutor及其简单接口。 这个名字本身说它是我们传递某些东西并开始做某事的组成部分。 ActExecutor有一种方法可以让我们通过Act并且以防万一,要处理错误,因为没有这些错误是没有办法的。

 interface ActExecutor { fun execute( act: Act, e: (Throwable) -> Unit = ::logError) } interface MapDisposable { fun contains(id: Id): Boolean fun add(id: Id, disposable: () -> T) fun remove(id: Id) } 

MapDisposable也受到限制:使用Map界面并复制containsaddremove方法。 add方法不同于Map:第二个参数是代表美观和方便的lambda。 方便之处在于我们可以同步lambda以防止意外的竞争情况 。 但是我们不会谈论这个,我们将继续讨论架构。

接口实现


我们已经声明了所有接口,并将尝试实现一些简单的方法。 采取CompletableActSingleAct

 class CompletableAct ( override val id: Id, override val completable: Completable ) : Act class SingleAct<T : Any>( override val id: Id, override val single: Single<T> ) : Act 

CompletableAct是Completable的包装。 在我们的例子中,它只包含一个ID-这就是我们所需要的。 SingleAct几乎相同。 我们也可以实现Maybe和Flowable,但只介绍前两个实现。

对于Single,我们指定了通用类型<T : Any> 。 作为Kotlin开发人员,我更喜欢使用这种方法。

尝试使用非null泛型。

现在我们有了一组接口,我们实现了一些逻辑来防止执行相同的请求。

 class ActExecutorImpl ( val map: MapDisposable ): ActExecutor { fun execute( act: Act, e: (Throwable) -> Unit ) = when { map.contains(act.id) -> { log("${act.id} - in progress") } else startExecution(act, e) log("${act.id} - Started") } } 

我们拿一张地图,检查其中是否有要求。 如果没有,我们就开始执行请求,并在运行时将其添加到Map中。 在执行任何结果(错误或成功)后,请从Map中删除该请求。

非常专心-没有同步,但是同步在GitHub的源代码中。

 fun startExecution(act: Act, e: (Throwable) -> Unit) { val removeFromMap = { mapDisposable.remove(act.id) } mapDisposable.add(act.id) { when (act) { is CompletableAct -> act.completable .doFinally(removeFromMap) .subscribe({}, e) is SingleAct<*> -> act.single .doFinally(removeFromMap) .subscribe({}, e) else -> throw IllegalArgumentException() } } 

使用lambda作为最后一个参数来提高代码的可读性。 它很美,您的同事将感谢您。

我们将使用更多的Kotlin芯片,并为Completable和Single添加扩展功能 。 使用它们,我们不必寻找工厂方法来创建CompletableAct和SingleAct-我们将通过扩展功能来创建它们。

 fun Completable.toAct(id: Id): Act = CompletableAct(id, this) fun <T: Any> Single<T>.toAct(id: Id): Act = SingleAct(id, this) 

扩展功能可以添加到任何类中。

结果


我们已经实现了几个组件和非常简单的逻辑。 现在,我们必须遵循的主要规则是不要强制手动订阅 。 当我们想执行某件事时,我们通过执行器给它。 和线程一样-没有人自己启动它们。

 fun act() = Completable.timer(2, SECONDS).toAct("Hello") executor.apply { execute(act()) execute(act()) execute(act()) } Hello - Act Started Hello - Act Duplicate Hello - Act Duplicate Hello - Act Finished 

我们曾经在团队中达成一致,现在始终可以保证我们的应用程序资源不会用于执行相同和不必要的请求。

第一个问题解决了。 现在,让我们扩展解决方案以使其具有灵活性。

问题编号2:要取消什么任务?


以及在有必要取消后续请求的情况下 ,我们可能需要取消先前的请求 。 例如,我们第一次编辑了有关用户的信息,并将其发送到服务器。 由于某种原因,调度花费了很长时间并且没有完成。 我们再次编辑了用户个人资料,并再次发送了相同的请求。 在这种情况下,为请求生成一个特殊的ID是没有意义的-来自第二次尝试的信息更加相关,并且先前的请求被取消

当前解决方案将不起作用,因为它将始终取消带有相关信息的请求的执行。 我们需要以某种方式扩展解决方案以解决问题并增加灵活性。 为此,了解我们所有人想要什么? 但是我们想了解要取消的任务,如何不复制粘贴以及如何调用它。

添加组件


我们称查询行为策略为它们创建两个接口: StrategyHolderStrategy 。 我们还创建2个对象,负责应用哪种策略。

 interface StrategyHolder { val strategy: Strategy } sealed class Strategy object KillMe : Strategy() object SaveMe : Strategy() 

我不使用枚举 -我更喜欢密封类 。 它们更轻巧,消耗更少的内存,并且更易于扩展。

密封类更易于扩展,编写起来更短。

更新现有组件


至此,一切都很简单。 我们有一个简单的界面,现在它将成为StrategyHolder的继承人。 由于这些是接口,因此继承没有问题。 在CompletableAct的实现中,我们将插入另一个override并在其中添加默认值,以确保所做的更改将与现有代码保持兼容。

 interface Act : StrategyHolder { val id: String } class CompletableAct( override val id: String, override val completable: Completable, override val strategy: Strategy = SaveMe ) : Act 

策略


我选择了SaveMe策略,这对我来说似乎很明显。 此策略仅取消以下请求-第一个请求将一直存在直到完成。

我们在实现方面做了一些工作。 我们有一个execute方法,现在我们在那里添加了策略检查。

  • 如果SaveMe策略与我们之前的策略相同,则没有任何变化。
  • 如果策略是KillMe ,则终止先前的请求并启动一个新请求。

 override fun execute(act: Act, e: (Throwable) -> Unit) = when { map.contains(act.id) -> when (act.strategy) { KillMe -> { map.remove(act.id) startExecution(act, e) } SaveMe -> log("${act.id} - Act duplicate") } else -> startExecution(act, e) } 

结果


通过编写最少的代码,我们能够轻松管理策略。 同时,我们的同事很高兴,我们可以做这样的事情。

 executor.apply { execute(Completable.timer(2, SECONDS) .toAct("Hello", KillMe)) execute(Completable.timer(2, SECONDS) .toAct("Hello", KillMe)) execute(Completable.timer(2, SECONDS) .toAct("Hello«, KillMe)) } Hello - Act Started Hello - Act Canceled Hello - Act Started Hello - Act Canceled Hello - Act Started Hello - Act Finished 

我们创建一个异步任务,通过该策略,并且每次启动新任务时,所有先前的任务(而不是下一个任务)都将被取消。

问题三:策略还不够


让我们继续讨论我在几个项目中遇到的一个有趣的问题。 我们将扩展解决方案,以处理更复杂的案件。 其中一种情况(与社交网络特别相关)是“喜欢/不喜欢” 。 有一个帖子,我们希望它喜欢,但是作为开发人员,我们不想阻止整个UI,而是以全屏方式显示对话框并加载,直到请求完成为止。 是的,用户将不满意。 我们要欺骗用户:他按下按钮,就好像已经发生了一样-精美的动画已经开始。 但是实际上,没有像-我们等到欺骗变成事实。 为了防止欺诈,我们必须为用户透明地处理不喜欢的行为。

正确处理此问题将很不错,以便用户获得所需的结果。 但是,作为开发人员,我们很难每次都处理不同的互斥请求

有太多问题。 如何理解查询相关? 如何存储这些连接? 如何处理复杂的脚本而不是复制粘贴? 如何命名新组件? 任务很复杂,我们已经实现的内容不适合该解决方案。

小组和小组策略


创建一个名为GroupStrategyHolder的简单接口。 这有点复杂-两个字段而不是一个。

 interface GroupStrategyHolder { val groupStrategy: GroupStrategy val groupKey: String } sealed class GroupStrategy object Default : GroupStrategy() object KillGroup : GroupStrategy() 

除了特定请求的策略外,我们还引入了一个新实体-一组请求。 该小组还将制定策略。 我们将仅考虑具有两种策略的最简单的选项: 默认 -当我们对查询不执行任何操作时的默认策略,以及KillGroup-杀死该组中的所有现有查询并启动一个新查询。

 interface Act : StrategyHolder, GroupStrategyHolder { val id: String } class CompletableAct( override val id: String, override val completable: Completable, override val strategy: Strategy = SaveMe, override val groupStrategy: GroupStrategy = Default override val groupKey: String = "" ) : Act 

我们重复我之前提到的步骤:我们使用界面,展开并将两个其他字段添加到CompletableAct和SingleAct。

更新实施


我们返回到Execute方法。 第三项任务比较复杂,但是解决方案却很简单:我们检查特定请求的组策略,如果是KillGroup,则杀死整个组并执行通常的逻辑。

 MapDisposable -> GroupDisposable ... override fun execute(act: Act, e: (Throwable) -> Unit) { if (act.groupStrategy == KillGroup) groupDisposable.removeGroup(act.groupKey) return when { groupDisposable.contains(act.groupKey, act.id) -> when (act.strategy) { KillMe -> { stop(act.groupKey, act.id) startExecution(act, e) } SaveMe -> log("${act.id} - Act duplicate") } else -> startExecution(act, e) } } 

这个问题很复杂,但是我们已经有了相当完善的基础架构-我们可以扩展它并解决问题。 如果您看看我们的结果,我们现在需要做什么?

结果


 fun act(id: String)= Completable.timer(2, SECONDS).toAct( id = id, groupStrategy = KillGroup, groupKey = "Like-Dislike-PostId-1234" ) executor.apply { execute(act(“Like”)) execute(act(“Dislike”)) execute(act(“Like”)) } Like - Act Started Like - Act Canceled Dislike - Act Started Dislike - Act Canceled Like - Act Started Like - Act Finished 

如果需要此类复杂的查询,则添加两个字段:groupStrategy和组ID。 组ID是一个特定的参数,因为为了支持许多并行的“喜欢” /“不喜欢”请求,您需要为属于同一对象的每对请求创建一个组。 在这种情况下,您可以命名组Like-Dislike-PostId并在其中添加帖子ID。 每次我们喜欢相邻的帖子时,我们都将确保上一个帖子和下一个帖子均能正常运行。

在我们的综合示例中,我们试图执行一个“喜欢-不喜欢-喜欢”序列。 当我们执行第一个动作,然后执行第二个动作时,前一个动作将被取消,而下一个喜欢的动作将取消之前的不喜欢的动作。 这就是我想要的。

在最后一个示例中,我们使用命名参数来创建Act。 这有助于降低代码的可读性,尤其是当有很多参数时。

为了便于阅读,请使用命名参数。

建筑学


让我们看看这个决定如何影响我们的架构。 在项目上,我经常看到View Model或Presenter承担很多责任,例如黑客,以某种方式喜欢/不喜欢来处理这种情况。 通常,视图模型中的所有这些逻辑都是:具有按钮锁定,LifeCycle处理程序,订阅的大量重复代码。



执行器现在正在执行的所有操作都曾经是Presenter或View Model。 如果体系结构成熟,则开发人员可以将此逻辑带到某种类型的交互器或用例中,但是该逻辑在多个位置重复。

在采用Executor之后,视图模型变得更简单,所有逻辑都从它们中隐藏了。 如果您曾经把它带给Presenter和交互者,那么您知道交互者和Presenter变得越来越容易。 总的来说,我很满意。



还有什么要补充的?


当前解决方案的另一个优点是它是可扩展的。 作为开发人员,他们每天都在移动应用程序上工作并与bug和大量并发请求作斗争,我们还要添加什么呢?

可能性


生命周期实现仍在幕后,但作为移动开发人员,我们所有人都始终在考虑这一点,并担心没有东西会流失。 我想保存和恢复应用程序重启请求。

通话链。 由于RX链的包装,可以对它们进行序列化,因为默认情况下RX不序列化。

很少有人知道他们的应用程序在特定时间点正在运行多少并发请求。 我不会说这对于中小型应用程序是一个大问题。 但是,对于在后台执行大量工作的大型应用程序,很高兴了解崩溃的原因和用户的抱怨。 如果没有其他基础结构,开发人员根本就没有信息来理解原因:原因可能是在UI中,或者可能是在后台存在大量不断的请求。 我们可以扩展解决方案并添加某种指标

让我们更详细地考虑各种可能性。

生命周期处理


 class ActExecutorImpl( lifecycle: Lifecycle ) : ActExecutor { inir { lifecycle.doOnDestroy { cancelAll() } } ... 

这是生命周期实施的一个示例。 在最简单的情况下-使用Destroy片段或用Activity取消,我们将生命周期处理程序传递给我们的Executor ,并且在发生onDestroy事件时,我们将删除所有请求 。 这是一个简单的解决方案,无需在视图模型中复制粘贴类似的代码。 LifeData做大致相同的事情。

保存/还原


由于有包装器,因此可以为Acts创建单独的类 ,在其中将包含用于创建异步任务的逻辑。 此外,我们可以将该名称保存到数据库中,并使用工厂方法或类似方法启动应用程序时从数据库中将其还原

同时,我们将获得脱机工作的机会,并且当出现Internet时,我们将重新启动已完成但有错误的请求。 在没有Internet或请求错误的情况下,我们将它们保存到数据库中,然后还原并再次执行。 如果您可以使用常规RX来执行此操作而无需其他包装器,请在评论中写下,这将很有趣。

通话链


我们也可以约束我们的行为 。 另一个扩展选项是运行查询链 。 例如,您有一个需要在服务器上创建的实体,而另一个依赖于第一个实体的实体必须在我们确定第一个请求成功完成时立即创建。 这也可以做到。 当然,这并不是那么简单,但是可以使用一个类来控制所有异步任务的启动。 使用裸机RX更加困难。

指标


有趣的是,平均在后台执行了多少个并行查询 。 有了指标,您可以了解用户抱怨嗜睡的原因。 , , .

, , , , - - 10% . , .

结论


— , . «» . , , , , .

, , . — - , , — . — . , . . .

. Kotlin, . , .

AppsConf 2018, AppsConf 2019 . 38 : , Android, UX, , - , , Kotlin.

, youtube- 22–23 .

Source: https://habr.com/ru/post/zh-CN437592/


All Articles