Référence de source d'événement Rx



RxJava est utilisé dans un grand nombre d'applications Android, mais en même temps, beaucoup ne connaissent pas d'autres sources d'événements, à l'exception d'Observable et peut-être de Flowable. Ils oublient les classes spécialisées Single, Maybe et Completable, qui sont souvent capables d'ajouter plus de clarté au code.

Sous le chat, vous trouverez une feuille de triche sur les sources d'événements qui existent dans RxJava.

Completable est en fait un analogue Rx de Runnable. C'est une opération qui peut être effectuée ou non. Si nous tirons une analogie avec Kotlin, alors c'est amusant complétable () du monde de Rx. En conséquence, pour vous y abonner, vous devez implémenter onComplete et onError. Il ne peut pas être créé à partir de la valeur (Observable # juste, ...) car il n'est pas conçu pour cela.

Callable réactif, car ici il est possible de renvoyer le résultat de l'opération. Poursuivant la comparaison avec Kotlin, nous pouvons dire que Single est fun single (): T {}. Ainsi, pour vous y abonner, vous devez implémenter onSuccess (T) et onError.

Peut - être - un croisement entre Single et Completable, car il prend en charge une valeur, aucune valeur et erreur. Il est plus difficile de faire un parallèle sans ambiguïté avec les méthodes, mais je pense que Peut-être est amusant peut-être (): T? {}, qui renvoie null lorsqu'il n'y a aucun résultat. Il est facile de deviner que pour un abonnement, vous devez définir onSuccess (T), onComplete et onError.
Il est important de noter que onSuccess (T) et onComplete s'excluent mutuellement. C'est-à-dire en cas d'appel du premier, vous ne pouvez pas attendre le second.
Observable est la source la plus courante, en raison de sa polyvalence. Il sait comment ne pas produire d'événements du tout, et en générer beaucoup, de sorte qu'il peut toujours être utilisé lorsque d'autres options ne conviennent pas. Malgré cela, Observable a un inconvénient - il ne sait pas comment gérer la contre-pression. Pour vous y abonner, vous avez besoin de onNext (T), onError et onComplete.

Contre-pression - une situation où de nouveaux événements arrivent beaucoup plus rapidement qu'ils n'ont le temps de traiter et commencent à s'accumuler dans le tampon, le débordant. Cela peut entraîner des problèmes comme OutOfMemoryError. Plus de détails peuvent être trouvés ici .

ConnectableObservable - une version chauffée d'Observable. Toutes les sources de données commencent à émettre leur flux d'événements au moment de l'abonnement. Mais pas ce type. Pour ce faire, ConnectableObservable attend un appel pour se connecter. Cette opération est effectuée afin que plusieurs observateurs puissent consulter un flux d'événements sans le redémarrer à chaque abonnement. Pour illustrer, je vais vous donner l'extrait suivant:

val observable = Observable.fromCallable { Log.d("RxLogs", "observable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe() 
Dans la console sera:
observable depuis Appelable exécuté
observable depuis Appelable exécuté
observable depuis Appelable exécuté
observable depuis Appelable exécuté

 val connectedObservable = Observable.fromCallable { Log.d("RxLogs", "connectedObservable fromCallable executed") Thread.sleep(1000) }.subscribeOn(Schedulers.computation()) .publish() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.subscribe() connectedObservable.connect() 

Et dans ce cas: observable depuis Callable exécuté

Flowable - une source qui fournit des opérateurs supplémentaires pour le traitement de la contre-pression. Lorsque vous devez gérer plus de 10 000 événements qui se produisent rapidement l'un après l'autre, il est recommandé de l'utiliser au lieu d'Observable.

Ce dernier peut créer un ConnectableFlowable, ouvrant les mêmes possibilités que ConnectableObservable.

En parlant de générateurs d'événements, on ne peut que mentionner le sujet et le processeur.

Subject - une classe qui peut être à la fois une source et un navigateur. Cela vous permet de l'utiliser, par exemple, dans différents types de contrôleurs, ce qui le donnera à l'extérieur en tant qu'observable et à l'intérieur en tant qu'observateur. Ensuite, nous allons passer par différentes implémentations de cette classe.

AsyncSubject / AsyncProcessor conserve le dernier événement jusqu'à ce que le thread se termine correctement, puis l'envoie aux abonnés. Si une erreur se produit, aucun événement ne sera transmis.

image

PublishSubject / PublishProcessor transfère les événements qui y entrent jusqu'à ce qu'un signal de terminal arrive. Après la fin du flux ou de l'erreur, il renvoie les événements appropriés.

image

BehaviorSubject / BehaviorProcessor fonctionne de manière similaire à PublishSubject / PublishProcessor, mais lors de la souscription, il renvoie le dernier événement, le cas échéant, et si Subject n'a pas effectué la transition vers l'état terminal.

image

ReplaySubject / ReplayProcessor - BehaviourSubject / BehaviorProcessor sur les stéroïdes. Il ne renvoie pas un dernier événement, mais autant que l'âme le désire. Si vous vous abonnez à un ReplaySubject ou un ReplayProcessor terminé, toutes les données accumulées seront reçues.

image
Ainsi, ReplaySubject.createWithSize (1) et BehaviourSubject.create () fonctionnent différemment après la transition vers l'état terminal. Pendant l'abonnement, le premier retournera le dernier événement, et le second ne le fera pas. C'est également vrai pour ReplayProcessor.
CompletableSubject , MaybeSubject et SingleSubject fonctionnent de manière similaire à PublishSubject, uniquement conçus pour être utilisés avec Completable, Maybe et Single, respectivement.

UnicastSubject / UnicastProcessor est en fait un ReplaySubject qui garantit qu'il n'a qu'un seul abonné. Il lève une exception IllegalStateException lors d'une tentative de réabonnement.

image

C'est-à-dire extrait suivant

 val subject = UnicastSubject.create<String>(3) repeat(3) { subject.onNext(it.toString()) } subject.onComplete() subject.subscribe({ Log.d("RxLogs", it) }, { }, { Log.d("RxLogs", "complete") }) 

affichera dans le journal
0
1
2
complet

MulticastProcessor fonctionne de manière similaire à PublishProcessor, à l'exception d'une petite fonctionnalité. Il sait comment gérer la contre-pression pour le flux entrant. MulticastProcessor vous permet de définir la taille du tampon auquel il pré-interrogera les éléments en amont pour les futurs abonnés.

Dans le schéma ci-dessous, un processeur est créé avec stockage pour 2 éléments, qu'il demande immédiatement à sa source. Par conséquent, lorsque le premier observateur y souscrit, il émet immédiatement le contenu du tampon, qui est instantanément rempli de nouveaux événements. Après l'événement terminal, MulticastProcessor efface son stockage et les nouveaux abonnés reçoivent immédiatement l'achèvement du flux.

image

Source: https://habr.com/ru/post/fr459174/


All Articles