Bonjour, Habr!
Cette semaine, nous attendons un nouveau
livre Spring 5 de l'imprimerie:
Parmi les fonctionnalitĂ©s intĂ©ressantes de Spring 5, la programmation rĂ©active mĂ©rite une mention spĂ©ciale, dont la mise en Ćuvre dans ce cadre est briĂšvement dĂ©crite par l'article proposĂ© par Matt Raible. Dans le livre susmentionnĂ©, les modĂšles rĂ©actifs sont examinĂ©s au chapitre 11.
Matt a été co-écrit par Josh Long, auteur d'un autre grand livre sur Java et Spring, "
Java in the Cloud ", sorti l'été dernier.
La programmation rĂ©active est votre façon de construire des systĂšmes rĂ©sistants aux charges Ă©levĂ©es. Le traitement d'un trafic important n'est plus un problĂšme, car le serveur n'est pas bloquant et les processus clients n'ont pas Ă attendre les rĂ©ponses. Le client ne peut pas observer directement comment le programme s'exĂ©cute sur le serveur et se synchroniser avec lui. Lorsque l'API a du mal Ă traiter les demandes, elle doit quand mĂȘme donner des rĂ©ponses raisonnables. Ne doit pas refuser et rejeter les messages de maniĂšre incontrĂŽlĂ©e. Il doit informer les composants supĂ©rieurs qu'il fonctionne sous charge afin qu'ils puissent le libĂ©rer partiellement de cette charge. Cette technique est appelĂ©e contre-pression, un aspect important de la programmation rĂ©active.
Nous avons co-écrit cet article avec
Josh Long . Josh est un champion Java, Spring Developer Advocate, et généralement un gars mondial travaillant chez Pivotal. Je travaille avec Spring depuis longtemps, mais c'est Josh qui m'a montré le Spring Boot, c'était lors de la conférence Devoxx en Belgique. Depuis, nous sommes devenus de bons amis, nous aimons Java et écrivons des applications sympas.
Programmation rĂ©active ou E / S, E / S, on se met au travail ...La programmation rĂ©active est une approche de crĂ©ation de logiciels qui utilise activement les E / S asynchrones. Les E / S asynchrones sont une petite idĂ©e, chargĂ©e de grands changements dans la programmation. L'idĂ©e elle-mĂȘme est simple: corriger la situation avec l'allocation inefficace des ressources, libĂ©rer les ressources qui auraient Ă©tĂ© inactives sans notre intervention, attendre la fin des E / S. L'entrĂ©e / sortie asynchrone inverse l'approche habituelle du traitement des E / S: le client est libĂ©rĂ© et peut effectuer d'autres tĂąches en attendant de nouvelles notifications.
Considérez ce qui est commun entre les entrées / sorties synchrones et asynchrones, et quelles sont les différences entre elles.
Nous allons écrire un programme simple qui lit les données de la source (en particulier, nous parlons du lien
java.io.File
). Commençons par une implémentation qui utilise le bon vieux
java.io.InputStream
:
Exemple 1. Lecture synchrone de données d'un fichier package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) {
- Nous fournissons un fichier Ă lire avec le fichier
java.io.File
habituel - Tirez les résultats de la source une ligne à la fois ...
- J'ai écrit ce code pour prendre
Consumer<BytesPayloadgt;
appelé lorsque de nouvelles données arrivent
Assez simple, que dites-vous? Exécutez ce code et vous verrez dans la sortie du journal (à gauche de chaque ligne), indiquant que toutes les actions se produisent dans un seul thread.
Ici, nous extrayons des octets de nos données prises dans la source (dans ce cas, nous parlons d'une sous-classe de
java.io.FileInputStream
héritée de
java.io.InputStream
). Quel est le problÚme avec cet exemple? Dans ce cas, nous utilisons un InputStream qui pointe vers des données situées sur notre systÚme de fichiers. Si le fichier est là et que le disque dur fonctionne, alors ce code fonctionnera comme prévu.
Mais que se passe-t-il si nous lisons les données non pas depuis
File
, mais depuis une prise réseau, et utilisons une autre implémentation de
InputStream
? Rien Ă craindre! Bien sĂ»r, il n'y aura absolument rien Ă craindre si la vitesse du rĂ©seau est infiniment Ă©levĂ©e. Et si le canal rĂ©seau entre celui-ci et l'autre nĆud ne tombe jamais en panne. Si ces conditions sont remplies, le code fonctionnera parfaitement.
Mais que se passe-t-il si le réseau ralentit ou s'étire? Dans ce cas, je veux dire que nous allons augmenter la période jusqu'à ce que l'opération
in.read(âŠ)
. En fait, elle ne reviendra peut-ĂȘtre pas du tout! C'est un problĂšme si nous essayons de faire autre chose avec le flux Ă partir duquel nous lisons des donnĂ©es. Bien sĂ»r, vous pouvez toujours crĂ©er un autre flux et lire des donnĂ©es Ă travers celui-ci. Cela peut ĂȘtre fait jusqu'Ă un certain point, mais, Ă la fin, nous atteindrons la limite Ă laquelle l'ajout de threads pour une mise Ă l'Ă©chelle supplĂ©mentaire ne sera plus suffisant. Nous n'aurons pas de vĂ©ritable concurrence au-delĂ du nombre de cĆurs prĂ©sents sur notre machine. Impasse! Dans ce cas, nous pouvons augmenter le traitement d'entrĂ©e / sortie (la lecture est ici signifiĂ©e) uniquement en raison de flux supplĂ©mentaires, mais ici nous atteindrons tĂŽt ou tard la limite.
Dans cet exemple, le travail principal est la lecture - presque rien ne se passe sur les autres fronts. Nous dépendons des E / S. Considérez comment une solution asynchrone nous aide à surmonter partiellement la monopolisation de nos flux.
Exemple 2. Lecture asynchrone de données d'un fichier package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath();
- Cette fois, nous adaptons
java.io.File
, faisant de Java NIO java.nio.file.Path
partir de celui-ci - Lors de la création d'un
Channel
, nous java.util.concurrent.ExecutorService
en particulier le service java.util.concurrent.ExecutorService
, qui sera utilisé pour appeler le gestionnaire CompletionHandler
lorsque les données nécessaires apparaissent - Nous commençons la lecture en passant un lien vers
CompletionHandler<Integer, ByteBuffer> (this)
- Dans le rappel, lisez les octets du
ByteBuffer
dans la capacité en byte[]
- Tout comme dans l'exemple
Synchronous
, les données d' byte[]
sont transmises au consommateur.
Nous ferons une rĂ©servation tout de suite: ce code sâest avĂ©rĂ© beaucoup plus difficile! Il y a tellement de choses qui se passent ici que votre tĂȘte tourne tout de suite, cependant, permettez-moi de souligner ... ce code lit les donnĂ©es du
Java NIO Channel
, puis traite ces données dans un thread séparé responsable des rappels. Ainsi, le flux dans lequel la lecture a commencé n'est pas monopolisé. Nous revenons presque instantanément aprÚs avoir appelé
.read(..)
, et quand, enfin, nous avons les donnĂ©es Ă notre disposition, un rappel est effectuĂ© - dĂ©jĂ dans un autre thread. S'il y a un dĂ©lai entre les appels Ă
.read()
vous pouvez passer à d'autres sujets en les exécutant dans notre thread. La durée d'une opération de lecture asynchrone, du premier au dernier, n'est au mieux pas plus longue que celle d'une opération de lecture synchrone. Habituellement, une opération asynchrone n'est pas significativement plus longue. Cependant, face à ces difficultés supplémentaires, nous pouvons gérer plus efficacement nos flux. Faites plus de travail, multiplexez les E / S dans un pool avec un nombre fini de threads.
Je travaille pour une entreprise de cloud computing. Nous aimerions que vous obteniez de nouvelles instances de l'application pour rĂ©soudre les problĂšmes de mise Ă l'Ă©chelle horizontale! Bien sĂ»r, ici, je suis un peu hypocrite. Les E / S asynchrones compliquent un peu les choses, mais j'espĂšre que cet exemple illustre Ă quel point le code rĂ©actif est si utile: il vous permet de traiter plus de requĂȘtes et de travailler plus sur votre matĂ©riel existant si les performances dĂ©pendent fortement des E / S. Si les performances dĂ©pendent de l'utilisation du processeur (disons, nous parlons d'opĂ©rations sur les nombres de Fibonacci, d'extraction de bitcoins ou de cryptographie), alors la programmation rĂ©active ne nous donnera rien.
Actuellement, la plupart d'entre nous n'utilisons pas les implémentations
Channel
ou
InputStream
dans notre travail quotidien! Nous devons penser aux problÚmes au niveau des abstractions de niveau supérieur. Il s'agit de choses comme les tableaux, ou plutÎt, la hiérarchie
java.util.Collection
. La collection
java.util.Collection
s'affiche trÚs bien sur un InputStream: les deux entités supposent que vous pouvez opérer sur toutes les données à la fois, et presque instantanément. Il est prévu que vous pourrez terminer la lecture de la plupart des
InputStreams
plus tÎt que tard. Les types de collections deviennent un peu inconfortables lors du passage à de plus grandes quantités de données. Et si vous avez affaire à quelque chose de potentiellement infini (illimité) - par exemple, des sockets Web ou des événements de serveur? Que faire en cas de retard entre les enregistrements?
Nous avons besoin d'une meilleure façon de décrire ce type de données. Nous parlons d'événements asynchrones, tels qui se produiront à la fin. Il peut sembler que
Future<T>
ou
CompletableFuture<T>
sont bien adaptés à cette fin, mais ils décrivent juste une chose qui se produit à la fin. En fait, Java ne fournit pas de métaphore appropriée pour décrire ce type de données. Les types
Iterator
et
Stream
de Java 8 peuvent ne pas ĂȘtre liĂ©s, cependant, les deux sont orientĂ©s vers l'extraction; vous demandez vous-mĂȘme l'entrĂ©e suivante, et non le type devrait envoyer un rappel Ă votre code. Il est supposĂ© que si le traitement par push Ă©tait pris en charge dans ce cas, ce qui permettrait de rĂ©aliser bien plus au niveau du thread, l'API fournirait Ă©galement un contrĂŽle de threading et de planification.
Iterator
implémentations d'
Iterator
ne disent rien sur le thread, et tous les threads Java 8 partagent le mĂȘme pool de jointures de fourches.
Si
Iterator
et
Stream
Iterator
vraiment en charge le traitement push, nous rencontrerions alors un autre problĂšme qui dĂ©gĂ©nĂšre prĂ©cisĂ©ment dans le contexte des E / S: nous aurons besoin d'une sorte de mĂ©canisme de rĂ©tro-pĂ©nĂ©tration! Ătant donnĂ© que le consommateur de donnĂ©es est traitĂ© de maniĂšre asynchrone, nous n'avons aucune idĂ©e du moment oĂč les donnĂ©es seront dans le pipeline et en quelle quantitĂ©. Nous ne savons pas combien de donnĂ©es devront ĂȘtre traitĂ©es lors du prochain rappel: un octet ou un tĂ©raoctet!
En tirant des données d'un
InputStream
, vous lisez autant d'informations que vous ĂȘtes prĂȘt Ă les traiter, et pas plus. Dans les exemples prĂ©cĂ©dents, nous lisons les donnĂ©es dans le tampon d'
byte[]
d'une longueur fixe et connue. Dans un contexte asynchrone, nous avons besoin d'un moyen de dire au fournisseur la quantitĂ© de donnĂ©es que nous sommes prĂȘts Ă traiter.
Oui, monsieur. Il manque certainement quelque chose ici.
Rechercher la métaphore manquanteDans ce cas, nous recherchons une métaphore qui reflÚte magnifiquement l'essence des E / S asynchrones, prend en charge un tel mécanisme pour le transfert inverse des données et nous permet de contrÎler le flux d'exécution dans les systÚmes distribués. Dans la programmation réactive, la capacité d'un client à signaler quelle charge il est capable de gérer est appelée «flux inverse».
Il existe maintenant un certain nombre de bons projets - Vert.x, Akka Streams et RxJava - prenant en charge la programmation réactive. L'équipe Spring gÚre également un projet appelé
Reactor . Entre ces différentes normes, il existe un champ général assez large, attribué de facto à la norme de l'
initiative Reactive Streams . L'initiative Reactive Streams définit quatre types:
Interface de l'
Publisher<T>
; produit des valeurs qui peuvent finalement arriver. Interface de l'
Publisher<T>
; produit des valeurs de type
T
pour l'
Subscriber<T>
.
Exemple 3. Flux réactifs: interface Publisher<T>
.
package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); }
Le type
Subscriber
s'abonne Ă
Publisher<T>
, recevant des notifications de toute nouvelle valeur de type
T
via sa
onNext(T)
. Si des erreurs se produisent, sa
onError(Throwable)
est
onError(Throwable)
. Lorsque le traitement s'est terminé normalement, la méthode
onComplete
l'abonné est appelée.
Exemple 4. Jet streams: interface Subscriber<T>
. package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Lorsque l'
Subscriber
connecte pour la premiĂšre fois Ă
Publisher
, il reçoit un
Subscription
dans la
Subscriber#onSubscribe
. Abonnement L'
Subscription
est peut-ĂȘtre la partie la plus importante de toute la spĂ©cification; c'est elle qui assure le retour. Un abonnĂ© abonnĂ© utilise la mĂ©thode de demande d'
Subscription#request
pour demander des données supplémentaires ou la méthode d'
Subscription#cancel
pour arrĂȘter le traitement.
Exemple 5. Flux réactifs: interface d' Subscription<T>
.
package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); }
La spécification de flux réactif fournit un autre type utile, bien qu'évident,: le
Processor<A,B>
n'est qu'une interface qui hérite à la fois de l'
Subscriber<A>
et de l'
Publisher<B>
.
Exemple 6. Jet streams: Processor<T>
.
package org.reactivestreams; public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Une spécification n'est pas positionnée comme une prescription pour les implémentations; en fait, son objectif est de définir des types pour prendre en charge l'interopérabilité. L'avantage évident des types associés aux flux réactifs est qu'ils ont néanmoins trouvé une place dans la version Java 9, d'ailleurs, sémantiquement ils sont «one to one» correspondent aux interfaces de la classe
java.util.concurrent.Flow
, par exemple:
java.util.concurrent.Flow.Publisher
.
Rencontrez ReactorLes types de flux réactifs ne suffisent pas à eux seuls; des implémentations d'ordre supérieur sont nécessaires pour prendre en charge des opérations telles que le filtrage et la transformation. En tant que tel, le projet Reactor est pratique; il s'appuie sur la spécification Reactive Streams et propose deux spécialisations
Publisher<T>
.
Tout d'abord,
Flux<T>
est un
Publisher
qui produit zéro ou plusieurs valeurs. Le second,
Mono<T>
, est
Publisher<T>
, produisant zéro ou une valeur. Tous deux publient des valeurs et peuvent les gérer en conséquence, cependant, leurs capacités sont beaucoup plus étendues que la spécification Reactive Streams. Les deux fournissent des opérateurs qui vous permettent de traiter des flux de valeur. Les types de réacteurs se composent bien - la sortie de l'un d'eux peut servir d'entrée pour l'autre, et si le type doit fonctionner avec d'autres flux de données, ils s'appuient sur des instances
Publisher<T>
.
Mono<T>
et
Flux<T>
implémentent
Publisher<T>
; nous recommandons que vos méthodes acceptent les instances
Publisher<T>
mais retournent
Flux<T>
ou
Mono<T>
; cela aidera le client à distinguer le type de données qu'il reçoit.
Supposons qu'on vous ait donné
Publisher<T>
et qu'on vous ait demandé d'afficher l'interface utilisateur de cet
Publisher<T>
. Dois-je ensuite afficher une page avec les détails d'un enregistrement, car vous pouvez obtenir
CompletableFuture<T>
? Ou afficher une page de prĂ©sentation avec une liste ou une grille oĂč toutes les entrĂ©es sont affichĂ©es page par page? C'est difficile Ă dire.
Ă leur tour,
Flux<T>
et
Mono<T>
trÚs spécifiques. Vous savez que vous devez afficher une page de révision si
Flux<T>
reçu, et une page avec des détails pour un (ou pas un seul) enregistrement lorsque vous recevez
Mono<T>
.
Reactor est un projet open source lancé par Pivotal; Maintenant, il est devenu trÚs populaire. Facebook l'utilise dans son
moteur à réaction
pour appeler des procédures à distance , et l'utilise également dans
Rsocket , dirigé par Ben Christensen, le créateur de RxJava. Salesforce l'utilise dans son
implémentation gRPC réactive . Reactor implémente les types Reactive Streams, afin qu'il puisse interagir avec d'autres technologies qui prennent en charge ces types, par exemple, avec
RxJava 2 de Netflix,
Akka Streams de Lightbend et avec le projet
Vert.x de la Fondation Eclipse. David Cairnock, directeur de RxJava 2, a également collaboré activement avec Pivotal pour développer Reactor, améliorant encore le projet. De plus, bien sûr, il est présent sous une forme ou une autre dans Spring Framework, à commencer par Spring Framework 4.0.
Programmation réactive avec Spring WebFluxMalgré toute son utilité, Reactor n'est que la base. Nos applications doivent communiquer avec des sources de données. Doit prendre en charge l'authentification et l'autorisation. Le printemps fournit tout cela. Si Reactor nous donne la métaphore manquante, alors Spring nous aide tous à parler un langage commun.
Spring Framework 5.0 a été publié en septembre 2017. Il s'appuie sur Reactor et la spécification Reactive Streams. Il dispose d'un nouveau modÚle d'exécution réactif et de composants appelé
Spring WebFlux .
Spring WebFlux est indépendant de l'API Servlet et ne nécessite pas leur fonctionnement. Il est livré avec des adaptateurs qui vous permettent de l'utiliser au-dessus du moteur Servlet si nécessaire, mais ce n'est pas nécessaire. Il fournit également un tout nouveau runtime basé sur Netty appelé Spring WebFlux. Spring Framework 5, fonctionnant avec Java 8 et Java EE 7 et versions ultérieures, sert désormais de base à une grande partie de l'écosystÚme Spring, notamment Spring Data Kay, Spring Security 5, Spring Boot 2 et Spring Cloud Finchley.