Bonjour à tous!
L'année dernière, le cours
Java Enterprise Developer a été lancé avec succès et nous avons le dernier matériel sur ce sujet que nous voulons partager avec vous, qui discute de l'utilisation de l'approche asynchrone et de la mise en scène pour développer des applications réactives réactives.
Allons-y.
La programmation réactive sonne d'abord comme le nom d'un paradigme émergent, mais se réfère en fait à une méthode de programmation dans laquelle une approche orientée événement est utilisée pour travailler avec des flux de données asynchrones. Basés sur des données constamment à jour, les systèmes réactifs y répondent en effectuant une série d'événements.
La programmation réactive suit le modèle de conception «Observateur», qui peut être défini comme suit: si un état change dans un objet, tous les autres objets sont notifiés et mis à jour en conséquence. Par conséquent, au lieu d'interroger les événements pour les modifications, les événements sont poussés de manière asynchrone afin que les observateurs puissent les traiter. Dans cet exemple, les observateurs sont des fonctions qui sont exécutées lorsque l'événement est distribué. Et le flux de données mentionné est le véritable observable.
Presque tous les langages et frameworks utilisent cette approche dans leur écosystème, et les dernières versions de Java ne font pas exception. Dans cet article, je vais expliquer comment la programmation réactive peut être appliquée à l'aide de la dernière version de JAX-RS dans les fonctionnalités Java EE 8 et Java 8.
Jet ManifestoLe
Jet Manifesto énumère quatre aspects fondamentaux dont une application a besoin pour être plus flexible, couplée de manière lâche et facile à évoluer, et donc capable d'être réactive. Il stipule que l'application doit être réactive, flexible (et donc évolutive), résiliente et axée sur les messages.
L'objectif sous-jacent est une application vraiment réactive. Supposons qu'il existe une application dans laquelle un gros thread est engagé dans le traitement des demandes des utilisateurs, et après avoir terminé le travail, ce thread renvoie les réponses aux demandeurs d'origine. Lorsqu'une application reçoit plus de demandes qu'elle n'en peut gérer, ce thread devient un goulot d'étranglement et l'application perd son ancienne réactivité. Pour maintenir la réactivité, l'application doit être évolutive et résiliente. Durable peut être considéré comme une application dotée de fonctionnalités d'auto-récupération. D'après l'expérience de la plupart des développeurs, seule une architecture basée sur les messages permet à l'application d'être évolutive, résiliente et réactive.
La programmation réactive a été introduite dans Java 8 et Java EE 8. Java a introduit des concepts tels que
CompletionStage
et son implémentation de
CompletableFuture
, et Java a commencé à utiliser ces fonctionnalités dans des spécifications telles que l'API Reactive Client dans JAX-RS.
API client réactive JAX-RS 2.1Voyons comment la programmation réactive peut être utilisée dans les applications Java EE 8. Pour comprendre le processus, vous avez besoin d'une connaissance de base de l'API Java EE.
JAX-RS 2.1 a introduit une nouvelle façon de créer un client REST avec prise en charge de la programmation réactive. L'implémentation d'invocateur par défaut proposée dans JAX-RS est synchrone, ce qui signifie que le client créé enverra un appel de blocage au point de terminaison du serveur. Un exemple d'implémentation est présenté dans le Listing 1.
Listing 1
Response response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .get();
À partir de la version 2.0, JAX-RS prend en charge la création d'un appelant asynchrone sur l'API client avec un simple appel à la méthode
async()
, comme indiqué dans le listing 2.
Listing 2
Future<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .async() .get();
L'utilisation d'un invocateur asynchrone sur le client renvoie une instance
Future
de type
javax.ws.rs.core.Response
. Cela peut conduire à interroger la réponse, avec un appel à
future.get()
, ou à enregistrer un rappel qui sera appelé lorsqu'une réponse HTTP sera disponible. Les deux implémentations conviennent à la programmation asynchrone, mais les choses se compliquent généralement si vous souhaitez regrouper des rappels ou ajouter des cas conditionnels à ces minima d'exécution asynchrones.
JAX-RS 2.1 fournit un moyen réactif de surmonter ces problèmes avec la nouvelle API client réactive JAX-RS pour la construction d'un client. C'est aussi simple que d'appeler la méthode
rx()
lors de la génération du client. Dans le listing 3, la méthode
rx()
renvoie l'invocateur réactif qui existe pendant l'exécution du client, et le client renvoie une réponse de type
CompletionStage.rx()
, qui permet la transition de l'invocateur synchrone à l'appelant asynchrone avec un simple appel.
Listing 3
CompletionStage<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .rx() .get();
CompletionStage<>
est une nouvelle interface introduite dans Java 8. Elle représente un calcul, qui peut être une étape dans le cadre d'un calcul plus large, comme son nom l'indique. Il s'agit du seul représentant de réactivité Java 8 à avoir frappé le JAX-RS.
Après avoir reçu l'instance de réponse, je peux appeler
AcceptAsync()
, où je peux fournir un morceau de code qui s'exécutera de manière asynchrone lorsque la réponse sera disponible, comme indiqué dans le Listing 4.
Listing 4
response.thenAcceptAsync(res -> { Temperature t = res.readEntity(Temperature.class);
Ajout de réactivité à un point de terminaison RESTL'approche réactive ne se limite pas au côté client dans JAX-RS; il peut également être utilisé côté serveur. Pour un exemple, je vais d'abord créer un script simple où je peux demander une liste d'emplacements d'une destination. Pour chaque position, je ferai un appel séparé avec les données de localisation à un autre point pour obtenir les valeurs de température. L'interaction des destinations sera comme indiqué dans la figure 1.
Figure 1. Interaction entre les points de destinationTout d'abord, je définis simplement le modèle de domaine, puis les services pour chaque modèle. Le listing 5 montre comment la classe
Forecast
est définie, ce qui englobe les classes
Location
et
Temperature
.
Listing 5
public class Temperature { private Double temperature; private String scale;
Pour boucler la liste des prévisions, la classe
ServiceResponse
implémentée dans le Listing 6.
Listing 6
public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; }
LocationResource
indiquée dans le Listing 7 définit trois exemples d'emplacements renvoyés avec le chemin
/location
.
Listing 7
@Path("/location") public class LocationResource { @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocations() { List<Location> locations = new ArrayList<>(); locations.add(new Location("London")); locations.add(new Location("Istanbul")); locations.add(new Location("Prague")); return Response.ok(new GenericEntity<List<Location>>(locations){}).build(); } }
TemperatureResource
indiqué dans le Listing 8 renvoie une valeur de température générée aléatoirement entre 30 et 50 pour un emplacement donné. Un délai de 500 ms a été ajouté à l'implémentation pour simuler une lecture de capteur.
Listing 8
@Path("/temperature") public class TemperatureResource { @GET @Path("/{city}") @Produces(MediaType.APPLICATION_JSON) public Response getAverageTemperature(@PathParam("city") String cityName) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { ignored.printStackTrace(); } return Response.ok(temperature).build(); } }
Tout d'abord, je vais montrer l'implémentation du
ForecastResource
synchrone (voir Listing 9), qui retourne tous les emplacements. Ensuite, pour chaque position, il appelle le service de température pour obtenir les valeurs en degrés Celsius.
Listing 9
@Path("/forecast") public class ForecastResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocationsWithTemperature() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = locationTarget .request() .get(new GenericType<List<Location>>(){}); locations.forEach(location -> { Temperature temperature = temperatureTarget .resolveTemplate("city", location.getName()) .request() .get(Temperature.class); response.getForecasts().add( new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return Response.ok(response).build(); } }
Lorsque la destination de prévision est demandée en tant que
/forecast
, vous obtiendrez une sortie similaire à celle illustrée dans le listing 10. Notez que le temps de traitement de la demande a pris 1,533 ms, ce qui est logique, car une demande synchrone de valeurs de température à partir de trois emplacements différents s'élève à 1,5 ms
Listing 10
{ "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 33 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 38 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 46 } } ], "processingTime": 1533 }
Jusqu'à présent, tout se déroule comme prévu. Il est temps d'introduire une programmation réactive côté serveur, où les appels à chaque emplacement peuvent être effectués en parallèle après avoir reçu tous les emplacements. Cela peut clairement améliorer le flux synchrone présenté précédemment. Cela se fait dans le Listing 11, qui montre la définition de la version du service de prévision réactif.
Listing 11
@Path("/reactiveForecast") public class ForecastReactiveResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public void getLocationsWithTemperature(@Suspended final AsyncResponse async) { long startTime = System.currentTimeMillis();
Une mise en œuvre réactive peut sembler compliquée à première vue, mais après une étude plus approfondie, vous remarquerez qu'elle est assez simple. Dans l'implémentation de
ForecastReactiveResource
je crée d'abord un appel client aux services de localisation à l'aide de l'API JAX-RS Reactive Client. Comme je l'ai mentionné ci-dessus, il s'agit d'un module complémentaire pour Java EE 8, et il permet de créer un appel réactif simplement en utilisant la méthode
rx()
.
Maintenant, je crée une nouvelle phase basée sur l'emplacement pour dresser une liste de prévisions. Ils seront stockés sous forme de liste de prévisions dans une grande étape d'achèvement appelée
forecastCS
. En fin de compte, je vais créer une réponse aux appels de service en utilisant uniquement
forecastCS
.
Et maintenant, collectons les prévisions sous la forme d'une liste d'étapes d'achèvement définies dans la variable
forecastList
. Pour créer une étape d'achèvement pour chaque prévision, je passe les données par emplacement, puis
tempCS
crée la variable
tempCS
, à nouveau à l'aide de l'API JAX-RS Reactive Client, qui appelle le service de température avec le nom de la ville. Ici, j'utilise la méthode
resolveTemplate()
pour construire le client, et cela me permet de passer le nom de la ville au collecteur en tant que paramètre.
Comme dernière étape du streaming, j'appelle
CompletableFuture.completedFuture()
, en passant la nouvelle instance de
Forecast
tant que paramètre. Je combine cet avenir avec l'étape
tempCS
pour avoir une valeur de température pour les emplacements surveillés.
La méthode
CompletableFuture.allOf()
du Listing 11 convertit la liste des étapes d'achèvement en
forecastCS
. L'exécution de cette étape renvoie une grande instance future complétable lorsque tous les objets futurs complétables fournis sont terminés.
La réponse de service est une instance de la classe
ServiceResponse
, donc je crée un futur terminé, puis je combine l'étape d'achèvement de la
forecastCS
CS avec une liste de prévisions et calcule le temps de réponse du service.
Bien entendu, la programmation réactive force uniquement le côté serveur à s'exécuter de manière asynchrone; le côté client sera bloqué jusqu'à ce que le serveur renvoie une réponse au demandeur. Pour surmonter ce problème, les événements envoyés par le serveur (SSE) peuvent être utilisés pour envoyer partiellement une réponse dès qu'elle est disponible afin que les valeurs de température pour chaque emplacement soient transmises au client une par une. La sortie de
ForecastReactiveResource
sera similaire à celle présentée dans le listing 12. Comme indiqué dans la sortie, le temps de traitement est de 515 ms, ce qui est un temps d'exécution idéal pour obtenir des valeurs de température à partir d'un emplacement.
Listing 12
{ "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 49 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 32 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 45 } } ], "processingTime": 515 }
ConclusionDans les exemples de cet article, j'ai d'abord montré une manière synchrone d'obtenir des prévisions à l'aide des services de localisation et de température. Ensuite, je suis passé à une approche réactive afin que le traitement asynchrone soit effectué entre les appels de service. Lorsque vous utilisez l'API JAX-RS Reactive Client dans Java EE 8 avec les classes
CompletionStage
et
CompletableFuture
disponibles dans Java 8, la puissance du traitement asynchrone se déchaîne grâce à la programmation réactive.
La programmation réactive est plus que l'implémentation d'un modèle asynchrone à partir d'un modèle synchrone; il simplifie également l'utilisation de concepts tels que l'étape d'imbrication. Plus il est utilisé, plus il sera facile de gérer des scripts complexes en programmation parallèle.
LA FIN
Merci de votre attention. Comme toujours, nous attendons vos commentaires et questions.