Comment puis-je imprimer un flux continu de messages de Twitter avec quelques lignes de code en ajoutant des donnĂ©es mĂ©tĂ©orologiques aux endroits oĂč vivent leurs auteurs? Et comment pouvez-vous limiter la vitesse des demandes au fournisseur mĂ©tĂ©o pour qu'il ne nous liste pas?
Aujourd'hui, nous vous expliquerons comment procĂ©der, mais nous dĂ©couvrirons tout d'abord la technologie Akka Streams, qui rend le travail avec les flux de donnĂ©es en temps rĂ©el aussi simple que les programmeurs travaillant avec des expressions LINQ sans nĂ©cessiter la mise en Ćuvre d'acteurs individuels ou d'interfaces Reactive Streams .
L'article est basé sur une transcription du
rapport de Vagif Abilov de notre conférence de décembre DotNext 2017 Moscou.
Je m'appelle Vagif, je travaille pour la société norvégienne Miles. Aujourd'hui, nous allons parler de la bibliothÚque Akka Streams.
Akka et Reactive Streams sont l'intersection d'ensembles assez étroits, et on pourrait avoir l'impression que c'est une niche telle que vous devez avoir une grande connaissance pour entrer, mais juste le contraire. Et cet article est destiné à montrer qu'en utilisant Akka Streams, vous pouvez éviter la programmation de bas niveau qui est requise lors de l'utilisation de Reactive Streams et Akka.NET. Pour l'avenir, je peux dire immédiatement: si au tout début de notre projet, sur lequel nous utilisons Akka, nous connaissions l'existence d'Akka Streams, nous écririons beaucoup différemment, nous économiserions à la fois du temps et du code.
"Peut-ĂȘtre que le pire que vous puissiez faire est d'amener les personnes qui ne souffrent pas Ă prendre votre aspirine."
Max Kreminski
«Portes fermĂ©es, maux de tĂȘte et besoins intellectuels»
Avant d'entrer dans les dĂ©tails techniques, un peu sur ce que votre chemin vers Akka Streams peut s'avĂ©rer ĂȘtre et ce qui peut vous y conduire. Un jour, je suis tombĂ© sur le blog de Max Kreminski, oĂč il a posĂ© une telle question philosophique aux programmeurs: comment ou pourquoi il est impossible pour un programmeur d'expliquer ce que sont les monades. Il l'a expliquĂ© de cette façon: trĂšs souvent, les gens vont immĂ©diatement aux dĂ©tails techniques, expliquant Ă quel point la programmation est magnifiquement fonctionnelle et Ă quel point il y a du sens dans la monade, sans prendre la peine de se demander pourquoi le programmeur pourrait en avoir besoin. Dessiner une analogie, c'est comme essayer de vendre de l'aspirine sans se soucier de savoir si votre patient souffre.
En utilisant cette analogie, je voudrais poser la question suivante: si Akka Streams est de l'aspirine, quelle devrait ĂȘtre la douleur qui vous y conduira?
Flux de données
Parlons d'abord des flux de donnĂ©es. Le flux peut ĂȘtre assez simple, linĂ©aire.
Ici, nous avons un certain consommateur de donnĂ©es (un lapin dans la vidĂ©o). Il consomme des donnĂ©es Ă une vitesse qui lui convient. Il s'agit de l'interaction idĂ©ale du consommateur avec le flux: elle Ă©tablit la bande passante et les donnĂ©es y circulent tranquillement. Ce flux de donnĂ©es simple peut ĂȘtre infini ou se terminer.
Mais le flux peut ĂȘtre plus complexe. Si vous plantez plusieurs lapins cĂŽte Ă cĂŽte, nous aurons dĂ©jĂ une parallĂ©lisation des flux. Ce que Reactive Streams essaie de rĂ©soudre, c'est prĂ©cisĂ©ment comment nous pouvons communiquer avec les flux Ă un niveau plus conceptuel, c'est-Ă -dire, que nous parlions simplement d'une sorte de mesure de capteur de tempĂ©rature, oĂč les mesures linĂ©aires entrent en jeu. , ou nous avons des mesures en continu de milliers de capteurs de tempĂ©rature entrant dans le systĂšme via des files d'attente RabbitMQ et stockĂ©s dans les journaux systĂšme. Tout ce qui prĂ©cĂšde peut ĂȘtre considĂ©rĂ© comme un seul flux composite. Si vous allez encore plus loin, la gestion automatisĂ©e de la production (par exemple, par une boutique en ligne) peut Ă©galement ĂȘtre rĂ©duite Ă un flux de donnĂ©es, et ce serait formidable si nous pouvions parler de la planification d'un tel flux, quelle que soit sa complexitĂ©.

Pour les projets modernes, le support des threads n'est pas trÚs bon. Si je me souviens bien, Aaron Stannard, dont vous voyez le tweet dans l'image, voulait obtenir un flux d'un fichier de plusieurs gigaoctets contenant CSV, c'est-à -dire texte, et il s'est avéré qu'il n'y a rien que vous pouvez simplement prendre et utiliser immédiatement, sans un tas d'actions supplémentaires. Mais il n'a tout simplement pas pu obtenir un flux de valeurs CSV, ce qui l'a attristé. Il y a peu de solutions (à l'exception de certaines zones spéciales), beaucoup est réalisé par les anciennes méthodes, lorsque nous ouvrons tout cela, commençons la lecture, la mise en mémoire tampon, dans le pire des cas, nous obtenons quelque chose comme le bloc-notes, qui dit que le fichier est trop gros.
à un niveau conceptuel élevé, nous sommes tous engagés dans le traitement des flux de données, et Akka Streams vous aidera si:
- Vous connaissez Akka, mais vous voulez vous épargner les détails associés à l'écriture du code d'acteur et à sa coordination;
- Vous connaissez les flux rĂ©actifs et souhaitez utiliser une implĂ©mentation prĂȘte Ă l'emploi de leurs spĂ©cifications;
- Les éléments de bloc Akka Streams pour les étapes conviennent à la modélisation de votre processus;
- Vous souhaitez profiter de la contre-pression (contre-pression) d'Akka Streams pour gérer et affiner dynamiquement les étapes de débit de votre flux de travail.
Des acteurs aux Akka Streams

La premiÚre façon est celle des acteurs aux Akka Streams, à ma façon.
La photo montre pourquoi nous avons commencĂ© Ă utiliser le modĂšle d'acteur. Nous Ă©tions Ă©puisĂ©s par le contrĂŽle manuel des flux, l'Ă©tat partagĂ©, c'est tout. Tous ceux qui ont travaillĂ© avec de grands systĂšmes, avec plusieurs threads, comprennent combien cela prend du temps et combien il est facile de s'y tromper, ce qui peut ĂȘtre fatal pour l'ensemble du processus. Cela nous a conduit au modĂšle des acteurs. Nous ne regrettons pas le choix fait, mais, bien sĂ»r, lorsque vous commencez Ă travailler et Ă programmer davantage, ce n'est pas que l'enthousiasme initial cĂšde la place Ă autre chose, mais vous commencez Ă rĂ©aliser que quelque chose pourrait ĂȘtre fait encore plus efficacement.
«Par défaut, les destinataires de leurs messages sont entrés dans le code des acteurs. Si je crée un acteur A qui envoie un message à l'acteur B et que vous souhaitez remplacer le destinataire par l'acteur C, dans le cas général, cela ne fonctionnera pas pour vous. »
Noel Welch (underscore.io)
Les acteurs critiqués pour ne pas avoir composé. L'un des premiers à écrire à ce sujet sur son blog a été Noel Welch, l'un des développeurs d'Underscore. Il a remarqué que le systÚme d'acteurs ressemble à ceci:

Si vous n'utilisez aucun élément supplémentaire, comme une injection de dépendance, l'adresse de son destinataire est cousue dans l'acteur.

Quand ils commencent à s'envoyer des messages, vous définissez tout cela à l'avance, en programmant les acteurs. Et sans astuces supplémentaires, un tel systÚme rigide est obtenu.
Un des développeurs d'Akka, Roland Kuhn, a
expliquĂ© ce que l'on entend gĂ©nĂ©ralement par mauvaise disposition. La mĂ©thode acteur est basĂ©e sur la mĂ©thode tell, c'est-Ă -dire les messages unidirectionnels: elle est de type void, c'est-Ă -dire qu'elle ne renvoie rien (ou unitĂ©, selon la langue). Il est donc impossible de construire une description du processus Ă partir d'une chaĂźne d'acteurs. Alors tu as envoyĂ© dire, alors quoi? ArrĂȘter Nous sommes devenus nuls. Vous pouvez le comparer, par exemple, avec des expressions LINQ, oĂč chaque Ă©lĂ©ment de l'expression renvoie IQueryable, IEnumerable, et tout cela peut ĂȘtre facilement compilĂ©. Les acteurs ne donnent pas une telle opportunitĂ©. Dans le mĂȘme temps, Roland Kuhn s'est opposĂ© au fait qu'ils, disent-ils, ne composent pas en principe, affirmant qu'en fait ils sont compilĂ©s d'autres maniĂšres, dans le mĂȘme sens oĂč la sociĂ©tĂ© humaine se prĂȘte Ă l'agencement. Cela ressemble Ă un argument philosophique, mais si vous y rĂ©flĂ©chissez, l'analogie est logique - oui, les acteurs s'envoient des messages unidirectionnels, mais nous communiquons Ă©galement les uns avec les autres, en prononçant des messages unidirectionnels, mais en mĂȘme temps, nous interagissons assez efficacement, c'est-Ă -dire que nous crĂ©ons des systĂšmes complexes. NĂ©anmoins, une telle critique des acteurs existe.
public class SampleActor : ReceiveActor { public SampleActor() { Idle(); } protected override void PreStart() { } private void Idle() { Receive<Job>(job => ); } private void Working() { Receive<Cancel>(job => ); } }
De plus, l'implĂ©mentation de l'acteur nĂ©cessite au moins d'Ă©crire une classe si vous travaillez en C #, ou des fonctions si vous travaillez en F #. Dans l'exemple ci-dessus - code passe-partout, que vous devez de toute façon Ă©crire. Bien qu'il ne soit pas trĂšs volumineux, c'est un certain nombre de lignes que vous devrez toujours Ă©crire Ă ce bas niveau. Presque tout le code qui est prĂ©sent ici est une sorte de cĂ©rĂ©monie. Ce qui se passe lorsqu'un acteur reçoit directement un message ne s'affiche pas du tout ici. Et tout cela doit ĂȘtre Ă©crit. Bien sĂ»r, ce n'est pas beaucoup, mais c'est la preuve que nous travaillons avec des acteurs de bas niveau, crĂ©ant de telles mĂ©thodes de vide.
Et si nous pouvions passer à un niveau différent et supérieur, nous poser des questions sur la modélisation de notre processus, qui comprend le traitement de données provenant de diverses sources qui sont mélangées, converties et transférées?
var results = db.Companies .Join(db.People, c => c.CompanyID, p => p.PersonID, (c, p) => new { c, p }) .Where(z => zcCreated >= fromDate) .OrderByDescending(z => zcCreated) .Select(z => zp) .ToList();
Un analogue de cette approche peut ĂȘtre ce que nous avons tous l'habitude de travailler avec LINQ depuis dix ans. Nous ne nous demandons pas comment fonctionne Join. Nous savons qu'il existe un tel fournisseur LINQ qui fera tout cela pour nous, et nous sommes intĂ©ressĂ©s Ă un niveau supĂ©rieur pour rĂ©pondre Ă la demande. Et nous pouvons gĂ©nĂ©ralement mĂ©langer les bases de donnĂ©es ici, nous pouvons envoyer des demandes distributives. Et si vous pouviez dĂ©crire le processus de cette façon?
HttpGet pageUrl |> fun s -> Regex.Replace(s, "[^A-Za-z']", " ") |> fun s -> Regex.Split(s, " +") |> Set.ofArray |> Set.filter (fun word -> not (Spellcheck word)) |> Set.iter (fun word -> printfn " %s" word)
(Source)Ou, par exemple, des transformations fonctionnelles. Ce que beaucoup de gens aiment à propos de la programmation fonctionnelle, c'est que vous pouvez transmettre des données à travers une série de transformations, et vous obtenez un code compact assez clair, quelle que soit la langue dans laquelle vous l'écrivez. C'est assez facile à lire. Le code de l'image est spécialement écrit en F #, mais en général, probablement, tout le monde comprend ce qui se passe ici.
val in = Source(1 to 10) val out = Sink.ignore val bcast = builder.add(Broadcast[Int](2)) val merge = builder.add(Merge[Int](2)) val f1,f2,f3,f4 = Flow[Int].map(_ + 10) source ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> sink bcast ~> f4 ~> merge ~>
(Source)Et alors? Dans l'exemple ci-dessus, nous avons une source de données source, qui se compose d'entiers de 1 à 10. C'est ce qu'on appelle le DSL graphique (langage spécifique au domaine). Les éléments de la langue du domaine dans l'exemple ci-dessus sont des flÚches unidirectionnelles - ce sont des opérateurs supplémentaires définis par des outils de langue qui montrent graphiquement la direction du flux. Nous passons Source à travers une série de transformations - pour faciliter la démonstration, elles ajoutent toutes un dix au nombre. Vient ensuite la diffusion: nous multiplions les canaux, c'est-à -dire que chaque numéro entre dans deux canaux. Ensuite, nous ajoutons à nouveau 10, mélangons nos flux de données, obtenons un nouveau flux, en ajoutons également 10, et tout cela va à notre flux de données, dans lequel rien ne se passe. Il s'agit du vrai code écrit en Scala, qui fait partie d'Akka Streams, implémenté dans ce langage. C'est-à -dire que vous spécifiez les phases de la transformation de vos données, indiquez quoi en faire, spécifiez la source, le stock, certains points de contrÎle, puis formez un tel graphique à l'aide du DSL graphique. C'est tout le code pour un seul programme. Quelques lignes de code montrent ce qui se passe dans le processus.
Oublions comment Ă©crire le code de dĂ©finition pour les acteurs individuels et apprenons plutĂŽt les primitives de mise en page de haut niveau qui crĂ©eront et connecteront les acteurs requis en eux-mĂȘmes. Lorsque nous exĂ©cutons un tel graphique, le systĂšme qui fournit Akka Streams crĂ©era lui-mĂȘme l'acteur requis, y enverra toutes ces donnĂ©es, les traitera comme il se doit et les transmettra finalement au destinataire final.
var runnable = Source .From(Enumerable.Range(1, 1000)) .Via(Flow.Create<int>().Select(x => x * 2) .To(Sink.ForEach<int>(x => Console.Write(x.ToString));
L'exemple ci-dessus montre à quoi cela pourrait ressembler en C #. La maniÚre la plus simple: nous avons une source de données - ce sont des nombres de 1 à 1000 (comme vous pouvez le voir, dans Akka Streams, tout IEnumerable peut devenir une source de flux de données, ce qui est trÚs pratique). Nous faisons un calcul simple, disons, nous multiplions par deux, puis sur le flux de données tout cela est affiché à l'écran.
var graph = GraphDsl.Create(builder => { var bcast = builder.Add(new Broadcast<int>(2)); var merge = builder.Add(new Merge<int, int>(2)); var count = Flow.FromFunction(new Func<int, int>(x => 1)); var sum = Flow.Create<int>().Sum((x, y) => x + y); builder.From(bcast.Out(0)).To(merge.In(0)); builder.From(bcast.Out(1)).Via(count).Via(sum).To(merge.In(1)); return new FlowShape<int, int>(bcast.In, merge.Out); });
Ce qui est illustrĂ© dans l'exemple ci-dessus est appelĂ© «DSL graphique en C #». En fait, il n'y a pas de graphiques ici, c'est un port avec Scala, mais en C # il n'y a aucun moyen de dĂ©finir les opĂ©rateurs de cette façon, donc cela semble un peu plus encombrant, mais toujours assez compact pour comprendre ce qui se passe ici. Donc, nous crĂ©ons un certain graphique (il existe diffĂ©rents types de graphiques, ici on l'appelle FlowShape) Ă partir de diffĂ©rents composants, oĂč il y a une source de donnĂ©es et il y a des transformations. Nous envoyons des donnĂ©es Ă un canal dans lequel nous gĂ©nĂ©rons le comptage, c'est-Ă -dire le nombre d'Ă©lĂ©ments de donnĂ©es Ă transmettre, et dans l'autre, nous gĂ©nĂ©rons la somme, puis nous mĂ©langons le tout. Ensuite, nous verrons des exemples plus intĂ©ressants que le simple traitement de nombres entiers.
C'est le premier chemin qui peut vous conduire Ă Akka Streams, si vous avez de l'expĂ©rience avec un modĂšle d'acteur et que vous avez pensĂ© Ă Ă©crire manuellement chacun, mĂȘme l'acteur le plus simple. La deuxiĂšme façon dont Akka Streams arrive Ă travers Reactive Streams.
Des flux réactifs aux flux Akka
Qu'est-ce que les
flux réactifs ? Il s'agit d'une initiative conjointe visant à développer une norme pour le traitement asynchrone des flux de données. Il définit l'ensemble minimal d'interfaces, de méthodes et de protocoles qui décrivent les opérations et entités nécessaires pour atteindre l'objectif - traitement asynchrone des données en temps réel avec une contre-pression non bloquante (contre-pression). Il permet diverses implémentations utilisant différents langages de programmation.
Reactive Streams vous permet de traiter un nombre potentiellement illimité d'éléments dans une séquence et de transférer des éléments de maniÚre asynchrone entre les composants avec une contre-pression non bloquante.
La liste des initiateurs de la création de Reactive Streams est assez impressionnante: voici Netflix, Oracle et Twitter.
La spécification est trÚs simple pour rendre l'implémentation dans différentes langues et plates-formes aussi accessible que possible. Les principaux composants de l'API Reactive Streams:
- Ăditeur
- Abonné
- Abonnement
- Processeur
Essentiellement, cette spécification n'implique pas que vous commencerez manuellement à implémenter ces interfaces. Il est entendu que certains développeurs de bibliothÚques le feront pour vous. Et Akka Streams est l'une des implémentations de cette spécification.
public interface IPublisher<out T> { void Subscribe(ISubscriber<T> subscriber); } public interface ISubscriber<in T> { void OnSubscribe(ISubscription subscription); void OnNext(T element); void OnError(Exception cause); void OnComplete(); }
Les interfaces, comme vous pouvez le voir dans l'exemple, sont vraiment trÚs simples: par exemple, Publisher ne contient qu'une seule méthode - «s'abonner». L'abonné, l'abonné, ne contient que quelques réactions à l'événement.
public interface ISubscription { void Request(long n); void Cancel(); } public interface IProcessor<in T1, out T2> : ISubscriber<T1>, IPublisher<T2> { }
Enfin, l'abonnement contient deux méthodes - «démarrer» et «refuser». Le processeur ne définit aucune nouvelle méthode, il associe un éditeur et un abonné.
Qu'est-ce qui distingue les flux rĂ©actifs des autres implĂ©mentations de flux? Reactive Streams combine des modĂšles push et pull. Pour le support, il s'agit du scĂ©nario de performances le plus efficace. Supposons que vous ayez un abonnĂ© lent aux donnĂ©es. Dans ce cas, pousser pour lui peut ĂȘtre fatal: si vous lui envoyez une Ă©norme quantitĂ© de donnĂ©es, il ne pourra pas les traiter. Il est prĂ©fĂ©rable d'utiliser pull pour que l'abonnĂ© rĂ©cupĂšre lui-mĂȘme les donnĂ©es de l'Ă©diteur. Mais si l'Ă©diteur est lent, il s'avĂšre que l'abonnĂ© est bloquĂ© tout le temps, attendant tout le temps. Une solution intermĂ©diaire peut ĂȘtre la configuration: nous avons un fichier de configuration dans lequel nous dĂ©terminons lequel est le plus rapide. Et si leurs vitesses changent?
Ainsi, l'implémentation la plus élégante est celle dans laquelle nous pouvons changer dynamiquement les modÚles push et pull.
(Source (Apache Flink))Le diagramme montre comment cela peut se produire. Cette dĂ©mo utilise Apache Flink. Yellow est un Ă©diteur, producteur de donnĂ©es, il Ă©tait fixĂ© Ă environ 50% de ses capacitĂ©s. L'abonnĂ© essaie de choisir la meilleure stratĂ©gie - elle se rĂ©vĂšle ĂȘtre push. Ensuite, nous avons rĂ©initialisĂ© l'abonnĂ© Ă une vitesse d'environ 20%, et il passe pour tirer. Ensuite, nous allons Ă 100%, revenons Ă nouveau Ă 20%, au modĂšle de traction, etc. Tout cela se produit dans la dynamique, vous n'avez pas besoin d'arrĂȘter le service, entrez quelque chose dans la configuration. Ceci est une illustration du fonctionnement de la contre-pression dans Akka Streams.
Principes d'Akka Streams
Bien sûr, Akka Streams ne gagnerait pas en popularité s'il n'y avait pas de blocs intégrés trÚs faciles à utiliser. Il y en a beaucoup. Ils sont divisés en trois groupes principaux:
- Source de données (Source) - étape de traitement avec une sortie.
- Le récepteur est une étape de traitement à entrée unique.
- Checkpoint (Flow) - étape de traitement avec une entrée et une sortie. Des transformations fonctionnelles ont lieu ici, et pas nécessairement dans la mémoire: il peut s'agir, par exemple, d'un appel à un service web, à certains éléments de parallélisme, multi-thread.
De ces trois types, des graphiques peuvent ĂȘtre formĂ©s. Ce sont dĂ©jĂ des Ă©tapes de traitement plus complexes, qui sont construites Ă partir de sources, de drains et de points de contrĂŽle. Mais tous les graphiques ne peuvent pas ĂȘtre exĂ©cutĂ©s: s'il y a des trous dedans, c'est-Ă -dire des entrĂ©es et des sorties ouvertes, alors ce graphique n'est pas exĂ©cutĂ©.
Un graphe est un graphe exĂ©cutable, s'il est fermĂ©, c'est-Ă -dire qu'il y a une sortie pour chaque entrĂ©e: si les donnĂ©es sont entrĂ©es, elles doivent ĂȘtre allĂ©es quelque part.

Akka Streams a des sources intĂ©grĂ©es: dans l'image, vous voyez combien d'entre elles. Leurs noms sont Ă peu prĂšs un Ă un et reflĂštent ce que Scala ou la JVM a, Ă l'exception de certaines sources utiles spĂ©cifiques Ă .NET. Les deux premiers (FromEnumerator et From) sont parmi les plus importants: toute numĂ©rotation, tout ienumerable peut ĂȘtre transformĂ© en source de flux.

Il existe des drains intégrés: certains d'entre eux ressemblent aux méthodes LINQ, par exemple, First, Last, FirstOrDefault. Bien sûr, tout ce que vous obtenez, vous pouvez le vider dans des fichiers, dans des flux, non pas dans des flux Akka, mais dans des flux .NET. Et encore une fois, si vous avez des acteurs dans votre systÚme, vous pouvez les utiliser à la fois en entrée et en sortie du systÚme, c'est-à -dire, si vous le souhaitez, les intégrer dans votre systÚme fini.

Et il existe un grand nombre de points de contrĂŽle intĂ©grĂ©s, qui rappellent peut-ĂȘtre encore plus LINQ, car ici il y a Select, SelectMany et GroupBy, c'est-Ă -dire tout ce que nous avons l'habitude de travailler avec LINQ.
Par exemple, Select in Scala est appelé SelectAsync: il est suffisamment puissant car il prend le niveau de parallélisme comme l'un des arguments. Autrement dit, vous pouvez indiquer que, par exemple, Select envoie des données à un service Web en parallÚle sur dix threads, puis elles sont toutes collectées et transmises. En fait, vous déterminez le degré de mise à l'échelle du point de contrÎle avec une ligne de code.
Une dĂ©claration de flux est son plan d'exĂ©cution, c'est-Ă -dire qu'un graphique, mĂȘme un graphique d'exĂ©cution, ne peut pas ĂȘtre exĂ©cutĂ© comme ça - il doit ĂȘtre matĂ©rialisĂ©. Il doit y avoir un systĂšme instanciĂ©, un systĂšme d'acteur, vous devez lui donner un flux, ce plan d'exĂ©cution, puis il sera exĂ©cutĂ©. De plus, au moment de l'exĂ©cution, il est hautement optimisĂ©, tout comme lorsque vous envoyez une expression LINQ Ă une base de donnĂ©es: un fournisseur peut optimiser votre SQL pour une sortie de donnĂ©es plus efficace, en remplaçant essentiellement la commande de requĂȘte par une autre. MĂȘme chose avec Akka Streams: Ă partir de la version 2.0, vous pouvez dĂ©finir un certain nombre de points de contrĂŽle, et le systĂšme comprendra que certains d'entre eux peuvent ĂȘtre combinĂ©s afin qu'ils soient exĂ©cutĂ©s par un seul acteur (fusion d'opĂ©rateur). Les points de contrĂŽle, en rĂšgle gĂ©nĂ©rale, conservent l'ordre des Ă©lĂ©ments de traitement.
var results = db.Companies .Join(db.People, c => c.CompanyID, p => p.PersonID, (c, p) => new { c, p }) .Where(z => zcCreated >= fromDate) .OrderByDescending(z => zcCreated) .Select(z => zp) .ToList();
La matĂ©rialisation du flux peut ĂȘtre comparĂ©e au dernier Ă©lĂ©ment ToList de l'expression LINQ dans l'exemple ci-dessus. Si nous n'Ă©crivons pas ToList, nous obtenons alors une expression LINQ non matĂ©rialisĂ©e qui n'entraĂźnera pas le transfert des donnĂ©es vers le serveur SQL ou Oracle, car la plupart des fournisseurs LINQ prennent en charge l'exĂ©cution dite de requĂȘte diffĂ©rĂ©e (exĂ©cution de requĂȘte retardĂ©e), t c'est-Ă -dire que la demande n'est exĂ©cutĂ©e que lorsqu'une commande est donnĂ©e pour donner un rĂ©sultat. Selon ce qui est demandĂ© - une liste ou le premier rĂ©sultat - l'Ă©quipe la plus efficace sera formĂ©e. Lorsque nous disons ToList, nous demandons ainsi au fournisseur LINQ de nous donner le rĂ©sultat final.
var runnable = Source .From(Enumerable.Range(1, 1000)) .Via(Flow.Create<int>().Select(x => x * 2) .To(Sink.ForEach<int>(x => Console.Write(x.ToString));
Akka Streams fonctionne de la mĂȘme maniĂšre. Dans l'image est notre graphique lancĂ©, qui se compose d'une source de points de contrĂŽle et de ruissellement, et nous voulons maintenant l'exĂ©cuter.
var runnable = Source .From(Enumerable.Range(1, 1000)) .Via(Flow.Create<int>().Select(x => x * 2) .To(Sink.ForEach<int>(x => Console.Write(x.ToString)); var system = ActorSystem.Create("MyActorSystem"); using (var materializer = ActorMaterializer.Create(system)) { await runnable.Run(materializer); }
Pour que cela se produise, nous devons crĂ©er un systĂšme d'acteurs, en lui il y a un matĂ©rialisant, lui passer notre graphique, et il l'exĂ©cutera. Si nous le recrĂ©ons, il l'exĂ©cutera Ă nouveau et d'autres rĂ©sultats pourront ĂȘtre obtenus.
En plus de la matérialisation du flux, en parlant de la partie matérielle d'Akka Streams, il convient de mentionner les valeurs matérialisées.
var output = new List<int>(); var source1 = Source.From(Enumerable.Range(1, 1000)); var sink1 = Sink.ForEach<int>(output.Add); IRunnableGraph<NotUsed> runnable1 = source1.To(sink1); var source2 = Source.From(Enumerable.Range(1, 1000)); var sink2 = Sink.Sum<int>((x,y) => x + y); IRunnableGraph<Task<int>> runnable2 = source2.ToMaterialized(sink2, Keep.Right);
Lorsque nous avons un flux qui va de la source aux points de contrĂŽle vers le drain, alors si nous ne demandons pas de valeurs intermĂ©diaires, elles ne sont pas disponibles, car elles seront exĂ©cutĂ©es de la maniĂšre la plus efficace. C'est comme une boĂźte noire. Mais il peut ĂȘtre intĂ©ressant pour nous de retirer certaines valeurs intermĂ©diaires, car Ă chaque point Ă gauche, certaines valeurs entrent, d'autres valeurs sortent Ă droite, et vous pouvez spĂ©cifier un graphique pour indiquer ce qui vous intĂ©resse. Dans l'exemple ci-dessus, un graphique de dĂ©part dans lequel NotUsed est indiquĂ©, c'est-Ă -dire qu'aucune valeur matĂ©rialisĂ©e ne nous intĂ©resse. Ci-dessous, nous le crĂ©ons avec l'indication que sur le cĂŽtĂ© droit du ruissellement, c'est-Ă -dire, une fois toutes les transformations terminĂ©es, nous devons donner des valeurs matĂ©rialisĂ©es. Nous obtenons la tĂąche graphique - une tĂąche, Ă la fin de laquelle nous obtenons un int, c'est-Ă -dire ce qui se passe Ă la fin de ce graphique. Vous pouvez indiquer dans chaque paragraphe que vous avez besoin d'une sorte de valeurs matĂ©rialisĂ©es, tout cela sera progressivement collectĂ©.
Pour transférer des données dans les flux Akka Streams ou pour les retirer de là , bien sûr, une sorte d'interaction avec le monde extérieur est nécessaire. Les étages source intégrés contiennent une large gamme de flux de données réactifs:
- Source.FromEnumerator et Source.From vous permettent de transférer des données à partir de n'importe quelle source qui implémente IEnumerable;
- Déplier et DéplierAsync génÚrent les résultats des calculs de fonction à condition qu'il renvoie des valeurs non nulles;
- FromInputStream transforme un flux;
- FromFile analyse le contenu du fichier dans le flux réactif;
- ActorPublisher convertit les messages des acteurs.
Comme je l'ai dĂ©jĂ dit, pour les dĂ©veloppeurs .NET, il est trĂšs productif d'utiliser Enumerator ou IEnumerable, mais parfois c'est trop primitif, trop inefficace pour accĂ©der aux donnĂ©es. Des sources plus complexes contenant une grande quantitĂ© de donnĂ©es nĂ©cessitent des connecteurs spĂ©ciaux. Ces connecteurs sont Ă©crits. Il existe un projet open source Alpakka, qui est apparu Ă l'origine dans Scala et est maintenant dans .NET. De plus, Akka a des acteurs dits persistants et ils ont leurs propres flux qui peuvent ĂȘtre utilisĂ©s (par exemple, Akka Persistence Query forme le flux de contenu du Akka Event Journal).

Si vous travaillez avec Scala, le moyen le plus simple est pour vous: il existe un grand nombre de connecteurs et vous trouverez sûrement quelque chose à votre goût. Pour information, Kafka est la soi-disant Reactive Kafka, pas Kafka Streams. Pour autant que je sache, Kafka Streams ne supporte pas la contre-pression. Reactive Kafka est une implémentation de flux de Kafka qui prend en charge Reactive Streams.

La liste des connecteurs Alpakka .NET est plus modeste, mais elle est réapprovisionnée et il y a un élément de concurrence. Il y a un tweet de demi-an de David Fowler de Microsoft, qui a déclaré que SignalR peut désormais échanger des données avec Reactive Extensions, et l'un des développeurs d'Akka a répondu qu'il se trouvait en fait dans Akka Streams depuis un certain temps. Akka prend en charge divers services de Microsoft Azure. CSV est le résultat de la frustration d'Aaron Stannard lorsqu'il a découvert qu'il n'y avait pas de bon flux pour CSV: maintenant Akka a son propre flux pour CSV XML. Il y a AMQP (en réalité, RabbitMQ), il est en cours de développement, mais est disponible pour utilisation, ça marche. Kafka est également en cours de développement. Cette liste continuera de s'étendre.
Quelques mots sur les alternatives, car si vous travaillez avec des flux de donnĂ©es, Akka Streams n'est bien sĂ»r pas le seul moyen de gĂ©rer ces flux. TrĂšs probablement, dans votre projet, le choix de la façon d'implĂ©menter les threads dĂ©pendra de nombreux autres facteurs qui peuvent devenir essentiels. Par exemple, si vous travaillez beaucoup avec Microsoft Azure et OrlĂ©ans est organiquement intĂ©grĂ© aux besoins de votre projet avec leur prise en charge des acteurs virtuels, ou, comme ils les appellent, grains, alors ils ont leur propre implĂ©mentation qui ne rĂ©pond pas Ă la spĂ©cification Reactive Streams - Orleans Streams, qui il sera le plus proche de vous et il est logique que vous y prĂȘtiez attention. Si vous travaillez beaucoup avec TPL, il y a TPL DataFlow - cela peut ĂȘtre l'analogie la plus proche des flux Akka: il existe Ă©galement des primitives pour lier les flux de donnĂ©es, ainsi que des outils de mise en mĂ©moire tampon et de limitation de la bande passante (BoundedCapacity, MaxMessagePerTask). Si les idĂ©es du modĂšle d'acteur sont proches de vous, alors Akka Streams est un moyen de rĂ©soudre ce problĂšme et de gagner beaucoup de temps sans avoir Ă Ă©crire chaque acteur manuellement.
Exemple d'implémentation: flux du journal des événements
Regardons quelques exemples d'implĂ©mentation. â , . Akka Streams, , - , .

. : 15 23 , 7 . â . Kibana Dashboard.
Kibana Elasticsearch , Elasticsearch , , , , . , , , . . . (event journal) Akka, Microsoft SQL Server. , .
CREATE TABLE EventJournal ( Ordering BIGINT IDENTITY(1,1) PRIMARY KEY NOT NULL, PersistenceID NVARCHAR(255) NOT NULL, SequenceNr BIGINT NOT NULL, Timestamp BIGINT NOT NULL, IsDeleted BIT NOT NULL, Manifest NVARCHAR(500) NOT NULL, Payload VARBINARY(MAX) NOT NULL, Tags NVARCHAR(100) NULL CONSTRAINT QU_EventJournal UNIQUE (PersistenceID, SequenceNr) )
, , , , SQL Server, eventstore Akka, eventJournal. eventstore.

, . , , , , - : , . , . . . - . , . , . , Akka persistence query.

, , .
(persistence queries):
- AllPersistencelds
- CurrentPersistencelds
- EventsByPersistenceld
- CurrentEventsByPersistenceld
- EventsByTag
- CurrentEventsByTag
, , , Current â , . â . EventsByTag.
let system = mailbox.Context.System let queries = PersistenceQuery.Get(system) .ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier) let mat = ActorMaterializer.Create(system) let offset = getCurrentOffset client config let ks = KillSwitches.Shared "persistence-elastic" let task = queries.EventsByTag(PersistenceUtils.anyEventTag, offset) .Select(fun e -> ElasticTypes.EventEnvelope.FromAkka e) .GroupedWithin(config.BatchSize, config.BatchTimeout) .Via(ks.Flow()) .RunForeach((fun batch -> processItems client batch), mat) .ContinueWith(handleStreamError mailbox, TaskContinuationOptions.OnlyOnFaulted) |> Async.AwaitTaskVoid
, . F#, C# . EventsByTag, Akka Streams, , Elasticsearch. . . - , , , â . .
. , , , , Twitter , â , , , . , Akka Streams.
:
Akka Scala, Akka.NET, , , , , . . - .
Tweetinvi â , Twitter, . Reactive Streams, . . , , , , - Akka, , .

, , . . Broadcast-. , , . : , , , , .
GitHub-,
AkkaStreamsDemo . (
).
Commençons par un simple. Twitter: Program.cs
var useCachedTweets = false
, Twitter, , . RunnableGraph.
public static IRunnableGraph<IActorRef> CreateRunnableGraph() { var tweetSource = Source.ActorRef<ITweet>(100, OverflowStrategy.DropHead); var formatFlow = Flow.Create<ITweet>().Select(Utils.FormatTweet); var writeSink = Sink.ForEach<string>(Console.WriteLine); return tweetSource.Via(formatFlow).To(writeSink); }
(
)
, . , , ( ) .
StartTweetStream â Tweetinvi.
public static void StartTweetStream(IActorRef actor) { var stream = Stream.CreateSampleStream(); stream.TweetReceived += (_, arg) => { arg.Tweet.Text = arg.Tweet.Text.Replace("\r", " ").Replace("\n", " "); var json = JsonConvert.SerializeObject(arg.Tweet); File.AppendAllText("tweets.txt", $"{json}\r\n"); actor.Tell(arg.Tweet); }; stream.StartStream(); }
(
)
CreateSampleStream , . , , , : « ». IEnumerable, .
TweetEnumerator : , Current, MoveNext, Reset, Dispose, . , . , . .
useCachedTweets true, . CashedTweets â , 50000 , , , . , , . â . , .
TweetsWithBroadcast:
var graph = GraphDsl.Create(b => { var broadcast = b.Add(new Broadcast<ITweet>(2)); var merge = b.Add(new Merge<string>(2)); b.From(broadcast.Out(0)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.CreatedBy)) .Via(formatUser) .To(merge.In(0)); b.From(broadcast.Out(1)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.Coordinates)) .Via(formatCoordinates) .To(merge.In(1)); return new FlowShape<ITweet, string>(broadcast.In, merge.Out); });
(
)
Scala, , DSL. Broadcast â out(0), out(1) â CreatedBy, , . .
â . .
var graph = GraphDsl.Create(b => { var broadcast = b.Add(new Broadcast<ITweet>(2)); var merge = b.Add(new Merge<string>(2)); b.From(broadcast.Out(0)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.CreatedBy) .Throttle(10, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)) .Via(formatUser) .To(merge.In(0)); b.From(broadcast.Out(1)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.Coordinates) .Buffer(10, OverflowStrategy.DropNew) .Throttle(1, TimeSpan.FromSeconds(1), 10, ThrottleMode.Shaping)) .Via(formatCoordinates) .To(merge.In(1)); return new FlowShape<ITweet, string>(broadcast.In, merge.Out); });}
(
)
10 , 10. , , , . , , Akka Streams Reactive Streams: . , , , , - . , , , . , . , , . Buffer(10, OverFlowStrategy.DropHead). , . 10 , . , , - , â - , , , , . . . , .
var graph = GraphDsl.Create(b => { var broadcast = b.Add(new Broadcast<ITweet>(2)); var merge = b.Add(new Merge<string>(2)); b.From(broadcast.Out(0)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.CreatedBy) .Throttle(10, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)) .Via(formatUser) .To(merge.In(0)); b.From(broadcast.Out(1)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.Coordinates) .Buffer(10, OverflowStrategy.DropNew) .Throttle(1, TimeSpan.FromSeconds(1), 10, ThrottleMode.Shaping)) .Via(Flow.Create<ICoordinates>().SelectAsync(5, Utils.GetWeatherAsync)) .Via(formatTemperature) .To(merge.In(1)); return new FlowShape<ITweet, string>(broadcast.In, merge.Out); });
(
)
, SelectAsync, . , , 5: , 5 , , . , , .
public static async Task<decimal> GetWeatherAsync(ICoordinates coordinates) { var httpClient = new HttpClient(); var requestUrl = $"http://api.met.no/weatherapi/locationforecast/1.9/?lat={coordinates.Latitude};lon={coordinates.Latitude}"; var result = await httpClient.GetStringAsync(requestUrl); var doc = XDocument.Parse(result); var temp = doc.Root.Descendants("temperature").First().Attribute("value").Value; return decimal.Parse(temp); }
(
)
. -, , - , HttpClient , XML, , .
,
, , . 10 10 , , .
, â , . , Akka Streams, , . , , .
, , , Akka Streams, . , , Akka Streams, C# , , , , .

Akka Streams , ? DotNext 2017 Moscow
Azure Functions. - , deployment, , ( - , , ), . , , , . , , Akka Streams, .. , . .
Akka Streams , , , , , . , , , , , . Akka Streams â , , .
, Akka Streams, «Akka Stream Rap».
, .
This is the Akka Stream.
This is the Source that feeds the Akka Stream.
This is the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Streams.
This is the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is TakeWhile that pulls from the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Throttle that speeds down the TakeWhile that pulls from the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Bidiflow that turns back the Throttle that speeds down the TakeWhile that pulls from the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the source that feeds the Akka Streams.
This is the Sink that is filled from the Bidiflow that turns back the Throttle that speeds down the TakeWhile that pulls from the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
. â 22-23
DotNext 2018 Moscow , .
( ).