Introduciendo programación reactiva en Spring

Hola Habr!

Esta semana esperamos un nuevo libro de Spring 5 de la imprenta:


Entre las características interesantes de Spring 5, la programación reactiva merece una mención especial, cuya implementación en este marco se describe brevemente en el artículo propuesto por Matt Raible. En el libro mencionado, los patrones reactivos se discuten en el capítulo 11.

Matt fue coautor de Josh Long, autor de otro gran libro sobre Java y Spring, " Java in the Cloud " , publicado el verano pasado.

La programación reactiva es su manera de construir sistemas que sean resistentes a altas cargas. El procesamiento de tráfico enorme ya no es un problema, ya que el servidor no bloquea y los procesos del cliente no tienen que esperar las respuestas. El cliente no puede observar directamente cómo se ejecuta el programa en el servidor y sincronizarse con él. Cuando la API encuentra dificultades para procesar solicitudes, aún debe dar respuestas razonables. No debe rechazar y descartar mensajes de manera incontrolada. Debe informar a los componentes superiores que está trabajando bajo carga para que puedan liberarlo parcialmente de esta carga. Esta técnica se llama contrapresión, un aspecto importante de la programación reactiva.

Somos coautores de este artículo con Josh Long . Josh es un campeón de Java, Spring Developer Advocate y, en general, un hombre global que trabaja en Pivotal. He estado trabajando con Spring durante mucho tiempo, pero fue Josh quien me mostró el Spring Boot, fue en la conferencia Devoxx en Bélgica. Desde entonces, nos hemos convertido en buenos amigos, somos aficionados a Java y escribimos aplicaciones geniales.

Programación reactiva o E / S, E / S, vamos a trabajar ...

La programación reactiva es un enfoque para crear software que utiliza activamente E / S asíncronas. La E / S asincrónica es una idea pequeña, cargada de grandes cambios en la programación. La idea en sí es simple: corregir la situación con una asignación ineficiente de recursos, liberando aquellos recursos que habrían estado inactivos sin nuestra intervención, esperando la finalización de E / S. La entrada / salida asincrónica invierte el enfoque habitual para el procesamiento de E / S: el cliente se libera y puede realizar otras tareas, esperando nuevas notificaciones.

Considere lo que es común entre la entrada / salida síncrona y asíncrona, y cuáles son las diferencias entre ellos.

Escribiremos un programa simple que lea datos de la fuente (específicamente, estamos hablando del enlace java.io.File ). Comencemos con una implementación que utiliza el viejo java.io.InputStream :

Ejemplo 1. Lectura sincrónica de datos de un archivo

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } } 

  1. Proporcionamos un archivo para leer con el archivo java.io.File habitual.
  2. Extraiga los resultados de la fuente línea por línea ...
  3. Escribí este código para tomar Consumer<BytesPayloadgt; llamado cuando llegan nuevos datos

Bastante simple, ¿qué dices? Ejecute este código y verá en la salida del registro (a la izquierda de cada línea), lo que indica que todas las acciones ocurren en un solo hilo.
Aquí extraemos bytes de nuestros datos tomados en la fuente (en este caso, estamos hablando de una subclase de java.io.FileInputStream heredada de java.io.InputStream ). ¿Qué hay de malo en este ejemplo? En este caso, usamos un InputStream que apunta a datos ubicados en nuestro sistema de archivos. Si el archivo está allí y el disco duro funciona, este código funcionará como se esperaba.

Pero, ¿qué sucede si leemos los datos no de File , sino de un socket de red, y utilizamos otra implementación de InputStream ? ¡Nada de qué preocuparse! Por supuesto, no habrá absolutamente nada de qué preocuparse si la velocidad de la red es infinitamente alta. Y si el canal de red entre este y el otro nodo nunca falla. Si se cumplen estas condiciones, el código funcionará perfectamente.

Pero, ¿qué sucede si la red se ralentiza o se establece? En este caso, quiero decir que aumentaremos el período hasta que la operación in.read(…) . De hecho, ¡es posible que no regrese en absoluto! Este es un problema si intentamos hacer algo más con la secuencia de la que estamos leyendo datos. Por supuesto, siempre puede crear otra secuencia y leer datos a través de ella. Esto puede hacerse hasta cierto punto, pero, al final, alcanzaremos el límite en el que simplemente agregar subprocesos para una mayor escala ya no será suficiente. No tendremos una verdadera competencia más allá del número de núcleos que hay en nuestra máquina. Callejón sin salida! En este caso, podemos aumentar el procesamiento de entrada / salida (la lectura se entiende aquí) solo debido a flujos adicionales, pero aquí tarde o temprano alcanzaremos el límite.

En este ejemplo, el trabajo principal es la lectura: casi nada sucede en otros frentes. Dependemos de las E / S. Considere cómo una solución asincrónica nos ayuda a superar parcialmente la monopolización de nuestros flujos.

Ejemplo 2. Lectura asíncrona de datos de un archivo

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } } 

  1. Esta vez adaptamos java.io.File , haciendo Java NIO java.nio.file.Path partir de él
  2. Al crear un Channel , en particular, especificamos el servicio java.util.concurrent.ExecutorService , que se utilizará para llamar al controlador CompletionHandler cuando aparezcan los datos necesarios.
  3. Comenzamos a leer pasando un enlace a CompletionHandler<Integer, ByteBuffer> (this)
  4. En la devolución de llamada, lea los bytes del ByteBuffer en la capacidad de byte[]
  5. Al igual que en el ejemplo Synchronous , los datos de byte[] se pasan al consumidor.

Haremos una reserva de inmediato: ¡este código resultó ser mucho más difícil! Hay tantas cosas que suceden aquí que su cabeza está girando de inmediato, sin embargo, permítanme señalar ... este código lee los datos del Java NIO Channel y luego los procesa en un hilo separado responsable de las devoluciones de llamada. Por lo tanto, la corriente en la que comenzó la lectura no está monopolizada. Volvemos casi instantáneamente después de llamar a .read(..) , y cuando, finalmente, tenemos los datos a nuestra disposición, se realiza una devolución de llamada, ya en otro hilo. Si hay un retraso entre las llamadas a .read() puede pasar a otros asuntos ejecutándolos en nuestro hilo. La duración de una operación de lectura asíncrona, desde el primer byte hasta el último, no es, en el mejor de los casos, mayor que la de una operación de lectura síncrona. Por lo general, una operación asincrónica no es significativamente más larga. Sin embargo, yendo a tales dificultades adicionales, podemos manejar más efectivamente nuestros flujos. Haga más trabajo, multiplexe las E / S en un grupo con un número finito de subprocesos.

Trabajo para una empresa de computación en la nube. ¡Nos gustaría que obtenga nuevas instancias de la aplicación para resolver problemas con el escalado horizontal! Por supuesto, aquí estoy un poco falso. La E / S asincrónica complica un poco las cosas, pero espero que este ejemplo ilustre cómo el código reactivo es tan útil: le permite procesar más solicitudes y hacer más trabajo en su hardware existente si el rendimiento depende en gran medida de las E / S. Si el rendimiento depende del uso del procesador (por ejemplo, estamos hablando de operaciones con números de Fibonacci, extracción de bitcoins o criptografía), entonces la programación reactiva no nos dará nada.

¡Actualmente, la mayoría de nosotros no usamos implementaciones de Channel o InputStream en nuestro trabajo diario! Tenemos que pensar en los problemas a nivel de abstracciones de nivel superior. Se trata de cosas como matrices, o más bien, la jerarquía java.util.Collection . La colección java.util.Collection muestra muy bien en un InputStream: ambas entidades suponen que puede operar con todos los datos a la vez, y casi al instante. Se espera que pueda terminar de leer la mayoría de InputStreams antes, en lugar de más tarde. Los tipos de recopilación se vuelven un poco incómodos al pasar a grandes cantidades de datos. ¿Qué sucede si se trata de algo potencialmente infinito (ilimitado), por ejemplo, sockets web o eventos del servidor? ¿Qué hacer si hay un retraso entre las grabaciones?

Necesitamos una mejor manera de describir este tipo de datos. Estamos hablando de eventos asincrónicos, tales que ocurrirán al final. Puede parecer que Future<T> o CompletableFuture<T> son muy adecuados para este propósito, pero describen solo una cosa que sucede al final. De hecho, Java no proporciona una metáfora adecuada para describir este tipo de datos. Los tipos Iterator y Stream de Java 8 pueden no estar relacionados, sin embargo, ambos están orientados a la extracción; usted mismo solicita la siguiente entrada, no el tipo debe enviar una devolución de llamada a su código. Se supone que si el procesamiento basado en inserción fuera compatible en este caso, lo que permitiría lograr mucho más a nivel de subproceso, entonces la API también proporcionaría control de subprocesamiento y programación. Iterator implementaciones de Iterator no dicen nada acerca de los subprocesos, y todos los subprocesos de Java 8 comparten el mismo grupo de bifurcación.

Si Iterator y Stream realmente admitieran el procesamiento push, entonces encontraríamos otro problema que realmente se intensifica precisamente en el contexto de E / S: ¡necesitaremos algún tipo de mecanismo de penetración inversa! Dado que el consumidor de datos se procesa de forma asíncrona, no tenemos idea de cuándo estarán los datos en la tubería y en qué cantidad. No sabemos cuántos datos deberán procesarse en la próxima devolución de llamada: ¡un byte o un terabyte!

Al extraer datos de un InputStream , lee tanta información como está listo para procesar, y nada más. En los ejemplos anteriores, leemos datos en el búfer byte[] de una longitud fija y conocida. En un contexto asincrónico, necesitamos alguna forma de decirle al proveedor cuántos datos estamos dispuestos a procesar.
Si señor. Aquí falta algo seguro.

Busca la metáfora que falta

En este caso, estamos buscando una metáfora que refleje bellamente la esencia de las E / S asincrónicas, que soporte dicho mecanismo para la transferencia inversa de datos y nos permita controlar el flujo de ejecución en sistemas distribuidos. En la programación reactiva, la capacidad de un cliente para indicar qué carga puede manejar se denomina "flujo inverso".

Ahora hay una serie de buenos proyectos: Vert.x, Akka Streams y RxJava, que admiten la programación reactiva. El equipo de Spring también ejecuta un proyecto llamado Reactor . Entre estos diversos estándares hay un campo general bastante amplio, asignado de facto al estándar de iniciativa Reactive Streams . La iniciativa Reactive Streams define cuatro tipos:

Interfaz del Publisher<T> ; produce valores que finalmente pueden llegar. Interfaz del Publisher<T> ; produce valores de tipo T para el Subscriber<T> .

Ejemplo 3. Secuencias reactivas: interfaz Publisher<T> .

 package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); } 

El tipo de suscriptor se suscribe a Publisher<T> , recibiendo notificaciones de cualquier valor nuevo de tipo T través de su onNext(T) . Si se produce algún error, se onError(Throwable) su onError(Throwable) . Cuando el procesamiento se completa normalmente, se onComplete método onComplete del suscriptor.

Ejemplo 4. Secuencias de chorro: interfaz de Subscriber<T> .

 package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } 

Cuando el Subscriber conecta por primera vez al Publisher , recibe una Subscription en el Subscriber#onSubscribe . Suscripción La Subscription es quizás la parte más importante de toda la especificación; es ella quien proporciona el flujo de retorno. Un suscriptor utiliza el método de solicitud de Subscription#request para solicitar datos adicionales o el método de Subscription#cancel para detener el procesamiento.

Ejemplo 5. Secuencias reactivas: interfaz de Subscription<T> .

 package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); } 

La especificación de flujo reactivo proporciona otro tipo útil, aunque obvio: el Processor<A,B> es solo una interfaz que hereda tanto el Subscriber<A> como el Publisher<B> .

Ejemplo 6. Secuencias de chorro: interfaz del Processor<T> .

 package org.reactivestreams; public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> { } 

Una especificación no se posiciona como una receta para implementaciones; de hecho, su propósito es definir tipos para soportar la interoperabilidad. El beneficio obvio de los tipos asociados con flujos reactivos es que, sin embargo, encontraron un lugar en la versión Java 9, además, semánticamente son "uno a uno" corresponden a interfaces de la clase java.util.concurrent.Flow , por ejemplo: java.util.concurrent.Flow.Publisher .

Conoce a Reactor

Los tipos de flujos reactivos por sí solos no son suficientes; Se necesitan implementaciones de orden superior para soportar operaciones como el filtrado y la transformación. Como tal, el proyecto Reactor es conveniente; se basa en la especificación de Reactive Streams y proporciona dos especializaciones de Publisher<T> .

Primero, Flux<T> es un Publisher que produce cero o más valores. El segundo, Mono<T> , es Publisher<T> , produciendo cero o un valor. Ambos publican valores y pueden manejarlos en consecuencia, sin embargo, sus capacidades son mucho más amplias que la especificación de Reactive Streams. Ambos proporcionan operadores que le permiten procesar flujos de valor. Los tipos de reactores componen bien: la salida de uno de ellos puede servir como entrada para el otro, y si un tipo necesita trabajar con otros flujos de datos, confían en instancias de Publisher<T> .

Tanto Mono<T> como Flux<T> implementan Publisher<T> ; Recomendamos que sus métodos acepten instancias de Publisher<T> pero devuelvan Flux<T> o Mono<T> ; Esto ayudará al cliente a distinguir qué tipo de datos recibe.

Supongamos que se le proporcionó el Publisher<T> y se le pidió que mostrara la interfaz de usuario para este Publisher<T> . ¿Debería mostrar una página con detalles para un registro, ya que puede obtener CompletableFuture<T> ? ¿O mostrar una página de resumen con una lista o cuadrícula donde todas las entradas se muestran página por página? Es dificil de decir.

A su vez, Flux<T> y Mono<T> muy específicos. Sabe que necesita mostrar una página de revisión si Flux<T> recibe Flux<T> , y una página con detalles para un registro (o no uno) cuando recibe Mono<T> .

Reactor es un proyecto de código abierto lanzado por Pivotal; Ahora se ha vuelto muy popular. Facebook lo usa en su motor a reacción para llamar a procedimientos remotos , y también lo usa en Rsocket , dirigido por el creador de RxJava, Ben Christensen. Salesforce lo usa en su implementación reactiva de gRPC . Reactor implementa los tipos Reactive Streams, por lo que puede interactuar con otras tecnologías que admiten estos tipos, por ejemplo, con RxJava 2 de Netflix, Akka Streams de Lightbend y con el proyecto Vert.x de la Fundación Eclipse. David Cairnock, director de RxJava 2, también colaboró ​​activamente con Pivotal para desarrollar Reactor, haciendo que el proyecto sea aún mejor. Además, por supuesto, está presente de una forma u otra en Spring Framework, comenzando con Spring Framework 4.0.

Programación reactiva con Spring WebFlux

A pesar de su utilidad, Reactor es solo la base. Nuestras aplicaciones deben comunicarse con fuentes de datos. Debe admitir autenticación y autorización. La primavera ofrece todo esto. Si Reactor nos da la metáfora faltante, entonces Spring nos ayuda a todos a hablar un idioma común.

Spring Framework 5.0 se lanzó en septiembre de 2017. Se basa en las especificaciones Reactor y Reactive Streams. Tiene un nuevo modelo de componentes y tiempo de ejecución reactivo llamado Spring WebFlux .

Spring WebFlux es independiente de la API de Servlet y no requiere que funcionen. Viene con adaptadores que le permiten usarlo en la parte superior del motor Servlet, si es necesario, pero esto no es necesario. También proporciona un tiempo de ejecución completamente nuevo basado en Netty llamado Spring WebFlux. Spring Framework 5, que trabaja con Java 8 y Java EE 7 y versiones posteriores, ahora sirve como base para gran parte del ecosistema de Spring, incluidos Spring Data Kay, Spring Security 5, Spring Boot 2 y Spring Cloud Finchley.

Source: https://habr.com/ru/post/es435972/


All Articles