Programação reativa com JAX-RS

Olá pessoal!

O ano passado, o curso Java Enterprise Developer, foi lançado com sucesso e temos o último material sobre este tópico que queremos compartilhar com você, que discute o uso da abordagem assíncrona e da preparação para o desenvolvimento de aplicativos responsivos responsivos.

Vamos lá

A programação reativa a princípio soa como o nome de um paradigma emergente, mas na verdade se refere a um método de programação no qual uma abordagem orientada a eventos é usada para trabalhar com fluxos de dados assíncronos. Com base em dados constantemente atualizados, os sistemas reativos respondem a eles executando uma série de eventos.
A programação reativa segue o padrão de design “Observador”, que pode ser definido da seguinte forma: se um estado mudar em um objeto, todos os outros objetos são notificados e atualizados de acordo. Portanto, em vez de pesquisar eventos para alterações, os eventos são enviados de forma assíncrona para que os observadores possam processá-los. Neste exemplo, observadores são funções executadas quando o evento é despachado. E o fluxo de dados mencionado é o real observável.

Quase todas as linguagens e estruturas usam essa abordagem em seu ecossistema, e as versões mais recentes do Java não são exceção. Neste artigo, explicarei como a programação reativa pode ser aplicada usando a versão mais recente do JAX-RS na funcionalidade Java EE 8 e Java 8.



Manifesto a jato

O Jet Manifesto lista quatro aspectos fundamentais que um aplicativo precisa ser mais flexível, pouco acoplado e fácil de dimensionar e, portanto, capaz de ser reativo. Ele afirma que o aplicativo deve ser responsivo, flexível (e, portanto, escalável), resiliente e orientado a mensagens.

O objetivo subjacente é uma aplicação verdadeiramente responsiva. Suponha que haja um aplicativo no qual um encadeamento grande esteja envolvido no processamento de solicitações do usuário e, após concluir o trabalho, esse encadeamento envia respostas de volta aos solicitantes originais. Quando um aplicativo recebe mais solicitações do que pode lidar, esse encadeamento se torna um gargalo e o aplicativo perde sua capacidade de resposta anterior. Para manter a capacidade de resposta, o aplicativo deve ser escalável e resiliente. Sustentável pode ser considerado um aplicativo que possui funcionalidade para recuperação automática. Na experiência da maioria dos desenvolvedores, apenas a arquitetura orientada a mensagens permite que o aplicativo seja escalável, resiliente e responsivo.

A programação reativa foi introduzida no Java 8 e Java EE 8. O Java introduziu conceitos como CompletionStage e sua implementação do CompletableFuture , e o Java começou a usar esses recursos em especificações como a API do cliente reativo no JAX-RS.

API do cliente reativo JAX-RS 2.1

Vamos ver como a programação reativa pode ser usada em aplicativos Java EE 8. Para entender o processo, você precisa de algum conhecimento básico da API Java EE.

O JAX-RS 2.1 introduziu uma nova maneira de criar um cliente REST com suporte para programação reativa. A implementação padrão do invocador oferecida no JAX-RS é síncrona, o que significa que o cliente criado enviará uma chamada de bloqueio para o terminal do servidor. Um exemplo de implementação é apresentado na Listagem 1.

Listagem 1

 Response response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .get(); 

A partir da versão 2.0, o JAX-RS fornece suporte para criar um invocador assíncrono na API do cliente com uma chamada simples para o método async() , como mostra a Listagem 2.

Listagem 2

 Future<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .async() .get(); 

O uso de um invocador assíncrono no cliente retorna uma instância Future do tipo javax.ws.rs.core.Response . Isso pode levar à consulta da resposta, com uma chamada para future.get() , ou ao registro de um retorno de chamada que será chamado quando uma resposta HTTP estiver disponível. Ambas as implementações são adequadas para programação assíncrona, mas as coisas geralmente ficam complicadas se você deseja agrupar retornos de chamada ou adicionar casos condicionais a esses mínimos de execução assíncronos.

O JAX-RS 2.1 fornece uma maneira reativa de superar esses problemas com a nova API do cliente reativo do JAX-RS para a construção de um cliente. É tão simples quanto chamar o método rx() durante a criação do cliente. Na Listagem 3, o método rx() retorna o invocador reativo que existe durante a execução do cliente e o cliente retorna uma resposta do tipo CompletionStage.rx() , que permite a transição do invocador síncrono para o invocador assíncrono com uma chamada simples.

Listagem 3

 CompletionStage<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .rx() .get(); 

CompletionStage<> é uma nova interface introduzida no Java 8. Ela representa uma computação, que pode ser uma etapa dentro de uma computação maior, como o nome indica. Este é o único representante de reatividade do Java 8 a acessar o JAX-RS.
Depois de receber a instância de resposta, posso chamar AcceptAsync() , onde posso fornecer um trecho de código que será executado de forma assíncrona quando a resposta ficar disponível, conforme mostrado na Listagem 4.

Listagem 4

 response.thenAcceptAsync(res -> { Temperature t = res.readEntity(Temperature.class); //do stuff with t }); 

Incluindo reatividade em um terminal REST

A abordagem reativa não se limita ao lado do cliente no JAX-RS; também pode ser usado no lado do servidor. Por exemplo, primeiro vou criar um script simples onde posso solicitar uma lista de locais de um destino. Para cada posição, farei uma chamada separada com dados de localização em outro ponto para obter os valores de temperatura. A interação dos destinos será mostrada na Figura 1.


Figura 1. Interação entre pontos de destino

Primeiro, eu simplesmente defino o modelo de domínio e, em seguida, os serviços para cada modelo. A Listagem 5 mostra como a classe Forecast é definida, o que agrupa as classes Location e Temperature .

Listagem 5

 public class Temperature { private Double temperature; private String scale; // getters & setters } public class Location { String name; public Location() {} public Location(String name) { this.name = name; } // getters & setters } public class Forecast { private Location location; private Temperature temperature; public Forecast(Location location) { this.location = location; } public Forecast setTemperature( final Temperature temperature) { this.temperature = temperature; return this; } // getters } 

Para quebrar a lista de previsões, a classe ServiceResponse implementada na Listagem 6.

Listagem 6

 public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; } // getters } 

LocationResource mostrado na Listagem 7 define três locais de amostra retornados com o caminho /location .

Listagem 7

 @Path("/location") public class LocationResource { @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocations() { List<Location> locations = new ArrayList<>(); locations.add(new Location("London")); locations.add(new Location("Istanbul")); locations.add(new Location("Prague")); return Response.ok(new GenericEntity<List<Location>>(locations){}).build(); } } 

TemperatureResource mostrado na Listagem 8 retorna um valor de temperatura gerado aleatoriamente entre 30 e 50 para um determinado local. Um atraso de 500 ms foi adicionado à implementação para simular uma leitura do sensor.

Listagem 8

 @Path("/temperature") public class TemperatureResource { @GET @Path("/{city}") @Produces(MediaType.APPLICATION_JSON) public Response getAverageTemperature(@PathParam("city") String cityName) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { ignored.printStackTrace(); } return Response.ok(temperature).build(); } } 

Primeiro, mostrarei a implementação do ForecastResource síncrono (consulte a Listagem 9), que retorna todos os locais. Então, para cada posição, ele chama o serviço de temperatura para obter os valores em graus Celsius.

Listagem 9

 @Path("/forecast") public class ForecastResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocationsWithTemperature() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = locationTarget .request() .get(new GenericType<List<Location>>(){}); locations.forEach(location -> { Temperature temperature = temperatureTarget .resolveTemplate("city", location.getName()) .request() .get(Temperature.class); response.getForecasts().add( new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return Response.ok(response).build(); } } 

Quando o destino da previsão é solicitado como /forecast , você obtém uma saída semelhante à mostrada na Listagem 10. Observe que o tempo de processamento da solicitação levou 1.533 ms, o que é lógico, pois uma solicitação síncrona de valores de temperatura de três locais diferentes chega a 1,5 ms

Listagem 10

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 33 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 38 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 46 } } ], "processingTime": 1533 } 

Até agora, tudo está indo conforme o planejado. É hora de introduzir a programação reativa no lado do servidor, onde as chamadas para cada local podem ser feitas em paralelo após o recebimento de todos os locais. Isso pode melhorar claramente o fluxo síncrono mostrado anteriormente. Isso é feito na Listagem 11, que mostra a definição da versão do serviço de previsão reativa.

Listagem 11

 @Path("/reactiveForecast") public class ForecastReactiveResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public void getLocationsWithTemperature(@Suspended final AsyncResponse async) { long startTime = System.currentTimeMillis(); //   (stage)    CompletionStage<List<Location>> locationCS = locationTarget.request() .rx() .get(new GenericType<List<Location>>() {}); //      , //  ,   , //    CompletionStage final CompletionStage<List<Forecast>> forecastCS = locationCS.thenCompose(locations -> { //      //   ompletionStage List<CompletionStage<Forecast>> forecastList = //      //     locations.stream().map(location -> { //     //      //    final CompletionStage<Temperature> tempCS = temperatureTarget .resolveTemplate("city", location.getName()) .request() .rx() .get(Temperature.class); //   CompletableFuture,   //    //      return CompletableFuture.completedFuture( new Forecast(location)) .thenCombine(tempCS, Forecast::setTemperature); }).collect(Collectors.toList()); //    CompletableFuture, //     completable future //  return CompletableFuture.allOf( forecastList.toArray( new CompletableFuture[forecastList.size()])) .thenApply(v -> forecastList.stream() .map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join) .collect(Collectors.toList())); }); //   ServiceResponse, //       //    . //   future    // forecastCS,    //      CompletableFuture.completedFuture( new ServiceResponse()) .thenCombine(forecastCS, ServiceResponse::forecasts) .whenCompleteAsync((response, throwable) -> { response.setProcessingTime( System.currentTimeMillis() - startTime); async.resume(response); }); } } 

A implementação reativa pode parecer complicada à primeira vista, mas após um estudo mais cuidadoso, você notará que é bastante simples. Na implementação do ForecastReactiveResource primeiro crio uma chamada de cliente para serviços de localização usando a API do cliente reativo JAX-RS. Como mencionei acima, este é um complemento para o Java EE 8 e ajuda a criar uma chamada reativa simplesmente usando o método rx() .

Agora, estou criando uma nova fase com base no local para reunir uma lista de previsões. Eles serão armazenados como uma lista de previsão em um grande estágio de conclusão chamado forecastCS . Por fim, criarei uma resposta de chamada de serviço usando apenas o forecastCS .

E agora, vamos coletar as previsões na forma de uma lista de estágios de conclusão definidos na variável forecastList . Para criar um estágio de conclusão para cada previsão, passo os dados por local e, em seguida, crio a variável tempCS , novamente usando a API do cliente reativo JAX-RS, que chama o serviço de temperatura com o nome da cidade. Aqui, uso o método resolveTemplate() para criar o cliente, e isso permite que eu passe o nome da cidade para o coletor como parâmetro.

Como etapa final do streaming, faço uma chamada para CompletableFuture.completedFuture() , passando a nova instância de Forecast como parâmetro. tempCS esse futuro com o estágio tempCS para que eu tenha um valor de temperatura para os locais monitorados.

O método CompletableFuture.allOf() na Listagem 11 converte a lista de estágios de conclusão em forecastCS . A execução desta etapa retorna uma instância futura grande e completa quando todos os objetos futuros completáveis ​​fornecidos estiverem completos.

A resposta do serviço é uma instância da classe ServiceResponse , portanto, crio um futuro concluído e, em seguida, combino o estágio de conclusão do forecastCS com uma lista de previsões e calculo o tempo de resposta do serviço.

Obviamente, a programação reativa força apenas o lado do servidor a executar de forma assíncrona; o lado do cliente será bloqueado até que o servidor envie uma resposta de volta ao solicitante. Para superar esse problema, os SSEs (Server Sent Events) podem ser usados ​​para enviar parcialmente uma resposta assim que disponível, para que os valores de temperatura de cada local sejam transmitidos ao cliente um por um. A saída do ForecastReactiveResource será semelhante à apresentada na Listagem 12. Como mostrado na saída, o tempo de processamento é de 515 ms, que é um tempo de execução ideal para obter valores de temperatura de um local.

Listagem 12

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 49 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 32 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 45 } } ], "processingTime": 515 } 

Conclusão

Nos exemplos deste artigo, primeiro mostrei uma maneira síncrona de obter previsões usando serviços de localização e temperatura. Depois, fui para a abordagem reativa para que o processamento assíncrono seja executado entre as chamadas de serviço. Quando você usa a API do cliente reativo JAX-RS no Java EE 8, juntamente com as classes CompletionStage e CompletableFuture disponíveis no Java 8, o poder do processamento assíncrono diminui graças à programação reativa.

A programação reativa é mais do que apenas implementar um modelo assíncrono a partir de um modelo síncrono; também simplifica o trabalho com conceitos como o estágio de aninhamento. Quanto mais ele for usado, mais fácil será gerenciar scripts complexos em programação paralela.

O FIM

Obrigado pela atenção. Como sempre, estamos aguardando seus comentários e perguntas.

Source: https://habr.com/ru/post/pt424031/


All Articles