Programación reactiva con JAX-RS

Hola a todos!

El año pasado , se lanzó con éxito el curso Java Enterprise Developer, y tenemos el último material sobre este tema que queremos compartir con usted, que analiza el uso del enfoque asincrónico y la puesta en escena para desarrollar aplicaciones receptivas receptivas.

Vamos

La programación reactiva al principio suena como el nombre de un paradigma emergente, pero en realidad se refiere a un método de programación en el que se utiliza un enfoque orientado a eventos para trabajar con flujos de datos asincrónicos. Basado en datos constantemente actuales, los sistemas reactivos responden a ellos realizando una serie de eventos.
La programación reactiva sigue el patrón de diseño "Observador", que se puede definir de la siguiente manera: si un estado cambia en un objeto, todos los demás objetos se notifican y actualizan en consecuencia. Por lo tanto, en lugar de sondear eventos para cambios, los eventos se envían de forma asíncrona para que los observadores puedan procesarlos. En este ejemplo, los observadores son funciones que se ejecutan cuando se distribuye el evento. Y el flujo de datos mencionado es el observable real.

Casi todos los lenguajes y marcos utilizan este enfoque en su ecosistema, y ​​las últimas versiones de Java no son una excepción. En este artículo, explicaré cómo se puede aplicar la programación reactiva utilizando la última versión de JAX-RS en la funcionalidad Java EE 8 y Java 8.



Manifiesto de chorro

El Manifiesto de Jet enumera cuatro aspectos fundamentales que una aplicación necesita para ser más flexible, poco acoplada y fácil de escalar y, por lo tanto, capaz de ser reactiva. Establece que la aplicación debe ser receptiva, flexible (y por lo tanto escalable), resistente e impulsada por mensajes.

El objetivo subyacente es una aplicación verdaderamente receptiva. Supongamos que hay una aplicación en la que un hilo grande se dedica a procesar las solicitudes de los usuarios, y después de completar el trabajo, este hilo envía respuestas a los solicitantes originales. Cuando una aplicación recibe más solicitudes de las que puede manejar, este hilo se convierte en un cuello de botella y la aplicación pierde su capacidad de respuesta anterior. Para mantener la capacidad de respuesta, la aplicación debe ser escalable y resistente. Sostenible puede considerarse una aplicación que tiene funcionalidad para la recuperación automática. En la experiencia de la mayoría de los desarrolladores, solo la arquitectura basada en mensajes permite que la aplicación sea escalable, resistente y receptiva.

La programación reactiva se introdujo en Java 8 y Java EE 8. Java introdujo conceptos como CompletionStage y su implementación de CompletableFuture , y Java comenzó a usar estas características en especificaciones como la API Reactive Client en JAX-RS.

API de cliente reactivo JAX-RS 2.1

Veamos cómo se puede usar la programación reactiva en aplicaciones Java EE 8. Para comprender el proceso, necesita algunos conocimientos básicos de la API Java EE.

JAX-RS 2.1 introdujo una nueva forma de crear un cliente REST con soporte para programación reactiva. La implementación de invocador predeterminada que se ofrece en JAX-RS es síncrona, lo que significa que el cliente creado enviará una llamada de bloqueo al punto final del servidor. Un ejemplo de implementación se presenta en el Listado 1.

Listado 1

 Response response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .get(); 

A partir de la versión 2.0, JAX-RS proporciona soporte para crear un invocador asíncrono en la API del cliente con una simple llamada al método async() , como se muestra en el Listado 2.

Listado 2

 Future<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .async() .get(); 

El uso de un invocador asíncrono en el cliente devuelve una instancia Future del tipo javax.ws.rs.core.Response . Esto puede conducir a sondear la respuesta, con una llamada a future.get() , o registrar una devolución de llamada que se llamará cuando haya una respuesta HTTP disponible. Ambas implementaciones son adecuadas para la programación asincrónica, pero las cosas generalmente se complican si desea agrupar devoluciones de llamada o agregar casos condicionales a estos mínimos de ejecución asincrónica.

JAX-RS 2.1 proporciona una forma reactiva de superar estos problemas con la nueva API JAX-RS Reactive Client para construir un cliente. Es tan simple como llamar al método rx() durante la compilación del cliente. En el Listado 3, el método rx() devuelve el invocador reactivo que existe durante la ejecución del cliente, y el cliente devuelve una respuesta de tipo CompletionStage.rx() , que permite la transición de un invocador síncrono a un invocador asíncrono con una simple llamada.

Listado 3

 CompletionStage<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .rx() .get(); 

CompletionStage<> es una nueva interfaz introducida en Java 8. Representa un cálculo, que puede ser un paso en el marco de un cálculo más grande, como su nombre lo indica. Este es el único representante de reactividad de Java 8 que alcanza el JAX-RS.
Después de recibir la instancia de respuesta, puedo llamar a AcceptAsync() , donde puedo proporcionar un fragmento de código que se ejecutará de forma asincrónica cuando la respuesta esté disponible, como se muestra en el Listado 4.

Listado 4

 response.thenAcceptAsync(res -> { Temperature t = res.readEntity(Temperature.class); //do stuff with t }); 

Agregar reactividad a un punto final REST

El enfoque reactivo no se limita al lado del cliente en JAX-RS; También se puede utilizar en el lado del servidor. Por ejemplo, primero crearé un script simple donde pueda solicitar una lista de ubicaciones de un destino. Para cada posición, haré una llamada separada con datos de ubicación a otro punto para obtener los valores de temperatura. La interacción de los destinos será como se muestra en la Figura 1.


Figura 1. Interacción entre puntos de destino

Primero, simplemente defino el modelo de dominio y luego los servicios para cada modelo. El Listado 5 muestra cómo se define la clase Forecast , que envuelve las clases Location y Temperature .

Listado 5

 public class Temperature { private Double temperature; private String scale; // getters & setters } public class Location { String name; public Location() {} public Location(String name) { this.name = name; } // getters & setters } public class Forecast { private Location location; private Temperature temperature; public Forecast(Location location) { this.location = location; } public Forecast setTemperature( final Temperature temperature) { this.temperature = temperature; return this; } // getters } 

Para ajustar la lista de pronósticos, la clase ServiceResponse implementa en el Listado 6.

Listado 6

 public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; } // getters } 

LocationResource muestra en el Listado 7 define tres ubicaciones de muestra devueltas con la ruta /location .

Listado 7

 @Path("/location") public class LocationResource { @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocations() { List<Location> locations = new ArrayList<>(); locations.add(new Location("London")); locations.add(new Location("Istanbul")); locations.add(new Location("Prague")); return Response.ok(new GenericEntity<List<Location>>(locations){}).build(); } } 

TemperatureResource muestra en el Listado 8 devuelve un valor de temperatura generado aleatoriamente entre 30 y 50 para una ubicación determinada. Se ha agregado un retraso de 500 ms a la implementación para simular una lectura del sensor.

Listado 8

 @Path("/temperature") public class TemperatureResource { @GET @Path("/{city}") @Produces(MediaType.APPLICATION_JSON) public Response getAverageTemperature(@PathParam("city") String cityName) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { ignored.printStackTrace(); } return Response.ok(temperature).build(); } } 

Primero, mostraré la implementación de ForecastResource síncrono (ver Listado 9), que devuelve todas las ubicaciones. Luego, para cada posición, llama al servicio de temperatura para obtener los valores en grados Celsius.

Listado 9

 @Path("/forecast") public class ForecastResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocationsWithTemperature() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = locationTarget .request() .get(new GenericType<List<Location>>(){}); locations.forEach(location -> { Temperature temperature = temperatureTarget .resolveTemplate("city", location.getName()) .request() .get(Temperature.class); response.getForecasts().add( new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return Response.ok(response).build(); } } 

Cuando el destino de pronóstico se solicita como /forecast , obtendrá una salida similar a la que se muestra en el Listado 10. Tenga en cuenta que el tiempo de procesamiento de la solicitud tomó 1.533 ms, lo cual es lógico, ya que una solicitud síncrona de valores de temperatura de tres ubicaciones diferentes suma 1.5 ms

Listado 10

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 33 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 38 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 46 } } ], "processingTime": 1533 } 

Hasta ahora, todo va de acuerdo al plan. Es hora de introducir la programación reactiva en el lado del servidor, donde las llamadas a cada ubicación se pueden hacer en paralelo después de recibir todas las ubicaciones. Esto puede mejorar claramente la secuencia síncrona mostrada anteriormente. Esto se realiza en el Listado 11, que muestra la definición de la versión del servicio de pronóstico reactivo.

Listado 11

 @Path("/reactiveForecast") public class ForecastReactiveResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public void getLocationsWithTemperature(@Suspended final AsyncResponse async) { long startTime = System.currentTimeMillis(); //   (stage)    CompletionStage<List<Location>> locationCS = locationTarget.request() .rx() .get(new GenericType<List<Location>>() {}); //      , //  ,   , //    CompletionStage final CompletionStage<List<Forecast>> forecastCS = locationCS.thenCompose(locations -> { //      //   ompletionStage List<CompletionStage<Forecast>> forecastList = //      //     locations.stream().map(location -> { //     //      //    final CompletionStage<Temperature> tempCS = temperatureTarget .resolveTemplate("city", location.getName()) .request() .rx() .get(Temperature.class); //   CompletableFuture,   //    //      return CompletableFuture.completedFuture( new Forecast(location)) .thenCombine(tempCS, Forecast::setTemperature); }).collect(Collectors.toList()); //    CompletableFuture, //     completable future //  return CompletableFuture.allOf( forecastList.toArray( new CompletableFuture[forecastList.size()])) .thenApply(v -> forecastList.stream() .map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join) .collect(Collectors.toList())); }); //   ServiceResponse, //       //    . //   future    // forecastCS,    //      CompletableFuture.completedFuture( new ServiceResponse()) .thenCombine(forecastCS, ServiceResponse::forecasts) .whenCompleteAsync((response, throwable) -> { response.setProcessingTime( System.currentTimeMillis() - startTime); async.resume(response); }); } } 

La implementación reactiva puede parecer complicada a primera vista, pero después de un estudio más cuidadoso, notará que es bastante simple. En la implementación de ForecastReactiveResource primero creo una llamada de cliente a los servicios de ubicación utilizando la API JAX-RS Reactive Client. Como mencioné anteriormente, este es un complemento para Java EE 8, y ayuda a crear una llamada reactiva simplemente usando el método rx() .

Ahora estoy creando una nueva fase basada en la ubicación para armar una lista de pronósticos. Se almacenarán como una lista de pronóstico en una gran etapa de finalización llamada forecastCS . Finalmente, crearé una respuesta de llamada de servicio usando solo forecastCS .

Y ahora, recopilemos los pronósticos en forma de una lista de etapas de finalización definidas en la variable forecastList . Para crear una etapa de finalización para cada pronóstico, paso los datos por ubicación y luego creo la variable tempCS , nuevamente usando la API JAX-RS Reactive Client, que llama al servicio de temperatura con el nombre de la ciudad. Aquí, uso el método resolveTemplate() para construir el cliente, y esto me permite pasar el nombre de la ciudad al recopilador como parámetro.

Como paso final en la transmisión, realizo una llamada a CompletableFuture.completedFuture() , pasando la nueva instancia de Forecast como parámetro. Combino este futuro con la etapa tempCS para tener un valor de temperatura para las ubicaciones monitoreadas.

El método CompletableFuture.allOf() en el Listado 11 convierte la lista de etapas de finalización en forecastCS . La realización de este paso devuelve una gran instancia futura completable cuando todos los objetos futuros completables provistos están completos.

La respuesta del servicio es una instancia de la clase ServiceResponse , por lo que creo un futuro completado y luego combino la etapa de finalización del ForecastCS con una lista de pronósticos y calculo el tiempo de respuesta del servicio.

Por supuesto, la programación reactiva obliga solo al lado del servidor a ejecutarse de forma asíncrona; el lado del cliente se bloqueará hasta que el servidor envíe una respuesta al solicitante. Para superar este problema, los eventos enviados por el servidor (SSE) se pueden usar para enviar parcialmente una respuesta tan pronto como esté disponible, de modo que los valores de temperatura para cada ubicación se transmitan al cliente uno por uno. La salida de ForecastReactiveResource será similar a la presentada en el Listado 12. Como se muestra en la salida, el tiempo de procesamiento es de 515 ms, que es un tiempo de ejecución ideal para obtener valores de temperatura desde una ubicación.

Listado 12

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 49 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 32 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 45 } } ], "processingTime": 515 } 

Conclusión

En los ejemplos de este artículo, primero mostré una forma sincrónica de obtener pronósticos utilizando los servicios de ubicación y temperatura. Luego, pasé al enfoque reactivo para que el procesamiento asincrónico se realice entre llamadas de servicio. Cuando utiliza la API JAX-RS Reactive Client en Java EE 8 junto con las clases CompletionStage y CompletableFuture disponibles en Java 8, la potencia del procesamiento asincrónico se desata gracias a la programación reactiva.

La programación reactiva es más que simplemente implementar un modelo asincrónico a partir de uno síncrono; También simplifica el trabajo con conceptos como la etapa de anidamiento. Cuanto más se use, más fácil será administrar scripts complejos en programación paralela.

El fin

Gracias por su atencion Como siempre, esperamos sus comentarios y preguntas.

Source: https://habr.com/ru/post/es424031/


All Articles