Présentation de la programmation réactive au printemps

Bonjour, Habr!

Cette semaine, nous attendons un nouveau livre Spring 5 de l'imprimerie:


Parmi les fonctionnalitĂ©s intĂ©ressantes de Spring 5, la programmation rĂ©active mĂ©rite une mention spĂ©ciale, dont la mise en Ɠuvre dans ce cadre est briĂšvement dĂ©crite par l'article proposĂ© par Matt Raible. Dans le livre susmentionnĂ©, les modĂšles rĂ©actifs sont examinĂ©s au chapitre 11.

Matt a été co-écrit par Josh Long, auteur d'un autre grand livre sur Java et Spring, " Java in the Cloud ", sorti l'été dernier.

La programmation rĂ©active est votre façon de construire des systĂšmes rĂ©sistants aux charges Ă©levĂ©es. Le traitement d'un trafic important n'est plus un problĂšme, car le serveur n'est pas bloquant et les processus clients n'ont pas Ă  attendre les rĂ©ponses. Le client ne peut pas observer directement comment le programme s'exĂ©cute sur le serveur et se synchroniser avec lui. Lorsque l'API a du mal Ă  traiter les demandes, elle doit quand mĂȘme donner des rĂ©ponses raisonnables. Ne doit pas refuser et rejeter les messages de maniĂšre incontrĂŽlĂ©e. Il doit informer les composants supĂ©rieurs qu'il fonctionne sous charge afin qu'ils puissent le libĂ©rer partiellement de cette charge. Cette technique est appelĂ©e contre-pression, un aspect important de la programmation rĂ©active.

Nous avons co-écrit cet article avec Josh Long . Josh est un champion Java, Spring Developer Advocate, et généralement un gars mondial travaillant chez Pivotal. Je travaille avec Spring depuis longtemps, mais c'est Josh qui m'a montré le Spring Boot, c'était lors de la conférence Devoxx en Belgique. Depuis, nous sommes devenus de bons amis, nous aimons Java et écrivons des applications sympas.

Programmation réactive ou E / S, E / S, on se met au travail ...

La programmation rĂ©active est une approche de crĂ©ation de logiciels qui utilise activement les E / S asynchrones. Les E / S asynchrones sont une petite idĂ©e, chargĂ©e de grands changements dans la programmation. L'idĂ©e elle-mĂȘme est simple: corriger la situation avec l'allocation inefficace des ressources, libĂ©rer les ressources qui auraient Ă©tĂ© inactives sans notre intervention, attendre la fin des E / S. L'entrĂ©e / sortie asynchrone inverse l'approche habituelle du traitement des E / S: le client est libĂ©rĂ© et peut effectuer d'autres tĂąches en attendant de nouvelles notifications.

Considérez ce qui est commun entre les entrées / sorties synchrones et asynchrones, et quelles sont les différences entre elles.

Nous allons écrire un programme simple qui lit les données de la source (en particulier, nous parlons du lien java.io.File ). Commençons par une implémentation qui utilise le bon vieux java.io.InputStream :

Exemple 1. Lecture synchrone de données d'un fichier

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } } 

  1. Nous fournissons un fichier Ă  lire avec le fichier java.io.File habituel
  2. Tirez les résultats de la source une ligne à la fois ...
  3. J'ai écrit ce code pour prendre Consumer<BytesPayloadgt; appelé lorsque de nouvelles données arrivent

Assez simple, que dites-vous? Exécutez ce code et vous verrez dans la sortie du journal (à gauche de chaque ligne), indiquant que toutes les actions se produisent dans un seul thread.
Ici, nous extrayons des octets de nos données prises dans la source (dans ce cas, nous parlons d'une sous-classe de java.io.FileInputStream héritée de java.io.InputStream ). Quel est le problÚme avec cet exemple? Dans ce cas, nous utilisons un InputStream qui pointe vers des données situées sur notre systÚme de fichiers. Si le fichier est là et que le disque dur fonctionne, alors ce code fonctionnera comme prévu.

Mais que se passe-t-il si nous lisons les donnĂ©es non pas depuis File , mais depuis une prise rĂ©seau, et utilisons une autre implĂ©mentation de InputStream ? Rien Ă  craindre! Bien sĂ»r, il n'y aura absolument rien Ă  craindre si la vitesse du rĂ©seau est infiniment Ă©levĂ©e. Et si le canal rĂ©seau entre celui-ci et l'autre nƓud ne tombe jamais en panne. Si ces conditions sont remplies, le code fonctionnera parfaitement.

Mais que se passe-t-il si le rĂ©seau ralentit ou s'Ă©tire? Dans ce cas, je veux dire que nous allons augmenter la pĂ©riode jusqu'Ă  ce que l'opĂ©ration in.read(
) . En fait, elle ne reviendra peut-ĂȘtre pas du tout! C'est un problĂšme si nous essayons de faire autre chose avec le flux Ă  partir duquel nous lisons des donnĂ©es. Bien sĂ»r, vous pouvez toujours crĂ©er un autre flux et lire des donnĂ©es Ă  travers celui-ci. Cela peut ĂȘtre fait jusqu'Ă  un certain point, mais, Ă  la fin, nous atteindrons la limite Ă  laquelle l'ajout de threads pour une mise Ă  l'Ă©chelle supplĂ©mentaire ne sera plus suffisant. Nous n'aurons pas de vĂ©ritable concurrence au-delĂ  du nombre de cƓurs prĂ©sents sur notre machine. Impasse! Dans ce cas, nous pouvons augmenter le traitement d'entrĂ©e / sortie (la lecture est ici signifiĂ©e) uniquement en raison de flux supplĂ©mentaires, mais ici nous atteindrons tĂŽt ou tard la limite.

Dans cet exemple, le travail principal est la lecture - presque rien ne se passe sur les autres fronts. Nous dépendons des E / S. Considérez comment une solution asynchrone nous aide à surmonter partiellement la monopolisation de nos flux.

Exemple 2. Lecture asynchrone de données d'un fichier

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } } 

  1. Cette fois, nous adaptons java.io.File , faisant de Java NIO java.nio.file.Path partir de celui-ci
  2. Lors de la création d'un Channel , nous java.util.concurrent.ExecutorService en particulier le service java.util.concurrent.ExecutorService , qui sera utilisé pour appeler le gestionnaire CompletionHandler lorsque les données nécessaires apparaissent
  3. Nous commençons la lecture en passant un lien vers CompletionHandler<Integer, ByteBuffer> (this)
  4. Dans le rappel, lisez les octets du ByteBuffer dans la capacité en byte[]
  5. Tout comme dans l'exemple Synchronous , les données d' byte[] sont transmises au consommateur.

Nous ferons une rĂ©servation tout de suite: ce code s’est avĂ©rĂ© beaucoup plus difficile! Il y a tellement de choses qui se passent ici que votre tĂȘte tourne tout de suite, cependant, permettez-moi de souligner ... ce code lit les donnĂ©es du Java NIO Channel , puis traite ces donnĂ©es dans un thread sĂ©parĂ© responsable des rappels. Ainsi, le flux dans lequel la lecture a commencĂ© n'est pas monopolisĂ©. Nous revenons presque instantanĂ©ment aprĂšs avoir appelĂ© .read(..) , et quand, enfin, nous avons les donnĂ©es Ă  notre disposition, un rappel est effectuĂ© - dĂ©jĂ  dans un autre thread. S'il y a un dĂ©lai entre les appels Ă  .read() vous pouvez passer Ă  d'autres sujets en les exĂ©cutant dans notre thread. La durĂ©e d'une opĂ©ration de lecture asynchrone, du premier au dernier, n'est au mieux pas plus longue que celle d'une opĂ©ration de lecture synchrone. Habituellement, une opĂ©ration asynchrone n'est pas significativement plus longue. Cependant, face Ă  ces difficultĂ©s supplĂ©mentaires, nous pouvons gĂ©rer plus efficacement nos flux. Faites plus de travail, multiplexez les E / S dans un pool avec un nombre fini de threads.

Je travaille pour une entreprise de cloud computing. Nous aimerions que vous obteniez de nouvelles instances de l'application pour rĂ©soudre les problĂšmes de mise Ă  l'Ă©chelle horizontale! Bien sĂ»r, ici, je suis un peu hypocrite. Les E / S asynchrones compliquent un peu les choses, mais j'espĂšre que cet exemple illustre Ă  quel point le code rĂ©actif est si utile: il vous permet de traiter plus de requĂȘtes et de travailler plus sur votre matĂ©riel existant si les performances dĂ©pendent fortement des E / S. Si les performances dĂ©pendent de l'utilisation du processeur (disons, nous parlons d'opĂ©rations sur les nombres de Fibonacci, d'extraction de bitcoins ou de cryptographie), alors la programmation rĂ©active ne nous donnera rien.

Actuellement, la plupart d'entre nous n'utilisons pas les implémentations Channel ou InputStream dans notre travail quotidien! Nous devons penser aux problÚmes au niveau des abstractions de niveau supérieur. Il s'agit de choses comme les tableaux, ou plutÎt, la hiérarchie java.util.Collection . La collection java.util.Collection s'affiche trÚs bien sur un InputStream: les deux entités supposent que vous pouvez opérer sur toutes les données à la fois, et presque instantanément. Il est prévu que vous pourrez terminer la lecture de la plupart des InputStreams plus tÎt que tard. Les types de collections deviennent un peu inconfortables lors du passage à de plus grandes quantités de données. Et si vous avez affaire à quelque chose de potentiellement infini (illimité) - par exemple, des sockets Web ou des événements de serveur? Que faire en cas de retard entre les enregistrements?

Nous avons besoin d'une meilleure façon de dĂ©crire ce type de donnĂ©es. Nous parlons d'Ă©vĂ©nements asynchrones, tels qui se produiront Ă  la fin. Il peut sembler que Future<T> ou CompletableFuture<T> sont bien adaptĂ©s Ă  cette fin, mais ils dĂ©crivent juste une chose qui se produit Ă  la fin. En fait, Java ne fournit pas de mĂ©taphore appropriĂ©e pour dĂ©crire ce type de donnĂ©es. Les types Iterator et Stream de Java 8 peuvent ne pas ĂȘtre liĂ©s, cependant, les deux sont orientĂ©s vers l'extraction; vous demandez vous-mĂȘme l'entrĂ©e suivante, et non le type devrait envoyer un rappel Ă  votre code. Il est supposĂ© que si le traitement par push Ă©tait pris en charge dans ce cas, ce qui permettrait de rĂ©aliser bien plus au niveau du thread, l'API fournirait Ă©galement un contrĂŽle de threading et de planification. Iterator implĂ©mentations d' Iterator ne disent rien sur le thread, et tous les threads Java 8 partagent le mĂȘme pool de jointures de fourches.

Si Iterator et Stream Iterator vraiment en charge le traitement push, nous rencontrerions alors un autre problĂšme qui dĂ©gĂ©nĂšre prĂ©cisĂ©ment dans le contexte des E / S: nous aurons besoin d'une sorte de mĂ©canisme de rĂ©tro-pĂ©nĂ©tration! Étant donnĂ© que le consommateur de donnĂ©es est traitĂ© de maniĂšre asynchrone, nous n'avons aucune idĂ©e du moment oĂč les donnĂ©es seront dans le pipeline et en quelle quantitĂ©. Nous ne savons pas combien de donnĂ©es devront ĂȘtre traitĂ©es lors du prochain rappel: un octet ou un tĂ©raoctet!

En tirant des donnĂ©es d'un InputStream , vous lisez autant d'informations que vous ĂȘtes prĂȘt Ă  les traiter, et pas plus. Dans les exemples prĂ©cĂ©dents, nous lisons les donnĂ©es dans le tampon d' byte[] d'une longueur fixe et connue. Dans un contexte asynchrone, nous avons besoin d'un moyen de dire au fournisseur la quantitĂ© de donnĂ©es que nous sommes prĂȘts Ă  traiter.
Oui, monsieur. Il manque certainement quelque chose ici.

Rechercher la métaphore manquante

Dans ce cas, nous recherchons une métaphore qui reflÚte magnifiquement l'essence des E / S asynchrones, prend en charge un tel mécanisme pour le transfert inverse des données et nous permet de contrÎler le flux d'exécution dans les systÚmes distribués. Dans la programmation réactive, la capacité d'un client à signaler quelle charge il est capable de gérer est appelée «flux inverse».

Il existe maintenant un certain nombre de bons projets - Vert.x, Akka Streams et RxJava - prenant en charge la programmation réactive. L'équipe Spring gÚre également un projet appelé Reactor . Entre ces différentes normes, il existe un champ général assez large, attribué de facto à la norme de l' initiative Reactive Streams . L'initiative Reactive Streams définit quatre types:

Interface de l' Publisher<T> ; produit des valeurs qui peuvent finalement arriver. Interface de l' Publisher<T> ; produit des valeurs de type T pour l' Subscriber<T> .

Exemple 3. Flux réactifs: interface Publisher<T> .

 package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); } 

Le type Subscriber s'abonne à Publisher<T> , recevant des notifications de toute nouvelle valeur de type T via sa onNext(T) . Si des erreurs se produisent, sa onError(Throwable) est onError(Throwable) . Lorsque le traitement s'est terminé normalement, la méthode onComplete l'abonné est appelée.

Exemple 4. Jet streams: interface Subscriber<T> .

 package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } 

Lorsque l' Subscriber connecte pour la premiĂšre fois Ă  Publisher , il reçoit un Subscription dans la Subscriber#onSubscribe . Abonnement L' Subscription est peut-ĂȘtre la partie la plus importante de toute la spĂ©cification; c'est elle qui assure le retour. Un abonnĂ© abonnĂ© utilise la mĂ©thode de demande d' Subscription#request pour demander des donnĂ©es supplĂ©mentaires ou la mĂ©thode d' Subscription#cancel pour arrĂȘter le traitement.

Exemple 5. Flux réactifs: interface d' Subscription<T> .

 package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); } 

La spécification de flux réactif fournit un autre type utile, bien qu'évident,: le Processor<A,B> n'est qu'une interface qui hérite à la fois de l' Subscriber<A> et de l' Publisher<B> .

Exemple 6. Jet streams: Processor<T> .

 package org.reactivestreams; public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> { } 

Une spécification n'est pas positionnée comme une prescription pour les implémentations; en fait, son objectif est de définir des types pour prendre en charge l'interopérabilité. L'avantage évident des types associés aux flux réactifs est qu'ils ont néanmoins trouvé une place dans la version Java 9, d'ailleurs, sémantiquement ils sont «one to one» correspondent aux interfaces de la classe java.util.concurrent.Flow , par exemple: java.util.concurrent.Flow.Publisher .

Rencontrez Reactor

Les types de flux réactifs ne suffisent pas à eux seuls; des implémentations d'ordre supérieur sont nécessaires pour prendre en charge des opérations telles que le filtrage et la transformation. En tant que tel, le projet Reactor est pratique; il s'appuie sur la spécification Reactive Streams et propose deux spécialisations Publisher<T> .

Tout d'abord, Flux<T> est un Publisher qui produit zéro ou plusieurs valeurs. Le second, Mono<T> , est Publisher<T> , produisant zéro ou une valeur. Tous deux publient des valeurs et peuvent les gérer en conséquence, cependant, leurs capacités sont beaucoup plus étendues que la spécification Reactive Streams. Les deux fournissent des opérateurs qui vous permettent de traiter des flux de valeur. Les types de réacteurs se composent bien - la sortie de l'un d'eux peut servir d'entrée pour l'autre, et si le type doit fonctionner avec d'autres flux de données, ils s'appuient sur des instances Publisher<T> .

Mono<T> et Flux<T> implémentent Publisher<T> ; nous recommandons que vos méthodes acceptent les instances Publisher<T> mais retournent Flux<T> ou Mono<T> ; cela aidera le client à distinguer le type de données qu'il reçoit.

Supposons qu'on vous ait donnĂ© Publisher<T> et qu'on vous ait demandĂ© d'afficher l'interface utilisateur de cet Publisher<T> . Dois-je ensuite afficher une page avec les dĂ©tails d'un enregistrement, car vous pouvez obtenir CompletableFuture<T> ? Ou afficher une page de prĂ©sentation avec une liste ou une grille oĂč toutes les entrĂ©es sont affichĂ©es page par page? C'est difficile Ă  dire.

À leur tour, Flux<T> et Mono<T> trĂšs spĂ©cifiques. Vous savez que vous devez afficher une page de rĂ©vision si Flux<T> reçu, et une page avec des dĂ©tails pour un (ou pas un seul) enregistrement lorsque vous recevez Mono<T> .

Reactor est un projet open source lancé par Pivotal; Maintenant, il est devenu trÚs populaire. Facebook l'utilise dans son moteur à réaction pour appeler des procédures à distance , et l'utilise également dans Rsocket , dirigé par Ben Christensen, le créateur de RxJava. Salesforce l'utilise dans son implémentation gRPC réactive . Reactor implémente les types Reactive Streams, afin qu'il puisse interagir avec d'autres technologies qui prennent en charge ces types, par exemple, avec RxJava 2 de Netflix, Akka Streams de Lightbend et avec le projet Vert.x de la Fondation Eclipse. David Cairnock, directeur de RxJava 2, a également collaboré activement avec Pivotal pour développer Reactor, améliorant encore le projet. De plus, bien sûr, il est présent sous une forme ou une autre dans Spring Framework, à commencer par Spring Framework 4.0.

Programmation réactive avec Spring WebFlux

Malgré toute son utilité, Reactor n'est que la base. Nos applications doivent communiquer avec des sources de données. Doit prendre en charge l'authentification et l'autorisation. Le printemps fournit tout cela. Si Reactor nous donne la métaphore manquante, alors Spring nous aide tous à parler un langage commun.

Spring Framework 5.0 a été publié en septembre 2017. Il s'appuie sur Reactor et la spécification Reactive Streams. Il dispose d'un nouveau modÚle d'exécution réactif et de composants appelé Spring WebFlux .

Spring WebFlux est indépendant de l'API Servlet et ne nécessite pas leur fonctionnement. Il est livré avec des adaptateurs qui vous permettent de l'utiliser au-dessus du moteur Servlet si nécessaire, mais ce n'est pas nécessaire. Il fournit également un tout nouveau runtime basé sur Netty appelé Spring WebFlux. Spring Framework 5, fonctionnant avec Java 8 et Java EE 7 et versions ultérieures, sert désormais de base à une grande partie de l'écosystÚme Spring, notamment Spring Data Kay, Spring Security 5, Spring Boot 2 et Spring Cloud Finchley.

Source: https://habr.com/ru/post/fr435972/


All Articles