Por que a reconstrução é necessária se o código fonte desta classe está aberto?
Apenas porque, sob o capô, existe um código altamente otimizado e de difícil leitura, cujo estudo dá pouco pedagogicamente.
Portanto, recriaremos a semântica das operações de acordo com suas especificações e escreveremos código funcionalmente equivalente, compreensível e legível, embora possa não ser o mais econômico em termos de consumo de memória e tempo do processador.
Vamos começar com um método relativamente simples:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture
Leia atentamente a especificação:
Returns a new CompletableFuture
Ou seja, um objeto do tipo CompletableFuture
ou sua subclasse é criado e retornado como resultado.
that is asynchronously completed by a task running in the given executor`
Além disso, é executada uma tarefa executada no Executor
'e.
Como sabemos, o Executor
aceita apenas objetos Runnable
.
Runnable é uma interface, e o primeiro objeto pode implementá-la - portanto, combinaremos duas funções em um objeto.
completed ... with the value obtained by calling the given Supplier.
Este Runnable
deve ligar para o Supplier
fornecido e, com o valor recebido, concluir o CompletableFuture
criado.
Supplier
é uma função sem parâmetros, portanto, a codificação de tudo é muito simples:
class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable { Supplier<U> supplier; public CompletableFutureForSupplyAsync(Supplier<U> supplier) { this.supplier = supplier; } public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; }
O exemplo a seguir é um pouco mais complicado:
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage<T> Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage
Returns a new CompletionStage that... is executed using the supplied Executor
Aqui, somos oferecidos diretamente para organizar o objeto criado para emitir na forma de Runnable
.
... with this stage's result as the argument to the supplied function.
Mas isso já é mais interessante. A função passada para nós tem um parâmetro e o valor desse parâmetro é o valor que completa o atual CompletionStage
. No momento da chamada de thenApplyAsync
esse valor pode não ser conhecido, portanto, não podemos executar imediatamente a tarefa no Executor
. Em vez disso, precisamos negociar com o atual CompletionStage
,
para que, no momento de sua conclusão, transfira seu valor para a tarefa. Entre os vários métodos do CompletionStage
existe um que é exatamente adequado para esse fim, quando whenComplete
:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.
Ou seja, no objeto de tarefa recém-criado, é suficiente implementar a interface BiConsumer
para receber um argumento:
class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override
Este exemplo é muito importante para entender a natureza da programação assíncrona; portanto, mais uma vez, listamos suas principais etapas:
1) um procedimento assíncrono é criado:
CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor);
2) ainda não está pronto para execução, por isso solicitamos ao provedor do argumento ausente que nos transmita esse argumento no futuro, chamando o método que enviamos:
this.whenComplete(task);
3) neste método, não apenas salvamos o argumento recebido, mas também executamos a tarefa para execução (consulte o método accept
()).
4) a execução da tarefa é reduzida ao cumprimento da função que nos foi dada e salvando o resultado.
Esse resultado pode ser exatamente o solicitado por outros procedimentos usando o método whenComplete
() aplicado ao nosso objeto recém-construído, para que possamos construir uma cadeia de procedimentos assíncronos de comprimento arbitrário. Mas essa cadeia será cumprida estritamente sequencialmente, sem nenhum paralelismo.
Mas como retratar um diagrama de cálculo mais complexo contendo ramos paralelos?
Para fazer isso, use o método thenCombineAsync
.
Se no exemplo anterior, iniciamos o procedimento assíncrono com um argumento, então neste - com dois.
Nesse caso, o cálculo dos dois argumentos pode ocorrer em paralelo.
ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function.
Aqui, tudo é igual ao exemplo anterior com thenApplyAsync
, mas o parâmetro function já possui dois argumentos, e o CompletionStage<? extends U> other
Parameter é adicionado CompletionStage<? extends U> other
, que é o provedor assíncrono do segundo argumento.
Como garantimos que o segundo argumento seja processado?
Bem, em primeiro lugar, em vez de uma variável T arg
descreva duas: T arg1; U arg2;
T arg1; U arg2;
, em vez de um método public void accept(T argument, Throwable throwable)
descreve dois - accept1
e accept2
,
cada um dos quais trabalha com seu próprio argumento.
Ao mesmo tempo, nosso objeto em construção não implementa mais a BiConsumer<T,Throwable>
e não podemos mais escrever uma frase-chave para vincular os nós do gráfico de computação assíncrono
this.whenComplete(task);
Felizmente, um objeto de interface funcional pode ser representado por uma referência a um método, sem concluí-lo em uma classe separada:
this.whenComplete(task::accept1); other.whenComplete(task::accept2);
Ou seja, a corrente this
objeto fornece o primeiro argumento e o other
objeto no segundo.
Aqui estão apenas os códigos dos métodos que deverão ser alterados para que eles não iniciem a tarefa imediatamente após a chegada do argumento, mas também verifique o recebimento do segundo:
public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } }
O método accept2 é descrito de maneira semelhante.
Note que:
- métodos tornam-se sincronizados (trabalhamos com dados comuns)
- em caso de transmissão de erro, aguarde o segundo argumento não ser necessário.
- verificar se o argumento chega em comparação com
null
não é a melhor maneira, talvez você precise adicionar uma variável booleana para cada argumento.
Dessa forma, você pode fazer procedimentos assíncronos a partir de um número maior de argumentos que dois, mas o pensamento surge imediatamente - você ainda pode criar uma classe separada para os parâmetros, para não escrever seu próprio método para receber cada parâmetro, mas fazê-lo criando dinamicamente os parâmetros?
Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2);
Sim, você pode criar essa classe, mas mais sobre isso na próxima vez.
Um breve resumo do exposto:
- um programa assíncrono é uma rede de procedimentos assíncronos interconectados,
assim como um programa multithread é uma rede de threads de execução interconectados (threads).
Mas os meios de fluxos de comunicação e procedimentos assíncronos são fundamentalmente diferentes.
Os fluxos são conectados usando semáforos, filas de bloqueio e outros objetos semelhantes,
que bloqueia o fluxo do destinatário se as informações ainda não chegaram, mas o fluxo já está tentando extraí-lo usando a operação baseada em pull.
Procedimentos assíncronos - os destinatários simplesmente não entram em execução até que todas as informações necessárias estejam prontas.
Eles esperam passivamente até que os próprios fornecedores de informações passem por uma operação baseada em envio.
Por esse motivo, eles não gastam memória na pilha enquanto aguardam e, portanto, ocupam muito menos memória que os threads de execução.
- construir uma rede de procedimentos assíncronos é reduzido a criar objetos e vinculá-los, mais precisamente, a vincular seus subobjetos - parâmetros: o endereço do parâmetro de entrada do destinatário é transmitido ao parâmetro de saída do provedor de informações.
O conjunto de métodos CompletableFuture
faz exatamente isso e, em princípio, você pode ficar sem esses métodos criando objetos explicitamente, conforme mostrado nos exemplos acima.
Mas, para isso, é necessário ter classes semelhantes às descritas nesses exemplos.
Por alguma razão, os criadores do java.util.concurrent
optaram por não conceder aos usuários acesso a essas classes e os ocultaram nas profundezas do código CompletableFuture
.
Aqueles que desejam ter uma representação visual da rede assíncrona sendo criada podem reconstruir essas classes, continuando os exemplos acima. O código fonte dos exemplos está disponível no Github .