
In diesem Artikel werden wir untersuchen, wie es möglich ist, einen anderen in einem Thread zu verarbeiten, warum er benötigt wird und wie Operatoren höherer Ordnung (im Folgenden als HOO bezeichnet) uns dabei helfen.
Artikelserie "Grundlagen der reaktiven Programmierung mit RxJS":
Bei der Arbeit mit Threads tritt häufig die Situation auf, dass die Ergebnisse eines anderen als Wert auf einen Thread übertragen werden müssen. Beispielsweise möchten wir eine Ajax-Anforderung ausführen und ihre Antwort im aktuellen Thread verarbeiten oder mehrere parallele Anforderungen ausführen und Pooling implementieren. Ich denke, viele Menschen sind es gewohnt, solche Probleme mit einem Mechanismus wie Versprechen zu lösen. Aber ist es möglich, sie mit RxJS zu lösen? Natürlich und alles ist viel einfacher als Sie denken!
Hinweis : Um den theoretischen Teil des Artikels zu verstehen, müssen Sie nicht die vorherigen Artikel lesen, sondern nur wissen, was beobachtbar ist, Bediener und Rohre. Im praktischen Teil werden wir das Beispiel aus dem
zweiten Artikel verfeinern, den Sie
hier finden.
Das Problem
Stellen wir uns folgende Aufgabe vor: Wir müssen jede Sekunde herausfinden, ob auf den Server zugegriffen werden kann. Wie können wir es lösen?
Erstellen Sie zunächst einen Stream mit der Timer-Methode:
timer(0, 1000).subscribe({ next: console.log });
Die
Timer- Methode ist im Prinzip dem
Intervall sehr ähnlich. Im Gegensatz dazu können Sie das Thread-Start-Timeout festlegen, das vom ersten Parameter übertragen wird. Der zweite Parameter gibt das Intervall an, in dem ein neuer Wert generiert wird. Wenn der zweite Parameter nicht angegeben wird, generiert der Timer nur einen Wert und beendet den Stream.
Da Sie und ich keinen Server haben, empfehle ich, nur eine Funktion zu schreiben, die eine Anforderung an den Server emuliert:
const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) }
Was macht diese Methode? Es gibt einen Stream zurück, der mit der Timer-Methode erstellt wurde. Nach Ablauf einer Sekunde wird ein Wert ausgegeben und beendet. Da die Timer-Methode nur eine Zahl generiert, verwenden wir den mapTo-Operator, um sie durch die Zeichenfolge "success" zu ersetzen.
So sieht der von der makeRequest-Methode erstellte Stream aus:

Jetzt haben wir die Wahl: die makeRequest-Methode innerhalb des Streams aufzurufen oder diese Verantwortung dem Beobachter zuzuweisen?
Der erste Ansatz ist vorzuziehen, da wir in diesem Fall das volle Potenzial von RxJS mit seinen Betreibern nutzen und unseren Beobachter von unnötigen Pflichten entlasten können. Wir verwenden die Timer-Methode, um Anforderungen nach Intervallen auszuführen:
timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log });
Wenn wir solchen Code ausführen, werden wir sehen, dass wir in console.log keine Nachricht mit dem Text "Erfolg" erhalten, sondern ein Objekt vom Typ Observable:

Die Antwort wird durchaus erwartet, da wir in der Karte den Stream zurückgeben. Damit ein Stream funktioniert, müssen Sie ihn abonnieren. Mal sehen, wie man es
nicht macht :
timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); });
Das Problem mit dem obigen Beispiel ist, dass wir ein Abonnement in einem Abonnement erhalten. Aber was ist, wenn wir mehr als eine Anfrage in einer Kette stellen möchten? Oder was ist, wenn wir uns irgendwann vom Fluss im Inneren abmelden müssen? In diesem Fall ähnelt unser Code immer mehr „Nudeln“. Um dieses Problem zu lösen, verfügt RxJS über spezielle Operatoren namens HOO.
Hoo
HOO ist eine spezielle Art von Anweisungen, die Streams als Werte akzeptieren. Ein solcher Operator ist die mergeAll-Methode.
Wenn ein Stream bei mergeAll ankommt, abonniert er ihn. Der Stream, den der Bediener abonniert hat, wird als intern bezeichnet. Der Stream, von dem der Bediener andere Flows als Werte empfängt, wird als extern bezeichnet.
Wenn ein interner Thread einen Wert generiert, überträgt mergeAll diesen Wert in den externen Thread. Auf diese Weise müssen wir nicht mehr manuell abonnieren. Wenn wir uns vom externen Flow abmelden, wird mergeAll den internen automatisch abbestellen.
Mal sehen, wie wir unser Beispiel mit mergeAll umschreiben können:
timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log });
Im obigen Beispiel wurde der externe Stream von der Timer-Anweisung erstellt. Die im Kartenoperator erstellten Flows sind intern. Jeder erstellte Thread fällt in die Anweisung mergeAll.

Die Kombination map + mergeAll wird sehr häufig verwendet, daher gibt es in RxJS eine mergeMap-Methode:
timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log });
Wenn ein externer Thread einen Wert generiert, ruft der mergeMap-Operator die an ihn übergebene Rückruffunktion auf, die einen neuen Thread generiert. Dann abonniert mergeMap den generierten Stream.

Die Besonderheit des Operators mergeAll / mergeMap besteht darin, dass ein anderer Stream ihn abonniert, wenn er darauf herunterkommt. Somit können wir in einem externen Stream Werte von mehreren internen gleichzeitig abrufen. Sehen wir uns das folgende Beispiel an:
timer(0, 1000)
So sieht der externe Stream ohne den Operator mergeMap aus:

Und so mit mergeMap:
timer(0, 1000).pipe( mergeMap(() => interval(1000)) )

Jede Sekunde erstellen wir einen neuen internen Thread und mergeMap abonniert ihn. Wir haben also viele interne Threads, die gleichzeitig arbeiten und deren Werte in die externen fallen:

Hinweis : Seien
Sie vorsichtig mit mergeMap. Jeder neue interne Thread funktioniert so lange, bis Sie sich vom externen Thread abmelden. Im obigen Beispiel wächst die Anzahl der internen Threads von Sekunde zu Sekunde. Am Ende kann es so viele Threads geben, dass der Computer die Last nicht bewältigen kann.
concatAll / concatMap
Die mergeMap-Methode eignet sich hervorragend, wenn Sie sich nicht für die Ausführungsreihenfolge interner Threads interessieren. Was ist, wenn Sie sie benötigen? Angenommen, wir möchten, dass die nächste Serveranforderung erst ausgeführt wird, nachdem eine Antwort von der vorherigen empfangen wurde.
Für solche Zwecke ist der HOO-Operator concatAll / concatMap geeignet. Dieser Operator, der den internen Thread abonniert hat, wartet, bis er beendet ist, und abonniert erst dann den nächsten.
Wenn während der Ausführung eines Threads ein neuer Thread zu diesem absteigt, wird er in die Warteschlange gestellt, bis der vorherige abgeschlossen ist.
Im obigen Beispiel erstellen wir zwei Threads mit der Timer-Methode. Aus Gründen der Übersichtlichkeit habe ich den mapTo-Operator verwendet, um verschiedene Werte anzuzeigen. Der erste Thread generiert 1, der zweite - 2. Ein externer Thread wird mit der of-Methode erstellt, bei der zwei der oben genannten Werte als Eingabe verwendet werden.
Die concatAll-Anweisung empfängt zuerst firstInnerObservable, abonniert sie und wartet auf ihren Abschluss. Erst nach Abschluss der ersten abonniert sie secondInnerObservable. So sieht der externe Stream aus:

Wenn wir concatAll durch mergeAll ersetzen, sieht der Stream folgendermaßen aus:
of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log });

switchAll / switchMap
Dieser Operator unterscheidet sich von den vorherigen darin, dass er beim Empfang eines neuen Streams den vorherigen sofort abmeldet und den neuen abonniert.
Nehmen Sie das obige Beispiel und ersetzen Sie concatAll durch switchAll. Sehen Sie, wie sich der externe Fluss verhält:
of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log });

Nur der Wert aus dem zweiten internen Stream wurde in den externen Stream übertragen. Dies liegt daran, dass switchMap vom ersten abgemeldet wurde, als es den zweiten Thread erhielt.
Wann wird das benötigt? Zum Beispiel bei der Implementierung einer Datensuche. Wenn die Antwort vom Server noch nicht eingetroffen ist und wir bereits eine neue Anfrage gesendet haben, ist es nicht sinnvoll, auf die vorherige zu warten.
Auspuff / AuspuffMap
Abgas ist das genaue Gegenteil der switchAll-Anweisung, und sein Verhalten ähnelt dem von concatAll. Diese Methode, die den Stream abonniert, wartet auf den Abschluss. Wenn ein neuer Stream darauf ankommt, wird er einfach verworfen.
of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log });

Im obigen Beispiel haben wir keine Zwei erhalten, da der Bediener in diesem Moment auf die Fertigstellung des ersten Threads wartete und einfach den zweiten Thread fallen ließ.
Ich denke, viele haben eine Frage, wann ein solches Verhalten erforderlich sein kann. Ein gutes Beispiel ist das Anmeldeformular. Es ist nicht sinnvoll, mehrere Anforderungen an den Server zu senden, bis die aktuelle abgeschlossen ist.
Wir schließen den Antrag ab
Wir erinnern uns an das
Beispiel aus dem
zweiten Artikel . Darin haben wir eine Suche auf GitHub implementiert und den mergeMap-Operator verwendet, um Anforderungen an den Server zu senden. Jetzt kennen wir die Funktionen dieses Operators. Ist er in unserem Fall wirklich geeignet?
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
Nehmen wir an, dass der GitHub-Server stark überlastet ist und die Verarbeitung unserer Antwort viel Zeit in Anspruch nimmt. Was könnte in diesem Fall möglicherweise schief gehen?
Angenommen, ein Benutzer hat einige Daten eingegeben, nicht auf eine Antwort gewartet und neue eingegeben. In diesem Fall senden wir die zweite Anfrage an den Server. Niemand garantiert jedoch, dass die Antwort auf die erste Anfrage früher kommt.
Da es dem mergeMap-Operator egal ist, in welcher Reihenfolge die internen Threads verarbeitet werden sollen, werden die tatsächlichen Daten gelöscht, wenn die erste Anforderung später als die zweite ausgeführt wird. Daher schlage ich vor, die mergeMap-Methode durch switchMap zu ersetzen:
fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
Wenn der Benutzer neue Daten eingibt, wird switchMap den vorherigen Stream abbestellen und den neuen abonnieren.
Es ist erwähnenswert, dass unsere http-Anfrage so lange hängen bleibt, bis der Server eine Antwort darauf gibt. Da wir uns jedoch vom internen Stream abgemeldet haben, fällt die Antwort nicht in den externen Stream.
Hinweis : Wenn Sie mit Angular arbeiten und HttpClient für die Arbeit mit http verwenden, können Sie sich keine Sorgen machen, die Anforderung selbst abzubrechen. HttpClient kann dies für Sie tun, wenn Sie sich abmelden.
Abbrechen http
Die Abruf-API kann die http-Anforderung mit dem
AbortController abbrechen . In Kombination mit dem switchMap-Operator spart diese Funktionalität Benutzerverkehr.
Lassen Sie uns unser Beispiel ein wenig umschreiben. Und erstellen Sie eine Methode, die den Abruf in Observable umschließt:
const createCancellableRequest = (url) => {
Ändern Sie auch die Methode getUsersRepsFromApi:
const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); }
Jetzt gibt die Methode nicht vielversprechend zurück, sondern beobachtbar. Daher entfernen wir den Wrapper in switchMap:
switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) )
Hinweis : In RxJS Version 6.5 wurde die
fromFetch-Anweisung hinzugefügt, die selbst die Abbruchmethode unter der Haube aufruft, sodass Sie kein eigenes „Fahrrad“ mehr schreiben müssen.
Das ist alles! Den gesamten Beispielcode finden Sie
hier .
Fazit
Heute haben wir uns angesehen, was HOO ist und einige sehr nützliche Operatoren aus dieser Kategorie. Natürlich waren diese weit von allen entfernt. Für detailliertere und detailliertere Informationen empfehle ich den Besuch der RxJS-
Dokumentation .
Im nächsten Artikel möchte ich untersuchen, was der Unterschied zwischen heißen und kalten Observablen ist.
Schließlich: Verwenden Sie das Abonnement nicht im Abonnement, da es HOO gibt!