Asynchroner Stream in C # 8

Die Async / Await-Funktionalität wurde in C # 5 eingeführt, um die Reaktionsfähigkeit der Benutzeroberfläche und den Webzugriff auf Ressourcen zu verbessern. Mit anderen Worten, asynchrone Methoden helfen Entwicklern, asynchrone Operationen auszuführen, die keine Threads blockieren und ein einzelnes skalares Ergebnis zurückgeben. Nach zahlreichen Versuchen von Microsoft, asynchrone Vorgänge zu vereinfachen, hat die Vorlage async / await dank eines einfachen Ansatzes bei Entwicklern einen guten Ruf erlangt.


Bestehende asynchrone Methoden sind erheblich eingeschränkt, da sie nur einen Wert zurückgeben sollten. Schauen wir uns eine async Task<int> DoAnythingAsync() -Methode an, die für eine solche Syntax üblich ist. Das Ergebnis seiner Arbeit ist eine Bedeutung. Aufgrund dieser Einschränkung können Sie diese Funktion nicht mit dem Schlüsselwort yield und der asynchronen IEnumerable<int> -Schnittstelle verwenden (um das Ergebnis einer asynchronen Aufzählung zurückzugeben).



Wenn Sie die async/await Funktion und die yield kombinieren, können Sie ein leistungsstarkes Programmiermodell verwenden, das als asynchrones Daten-Pull bezeichnet wird , oder eine Pull-basierte Aufzählungsaufzählung oder eine asynchrone Async-Sequenz , wie sie in F # genannt wird.


Die neue Möglichkeit, asynchrone Threads in C # 8 zu verwenden, hebt die mit der Rückgabe eines einzelnen Ergebnisses verbundene Einschränkung auf und ermöglicht der asynchronen Methode die Rückgabe mehrerer Werte. Diese Änderungen geben der asynchronen Vorlage mehr Flexibilität, und der Benutzer kann Daten von irgendwo (z. B. aus der Datenbank) mithilfe verzögerter asynchroner Sequenzen abrufen oder Daten von asynchronen Sequenzen in Teilen empfangen, sofern verfügbar.


Ein Beispiel:


 foreach await (var streamChunck in asyncStreams) { Console.WriteLine($“Received data count = {streamChunck.Count}”); } 

Ein anderer Ansatz zur Lösung von Problemen im Zusammenhang mit der asynchronen Programmierung ist die Verwendung von reaktiven Erweiterungen (Rx). Rx gewinnt unter Entwicklern an Bedeutung und diese Methode wird in vielen Programmiersprachen verwendet, beispielsweise Java (RxJava) und JavaScript (RxJS).


Rx basiert auf einem Push-Push-Modell (Tell Don't Ask-Prinzip), das auch als reaktive Programmierung bezeichnet wird. Das heißt, Im Gegensatz zu IEnumerable signalisiert der Datenprovider dem Verbraucher, dass ein neues Element in der Sequenz erscheint, wenn der Verbraucher das nächste Element im Rx-Modell anfordert. Daten werden im asynchronen Modus in die Warteschlange gestellt und vom Verbraucher zum Zeitpunkt des Empfangs verwendet.


In diesem Artikel werde ich ein Modell, das auf Push-Daten basiert (z. B. Rx), mit einem Modell vergleichen, das auf Pull-Daten basiert (z. B. IEnumerable), und zeigen, welche Szenarien für welches Modell am besten geeignet sind. Das gesamte Konzept und die Vorteile werden anhand verschiedener Beispiele und des Demo-Codes untersucht. Am Ende werde ich die Anwendung zeigen und sie anhand eines Codebeispiels demonstrieren.


Vergleich eines Modells basierend auf dem Pushing von Daten mit einem Modell basierend auf dem Pulling von Daten (Pull-)



Abb. -1- Vergleich eines Modells basierend auf Datenabruf mit einem Modell basierend auf Daten Pushing


Diese Beispiele basieren auf der Beziehung zwischen dem Datenanbieter und dem Verbraucher, wie in Abb. -1-. Ein Pull-basiertes Modell ist leicht zu verstehen. Darin fordert der Verbraucher Daten vom Lieferanten an und empfängt diese. Ein alternativer Ansatz ist ein Push-Push-Modell. Hier veröffentlicht der Anbieter die Daten in der Warteschlange und der Verbraucher muss sie abonnieren, um sie zu erhalten.


Das Data-Pull-Modell eignet sich für Fälle, in denen der Anbieter Daten schneller generiert, als der Verbraucher sie verwendet. Somit erhält der Verbraucher nur die erforderlichen Daten, wodurch Überlaufprobleme vermieden werden. Wenn der Verbraucher die Daten schneller verwendet als der Lieferant sie produziert, ist ein Modell geeignet, das auf dem Pushen der Daten basiert. In diesem Fall kann der Lieferant mehr Daten an den Verbraucher senden, damit keine unnötigen Verzögerungen auftreten.


Rx- und Akka-Streams (ein strömungsbasiertes Programmiermodell) verwenden die Gegendruckmethode, um den Durchfluss zu steuern. Um die oben beschriebenen Probleme des Lieferanten und des Empfängers zu lösen, verwendet das Verfahren sowohl Push- als auch Pulling-Daten.


Im folgenden Beispiel bezieht ein langsamer Verbraucher Daten von einem schnelleren Anbieter. Nachdem der Verbraucher das aktuelle Element verarbeitet hat, fragt er den Lieferanten nach dem nächsten und so weiter bis zum Ende der Sequenz.


Nutzungsmotivation und Basisinformationen


Beachten Sie den folgenden Code, um den gesamten Bedarf an asynchronen Threads zu verstehen.


 //       (count) static int SumFromOneToCount(int count) { ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; } //  : const int count = 5; ConsoleExt.WriteLine($"Starting the application with count: {count}!"); ConsoleExt.WriteLine("Classic sum starting."); ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}"); ConsoleExt.WriteLine("Classic sum completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 

Fazit:


Wir können die Methode mithilfe der Yield-Anweisung verschieben, wie unten gezeigt.


 static IEnumerable<int> SumFromOneToCountYield(int count) { ConsoleExt.WriteLine("SumFromOneToCountYield called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; yield return sum; } } 

Methodenaufruf


 const int count = 5; ConsoleExt.WriteLine("Sum with yield starting."); foreach (var i in SumFromOneToCountYield(count)) { ConsoleExt.WriteLine($"Yield sum: {i}"); } ConsoleExt.WriteLine("Sum with yield completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 

Fazit:


Wie im obigen Ausgabefenster gezeigt, wird das Ergebnis in Teilen und nicht in einem einzelnen Wert zurückgegeben. Die oben gezeigten zusammenfassenden Ergebnisse werden als verzögerte Auflistung bezeichnet. Das Problem ist jedoch immer noch nicht behoben: Summierungsmethoden blockieren den Code. Wenn Sie sich die Threads ansehen, können Sie sehen, dass im Haupt-Thread alles läuft.


Wenden wir das asynchrone Zauberwort auf die erste SumFromOneToCount-Methode an (ohne Ausbeute).


 static async Task<int> SumFromOneToCountAsync(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsync called!"); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }); return result; } 

Methodenaufruf


 const int count = 5; ConsoleExt.WriteLine("async example starting."); //      . ,  . ,        . var result = await SumFromOneToCountAsync(count); ConsoleExt.WriteLine("async Result: " + result); ConsoleExt.WriteLine("async completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 

Fazit:


Großartig. Jetzt werden die Berechnungen in einem anderen Thread durchgeführt, aber das Problem mit dem Ergebnis besteht weiterhin. Das System gibt das Ergebnis mit einem einzelnen Wert zurück.
Stellen Sie sich vor, wir können verzögerte Aufzählungen (Yield-Anweisung) und asynchrone Methoden in einem imperativen Programmierstil kombinieren. Die Kombination wird als asynchrone Streams bezeichnet. Dies ist eine neue Funktion in C # 8. Sie eignet sich hervorragend zur Lösung von Problemen im Zusammenhang mit einem Programmiermodell, das auf Datenextraktion basiert, z. B. zum Herunterladen von Daten von einer Site oder zum Lesen von Datensätzen in einer Datei oder Datenbank auf moderne Weise.


Versuchen wir dies in der aktuellen Version von C #. Ich werde das asynchrone Schlüsselwort der SumFromOneToCountYield-Methode wie folgt hinzufügen:



Abb. -2- Fehler bei gleichzeitiger Verwendung von Yield und Async Keyword.


Wenn wir versuchen, SumFromOneToCountYield asynchron hinzuzufügen, tritt ein Fehler wie oben gezeigt auf.
Lass es uns anders versuchen. Wir können das Yield-Schlüsselwort entfernen und IEnumerable in der Aufgabe anwenden, wie unten gezeigt:


 static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!"); var collection = new Collection<int>(); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; collection.Add(sum); } return collection; }); return result; } 

Methodenaufruf


 const int count = 5; ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!"); var scs = await SumFromOneToCountTaskIEnumerable(count); ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!"); foreach (var sc in scs) { //   ,  .     . ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}"); } ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 

Fazit:


Wie Sie dem Beispiel entnehmen können, wird alles im asynchronen Modus berechnet, das Problem bleibt jedoch weiterhin bestehen. Ergebnisse (alle Ergebnisse werden in einer Sammlung gesammelt) werden als einzelner Block zurückgegeben. Und das brauchen wir nicht. Wenn Sie sich erinnern, war es unser Ziel, den asynchronen Berechnungsmodus mit der Möglichkeit einer Verzögerung zu kombinieren.


Dazu müssen Sie eine externe Bibliothek verwenden, z. B. Ix (Teil von Rx) oder asynchrone Threads, die in C # dargestellt werden.


Kehren wir zu unserem Code zurück. Um asynchrones Verhalten zu demonstrieren, habe ich eine externe Bibliothek verwendet .


 static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence) { ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called"); await sequence.ForEachAsync(value => { ConsoleExt.WriteLineAsync($"Consuming the value: {value}"); //    Task.Delay(TimeSpan.FromSeconds(1)).Wait(); }); } static IEnumerable<int> ProduceAsyncSumSeqeunc(int count) { ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; //    Task.Delay(TimeSpan.FromSeconds(0,5)).Wait(); yield return sum; } } 

Methodenaufruf


 const int count = 5; ConsoleExt.WriteLine("Starting Async Streams Demo!"); //   .       . IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable(); ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#"); //    ;      . var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence)); //   . ,    . consumingTask.Wait(); ConsoleExt.WriteLineAsync("Async Streams Demo Done!"); 

Fazit:


Schließlich sehen wir das gewünschte Verhalten. Sie können eine Aufzählungsschleife im asynchronen Modus ausführen.
Siehe Quellcode hier .


Asynchrones Datenabrufen am Beispiel der Client-Server-Architektur


Schauen wir uns dieses Konzept mit einem realistischeren Beispiel an. Alle Vorteile dieser Funktion lassen sich am besten im Kontext der Client-Server-Architektur erkennen.


Synchroner Aufruf bei Client-Server-Architektur


Wenn eine Anforderung an den Server gesendet wird, muss der Client warten (d. H. Blockiert werden), bis eine Antwort eintrifft, wie in Fig. 4 gezeigt. -3-.



Abb. -3- Synchrones Datenabrufen, während dessen der Client wartet, bis die Anforderungsverarbeitung abgeschlossen ist


Asynchrones Datenabrufen


In diesem Fall fordert der Client Daten an und fährt mit anderen Aufgaben fort. Sobald die Daten empfangen wurden, wird der Client die Arbeit fortsetzen.



Abb. -4- Asynchrones Datenabrufen, bei dem der Client andere Aufgaben ausführen kann, während Daten angefordert werden


Daten asynchron abrufen


In diesem Fall fordert der Client einen Teil der Daten an und führt weiterhin andere Aufgaben aus. Nach dem Empfang der Daten verarbeitet der Client diese und fordert den nächsten Teil usw. an, bis alle Daten empfangen wurden. Aus diesem Szenario entstand die Idee von asynchronen Threads. In Abb. -5- zeigt, wie der Client die empfangenen Daten verarbeiten oder andere Aufgaben ausführen kann.



Abb. -5- Abrufen von Daten als asynchrone Sequenz (asynchrone Streams). Der Client ist nicht blockiert.


Asynchrone Threads


Wie bei IEnumerable<T> und IEnumerator<T> gibt es zwei neue IAsyncEnumerable<T> und IAsyncEnumerator<T> -Schnittstellen, die wie IAsyncEnumerator<T> definiert sind:


 public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(); } public interface IAsyncEnumerator<out T> : IAsyncDisposable { Task<bool> MoveNextAsync(); T Current { get; } } //      public interface IAsyncDisposable { Task DiskposeAsync(); } 

In InfoQ hat Jonathan Allen dieses Thema richtig verstanden. Hier werde ich nicht auf Details eingehen, daher empfehle ich, seinen Artikel zu lesen .


Der Fokus liegt auf dem Rückgabewert von Task<bool> MoveNextAsync() (geändert von bool in Task<bool> , bool IEnumerator.MoveNext() ). Dank ihm werden alle Berechnungen sowie deren Iteration asynchron erfolgen. Der Verbraucher entscheidet, wann er den nächsten Wert erhält. Obwohl es sich um ein asynchrones Modell handelt, wird immer noch das Abrufen von Daten verwendet. Für die asynchrone Bereinigung von Ressourcen können Sie die IAsyncDisposable Schnittstelle verwenden. Weitere Informationen zu asynchronen Threads finden Sie hier .


Syntax


Die endgültige Syntax sollte ungefähr so ​​aussehen:


 foreach await (var dataChunk in asyncStreams) { //        yield    . } 

Aus dem obigen Beispiel geht hervor, dass wir anstelle der Berechnung eines einzelnen Werts theoretisch nacheinander eine Reihe von Werten berechnen können, während wir auf andere asynchrone Operationen warten.


Überarbeitetes Microsoft-Beispiel


Ich habe den Demo-Code von Microsoft neu geschrieben. Es kann vollständig von meinem GitHub-Repository heruntergeladen werden .


Das Beispiel basiert auf der Idee, einen großen Stream im Speicher (ein Array von 20.000 Bytes) zu erstellen und Elemente im asynchronen Modus nacheinander daraus zu extrahieren. Während jeder Iteration werden 8 KB aus dem Array gezogen.




In Schritt (1) wird ein großes Datenarray erstellt, das mit Dummy-Werten gefüllt ist. Dann wird während Schritt (2) eine Variable definiert, die als Prüfsumme bezeichnet wird. Diese Variable, die die Prüfsumme enthält, soll die Richtigkeit der Summe der Berechnungen überprüfen. Ein Array und eine Prüfsumme werden im Speicher erstellt und in Schritt (3) als Folge von Elementen zurückgegeben.


Schritt (4) beinhaltet die Anwendung der AsEnumarble Erweiterungsmethode (der passendere Name ist AsAsyncEnumarble), mit deren Hilfe ein asynchroner Stream von 8 KB simuliert werden kann (BufferSize = 8000 Elemente (6)).


Das Erben von IAsyncEnumerable ist normalerweise nicht erforderlich. In dem oben gezeigten Beispiel wird diese Operation ausgeführt, um den Demo-Code zu vereinfachen, wie in Schritt (5) gezeigt.


Schritt (7) beinhaltet die Verwendung des Schlüsselworts foreach , das 8 KB Datenblöcke aus einem asynchronen Datenstrom im Speicher extrahiert. Der Abrufvorgang erfolgt nacheinander: Wenn der Verbraucher (ein Teil des Codes, der das foreach ) bereit ist, die nächsten Daten zu empfangen, zieht er sie vom Anbieter (dem im Stream im Speicher enthaltenen Array). Wenn der Zyklus abgeschlossen ist, überprüft das Programm schließlich den Wert von 'c' für die Prüfsumme und zeigt, wenn sie übereinstimmen, die Meldung "Prüfsummen stimmen überein!" Gemäß Schritt (8) an.


Microsoft Demo-Ausgabefenster:



Fazit


Wir haben uns asynchrone Threads angesehen, die sich hervorragend zum asynchronen Abrufen von Daten und zum Schreiben von Code eignen, der im asynchronen Modus mehrere Werte generiert.
Mit diesem Modell können Sie das nächste Datenelement in einer Sequenz abfragen und eine Antwort erhalten. Es unterscheidet sich vom IObservable<T> Push-Modell, mit dem Werte unabhängig vom Status des Verbrauchers generiert werden. Mit asynchronen Streams können Sie vom Verbraucher kontrollierte asynchrone Datenquellen perfekt darstellen, wenn er selbst die Bereitschaft bestimmt, die nächsten Daten zu akzeptieren. Beispiele hierfür sind die Verwendung von Webanwendungen oder das Lesen von Datensätzen in einer Datenbank.


Ich habe gezeigt, wie eine Aufzählung im asynchronen Modus erstellt und mithilfe einer externen Bibliothek mit asynchroner Reihenfolge verwendet wird. Ich habe auch gezeigt, welche Vorteile diese Funktion beim Herunterladen von Inhalten aus dem Internet bietet. Schließlich haben wir uns die neue Syntax für asynchrone Threads sowie ein vollständiges Beispiel für deren Verwendung auf der Grundlage des Microsoft Build Demo-Codes angesehen ( 7. bis 9. Mai 2018 // Seattle, WA ).


Source: https://habr.com/ru/post/de462755/


All Articles