Flux asynchrone en C # 8

La fonctionnalité Async / Await a été introduite dans C # 5 pour améliorer la réactivité de l'interface utilisateur et l'accès Web aux ressources. En d'autres termes, les méthodes asynchrones aident les développeurs à effectuer des opérations asynchrones qui ne bloquent pas les threads et renvoient un résultat scalaire unique. Après de nombreuses tentatives de Microsoft pour simplifier les opérations asynchrones, le modèle async / attente a acquis une bonne réputation auprès des développeurs grâce à une approche simple.


Les méthodes asynchrones existantes sont considérablement limitées en ce qu'elles ne doivent renvoyer qu'une seule valeur. Examinons une méthode async Task<int> DoAnythingAsync() qui est courante pour une telle syntaxe. Le résultat de son travail est une signification unique. En raison de cette limitation, vous ne pouvez pas utiliser cette fonction avec le mot clé yield et l'interface asynchrone IEnumerable<int> (pour renvoyer le résultat d'une énumération asynchrone).



Si vous combinez la fonction async/await et l' yield , vous pouvez utiliser un puissant modèle de programmation appelé extraction de données asynchrone , ou énumération basée sur l'extraction, ou une séquence asynchrone asynchrone , comme on l'appelle en F #.


La nouvelle possibilité d'utiliser des threads asynchrones en C # 8 supprime la limitation associée au renvoi d'un seul résultat et permet à la méthode asynchrone de renvoyer plusieurs valeurs. Ces modifications donneront au modèle asynchrone plus de flexibilité et l'utilisateur pourra récupérer des données de quelque part (par exemple, de la base de données) en utilisant des séquences asynchrones retardées ou recevoir des données de séquences asynchrones en parties selon les disponibilités.


Un exemple:


 foreach await (var streamChunck in asyncStreams) { Console.WriteLine($“Received data count = {streamChunck.Count}”); } 

Une autre approche pour résoudre les problèmes liés à la programmation asynchrone consiste à utiliser des extensions réactives (Rx). Rx gagne en importance parmi les développeurs et cette méthode est utilisée dans de nombreux langages de programmation, par exemple Java (RxJava) et JavaScript (RxJS).


Rx est basé sur un modèle push push (principe Tell Don't Ask), également connu sous le nom de programmation réactive. C'est-à-dire contrairement à IEnumerable, lorsque le consommateur demande l'élément suivant, dans le modèle Rx, le fournisseur de données signale au consommateur qu'un nouvel élément apparaît dans la séquence. Les données sont poussées dans la file d'attente en mode asynchrone et le consommateur les utilise au moment de la réception.


Dans cet article, je vais comparer un modèle basé sur la transmission de données (telles que Rx) avec un modèle basé sur l'extraction de données (telles que IEnumerable), et montrer également quels scénarios conviennent le mieux à quel modèle. L'ensemble du concept et des avantages est examiné avec une variété d'exemples et de code de démonstration. À la fin, je vais montrer l'application et la démontrer avec un exemple de code.


Comparaison d'un modèle basé sur la transmission de données avec un modèle basé sur la collecte de données (pull-)



Fig. -1- Comparaison d'un modèle basé sur l'extraction de données avec un modèle basé sur l'extraction de données


Ces exemples sont basés sur la relation entre le fournisseur de données et le consommateur, comme le montre la Fig. -1-. Un modèle basé sur l'extraction est facile à comprendre. Dans ce document, le consommateur demande et reçoit des données du fournisseur. Une approche alternative est un modèle push push. Ici, le fournisseur publie les données dans la file d'attente et le consommateur doit s'y abonner pour les recevoir.


Le modèle d'extraction de données convient aux cas où le fournisseur génère des données plus rapidement que le consommateur ne les utilise. Ainsi, le consommateur ne reçoit que les données nécessaires, ce qui évite les problèmes de débordement. Si le consommateur utilise les données plus rapidement que le fournisseur ne les produit, un modèle basé sur la transmission des données convient. Dans ce cas, le fournisseur peut envoyer plus de données au consommateur afin qu'il n'y ait pas de retards inutiles.


Rx et Akka Streams (un modèle de programmation basé sur le flux) utilisent la méthode de la contre-pression pour contrôler le flux. Pour résoudre les problèmes du fournisseur et du destinataire décrits ci-dessus, la méthode utilise à la fois la transmission et l'extraction de données.


Dans l'exemple ci-dessous, un consommateur lent extrait des données d'un fournisseur plus rapide. Une fois que le consommateur a traité l'élément en cours, il demandera au fournisseur le suivant et ainsi de suite jusqu'à la fin de la séquence.


Motivation à utiliser et informations de base


Pour comprendre tout le besoin de threads asynchrones, considérez le code suivant.


 //       (count) static int SumFromOneToCount(int count) { ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; } //  : const int count = 5; ConsoleExt.WriteLine($"Starting the application with count: {count}!"); ConsoleExt.WriteLine("Classic sum starting."); ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}"); ConsoleExt.WriteLine("Classic sum completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 

Conclusion:


Nous pouvons différer la méthode en utilisant la déclaration de rendement, comme indiqué ci-dessous.


 static IEnumerable<int> SumFromOneToCountYield(int count) { ConsoleExt.WriteLine("SumFromOneToCountYield called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; yield return sum; } } 

Appel de méthode


 const int count = 5; ConsoleExt.WriteLine("Sum with yield starting."); foreach (var i in SumFromOneToCountYield(count)) { ConsoleExt.WriteLine($"Yield sum: {i}"); } ConsoleExt.WriteLine("Sum with yield completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 

Conclusion:


Comme indiqué dans la fenêtre de sortie ci-dessus, le résultat est renvoyé en plusieurs parties, et non en une seule valeur. Les résultats résumés ci-dessus sont connus sous le nom d'inscription différée. Cependant, le problème n'est toujours pas résolu: les méthodes de sommation bloquent le code. Si vous regardez les threads, vous pouvez voir que tout fonctionne dans le thread principal.


Appliquons le mot magique asynchrone à la première méthode SumFromOneToCount (sans rendement).


 static async Task<int> SumFromOneToCountAsync(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsync called!"); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }); return result; } 

Appel de méthode


 const int count = 5; ConsoleExt.WriteLine("async example starting."); //      . ,  . ,        . var result = await SumFromOneToCountAsync(count); ConsoleExt.WriteLine("async Result: " + result); ConsoleExt.WriteLine("async completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 

Conclusion:


Super. Maintenant, les calculs sont effectués dans un thread différent, mais le problème avec le résultat existe toujours. Le système renvoie le résultat avec une seule valeur.
Imaginez que nous pouvons combiner des énumérations différées (déclaration de rendement) et des méthodes asynchrones dans un style de programmation impératif. La combinaison est appelée flux asynchrones et il s'agit d'une nouvelle fonctionnalité en C # 8. Elle est idéale pour résoudre les problèmes associés à un modèle de programmation basé sur l'extraction de données, par exemple, télécharger des données à partir d'un site ou lire des enregistrements dans un fichier ou une base de données de manière moderne.


Essayons de le faire dans la version actuelle de C #. J'ajouterai le mot clé async à la méthode SumFromOneToCountYield comme suit:



Fig. -2- Erreur lors de l'utilisation simultanée du mot-clé yield et async.


Lorsque nous essayons d'ajouter asynchrone à SumFromOneToCountYield, une erreur se produit comme indiqué ci-dessus.
Essayons différemment. Nous pouvons supprimer le mot clé yield et appliquer IEnumerable dans la tâche, comme indiqué ci-dessous:


 static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!"); var collection = new Collection<int>(); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; collection.Add(sum); } return collection; }); return result; } 

Appel de méthode


 const int count = 5; ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!"); var scs = await SumFromOneToCountTaskIEnumerable(count); ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!"); foreach (var sc in scs) { //   ,  .     . ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}"); } ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 

Conclusion:


Comme vous pouvez le voir dans l'exemple, tout est calculé en mode asynchrone, mais le problème persiste. Les résultats (tous les résultats sont collectés dans une collection) sont renvoyés en un seul bloc. Et ce n'est pas ce dont nous avons besoin. Si vous vous souvenez, notre objectif était de combiner le mode de calcul asynchrone avec la possibilité de retard.


Pour ce faire, vous devez utiliser une bibliothèque externe, par exemple, Ix (partie de Rx) ou des threads asynchrones, présentés en C #.


Revenons à notre code. Pour démontrer un comportement asynchrone, j'ai utilisé une bibliothèque externe .


 static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence) { ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called"); await sequence.ForEachAsync(value => { ConsoleExt.WriteLineAsync($"Consuming the value: {value}"); //    Task.Delay(TimeSpan.FromSeconds(1)).Wait(); }); } static IEnumerable<int> ProduceAsyncSumSeqeunc(int count) { ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; //    Task.Delay(TimeSpan.FromSeconds(0,5)).Wait(); yield return sum; } } 

Appel de méthode


 const int count = 5; ConsoleExt.WriteLine("Starting Async Streams Demo!"); //   .       . IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable(); ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#"); //    ;      . var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence)); //   . ,    . consumingTask.Wait(); ConsoleExt.WriteLineAsync("Async Streams Demo Done!"); 

Conclusion:


Enfin, nous voyons le comportement souhaité. Vous pouvez exécuter une boucle d'énumération en mode asynchrone.
Voir le code source ici .


Extraction de données asynchrones en utilisant l'architecture client-serveur comme exemple


Regardons ce concept avec un exemple plus réaliste. Tous les avantages de cette fonctionnalité sont mieux visibles dans le contexte de l'architecture client-serveur.


Appel synchrone en cas d'architecture client-serveur


Lors de l'envoi d'une demande au serveur, le client est obligé d'attendre (c'est-à-dire qu'il est bloqué) jusqu'à ce qu'une réponse arrive, comme le montre la Fig. -3-.



Fig. -3- Extraction de données synchrone, pendant laquelle le client attend la fin du traitement de la demande


Extraction de données asynchrones


Dans ce cas, le client demande des données et passe à d'autres tâches. Une fois les données reçues, le client continuera de faire le travail.



Fig. -4- Extraction de données asynchrones pendant laquelle le client peut effectuer d'autres tâches pendant la demande de données


Extraire des données de manière asynchrone


Dans ce cas, le client demande une partie des données et continue d'effectuer d'autres tâches. Ensuite, après avoir reçu les données, le client les traite et demande la partie suivante, et ainsi de suite, jusqu'à ce que toutes les données soient reçues. C'est à partir de ce scénario qu'est née l'idée de threads asynchrones. Dans la fig. -5- montre comment le client peut traiter les données reçues ou effectuer d'autres tâches.



Fig. -5- Extraction de données sous forme de séquence asynchrone (flux asynchrones). Le client n'est pas bloqué.


Threads asynchrones


Comme IEnumerable<T> et IEnumerator<T> il existe deux nouvelles IAsyncEnumerable<T> et IAsyncEnumerator<T> , qui sont définies comme indiqué ci-dessous:


 public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(); } public interface IAsyncEnumerator<out T> : IAsyncDisposable { Task<bool> MoveNextAsync(); T Current { get; } } //      public interface IAsyncDisposable { Task DiskposeAsync(); } 

Dans InfoQ, Jonathan Allen a bien compris ce sujet. Ici, je n'entrerai pas dans les détails, je recommande donc de lire son article .


L'accent est mis sur la valeur de retour de Task<bool> MoveNextAsync() (changé de bool en Task<bool> , bool IEnumerator.MoveNext() ). Grâce à lui, tous les calculs, ainsi que leur itération, se feront de manière asynchrone. Le consommateur décide quand obtenir la prochaine valeur. Bien qu'il s'agisse d'un modèle asynchrone, il utilise toujours l'extraction de données. Pour le nettoyage asynchrone des ressources, vous pouvez utiliser l'interface IAsyncDisposable . Plus d'informations sur les threads asynchrones peuvent être trouvées ici .


Syntaxe


La syntaxe finale devrait ressembler à ceci:


 foreach await (var dataChunk in asyncStreams) { //        yield    . } 

D'après l'exemple ci-dessus, il est clair qu'au lieu de calculer une seule valeur, nous pouvons théoriquement calculer séquentiellement un ensemble de valeurs, en attendant d'autres opérations asynchrones.


Exemple Microsoft repensé


J'ai réécrit le code de démonstration de Microsoft. Il peut être téléchargé entièrement à partir de mon référentiel GitHub .


L'exemple est basé sur l'idée de créer un grand flux en mémoire (un tableau de 20 000 octets) et d'en extraire séquentiellement des éléments en mode asynchrone. À chaque itération, 8 Ko sont extraits de la baie.




À l'étape (1), un grand tableau de données est créé, rempli de valeurs fictives. Ensuite, lors de l'étape (2), une variable appelée somme de contrôle est définie. Cette variable contenant la somme de contrôle est destinée à vérifier l'exactitude de la somme des calculs. Un tableau et une somme de contrôle sont créés en mémoire et renvoyés sous forme de séquence d'éléments à l'étape (3).


L'étape (4) implique l'application de la AsEnumarble extension AsEnumarble (le nom le plus approprié est AsAsyncEnumarble), qui aide à simuler un flux asynchrone de 8 Ko (BufferSize = 8000 éléments (6))


Il n'est généralement pas nécessaire d'hériter de IAsyncEnumerable, mais dans l'exemple illustré ci-dessus, cette opération est effectuée pour simplifier le code de démonstration, comme indiqué à l'étape (5).


L'étape (7) implique l'utilisation du mot clé foreach , qui extrait des blocs de 8 Ko de données d'un flux asynchrone en mémoire. Le processus d'extraction se produit de manière séquentielle: lorsque le consommateur (une partie du code contenant le foreach ) est prêt à recevoir la prochaine donnée, il les extrait du fournisseur (le tableau contenu dans le flux en mémoire). Enfin, lorsque le cycle est terminé, le programme vérifiera la valeur de «c» pour la somme de contrôle et s'ils correspondent, il affichera le message «Les sommes de contrôle correspondent!», Selon l'étape (8).


Fenêtre de sortie de démonstration de Microsoft:



Conclusion


Nous avons examiné les threads asynchrones, qui sont parfaits pour extraire des données de manière asynchrone et écrire du code qui génère plusieurs valeurs en mode asynchrone.
En utilisant ce modèle, vous pouvez interroger l'élément de données suivant dans une séquence et obtenir une réponse. Il diffère du modèle push de données IObservable<T> , en utilisant les valeurs qui sont générées quel que soit l'état du consommateur. Les flux asynchrones vous permettent de représenter parfaitement les sources de données asynchrones contrôlées par le consommateur lorsqu'il détermine lui-même la volonté d'accepter la prochaine donnée. Les exemples incluent l'utilisation d'applications Web ou la lecture d'enregistrements dans une base de données.


J'ai montré comment créer une énumération en mode asynchrone et l'utiliser à l'aide d'une bibliothèque externe avec séquence asynchrone. J'ai également montré les avantages de cette fonctionnalité lors du téléchargement de contenu à partir d'Internet. Enfin, nous avons examiné la nouvelle syntaxe des threads asynchrones, ainsi qu'un exemple complet de son utilisation basé sur le code de démonstration de Microsoft Build ( 7 au 9 mai 2018 // Seattle, WA )


Source: https://habr.com/ru/post/fr462755/


All Articles