Die Grundlagen der reaktiven Programmierung mit RxJS. Teil 2. Bediener und Rohre



In einem früheren Artikel haben wir uns angesehen, was Streams sind und womit sie essen. Im neuen Teil werden wir lernen, welche Methoden RxJS zum Erstellen von Flows bereitstellt, was Operatoren, Pipes sind und wie mit ihnen gearbeitet wird.

Artikelserie "Grundlagen der reaktiven Programmierung mit RxJS":



RxJS verfügt über eine umfangreiche API . Die Dokumentation beschreibt mehr als hundert Methoden. Um sie ein wenig kennenzulernen, werden wir eine einfache Anwendung schreiben und in der Praxis sehen, wie der reaktive Code aussieht. Sie werden sehen, dass dieselben Aufgaben, die früher routinemäßig erschienen und viel Code schreiben mussten, eine elegante Lösung haben, wenn Sie sie durch das Prisma der Reaktivität betrachten. Bevor wir jedoch mit der Praxis beginnen, werden wir uns ansehen, wie Flows grafisch dargestellt werden können, und uns mit praktischen Methoden zum Erstellen und Verarbeiten von Flows vertraut machen.

Grafische Darstellung von Threads


Um deutlich zu machen, wie sich ein bestimmter Fluss verhält, verwende ich die im reaktiven Ansatz verwendete Notation. Erinnern Sie sich an unser Beispiel aus dem vorherigen Artikel:

const observable = new Observable((observer) => { observer.next(1); observer.next(2); observer.complete(); }); 

So sieht die grafische Darstellung aus:



Die Strömung wird normalerweise als gerade Linie dargestellt. Wenn der Stream einen Wert ausgibt, wird er in der Zeile als Kreis angezeigt. Eine gerade Linie im Display ist das Signal zum Beenden des Streams. Verwenden Sie das Symbol „ד, um den Fehler anzuzeigen.

 const observable = new Observable((observer) => { observer.error(); }); 



Eine Zeile wird gestreamt


In meiner Praxis musste ich selten meine eigenen Observable-Instanzen direkt erstellen. Die meisten Methoden zum Erstellen von Threads befinden sich bereits in RxJS. Um einen Stream mit den Werten 1 und 2 zu erstellen, reicht es aus, die of-Methode zu verwenden:

 const observable = of(1, 2); 

Die of-Methode akzeptiert eine beliebige Anzahl von Argumenten und gibt eine fertige Instanz des Observable zurück. Nach dem Abonnieren werden die empfangenen Werte ausgegeben und Folgendes ausgeführt:



Wenn Sie das Array als Stream darstellen möchten, können Sie die from-Methode verwenden. Die from-Methode als Argument erwartet jedes iterierbare Objekt (Array, String usw.) oder Versprechen und projiziert dieses Objekt auf den Stream. So sieht der aus der Zeichenfolge erhaltene Stream aus:

 const observable = from('abc'); 



Und so können Sie ein Versprechen in einen Stream einwickeln:

 const promise = new Promise((resolve, reject) => { resolve(1); }); const observable = from(promise); 



Hinweis: Oft werden Threads mit Versprechen verglichen. Tatsächlich haben sie nur eines gemeinsam - eine Push-Strategie zur Verbreitung von Veränderungen. Der Rest sind völlig andere Einheiten. Versprechen kann nicht mehrere Werte erzeugen. Es kann nur Auflösung oder Zurückweisung ausführen, d. H. habe nur zwei Zustände. Ein Stream kann mehrere Werte übertragen und wiederverwendet werden.

Erinnerst du dich an das Beispiel mit dem Intervall vom ersten Artikel ? Dieser Stream ist ein Timer, der die Zeit in Sekunden ab dem Zeitpunkt des Abonnements zählt.

 const timer = new Observable(observer => { let counter = 0; const intervalId = setInterval(() => { observer.next(counter++); }, 1000); return () => { clearInterval(intervalId); } }); 

So können Sie dasselbe in einer Zeile implementieren:

 const timer = interval(1000); 



Und schließlich eine Methode, mit der Sie einen Ereignisstrom für DOM-Elemente erstellen können:

 const observable = fromEvent(domElementRef, 'keyup'); 

Als Werte empfängt und sendet dieser Stream Keyup-Ereignisobjekte.

Rohre & Bediener


Pipe ist eine Observable-Klassenmethode, die in RxJS in Version 5.5 hinzugefügt wurde. Dank dessen können wir Operatorketten für die sequentielle Verarbeitung der im Stream empfangenen Werte erstellen. Pipe ist ein unidirektionaler Kanal, der die Bediener miteinander verbindet. Die Operatoren selbst sind normale Funktionen, die in RxJS beschrieben sind und Werte aus einem Stream verarbeiten.

Sie können beispielsweise den Wert konvertieren und weiter an den Stream übergeben, oder sie können als Filter fungieren und keine Werte überspringen, wenn sie die angegebene Bedingung nicht erfüllen.

Schauen wir uns die Operatoren in Aktion an. Multiplizieren Sie jeden Wert aus dem Stream mit 2 mit dem Kartenoperator:

 of(1,2,3).pipe( map(value => value * 2) ).subscribe({ next: console.log }); 

So sieht der Stream aus, bevor Sie den Kartenoperator anwenden:



Nach der Map-Anweisung:



Verwenden wir den Filteroperator. Diese Anweisung funktioniert genau wie die Filterfunktion in der Array-Klasse. Die Methode verwendet eine Funktion als erstes Argument, das eine Bedingung beschreibt. Wenn der Wert aus dem Stream die Bedingung erfüllt, wird er weitergegeben:

 of(1, 2, 3).pipe( //     filter(value => value % 2 !== 0), map(value = value * 2) ).subscribe({ next: console.log }); 

Und so wird das gesamte Schema unseres Streams aussehen:



Nach dem Filter:



Nach Karte:



Hinweis: pipe! == abonnieren. Die Pipe-Methode deklariert das Fließverhalten, abonniert jedoch nicht. Bis Sie die Subscribe-Methode aufrufen, funktioniert Ihr Stream nicht.

Wir schreiben eine Bewerbung


Nachdem wir herausgefunden haben, was Rohrleitungen und Bediener sind, können Sie mit dem Üben beginnen. Unsere Anwendung führt eine einfache Aufgabe aus: Anzeigen einer Liste offener Github-Repositorys anhand des eingegebenen Spitznamens des Besitzers.

Es wird nur wenige Anforderungen geben:

  • Führen Sie keine API-Anforderung aus, wenn die in die Eingabe eingegebene Zeichenfolge weniger als 3 Zeichen enthält.
  • Um die Anforderung für jedes vom Benutzer eingegebene Zeichen nicht zu erfüllen, muss vor dem Zugriff auf die API eine Verzögerung (Entprellen) von 700 Millisekunden festgelegt werden.

Um nach Repositorys zu suchen, verwenden wir die Github-API . Ich empfehle, die Beispiele selbst auf stackblitz auszuführen . Dort habe ich die fertige Implementierung angelegt. Links finden Sie am Ende des Artikels.

Beginnen wir mit dem HTML-Markup. Beschreiben wir die Eingabe- und ul-Elemente:

 <input type="text"> <ul></ul> 

In der Datei js oder ts erhalten wir dann mithilfe der Browser-API Links zu den aktuellen Elementen:

 const input = document.querySelector('input'); const ul = document.querySelector('ul'); 

Wir benötigen auch eine Methode, die eine Anforderung an die Github-API ausführt. Unten finden Sie den Code für die Funktion getUsersRepsFromAPI, die den Spitznamen des Benutzers akzeptiert und eine Ajax-Anforderung mithilfe von fetch ausführt. Dann gibt es ein Versprechen zurück und wandelt die erfolgreiche Antwort auf dem Weg in json um:

 const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return fetch(url) .then(response => { if(response.ok) { return response.json(); } throw new Error(''); }); } 

Als nächstes schreiben wir eine Methode, die die Namen der Repositorys auflistet:

 const recordRepsToList = (reps) => { for (let i = 0; i < reps.length; i++) { //    ,    if (!ul.children[i]) { const newEl = document.createElement('li'); ul.appendChild(newEl); } //      const li = ul.children[i]; li.innerHTML = reps[i].name; } //    while (ul.children.length > reps.length) { ul.removeChild(ul.lastChild); } } 

Die Vorbereitungen sind abgeschlossen. Es ist Zeit, einen Blick auf RxJS in Aktion zu werfen. Wir müssen uns das Keyup-Ereignis unserer Eingabe anhören. Zunächst müssen wir verstehen, dass wir in einem reaktiven Ansatz mit Flüssen arbeiten. Glücklicherweise bietet RxJS bereits eine ähnliche Option. Denken Sie an die oben erwähnte fromEvent-Methode. Wir benutzen es:

 const keyUp = fromEvent(input, 'keyup'); keyUp.subscribe({ next: console.log }); 

Jetzt wird unsere Veranstaltung als Stream präsentiert. Wenn wir uns ansehen, was in der Konsole angezeigt wird, sehen wir ein Objekt vom Typ KeyboardEvent. Wir benötigen jedoch einen vom Benutzer eingegebenen Wert. Hier bieten sich die Pipe-Methode und der Kartenoperator an:

 fromEvent(input, 'keyup').pipe( map(event => event.target.value) ).subscribe({ next: console.log }); 

Wir fahren mit der Umsetzung der Anforderungen fort. Zunächst führen wir die Abfrage aus, wenn der eingegebene Wert mehr als zwei Zeichen enthält. Verwenden Sie dazu den Filteroperator:

 fromEvent(input, 'keyup').pipe( map(event => event.target.value), filter(value => value.length > 2) ) 

Wir haben uns mit der ersten Anforderung befasst. Wir fahren mit dem zweiten fort. Wir müssen Debounce implementieren. RxJS hat eine debounceTime-Anweisung. Dieser Operator nimmt als erstes Argument die Anzahl der Millisekunden an, in denen der Wert vor der Weitergabe gehalten wird. In diesem Fall setzt jeder neue Wert den Timer zurück. Somit erhalten wir am Ausgang den letzten Wert, nach dem 700 Millisekunden vergangen sind.

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(value => value.length > 2) ) 

So könnte unser Stream ohne debounceTime aussehen:



Und so sieht derselbe Stream aus, der durch diese Anweisung geleitet wird:



Mit debounceTime wird die API mit geringerer Wahrscheinlichkeit verwendet, wodurch Datenverkehr gespart und der Server entlastet wird.

Für zusätzliche Optimierung schlage ich vor, einen anderen Operator zu verwenden - uniqueUntilChanged. Diese Methode schützt uns vor Duplikaten. Es ist am besten, seine Arbeit anhand eines Beispiels zu zeigen:

 from('aaabccc').pipe( distinctUntilChanged() ) 

Ohne eindeutigeUntilChanged:



Mit uniqueUntilChanged:



Fügen Sie diese Anweisung unmittelbar nach der debounceTime-Anweisung hinzu. Daher werden wir nicht auf die API zugreifen, wenn der neue Wert aus irgendeinem Grund mit dem vorherigen übereinstimmt. Eine ähnliche Situation kann auftreten, wenn der Benutzer neue Zeichen eingegeben und diese dann wieder gelöscht hat. Da wir eine Verzögerung implementiert haben, fällt nur der letzte Wert in den Stream, auf den wir bereits eine Antwort haben.

Gehen Sie zum Server


Bereits jetzt können wir die Logik der Anfrage und die Verarbeitung der Antwort beschreiben. Wir können zwar nur mit Versprechen arbeiten. Daher beschreiben wir einen anderen Kartenoperator, der die Methode getUsersRepsFromAPI aufruft. Im Beobachter beschreiben wir die Verarbeitungslogik unseres Versprechens:

 /*  !     RxJS    promise,      */ fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), map(value => getUsersRepsFromAPI(value)) ).subscribe({ next: promise => promise.then(reps => recordRepsToList(reps)) }); 

Im Moment haben wir alles umgesetzt, was wir wollten. Unser Beispiel hat jedoch einen großen Nachteil: Es gibt keine Fehlerbehandlung. Unser Beobachter erhält nur ein Versprechen und hat keine Ahnung, dass etwas schief gehen könnte.

Natürlich können wir bei der nächsten Methode das Versprechen einhalten, aber aus diesem Grund wird unser Code immer mehr einer „Rückruf-Hölle“ ähneln. Wenn wir plötzlich eine weitere Anforderung ausführen müssen, erhöht sich die Komplexität des Codes.

Hinweis: Die Verwendung von Versprechen im RxJS-Code wird als Antipattern betrachtet. Versprechen hat viele Nachteile gegenüber beobachtbar. Es kann nicht rückgängig gemacht und nicht wiederverwendet werden. Wenn Sie eine Wahl haben, wählen Sie beobachtbar. Gleiches gilt für die toPromise-Methode der Observable-Klasse. Diese Methode wurde aus Gründen der Kompatibilität mit Bibliotheken implementiert, die nicht mit Streams arbeiten können.

Wir können die from-Methode verwenden, um ein Versprechen auf einen Stream zu projizieren. Diese Methode ist jedoch mit zusätzlichen Aufrufen der subscribe-Methode behaftet und führt auch zu Wachstum und Komplexität des Codes.

Dieses Problem kann mit dem Operator mergeMap gelöst werden:

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log }) 

Jetzt müssen wir keine Versprechen-Verarbeitungslogik schreiben. Die from-Methode hat einen Versprechen-Stream erstellt und vom mergeMap-Operator verarbeitet. Wenn das Versprechen erfolgreich erfüllt wird, wird die nächste Methode aufgerufen und unser Beobachter erhält das fertige Objekt. Wenn ein Fehler auftritt, wird die Fehlermethode aufgerufen und unser Beobachter gibt einen Fehler in der Konsole aus.

Der mergeMap-Operator unterscheidet sich geringfügig von den Operatoren, mit denen wir zuvor gearbeitet haben. Er gehört zu den sogenannten Observables höherer Ordnung , auf die ich im nächsten Artikel eingehen werde. Mit Blick auf die Zukunft werde ich jedoch sagen, dass die mergeMap-Methode selbst den Stream abonniert.

Fehlerbehandlung


Wenn unser Thread einen Fehler empfängt, wird er beendet. Und wenn wir versuchen, nach einem Fehler mit der Anwendung zu interagieren, erhalten wir keine Reaktion, da unser Thread abgeschlossen ist.

Hier hilft uns der Operator catchError. catchError wird nur ausgelöst, wenn im Stream ein Fehler auftritt. Sie können es abfangen, verarbeiten und den üblichen Wert an den Stream zurückgeben, was nicht zu seiner Fertigstellung führt.

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => from(getUsersRepsFromAPI(value))), catchError(err => of([])) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log }) 

Wir fangen den Fehler in catchError ab und geben stattdessen einen Stream mit einem leeren Array zurück. Wenn nun ein Fehler auftritt, löschen wir die Liste der Repositorys. Aber dann endet der Fluss wieder.

Die Sache ist, dass catchError unseren ursprünglichen Stream durch einen neuen ersetzt. Und dann hört unser Beobachter nur auf ihn. Wenn der of-Stream ein leeres Array ausgibt, wird die vollständige Methode aufgerufen.

Um unseren ursprünglichen Thread nicht zu ersetzen, rufen wir den catchError-Operator für den from-Thread innerhalb des mergeMap-Operators auf.

 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps), error: console.log }) 

Somit wird unser ursprünglicher Stream nichts bemerken. Anstelle eines Fehlers wird ein leeres Array angezeigt.

Fazit


Wir haben endlich angefangen zu üben und haben gesehen, wofür Rohrleitungen und Bediener sind. Wir haben uns angesehen, wie Sie mithilfe der von RxJS bereitgestellten Rich-API Code reduzieren können. Natürlich ist unsere Bewerbung noch nicht fertig. Im nächsten Teil werden wir analysieren, wie es möglich ist, eine andere in einem Thread zu verarbeiten und unsere http-Anfrage abzubrechen, um noch mehr Verkehr und Ressourcen unserer Bewerbung zu sparen. Und damit Sie den Unterschied sehen können, habe ich ein Beispiel ohne Verwendung von RxJS erstellt. Sie können es hier sehen . Unter diesem Link finden Sie den vollständigen Code der aktuellen Anwendung. Um die Schaltungen zu erzeugen, habe ich den RxJS-Visualizer verwendet .

Ich hoffe, dieser Artikel hat Ihnen geholfen, besser zu verstehen, wie RxJS funktioniert. Ich wünsche Ihnen viel Erfolg beim Studium!

Source: https://habr.com/ru/post/de444290/


All Articles