大家好!
去年,
Java Enterprise Developer课程已经成功启动,并且我们想与您分享该主题的最新资料,其中讨论了异步方法的使用以及开发响应式响应应用程序的阶段。
走吧
起初,响应式编程听起来像新兴范式的名称,但实际上是指一种编程方法,其中使用面向事件的方法来处理异步数据流。 基于持续不断的数据,反应性系统通过执行一系列事件来响应它们。
响应式编程遵循“观察者”设计模式,该模式可以定义如下:如果一个对象的状态发生变化,则将通知所有其他对象并相应地对其进行更新。 因此,不是轮询事件的更改,而是异步推送事件,以便观察者可以对其进行处理。 在此示例中,观察者是在调度事件时执行的功能。 提到的数据流是实际可观察到的。
几乎所有语言和框架都在其生态系统中使用此方法,并且Java的最新版本也不例外。 在本文中,我将解释如何使用Java EE 8和Java 8功能中的最新版本的JAX-RS应用反应式编程。
喷气宣言Jet宣言列出了四个基本方面,即应用程序需要更加灵活,松耦合,易于扩展并因此具有响应能力。 它指出,应用程序必须具有响应能力,灵活性(因此具有可伸缩性),弹性和消息驱动能力。
基本目标是真正响应的应用程序。 假设有一个应用程序,其中一个大线程参与处理用户请求,并且在完成工作之后,该线程将响应发送回原始请求者。 当应用程序收到的请求数超出其处理能力时,此线程将成为瓶颈,并且该应用程序将失去其以前的响应能力。 为了保持响应能力,应用程序必须是可伸缩的和可恢复的。 可持续性可以被视为具有自动恢复功能的应用程序。 根据大多数开发人员的经验,只有消息驱动的体系结构才能使应用程序具有可伸缩性,弹性和响应能力。
在Java 8和Java EE 8中引入了反应式编程。Java引入了诸如
CompletionStage
及其实现
CompletableFuture
概念,并且Java开始在规范中使用这些功能,例如JAX-RS中的Reactive Client API。
JAX-RS 2.1反应性客户端API让我们看看如何在Java EE 8应用程序中使用反应式编程。要了解该过程,您需要一些Java EE API的基础知识。
JAX-RS 2.1引入了一种新的方法来创建支持响应式编程的REST客户端。 JAX-RS中提供的默认调用程序实现是同步的,这意味着创建的客户端将向服务器的端点发送阻塞调用。 清单1给出了一个示例实现。
清单1
Response response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .get();
从2.0版开始,JAX-RS提供对在客户端API上创建异步调用程序的支持,只需调用
async()
方法即可,如清单2所示。
清单2
Future<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .async() .get();
在客户端上使用异步调用程序将返回类型为
javax.ws.rs.core.Response
的
Future
实例。 这可能导致通过调用
future.get()
来轮询响应,或者注册在HTTP响应可用时将调用的回调。 两种实现方式都适合异步编程,但是如果要将回调分组或向这些异步执行最小值添加条件大小写,事情通常会变得复杂。
JAX-RS 2.1通过用于构建客户端的新JAX-RS Reactive Client API提供了一种克服这些问题的反应方式。 就像在客户端构建期间调用
rx()
方法一样简单。 在清单3中,
rx()
方法返回在客户端执行期间存在的反应式调用程序,并且客户端返回类型为
CompletionStage.rx()
的响应,该响应允许通过简单调用从同步调用程序过渡到异步调用程序。
清单3
CompletionStage<Response> response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .rx() .get();
CompletionStage<>
是Java 8中引入的新接口。顾名思义,
CompletionStage<>
表示一个计算,可以作为较大计算框架的一个步骤。 这是唯一一个使用JAX-RS的Java 8反应性代表。
接收到响应实例之后,我可以调用
AcceptAsync()
,在其中提供一段代码,当响应可用时,它将异步执行,如清单4所示。
清单4
response.thenAcceptAsync(res -> { Temperature t = res.readEntity(Temperature.class);
向端点REST添加反应性反应性方法不仅限于JAX-RS中的客户端; 它也可以在服务器端使用。 例如,首先,我将创建一个简单的脚本,在其中可以请求一个目的地的位置列表。 对于每个位置,我将使用位置数据进行单独调用到另一个点以获取温度值。 目的地的交互将如图1所示。
图1.目标点之间的交互首先,我只定义域模型,然后定义每个模型的服务。 清单5显示了如何定义
Forecast
类,该类包装了
Location
和
Temperature
类。
清单5
public class Temperature { private Double temperature; private String scale;
为了包装预测列表,清单6中实现了
ServiceResponse
类。
清单6
public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; }
清单7中所示
LocationResource
定义了三个用
/location
路径返回的示例位置。
清单7
@Path("/location") public class LocationResource { @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocations() { List<Location> locations = new ArrayList<>(); locations.add(new Location("London")); locations.add(new Location("Istanbul")); locations.add(new Location("Prague")); return Response.ok(new GenericEntity<List<Location>>(locations){}).build(); } }
清单8中显示
TemperatureResource
返回给定位置的30到50之间的随机生成的温度值。 已将500 ms的延迟添加到实现中,以模拟传感器读取。
清单8
@Path("/temperature") public class TemperatureResource { @GET @Path("/{city}") @Produces(MediaType.APPLICATION_JSON) public Response getAverageTemperature(@PathParam("city") String cityName) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { ignored.printStackTrace(); } return Response.ok(temperature).build(); } }
首先,我将展示同步
ForecastResource
的实现(请参见清单9),该实现返回所有位置。 然后,对于每个位置,他致电温度服务部门以摄氏度为单位获取值。
清单9
@Path("/forecast") public class ForecastResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocationsWithTemperature() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = locationTarget .request() .get(new GenericType<List<Location>>(){}); locations.forEach(location -> { Temperature temperature = temperatureTarget .resolveTemplate("city", location.getName()) .request() .get(Temperature.class); response.getForecasts().add( new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return Response.ok(response).build(); } }
当将预测目标请求为
/forecast
,您将获得类似于清单10所示的输出。请注意,请求处理时间为1.533 ms,这是合乎逻辑的,因为来自三个不同位置的温度值同步请求总计为1.5。毫秒
清单10
{ "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 33 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 38 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 46 } } ], "processingTime": 1533 }
到目前为止,一切都按计划进行。 现在是时候在服务器端引入反应式编程了,可以在接收到所有位置之后并行调用每个位置。 这可以明显改善前面显示的同步流。 清单11中完成了该操作,其中显示了被动预测服务版本的定义。
清单11
@Path("/reactiveForecast") public class ForecastReactiveResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public void getLocationsWithTemperature(@Suspended final AsyncResponse async) { long startTime = System.currentTimeMillis();
乍一看,响应式实现似乎很复杂,但是经过更仔细的研究,您会发现它很简单。 在
ForecastReactiveResource
的实现中
ForecastReactiveResource
我首先使用JAX-RS Reactive Client API创建对定位服务的客户端调用。 正如我上面提到的,这是Java EE 8的附加组件,它有助于使用
rx()
方法创建响应式调用。
现在,我将根据位置创建一个新阶段,以汇总预测列表。 它们将作为一个预测列表存储在一个称为
forecastCS
大型完成阶段中。 最终,我将仅使用
forecastCS
创建服务调用响应。
现在,让我们以变量
forecastList
定义的完成阶段列表的形式收集
forecastList
。 为了为每个预测创建一个完成阶段,我按位置传递数据,然后再次使用JAX-RS Reactive Client API创建
tempCS
变量,该API调用城市名称的温度服务。 在这里,我使用
resolveTemplate()
方法构建客户端,这使我可以将城市名称作为参数传递给收集器。
作为流式传输的最后一步,我调用
CompletableFuture.completedFuture()
,并将新的
Forecast
实例作为参数传递。 我将这一未来与
tempCS
阶段结合在一起,以便获得所监视位置的温度值。
清单11中的
CompletableFuture.allOf()
方法将完成阶段列表转换为
forecastCS
。 当所有提供的可完成的将来对象完成时,执行此步骤将返回一个大型的可完成的将来实例。
服务响应是
ServiceResponse
类的实例,因此我创建了一个完成的Future,然后将
forecastCS
完成阶段与预测列表结合起来并计算服务响应时间。
当然,反应式编程会强制服务器端仅异步运行; 在服务器将响应发送回请求者之前,客户端将一直处于阻塞状态。 为了克服此问题,服务器发送事件(SSE)可用于在响应可用时立即部分发送响应,以便将每个位置的温度值一个一个地传输到客户端。
ForecastReactiveResource
的输出将类似于清单12中的输出。如输出中所示,处理时间为515 ms,这是从一个位置获取温度值的理想运行时间。
清单12
{ "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 49 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 32 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 45 } } ], "processingTime": 515 }
结论在本文的示例中,我首先展示了一种使用位置和温度服务获取预报的同步方法。 然后,我转到了反应性方法,以便在服务调用之间执行异步处理。 当您将Java EE 8中的JAX-RS Reactive Client API与Java 8中可用的
CompletionStage
和
CompletableFuture
类一起使用时,由于响应式编程,异步处理的功能会松散。
响应式编程不仅仅是从同步模型中实现异步模型。 它还简化了嵌套阶段等概念的使用。 使用的次数越多,在并行编程中管理复杂的脚本就越容易。
结束
谢谢您的关注。 与往常一样,我们正在等待您的评论和问题。