Programación asincrónica con ejemplos: reconstrucción de métodos java.util.concurrent.CompletableFuture

¿Por qué es necesaria la reconstrucción si el código fuente de esta clase está abierto?


Aunque solo sea porque debajo del capó hay un código altamente optimizado y difícil de leer, cuyo estudio da poco pedagógicamente.


Por lo tanto, recrearemos la semántica de las operaciones de acuerdo con sus especificaciones y escribiremos código funcionalmente entendible, comprensible y legible, aunque puede que no sea el más económico en términos de consumo de memoria y tiempo de procesador.


Comencemos con un método relativamente simple:


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture 

Lea la especificación cuidadosamente:


 Returns a new CompletableFuture 

Es decir, un objeto de tipo CompletableFuture o su subclase se crea y se devuelve como resultado.


 that is asynchronously completed by a task running in the given executor` 

Además, se ejecuta una tarea que se ejecuta en el Executor 'e.
Como sabemos, Executor acepta solo objetos Runnable .
Runnable es una interfaz, y el primer objeto puede implementarla, por lo que combinaremos dos funciones en un solo objeto.


  completed ... with the value obtained by calling the given Supplier. 

Este Runnable debe llamar al Supplier dado y con el valor recibido completar el CompletableFuture creado.


Supplier es una función sin parámetros, por lo que codificarlo todo es muy simple:


  class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable { Supplier<U> supplier; public CompletableFutureForSupplyAsync(Supplier<U> supplier) { this.supplier = supplier; } public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; } 

El siguiente ejemplo es un poco más complicado:


 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage<T> Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage 

Returns a new CompletionStage that... is executed using the supplied Executor


Aquí se nos ofrece directamente organizar el objeto creado para emitirlo en forma de Runnable .


... with this stage's result as the argument to the supplied function.


Pero esto ya es más interesante. La función que se nos pasa tiene un parámetro, y el valor de este parámetro es el valor que completa el CompletionStage actual. En el momento de llamar thenApplyAsync este valor no se conozca, por lo que no podemos ejecutar la tarea de inmediato en Executor . En cambio, debemos negociar con el CompletionStage actual,
para que en el momento de su finalización, transfiera su valor a la tarea. Entre los numerosos métodos de CompletionStage hay uno que es exactamente adecuado para este propósito, cuando se whenComplete :


 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. 

Es decir, en el objeto de tarea recién creado, es suficiente implementar la interfaz BiConsumer para recibir un argumento:


  class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override // implementation of BiConsumer interface public void accept(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; } else { this.arg = argument; } executor.execute(this); } @Override public void run() { if (throwable == null) { try { U result = fn.apply(arg); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } else { super.completeExceptionally(throwable); } } } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor ) { CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); this.whenComplete(task); return task; } } 

Este ejemplo es muy importante para comprender la naturaleza de la programación asincrónica, por lo que una vez más enumeramos sus pasos principales:


1) se crea un procedimiento asincrónico:


  CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); 

2) aún no está listo para la ejecución, por lo que le pedimos al proveedor del argumento faltante que nos pase este argumento en el futuro, llamando al método que enviamos:


  this.whenComplete(task); 

3) en este método no solo guardamos el argumento recibido, sino que también ejecutamos la tarea para su ejecución (consulte el método accept ()).


4) la ejecución de la tarea se reduce al cumplimiento de la función que se nos ha asignado y se guarda el resultado.
Este resultado puede ser tal como lo solicitan otros procedimientos que utilizan el método whenComplete () aplicado a nuestro objeto recién construido, para que podamos construir una cadena de procedimientos asincrónicos de longitud arbitraria. Pero esta cadena se cumplirá estrictamente secuencialmente, sin ningún paralelismo.


Pero, ¿cómo representar un diagrama de cálculo más complejo que contenga ramas paralelas?
Para hacer esto, use el método thenCombineAsync .


Si en el ejemplo anterior comenzamos el procedimiento asincrónico con un argumento, entonces en este, con dos.


En este caso, el cálculo de ambos argumentos puede ocurrir en paralelo.


 ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function. 

Aquí, todo es igual que en el ejemplo anterior con thenApplyAsync , pero el parámetro de función ya tiene dos argumentos, y se agrega el parámetro CompletionStage<? extends U> other CompletionStage<? extends U> other , que es el proveedor asincrónico del segundo argumento.


¿Cómo nos aseguramos de que se procese el segundo argumento?


Bueno, en primer lugar, en lugar de una variable T arg describa dos: T arg1; U arg2; T arg1; U arg2; , un método de void accept(T argument, Throwable throwable) público en lugar de uno void accept(T argument, Throwable throwable) accept1 accept2 void accept(T argument, Throwable throwable) describe dos: accept1 y accept2 ,
cada uno de los cuales trabaja con su propio argumento.


Al mismo tiempo, nuestro objeto en construcción ya no implementa la BiConsumer<T,Throwable> y ya no podemos escribir una oración clave para vincular los nodos del gráfico de cálculo asíncrono


  this.whenComplete(task); 

Afortunadamente, un objeto de interfaz funcional puede representarse mediante una referencia a un método, sin concluirlo en una clase separada:


  this.whenComplete(task::accept1); other.whenComplete(task::accept2); 

Es decir, la corriente this objeto proporciona el primer argumento, y el other objeto el segundo.


Estos son solo los códigos de los métodos que deberán cambiarse para que no comiencen la tarea inmediatamente después de la llegada de su argumento, sino que también verifiquen la recepción del segundo:


  public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } } 

El método accept2 se describe de manera similar.


Tenga en cuenta que:


  • los métodos se sincronizan (trabajamos con datos comunes)
  • en caso de transmisión de error, espere que el segundo argumento no sea necesario.
  • comprobar que el argumento llega en comparación con null no es la mejor manera, tal vez necesite agregar una variable booleana para cada argumento.

De esta manera, puede realizar procedimientos asincrónicos a partir de una mayor cantidad de argumentos que dos, pero el pensamiento surge de inmediato: ¿aún puede crear una clase separada para los parámetros, para no escribir su propio método para recibir cada parámetro, pero hacerlo creando dinámicamente los parámetros?


  Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2); 

Sí, puedes crear tal clase, pero más sobre eso la próxima vez.


Un breve resumen de lo anterior:


  • Un programa asincrónico es una red de procedimientos asincrónicos interconectados,
    Al igual que un programa multiproceso es una red de hilos de ejecución interconectados (hilos).

Pero los medios de los flujos de comunicación y los procedimientos asincrónicos son fundamentalmente diferentes.


Las secuencias se conectan mediante semáforos, colas de bloqueo y otros objetos similares,
que bloquean la secuencia del destinatario si la información aún no ha llegado, pero la secuencia ya está tratando de extraerla mediante la operación basada en extracción.


Procedimientos asincrónicos: los destinatarios simplemente no ingresan la ejecución hasta que toda la información que necesitan esté lista.
Esperan pasivamente hasta que los propios proveedores de información lo pasen a través de una operación de inserción.
Debido a esto, no gastan memoria en la pila mientras esperan y, por lo tanto, ocupan mucha menos memoria que los hilos de ejecución.


  • construir una red de procedimientos asincrónicos se reduce a crear objetos y vincularlos, más precisamente, a vincular sus subobjetos - parámetros: la dirección del parámetro de entrada del destinatario se transmite al parámetro de salida del proveedor de información.

El conjunto de métodos CompletableFuture hace exactamente eso y, en principio, puede prescindir de estos métodos creando objetos explícitamente, como se muestra en los ejemplos anteriores.
Pero para esto es necesario tener clases similares a las que se describieron en estos ejemplos.
Por alguna razón, los creadores de java.util.concurrent optaron por no dar a los usuarios acceso a estas clases y las ocultaron en las profundidades del código CompletableFuture .


Aquellos que quieran tener una representación visual de la red asincrónica que se está creando, pueden reconstruir estas clases al continuar con los ejemplos anteriores. El código fuente de los ejemplos está disponible en Github .

Source: https://habr.com/ru/post/es418547/


All Articles