使用JAX-RS进行反应式编程

大家好!

去年, Java Enterprise Developer课程已经成功启动,并且我们想与您分享该主题的最新资料,其中讨论了异步方法的使用以及开发响应式响应应用程序的阶段。

走吧

起初,响应式编程听起来像新兴范式的名称,但实际上是指一种编程方法,其中使用面向事件的方法来处理异步数据流。 基于持续不断的数据,反应性系统通过执行一系列事件来响应它们。
响应式编程遵循“观察者”设计模式,该模式可以定义如下:如果一个对象的状态发生变化,则将通知所有其他对象并相应地对其进行更新。 因此,不是轮询事件的更改,而是异步推送事件,以便观察者可以对其进行处理。 在此示例中,观察者是在调度事件时执行的功能。 提到的数据流是实际可观察​​到的。

几乎所有语言和框架都在其生态系统中使用此方法,并且Java的最新版本也不例外。 在本文中,我将解释如何使用Java EE 8和Java 8功能中的最新版本的JAX-RS应用反应式编程。



喷气宣言

Jet宣言列出了四个基本方面,即应用程序需要更加灵活,松耦合,易于扩展并因此具有响应能力。 它指出,应用程序必须具有响应能力,灵活性(因此具有可伸缩性),弹性和消息驱动能力。

基本目标是真正响应的应用程序。 假设有一个应用程序,其中一个大线程参与处理用户请求,并且在完成工作之后,该线程将响应发送回原始请求者。 当应用程序收到的请求数超出其处理能力时,此线程将成为瓶颈,并且该应用程序将失去其以前的响应能力。 为了保持响应能力,应用程序必须是可伸缩的和可恢复的。 可持续性可以被视为具有自动恢复功能的应用程序。 根据大多数开发人员的经验,只有消息驱动的体系结构才能使应用程序具有可伸缩性,弹性和响应能力。

在Java 8和Java EE 8中引入了反应式编程。Java引入了诸如CompletionStage及其实现CompletableFuture概念,并且Java开始在规范中使用这些功能,例如JAX-RS中的Reactive Client API。

JAX-RS 2.1反应性客户端API

让我们看看如何在Java EE 8应用程序中使用反应式编程。要了解该过程,您需要一些Java EE API的基础知识。

JAX-RS 2.1引入了一种新的方法来创建支持响应式编程的REST客户端。 JAX-RS中提供的默认调用程序实现是同步的,这意味着创建的客户端将向服务器的端点发送阻塞调用。 清单1给出了一个示例实现。

清单1

 Response response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .get(); 

从2.0版开始,JAX-RS提供对在客户端API上创建异步调用程序的支持,只需调用async()方法即可,如清单2所示。

清单2

 Future<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .async() .get(); 

在客户端上使用异步调用程序将返回类型为javax.ws.rs.core.ResponseFuture实例。 这可能导致通过调用future.get()来轮询响应,或者注册在HTTP响应可用时将调用的回调。 两种实现方式都适合异步编程,但是如果要将回调分组或向这些异步执行最小值添加条件大小写,事情通常会变得复杂。

JAX-RS 2.1通过用于构建客户端的新JAX-RS Reactive Client API提供了一种克服这些问题的反应方式。 就像在客户端构建期间调用rx()方法一样简单。 在清单3中, rx()方法返回在客户端执行期间存在的反应式调用程序,并且客户端返回类型为CompletionStage.rx()的响应,该响应允许通过简单调用从同步调用程序过渡到异步调用程序。

清单3

 CompletionStage<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .rx() .get(); 

CompletionStage<>是Java 8中引入的新接口。顾名思义, CompletionStage<>表示一个计算,可以作为较大计算框架的一个步骤。 这是唯一一个使用JAX-RS的Java 8反应性代表。
接收到响应实例之后,我可以调用AcceptAsync() ,在其中提供一段代码,当响应可用时,它将异步执行,如清单4所示。

清单4

 response.thenAcceptAsync(res -> { Temperature t = res.readEntity(Temperature.class); //do stuff with t }); 

向端点REST添加反应性

反应性方法不仅限于JAX-RS中的客户端; 它也可以在服务器端使用。 例如,首先,我将创建一个简单的脚本,在其中可以请求一个目的地的位置列表。 对于每个位置,我将使用位置数据进行单独调用到另一个点以获取温度值。 目的地的交互将如图1所示。


图1.目标点之间的交互

首先,我只定义域模型,然后定义每个模型的服务。 清单5显示了如何定义Forecast类,该类包装了LocationTemperature类。

清单5

 public class Temperature { private Double temperature; private String scale; // getters & setters } public class Location { String name; public Location() {} public Location(String name) { this.name = name; } // getters & setters } public class Forecast { private Location location; private Temperature temperature; public Forecast(Location location) { this.location = location; } public Forecast setTemperature( final Temperature temperature) { this.temperature = temperature; return this; } // getters } 

为了包装预测列表,清单6中实现了ServiceResponse类。

清单6

 public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; } // getters } 

清单7中所示LocationResource定义了三个用/location路径返回的示例位置。

清单7

 @Path("/location") public class LocationResource { @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocations() { List<Location> locations = new ArrayList<>(); locations.add(new Location("London")); locations.add(new Location("Istanbul")); locations.add(new Location("Prague")); return Response.ok(new GenericEntity<List<Location>>(locations){}).build(); } } 

清单8中显示TemperatureResource返回给定位置的30到50之间的随机生成的温度值。 已将500 ms的延迟添加到实现中,以模拟传感器读取。

清单8

 @Path("/temperature") public class TemperatureResource { @GET @Path("/{city}") @Produces(MediaType.APPLICATION_JSON) public Response getAverageTemperature(@PathParam("city") String cityName) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { ignored.printStackTrace(); } return Response.ok(temperature).build(); } } 

首先,我将展示同步ForecastResource的实现(请参见清单9),该实现返回所有位置。 然后,对于每个位置,他致电温度服务部门以摄氏度为单位获取值。

清单9

 @Path("/forecast") public class ForecastResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocationsWithTemperature() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = locationTarget .request() .get(new GenericType<List<Location>>(){}); locations.forEach(location -> { Temperature temperature = temperatureTarget .resolveTemplate("city", location.getName()) .request() .get(Temperature.class); response.getForecasts().add( new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return Response.ok(response).build(); } } 

当将预测目标请求为/forecast ,您将获得类似于清单10所示的输出。请注意,请求处理时间为1.533 ms,这是合乎逻辑的,因为来自三个不同位置的温度值同步请求总计为1.5。毫秒

清单10

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 33 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 38 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 46 } } ], "processingTime": 1533 } 

到目前为止,一切都按计划进行。 现在是时候在服务器端引入反应式编程了,可以在接收到所有位置之后并行调用每个位置。 这可以明显改善前面显示的同步流。 清单11中完成了该操作,其中显示了被动预测服务版本的定义。

清单11

 @Path("/reactiveForecast") public class ForecastReactiveResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public void getLocationsWithTemperature(@Suspended final AsyncResponse async) { long startTime = System.currentTimeMillis(); //   (stage)    CompletionStage<List<Location>> locationCS = locationTarget.request() .rx() .get(new GenericType<List<Location>>() {}); //      , //  ,   , //    CompletionStage final CompletionStage<List<Forecast>> forecastCS = locationCS.thenCompose(locations -> { //      //   ompletionStage List<CompletionStage<Forecast>> forecastList = //      //     locations.stream().map(location -> { //     //      //    final CompletionStage<Temperature> tempCS = temperatureTarget .resolveTemplate("city", location.getName()) .request() .rx() .get(Temperature.class); //   CompletableFuture,   //    //      return CompletableFuture.completedFuture( new Forecast(location)) .thenCombine(tempCS, Forecast::setTemperature); }).collect(Collectors.toList()); //    CompletableFuture, //     completable future //  return CompletableFuture.allOf( forecastList.toArray( new CompletableFuture[forecastList.size()])) .thenApply(v -> forecastList.stream() .map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join) .collect(Collectors.toList())); }); //   ServiceResponse, //       //    . //   future    // forecastCS,    //      CompletableFuture.completedFuture( new ServiceResponse()) .thenCombine(forecastCS, ServiceResponse::forecasts) .whenCompleteAsync((response, throwable) -> { response.setProcessingTime( System.currentTimeMillis() - startTime); async.resume(response); }); } } 

乍一看,响应式实现似乎很复杂,但是经过更仔细的研究,您会发现它很简单。 在ForecastReactiveResource的实现中ForecastReactiveResource我首先使用JAX-RS Reactive Client API创建对定位服务的客户端调用。 正如我上面提到的,这是Java EE 8的附加组件,它有助于使用rx()方法创建响应式调用。

现在,我将根据位置创建一个新阶段,以汇总预测列表。 它们将作为一个预测列表存储在一个称为forecastCS大型完成阶段中。 最终,我将仅使用forecastCS创建服务调用响应。

现在,让我们以变量forecastList定义的完成阶段列表的形式收集forecastList 。 为了为每个预测创建一个完成阶段,我按位置传递数据,然后再次使用JAX-RS Reactive Client API创建tempCS变量,该API调用城市名称的温度服务。 在这里,我使用resolveTemplate()方法构建客户端,这使我可以将城市名称作为参数传递给收集器。

作为流式传输的最后一步,我调用CompletableFuture.completedFuture() ,并将新的Forecast实例作为参数传递。 我将这一未来与tempCS阶段结合在一起,以便获得所监视位置的温度值。

清单11中的CompletableFuture.allOf()方法将完成阶段列表转换为forecastCS 。 当所有提供的可完成的将来对象完成时,执行此步骤将返回一个大型的可完成的将来实例。

服务响应是ServiceResponse类的实例,因此我创建了一个完成的Future,然后将forecastCS完成阶段与预测列表结合起来并计算服务响应时间。

当然,反应式编程会强制服务器端仅异步运行; 在服务器将响应发送回请求者之前,客户端将一直处于阻塞状态。 为了克服此问题,服务器发送事件(SSE)可用于在响应可用时立即部分发送响应,以便将每个位置的温度值一个一个地传输到客户端。 ForecastReactiveResource的输出将类似于清单12中的输出。如输出中所示,处理时间为515 ms,这是从一个位置获取温度值的理想运行时间。

清单12

 { "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 49 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 32 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 45 } } ], "processingTime": 515 } 

结论

在本文的示例中,我首先展示了一种使用位置和温度服务获取预报的同步方法。 然后,我转到了反应性方法,以便在服务调用之间执行异步处理。 当您将Java EE 8中的JAX-RS Reactive Client API与Jav​​a 8中可用的CompletionStageCompletableFuture类一起使用时,由于响应式编程,异步处理的功能会松散。

响应式编程不仅仅是从同步模型中实现异步模型。 它还简化了嵌套阶段等概念的使用。 使用的次数越多,在并行编程中管理复杂的脚本就越容易。

结束

谢谢您的关注。 与往常一样,我们正在等待您的评论和问题。

Source: https://habr.com/ru/post/zh-CN424031/


All Articles