Les bases de la programmation réactive à l'aide de RxJS. Partie 3. Observables d'ordre supérieur



Dans cet article, nous verrons comment il est possible d'en traiter un autre dans un même fil, pourquoi il est nécessaire et comment les opérateurs observables d'ordre supérieur (ci-après dénommés HOO) nous aideront à cet égard.

Série d'articles "Fondamentaux de la programmation réactive utilisant RxJS":



Lorsque vous travaillez avec des threads, une situation survient souvent lorsqu'il est nécessaire de transférer les résultats d'un autre sur un thread en tant que valeur. Par exemple, nous voulons exécuter une requête ajax et traiter sa réponse dans le thread actuel, ou exécuter plusieurs requêtes parallèles, implémenter le pooling. Je pense que beaucoup de gens sont habitués à résoudre de tels problèmes en utilisant un mécanisme tel que la promesse. Mais est-il possible de les résoudre en utilisant RxJS? Bien sûr, et tout est beaucoup plus facile que vous ne le pensez!

Remarque : pour comprendre la partie théorique de l'article, il n'est pas nécessaire de lire les articles précédents, il suffit de savoir ce que l'on peut observer, les opérateurs et les tuyaux. Dans la partie pratique, nous affinerons l'exemple du deuxième article , que vous pouvez trouver ici .

Le problème


Imaginons la tâche suivante: nous devons savoir à chaque seconde si le serveur est accessible. Comment pouvons-nous le résoudre?

Créez d'abord un flux à l'aide de la méthode du minuteur:

timer(0, 1000).subscribe({ next: console.log }); 

La méthode de la minuterie est très similaire en principe à l' intervalle . Mais contrairement à cela, il vous permet de définir le délai de démarrage du thread, qui est transmis par le premier paramètre. Le deuxième paramètre indique l'intervalle pendant lequel une nouvelle valeur sera générée. Si le deuxième paramètre n'est pas spécifié, le temporisateur ne générera qu'une seule valeur et terminera le flux.

Comme vous et moi n'avons pas de serveur, je vous suggère d'écrire simplement une fonction qui émule une demande au serveur:

 const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) } 

Que fait cette méthode? Il renvoie un flux créé à l'aide de la méthode timer, qui émet une valeur après une seconde et se termine. Étant donné que la méthode timer ne génère qu'un nombre, nous utilisons l'opérateur mapTo pour le remplacer par la chaîne «success».

Voici à quoi ressemble le flux créé par la méthode makeRequest:



Nous avons maintenant le choix: appeler la méthode makeRequest à l'intérieur du flux ou assigner cette responsabilité à l'observateur?

La première approche est préférable, car dans ce cas, nous pourrons utiliser tout le potentiel de RxJS avec ses opérateurs et décharger notre observateur de tâches inutiles. Nous utilisons la méthode timer pour exécuter les requêtes par intervalle:

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log }); 

Lorsque nous exécutons un tel code, nous verrons que dans console.log, nous n'obtenons pas un message avec le texte «success», mais un objet de type Observable:



La réponse est tout à fait attendue, car dans la carte, nous renvoyons le flux. Pour qu'un flux fonctionne, vous devez vous y abonner. Eh bien, voyons comment ne pas le faire :

 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); }); 

Le problème avec l'exemple ci-dessus est que nous obtenons un abonnement dans un abonnement. Mais que se passe-t-il si nous voulons faire plus d'une demande dans une chaîne? Ou si, à un moment donné, nous devons nous désinscrire du flux interne? Dans ce cas, notre code ressemblera de plus en plus à des «nouilles». Pour résoudre ce problème, RxJS a des opérateurs spéciaux appelés HOO.

Hoo


HOO est un type spécial d'instructions qui acceptent les flux comme valeurs. Un tel opérateur est la méthode mergeAll.

Lorsqu'un flux arrive à mergeAll, il y souscrit. Le flux auquel l'opérateur s'est abonné est appelé interne. Le flux à partir duquel l'opérateur reçoit d'autres flux sous forme de valeurs est appelé externe.

Lorsqu'un thread interne génère une valeur, mergeAll pousse cette valeur dans le thread externe. Ainsi, on se débarrasse de la nécessité de s'abonner manuellement. Si nous nous désabonnons du flux externe, alors mergeAll se désabonnera automatiquement du flux interne.

Voyons comment réécrire notre exemple avec mergeAll:

 timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log }); 

Dans l'exemple ci-dessus, le flux externe a été créé par l'instruction timer. Et les flux créés dans l'opérateur de carte sont internes. Chaque thread créé tombe dans l'instruction mergeAll.



La combinaison map + mergeAll est utilisée très souvent, donc dans RxJS il y a une méthode mergeMap:

 timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log }); 

Lorsqu'un thread externe génère une valeur, l'opérateur mergeMap appelle la fonction de rappel qui lui est transmise, ce qui génère un nouveau thread. MergeMap s'abonne ensuite au flux généré.



La particularité de l'opérateur mergeAll / mergeMap est que si un autre flux lui revient, il y souscrit également. Ainsi, dans un flux externe, nous pouvons obtenir des valeurs de plusieurs internes à la fois. Voyons l'exemple suivant:

  timer(0, 1000) 

Voici à quoi ressemblera le flux externe sans l'opérateur mergeMap:



Et donc avec mergeMap:

 timer(0, 1000).pipe( mergeMap(() => interval(1000)) ) 



Chaque seconde, nous créons un nouveau thread interne et mergeMap s'y abonne. Ainsi, nous avons de nombreux threads internes travaillant simultanément, dont les valeurs tombent dans l'externe:





Remarque : soyez prudent en utilisant mergeMap, chaque nouveau thread interne fonctionnera jusqu'à ce que vous vous désabonniez du externe. Dans l'exemple ci-dessus, le nombre de threads internes augmente chaque seconde, au final, il peut y avoir tellement de threads que l'ordinateur ne peut pas faire face à la charge.

concatAll / concatMap


La méthode mergeMap est idéale lorsque vous ne vous souciez pas de l'ordre d'exécution des threads internes, mais qu'en est-il si vous en avez besoin? Supposons que nous voulons que la prochaine requête de serveur soit exécutée uniquement après avoir reçu une réponse de la précédente?

À ces fins, l'opérateur HOO concatAll / concatMap convient. Cet opérateur, ayant souscrit au thread interne, attend jusqu'à ce qu'il se termine, puis seulement s'abonne au suivant.

Si pendant l'exécution d'un thread, un nouveau descend, il est placé dans la file d'attente jusqu'à ce que le précédent soit terminé.

 // ,  1     const firstInnerObservable = timer(1000).pipe( mapTo(1) ); // ,  2     const secondInnerObservable = timer(500).pipe( mapTo(2) ); of( firstInnerObservable, secondInnerObservable ).pipe( concatAll() ).subscribe({ next: console.log }); 

Dans l'exemple ci-dessus, nous créons deux threads en utilisant la méthode timer. Pour plus de clarté, j'ai utilisé l'opérateur mapTo pour afficher différentes valeurs. Le premier thread générera 1, le second - 2. Un thread externe est créé en utilisant la méthode of, qui prend en entrée deux des observables ci-dessus.

L'instruction concatAll reçoit d'abord firstInnerObservable, s'y abonne et attend qu'elle se termine, et seulement après l'achèvement du premier s'abonne à secondInnerObservable. Voici à quoi ressemblera le flux externe:



Si nous remplaçons concatAll par mergeAll, le flux ressemblera à ceci:

 of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log }); 



switchAll / switchMap


Cet opérateur diffère des précédents en ce que lorsqu'il reçoit un nouveau flux, il se désabonne immédiatement du précédent et s'abonne au nouveau.

Prenez l'exemple ci-dessus et remplacez concatAll par switchAll, et voyez comment le flux externe se comporte:

 of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log }); 



Seule la valeur du deuxième flux interne est entrée dans le flux externe. C'est parce que switchMap s'est désabonné du premier lorsqu'il a reçu le deuxième thread.

Quand est-ce nécessaire? Par exemple, lors de la mise en œuvre d'une recherche de données. Si la réponse du serveur n'est pas encore arrivée et que nous avons déjà envoyé une nouvelle demande, il n'est pas logique d'attendre la précédente.

exhaust / exhaustMap


exhaust est exactement l'opposé de l'instruction switchAll, et son comportement est similaire à concatAll. Cette méthode, en vous abonnant au flux, attend qu'elle se termine. Si un nouveau flux lui revient, il est simplement rejeté.

 of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log }); 



Dans l'exemple ci-dessus, nous n'avons pas obtenu de double, car à ce moment, l'opérateur attendait l'achèvement du premier thread, et a simplement laissé tomber le second.

Je pense que beaucoup ont une question, quand un tel comportement peut-il être nécessaire? Un bon exemple est le formulaire de connexion. Cela n'a aucun sens d'envoyer plusieurs requêtes au serveur tant que la requête en cours n'est pas terminée.

Nous finalisons la candidature


Nous rappelons l' exemple du deuxième article . Dans ce document, nous avons implémenté une recherche sur GitHub et utilisé l'opérateur mergeMap pour envoyer des demandes au serveur. Maintenant que nous connaissons les caractéristiques de cet opérateur, est-il vraiment adapté dans notre cas?

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

Supposons que le serveur GitHub soit fortement surchargé, alors le traitement de notre réponse prendra beaucoup de temps. Qu'est-ce qui pourrait mal tourner dans ce cas?

Supposons qu'un utilisateur saisisse certaines données, n'attende pas de réponse et en saisisse de nouvelles. Dans ce cas, nous enverrons la deuxième demande au serveur. Cependant, personne ne garantit que la réponse à la première demande viendra plus tôt.

Étant donné que l'opérateur mergeMap ne se soucie pas de l'ordre dans lequel traiter les threads internes, dans le cas où la première demande est exécutée après la seconde, nous effacerons les données réelles. Par conséquent, je propose de remplacer la méthode mergeMap par switchMap:

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) }) 

Maintenant, si l'utilisateur entre de nouvelles données, switchMap se désabonnera du flux précédent et s'abonnera au nouveau.

Il convient de noter que notre requête http continuera de se bloquer jusqu'à ce que le serveur y réponde. Mais, puisque nous nous sommes désabonnés du flux interne, la réponse ne tombera pas dans le flux externe.

Remarque : si vous travaillez avec Angular et utilisez HttpClient pour travailler avec http, vous ne pouvez pas vous soucier d'annuler la demande elle-même. HttpClient peut le faire pour vous lors de la désinscription.

Annuler http


L'API de récupération a la possibilité d'annuler la demande http à l'aide d' AbortController . Lorsqu'elle est combinée avec l'opérateur switchMap, cette fonctionnalité permettra d'économiser du trafic utilisateur.

Réécrivons un peu notre exemple. Et créez une méthode qui encapsulera l'appel de récupération dans observable:

 const createCancellableRequest = (url) => { //      const controller = new AbortController(); const signal = controller.signal; return new Observable(observer => { fetch(url, { signal }) .then(response => { if (response.ok) { return response.json(); } throw new Error(''); }) //     .then(result => observer.next(result)) //   .then(() => observer.complete()) //   ,     .catch(error => observer.error(error)); // ,    return () => { //   controller.abort(); }; }); }; 

Modifiez également la méthode getUsersRepsFromApi:

 const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); } 

Maintenant, la méthode retourne pas promis, mais observable. Par conséquent, nous supprimons le wrapper de switchMap:

 switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) ) 

Remarque : dans RxJS version 6.5, ils ont ajouté l' instruction fromFetch , qui elle-même appelle la méthode d' abandon sous le capot, de sorte que vous n'avez plus besoin d'écrire votre propre «vélo».

C'est tout! Tous les exemples de code peuvent être trouvés ici .

Conclusion


Aujourd'hui, nous avons examiné ce qu'est HOO et certains opérateurs très utiles de cette catégorie. Bien sûr, ceux-ci étaient loin d'être tous. Pour des informations plus détaillées et détaillées, je vous recommande de consulter la documentation RxJS.

Dans le prochain article, je prévois d'examiner quelle est la différence entre les observables chauds et froids.

Enfin: n'utilisez pas l'abonnement dans l'abonnement, car il y a HOO!

Source: https://habr.com/ru/post/fr450050/


All Articles