接收事件源参考



RxJava被用于大量的android应用程序中,但与此同时,除了Observable和Flowable之外,许多人不知道其他事件源。 他们忘了专门的类Single,Maybe和Completable,这些类通常能够使代码更加清晰。

在猫的下面,您会发现有关RxJava中存在的事件源的备忘单。

Completable实际上是Runnable的Rx模拟。 这是可以执行或无法执行的操作。 如果我们与Kotlin进行类比,那么从Rx的世界来看,这是很有趣的completable() 。 因此,要订阅它,您需要实现onComplete和onError。 不能从值(可观察到的#只是,...)创建它,因为它不是为此设计的。

单一 -反应式Callable,因为在这里可以返回操作结果。 继续与Kotlin进行比较,可以说Single是有趣的single():T {}。 因此,要订阅它,必须实现onSuccess(T)和onError。

也许 -Single和Completable之间的交叉,因为它支持一个值,无值和错误。 要与这些方法进行明确的比较会比较困难,但是我认为Maybe很有趣,也许():T? {},如果没有结果,则返回null。 很容易猜到,对于订阅,您需要定义onSuccess(T),onComplete和onError。
重要的是要注意onSuccess(T)和onComplete是互斥的。 即 如果拨打第一个电话,您将无法等待第二个电话。
由于其多功能性, 可观察是最常见的来源。 他知道根本不产生事件并产生许多事件的方法,因此在其他选项不合适的情况下可以始终使用他。 尽管如此,Observable有一个缺点-它不知道如何处理背压。 要订阅它,您需要onNext(T),onError和onComplete。

背压-当新事件到达的时间快于处理时间并开始在缓冲区中累积并溢出时的情况。 这可能会导致诸如OutOfMemoryError之类的麻烦。 可以在这里找到更多详细信息。

ConnectableObservable -Observable的加热版本。 在订阅时,所有数据源都开始发布事件流。 但不是这个家伙。 为此,ConnectableObservable等待呼叫连接。 这样做是为了使几个观察者可以查看一个事件流,而无需在每个订阅时都重新启动它。 为了说明,我将向您提供以下代码段:

val observable = Observable.fromCallable { Log.d("RxLogs", "observable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe() 
在控制台中将是:
可观察fromCallable执行
可观察fromCallable执行
可观察fromCallable执行
可观察fromCallable执行

 val connectedObservable = Observable.fromCallable { Log.d("RxLogs", "connectedObservable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) .publish() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.connect() 

并且在这种情况下:observable fromCallable执行

Flowable-一种为处理背压提供额外操作员的源。 当您需要处理超过10,000个快速发生的事件时,建议使用它而不是Observable。

后者可以创建一个ConnectableFlowable,从而打开与ConnectableObservable相同的可能性。

说到事件生成器,不能不提及主题和处理器。

主题 -既可以是源代码又可以是浏览器的类。 例如,这使您可以在各种控制器中使用它,这会将它作为Observable向外发出,而在内部作为Observer发出通知。 接下来,我们将研究此类的不同实现。

AsyncSubject / AsyncProcessor保留最后一个事件,直到线程正确完成为止,然后将其发送给订阅者。 如果发生错误,则不会转发任何事件。

图片

PublishSubject / PublishProcessor进一步转发进入其中的事件,直到终端信号到达为止。 流或错误结束后,它将返回适当的事件。

图片

BehaviorSubject / BehaviorProcessor的工作方式与PublishSubject / PublishProcessor相似,但是在订阅时,它将返回最后一个事件(如果有),并且如果Subject尚未转换到终端状态。

图片

ReplaySubject / ReplayProcessor-类固醇上的BehaviourSubject / BehaviorProcessor。 它返回的不是最后一个事件,而是灵魂所希望的。 如果您订阅了完整的ReplaySubject或ReplayProcessor,则将接收所有累积的数据。

图片
因此,转换到终端状态后,ReplaySubject.createWithSize(1)和BehaviourSubject.create()的工作方式有所不同。 在订阅期间,第一个将返回最后一个事件,第二个将不返回。 对于ReplayProcessor也是如此。
CompletableSubjectMaybeSubjectSingleSubject的工作方式与PublishSubject相似,仅设计用于Completable,Maybe和Single。

UnicastSubject / UnicastProcessor实际上是一个ReplaySubject,可确保它只有一个订阅者。 尝试重新订阅时将引发IllegalStateException。

图片

即 下一个片段

 val subject = UnicastSubject.create<String>(3) repeat(3) { subject.onNext(it.toString()) } subject.onComplete() subject.subscribe({ Log.d("RxLogs", it) }, { }, { Log.d("RxLogs", "complete") }) 

将输出到日志
0
1个
2
完成

除了一个小功能, MulticastProcessor的工作方式与PublishProcessor相似。 他知道如何处理传入流的背压。 MulticastProcessor允许您设置缓冲区的大小,在缓冲区中它将从上游预查询元素以供将来的订阅者使用。

在下图中,创建了一个具有2个元素存储的处理器,处理器立即从其源请求该元素。 因此,当第一个观察者订阅它时,它立即发出缓冲区的内容,该缓冲区立即被新事件填充。 终端事件发生后,MulticastProcessor清除其存储,新订户立即收到流的完成。

图片

Source: https://habr.com/ru/post/zh-CN459174/


All Articles