
RxJava wird in einer großen Anzahl von Android-Anwendungen verwendet, aber gleichzeitig kennen viele andere Ereignisquellen nicht, außer Observable und möglicherweise Flowable. Sie vergessen die speziellen Klassen Single, Maybe und Completable, die dem Code häufig mehr Klarheit verleihen können.
Unter der Katze finden Sie einen Spickzettel zu den Ereignisquellen in RxJava.
Completable ist eigentlich ein Rx-Analogon von Runnable. Es ist eine Operation, die ausgeführt werden kann oder nicht. Wenn wir eine Analogie mit Kotlin ziehen, dann macht dies
Spaß () aus der Welt von Rx. Dementsprechend müssen Sie onComplete und onError implementieren, um es zu abonnieren. Es kann nicht aus dem Wert (Observable # just, ...) erstellt werden, da es nicht dafür ausgelegt ist.
Single - reactive Callable, da hier das Ergebnis der Operation zurückgegeben werden kann. Wenn wir den Vergleich mit Kotlin fortsetzen, können wir sagen, dass Single Spaß macht single (): T {}. Um es zu abonnieren, müssen Sie onSuccess (T) und onError implementieren.
Vielleicht - eine Kreuzung zwischen Single und Completable, weil es einen Wert unterstützt, keine Werte und Fehler. Es ist schwieriger, eine eindeutige Parallele zu den Methoden zu ziehen, aber ich denke, vielleicht macht es vielleicht Spaß (): T? {}, die null zurückgibt, wenn kein Ergebnis vorliegt. Es ist leicht zu erraten, dass Sie für ein Abonnement onSuccess (T), onComplete und onError definieren müssen.
Es ist wichtig zu beachten, dass sich onSuccess (T) und onComplete gegenseitig ausschließen. Das heißt, Wenn Sie den ersten anrufen, können Sie nicht auf den zweiten warten.
Observable ist aufgrund seiner Vielseitigkeit die häufigste Quelle. Er weiß, wie man Ereignisse überhaupt nicht produziert und viele davon generiert, sodass er immer dann verwendet werden kann, wenn andere Optionen nicht geeignet sind. Trotzdem hat Observable einen Nachteil - es weiß nicht, wie es mit Gegendruck umgehen soll. Um es zu abonnieren, benötigen Sie onNext (T), onError und onComplete.
Gegendruck - eine Situation, in der neue Ereignisse viel schneller eintreffen als Zeit für die Verarbeitung haben und sich im Puffer ansammeln und überlaufen. Dies kann zu Problemen wie OutOfMemoryError führen. Weitere Details finden Sie hier .
ConnectableObservable - eine beheizte Version von Observable. Alle Datenquellen beginnen zum Zeitpunkt des Abonnements mit der Ausgabe ihres Ereignisstroms. Aber nicht dieser Typ. Zu diesem Zweck wartet ConnectableObservable auf die Verbindung eines Anrufs. Dies geschieht, damit mehrere Beobachter einen Ereignisstrom überprüfen können, ohne ihn bei jedem Abonnement neu zu starten. Zur Veranschaulichung gebe ich Ihnen den folgenden Ausschnitt:
val observable = Observable.fromCallable { Log.d("RxLogs", "observable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe()
In der Konsole wird sein:
beobachtbar vonCallable ausgeführt
beobachtbar vonCallable ausgeführt
beobachtbar vonCallable ausgeführt
beobachtbar vonCallable ausgeführt
val connectedObservable = Observable.fromCallable { Log.d("RxLogs", "connectedObservable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) .publish() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.connect()
Und in diesem Fall: beobachtbar von Callable ausgeführt
Flowable - eine Quelle, die zusätzliche Bediener für die Verarbeitung des Gegendrucks bereitstellt. Wenn Sie mehr als 10.000 Ereignisse verarbeiten müssen, die schnell nacheinander auftreten, wird empfohlen, sie anstelle von Observable zu verwenden.
Letzteres kann ein ConnectableFlowable erstellen und die gleichen Möglichkeiten wie ConnectableObservable eröffnen.
Wenn man über Ereignisgeneratoren spricht, kann man nur Betreff und Prozessor erwähnen.
Betreff - eine Klasse, die sowohl eine Quelle als auch ein Browser sein kann. Auf diese Weise können Sie es beispielsweise in verschiedenen Arten von Controllern verwenden, die es als Observable nach außen und als Observer nach innen benachrichtigen. Als nächstes werden wir verschiedene Implementierungen dieser Klasse durchgehen.
AsyncSubject / AsyncProcessor hält das letzte Ereignis, bis der Thread korrekt beendet wurde, und sendet es dann an Abonnenten. Wenn ein Fehler auftritt, werden keine Ereignisse weitergeleitet.
PublishSubject / PublishProcessor leitet eingehende Ereignisse weiter, bis ein Terminalsignal eintrifft. Nach dem Ende des Streams oder Fehlers werden die entsprechenden Ereignisse zurückgegeben.
BehaviorSubject / BehaviorProcessor funktioniert ähnlich wie PublishSubject / PublishProcessor, gibt jedoch beim Abonnement das letzte Ereignis zurück, falls vorhanden, und wenn Subject nicht in den Terminalstatus übergegangen ist.
ReplaySubject / ReplayProcessor - BehaviourSubject / BehaviorProcessor für Steroide. Es gibt nicht ein letztes Ereignis zurück, sondern so viel, wie die Seele wünscht. Wenn Sie ein abgeschlossenes ReplaySubject oder einen ReplayProcessor abonnieren, werden alle gesammelten Daten empfangen.

Daher funktionieren ReplaySubject.createWithSize (1) und BehaviourSubject.create () nach dem Übergang in den Terminalstatus unterschiedlich. Während des Abonnements gibt das erste das letzte Ereignis zurück und das zweite nicht. Dies gilt auch für ReplayProcessor.
CompletableSubject ,
MaybeSubject und
SingleSubject funktionieren ähnlich wie PublishSubject und sind nur für die Verwendung mit Completable, Maybe und Single vorgesehen.
UnicastSubject / UnicastProcessor ist eigentlich ein ReplaySubject, das sicherstellt, dass es nur einen Abonnenten hat. Beim erneuten Abonnieren wird eine IllegalStateException ausgelöst.

Das heißt, nächster Ausschnitt
val subject = UnicastSubject.create<String>(3) repeat(3) { subject.onNext(it.toString()) } subject.onComplete() subject.subscribe({ Log.d("RxLogs", it) }, { }, { Log.d("RxLogs", "complete") })
wird in das Protokoll ausgegeben
0
1
2
vollständig
MulticastProcessor funktioniert ähnlich wie PublishProcessor, mit Ausnahme einer kleinen Funktion. Er weiß, wie man mit Gegendruck für den eingehenden Strom umgeht. Mit MulticastProcessor können Sie die Größe des Puffers festlegen, bei dem Elemente vom Upstream für zukünftige Abonnenten vorab abgefragt werden.
In der folgenden Abbildung wird ein Prozessor mit Speicher für 2 Elemente erstellt, den er sofort von seiner Quelle anfordert. Wenn der erste Beobachter es abonniert, gibt er daher sofort den Inhalt des Puffers aus, der sofort mit neuen Ereignissen gefüllt wird. Nach dem Terminalereignis löscht MulticastProcessor seinen Speicher und neue Abonnenten erhalten sofort die Fertigstellung des Streams.
