Olá Habr!
Nesta semana, esperamos um novo
livro da Primavera 5 da gráfica:
Entre os recursos interessantes do Spring 5, a programação reativa merece menção especial, cuja implementação nesta estrutura é descrita brevemente no artigo proposto por Matt Raible. No livro mencionado acima, os padrões reativos são discutidos no capítulo 11.
Matt foi co-autor de Josh Long, autor de outro grande livro sobre Java e Spring, "
Java in the Cloud "
, lançado no verão passado.
A programação reativa é o seu caminho para criar sistemas resistentes a altas cargas. O processamento de tráfego enorme não é mais um problema, pois o servidor não está bloqueando e os processos do cliente não precisam esperar pelas respostas. O cliente não pode observar diretamente como o programa é executado no servidor e sincronizado com ele. Quando a API acha difícil processar solicitações, ainda deve fornecer respostas razoáveis. Não deve recusar e descartar mensagens de maneira descontrolada. Ele deve informar aos componentes superiores que está trabalhando com carga para que eles possam liberá-lo parcialmente dessa carga. Essa técnica é chamada de contrapressão, um aspecto importante da programação reativa.
Nós co-escrevemos este artigo com
Josh Long . Josh é um campeão de Java, Spring Developer Advocate e geralmente um cara global que trabalha na Pivotal. Trabalho com a Spring há muito tempo, mas foi Josh quem me mostrou a bota de primavera, na conferência da Devoxx na Bélgica. Desde então, nos tornamos grandes amigos, gostamos de Java e escrevemos aplicativos interessantes.
Programação reativa ou E / S, E / S, vamos trabalhar ...A programação reativa é uma abordagem para a criação de software que usa ativamente E / S assíncrona. E / S assíncrona é uma pequena idéia, repleta de grandes mudanças na programação. A ideia em si é simples: corrigir a situação com uma alocação ineficiente de recursos, liberando os recursos que estariam ociosos sem nossa intervenção, aguardando a conclusão da E / S. A entrada / saída assíncrona inverte a abordagem usual do processamento de E / S: o cliente é liberado e pode executar outras tarefas, aguardando novas notificações.
Considere o que é comum entre entrada / saída síncrona e assíncrona e quais são as diferenças entre elas.
Escreveremos um programa simples que lê dados da fonte (especificamente, estamos falando do link
java.io.File
). Vamos começar com uma implementação que usa o bom e velho
java.io.InputStream
:
Exemplo 1. Lendo Sincronamente Dados de um Arquivo package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) {
- Fornecemos um arquivo para leitura com o
java.io.File
usual - Puxe os resultados da fonte, uma linha de cada vez ...
- Eu escrevi esse código para
Consumer<BytesPayloadgt;
chamado quando novos dados chegam
Simples o suficiente, o que você diz? Execute esse código e você verá na saída do log (à esquerda de cada linha), indicando que todas as ações ocorrem em um único encadeamento.
Aqui, extraímos bytes de nossos dados obtidos na fonte (neste caso, estamos falando de uma subclasse de
java.io.FileInputStream
herdada de
java.io.InputStream
). O que há de errado com este exemplo? Nesse caso, usamos um InputStream que aponta para os dados localizados em nosso sistema de arquivos. Se o arquivo estiver lá e o disco rígido estiver funcionando, esse código funcionará conforme o esperado.
Mas, o que acontece se lemos os dados não do
File
, mas de um soquete de rede e usamos outra implementação do
InputStream
? Nada para se preocupar! Obviamente, não haverá absolutamente nada com que se preocupar se a velocidade da rede for infinitamente alta. E se o canal de rede entre este e o outro nó nunca falhar. Se essas condições forem atendidas, o código funcionará perfeitamente.
Mas o que acontece se a rede diminuir ou diminuir? Nesse caso, quero dizer que aumentaremos o período até que a operação
in.read(…)
. De fato, ela pode não voltar! Isso é um problema se tentarmos fazer outra coisa com o fluxo do qual estamos lendo dados. Obviamente, você sempre pode criar outro fluxo e ler dados através dele. Isso pode ser feito até certo ponto, mas, no final, atingiremos o limite no qual a simples adição de threads para maior escala não será mais suficiente. Não teremos verdadeira concorrência além do número de núcleos existentes em nossa máquina. Beco sem saída! Nesse caso, podemos aumentar o processamento de entrada / saída (a leitura é feita aqui) apenas devido a fluxos adicionais, mas aqui, mais cedo ou mais tarde, atingiremos o limite.
Neste exemplo, o principal trabalho é a leitura - quase nada acontece em outras frentes. Nós dependemos de E / S. Considere como uma solução assíncrona nos ajuda a superar parcialmente a monopolização de nossos fluxos.
Exemplo 2. Lendo Dados Assincronamente de um Arquivo package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath();
- Desta vez, adaptamos o
java.io.File
, tornando o Java NIO java.nio.file.Path
partir dele - Ao criar um
Channel
, especificamos o serviço java.util.concurrent.ExecutorService
, que será usado para chamar o manipulador CompletionHandler
quando os dados necessários aparecerem. - Começamos a ler passando um link para
CompletionHandler<Integer, ByteBuffer> (this)
- No retorno de chamada, leia os bytes do
ByteBuffer
na capacidade de byte[]
- Assim como no exemplo
Synchronous
, os dados de byte[]
são transmitidos ao consumidor.
Faremos uma reserva imediatamente: esse código acabou sendo muito mais difícil! Há tantas coisas acontecendo aqui que sua cabeça está girando imediatamente, no entanto, deixe-me salientar ... esse código lê dados do
Java NIO Channel
e depois processa esses dados em um encadeamento separado, responsável pelos retornos de chamada. Portanto, o fluxo no qual a leitura começou não é monopolizado. Retornamos quase instantaneamente depois de chamar
.read(..)
e, quando finalmente temos os dados à nossa disposição, um retorno de chamada é feito - já em outro encadeamento. Se houver um atraso entre as chamadas para
.read()
você poderá passar para outros assuntos executando-os em nosso encadeamento. A duração de uma operação de leitura assíncrona, do primeiro byte ao último, não é melhor que a de uma operação de leitura síncrona. Normalmente, uma operação assíncrona é insignificante por mais tempo. No entanto, enfrentando dificuldades adicionais, podemos lidar com mais eficiência com nossos fluxos. Faça mais trabalho, E / S multiplex em um pool com um número finito de threads.
Eu trabalho para uma empresa de computação em nuvem. Gostaríamos que você obtivesse novas instâncias do aplicativo para resolver problemas com o dimensionamento horizontal! Claro, aqui estou um pouco falso. A E / S assíncrona complica um pouco a situação, mas espero que este exemplo ilustre como o código reativo é tão útil: permite processar mais solicitações e trabalhar mais com o hardware existente, se o desempenho depender muito da E / S. Se o desempenho depender do uso do processador (por exemplo, estamos falando de operações em números de Fibonacci, mineração de bitcoins ou criptografia), a programação reativa não nos dará nada.
Atualmente, a maioria de nós não usa implementações de
Channel
ou
InputStream
em nosso trabalho diário! Temos que pensar em problemas no nível das abstrações de nível superior. É sobre coisas como matrizes, ou melhor, a hierarquia
java.util.Collection
. A coleção
java.util.Collection
exibida muito bem em um InputStream: as duas entidades assumem que você pode operar todos os dados de uma vez e quase instantaneamente. Espera-se que você consiga concluir a leitura da maioria dos
InputStreams
mais cedo ou mais tarde. Os tipos de coleção ficam um pouco desconfortáveis ao mover para grandes quantidades de dados. E se você estiver lidando com algo potencialmente infinito (ilimitado) - por exemplo, soquetes da web ou eventos de servidor? O que fazer se houver um atraso entre as gravações?
Precisamos de uma maneira melhor de descrever esse tipo de dados. Estamos falando de eventos assíncronos, que ocorrerão no final. Pode parecer que
Future<T>
ou
CompletableFuture<T>
são adequados para esse propósito, mas descrevem apenas uma coisa que acontece no final. De fato, Java não fornece uma metáfora adequada para descrever esse tipo de dados. Os tipos
Iterator
e
Stream
do Java 8 podem não estar relacionados, no entanto, ambos são orientados a puxar; você mesmo solicita a próxima entrada, não o tipo deve enviar um retorno de chamada para o seu código. Supõe-se que, se o processamento baseado em envio fosse suportado nesse caso, o que permitiria obter muito mais no nível do encadeamento, a API também forneceria controle de encadeamento e programação.
Iterator
implementações do
Iterator
não dizem nada sobre o encadeamento, e todos os encadeamentos do Java 8 compartilham o mesmo conjunto de junções de bifurcação.
Se o
Iterator
e o
Stream
suportassem realmente o processamento push, encontraríamos outro problema, que é realmente exacerbado precisamente no contexto da E / S: precisamos de algum tipo de mecanismo de penetração traseira! Como o consumidor de dados é processado de forma assíncrona, não temos idéia de quando os dados estarão no pipeline e em que quantidade. Não sabemos quantos dados precisarão ser processados no próximo retorno de chamada: um byte ou um terabyte!
Ao extrair dados de um
InputStream
, você lê o máximo de informações que está pronto para processar e nada mais. Nos exemplos anteriores, lemos os dados no buffer de
byte[]
de um comprimento fixo e conhecido. Em um contexto assíncrono, precisamos de alguma maneira de informar ao provedor quantos dados estamos dispostos a processar.
Sim senhor Algo está faltando aqui, com certeza.
Procure a metáfora ausenteNesse caso, estamos procurando uma metáfora que reflita belamente a essência da E / S assíncrona, suporte a esse mecanismo de transferência reversa de dados e nos permita controlar o fluxo de execução em sistemas distribuídos. Na programação reativa, a capacidade de um cliente de sinalizar qual carga é capaz de suportar é chamada de "fluxo reverso".
Agora, existem vários bons projetos - Vert.x, Akka Streams e RxJava - que suportam programação reativa. A equipe do Spring também executa um projeto chamado
Reactor . Entre esses vários padrões, existe um campo geral bastante amplo, de fato alocado ao padrão da
iniciativa Reactive Streams . A iniciativa Reactive Streams define quatro tipos:
Interface do
Publisher<T>
; produz valores que podem finalmente chegar. Interface do
Publisher<T>
; produz valores do tipo
T
para o
Subscriber<T>
.
Exemplo 3. Fluxos reativos: interface do Publisher<T>
.
package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); }
O tipo de
Subscriber
assina o
Publisher<T>
, recebendo notificações de novos valores do tipo
T
por meio do método
onNext(T)
. Se ocorrer algum erro, seu método
onError(Throwable)
será
onError(Throwable)
. Quando o processamento é concluído normalmente, o método
onComplete
do assinante é chamado.
Exemplo 4. Fluxos de Jet: Interface do Subscriber<T>
. package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Quando o
Subscriber
conecta pela primeira vez ao
Publisher
, ele recebe uma
Subscription
no
Subscriber#onSubscribe
. Assinatura A
Subscription
é talvez a parte mais importante de toda a especificação; é ela quem fornece o fluxo de retorno. Um assinante de Assinante usa o método de solicitação de
Subscription#request
para solicitar dados adicionais ou o método de
Subscription#cancel
para interromper o processamento.
Exemplo 5. Fluxos Reativos: Interface de Subscription<T>
.
package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); }
A especificação do fluxo reativo fornece outro tipo útil, embora óbvio,:
Processor<A,B>
é apenas uma interface que herda o
Subscriber<A>
e o
Publisher<B>
.
Exemplo 6. Jet streams: interface do Processor<T>
.
package org.reactivestreams; public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Uma especificação não é posicionada como uma prescrição para implementações; na verdade, seu objetivo é definir tipos para suportar a interoperabilidade. O benefício óbvio dos tipos associados aos fluxos reativos é que, apesar disso, eles encontraram um lugar no release Java 9; além disso, semanticamente eles são "um para um", correspondem a interfaces da classe
java.util.concurrent.Flow
, por exemplo:
java.util.concurrent.Flow.Publisher
.
Conheça o ReatorTipos de fluxos reativos por si só não são suficientes; implementações de ordem superior são necessárias para dar suporte a operações como filtragem e transformação. Como tal, o projeto Reactor é conveniente; baseia-se na especificação de Reativos reativos e fornece duas especializações do
Publisher<T>
.
Primeiro, o
Flux<T>
é um
Publisher
que produz zero ou mais valores. O segundo,
Mono<T>
, é o
Publisher<T>
, produzindo zero ou um valor. Ambos publicam valores e podem lidar com eles de acordo, no entanto, seus recursos são muito mais amplos do que a especificação de Fluxos Reativos. Ambos fornecem operadores que permitem processar fluxos de valor. Os tipos de reator compõem bem - a saída de um deles pode servir como entrada para o outro e, se o tipo precisar trabalhar com outros fluxos de dados, eles dependem das instâncias do
Publisher<T>
.
O
Mono<T>
e o
Flux<T>
implementam o
Publisher<T>
; recomendamos que seus métodos aceitem instâncias do
Publisher<T>
mas retornem
Flux<T>
ou
Mono<T>
; isso ajudará o cliente a distinguir que tipo de dados ele recebe.
Suponha que você tenha recebido o
Publisher<T>
e solicitado a exibir a interface do usuário para este
Publisher<T>
. Devo então exibir uma página com detalhes para um registro, já que você pode obter o
CompletableFuture<T>
? Ou exibir uma página de visão geral com uma lista ou grade em que todas as entradas são exibidas página por página? Difícil dizer.
Por sua vez, o
Flux<T>
e o
Mono<T>
muito específicos. Você sabe que precisa exibir uma página de revisão se o
Flux<T>
recebido e uma página com detalhes para um (ou não um) registro ao receber
Mono<T>
.
Reator é um projeto de código aberto lançado pelo Pivotal; Agora ele se tornou muito popular. O Facebook o usa em seu
mecanismo a jato
para chamar procedimentos remotos , e também no
Rsocket , liderado pelo criador do RxJava, Ben Christensen. O Salesforce usa-o em sua
implementação reativa de gRPC . O Reactor implementa tipos de Fluxos Reativos, para que ele possa interagir com outras tecnologias que suportam esses tipos, por exemplo, com o
RxJava 2 da Netflix,
Akka Streams da Lightbend e com o projeto
Vert.x da Eclipse Foundation. David Cairnock, diretor do RxJava 2, também colaborou ativamente com o Pivotal para desenvolver o Reactor, tornando o projeto ainda melhor. Além disso, é claro, ele está presente de uma forma ou de outra no Spring Framework, começando com o Spring Framework 4.0.
Programação reativa com Spring WebFluxPor toda a sua utilidade, o Reator é apenas a base. Nossos aplicativos devem se comunicar com fontes de dados. Deve suportar autenticação e autorização. A primavera fornece tudo isso. Se o Reactor nos fornecer a metáfora que falta, então o Spring nos ajudará a falar um idioma comum.
O Spring Framework 5.0 foi lançado em setembro de 2017. Ele se baseia no Reactor e na especificação Reactive Streams. Ele tem um novo tempo de execução reativo e modelo de componente chamado
Spring WebFlux .
O Spring WebFlux é independente da API do Servlet e não exige que eles funcionem. Ele vem com adaptadores que permitem usá-lo na parte superior do mecanismo Servlet, se necessário, mas isso não é necessário. Ele também fornece um tempo de execução completamente novo baseado em Netty chamado Spring WebFlux. O Spring Framework 5, trabalhando com Java 8 e Java EE 7 e posterior, agora serve como base para grande parte do ecossistema Spring, incluindo Spring Data Kay, Spring Security 5, Spring Security 5, Spring Boot 2 e Spring Cloud Finchley.