在Spring中引入反应式编程

哈Ha!

本周,我们期望印刷厂出版一本新的Spring 5 书籍


在Spring 5的有趣特性中,应该特别提及反应式编程,Matt Raible的拟议文章简要描述了在该框架中的实现。 在上述书籍中,第11章讨论了反应模式。

马特(Matt)由乔什·朗(Josh Long)合着,乔什·朗(Josh Long)是去年夏天发行的另一本关于Java和Spring的伟大著作《 云中的Java 》的作者。

反应性编程是构建耐高负荷系统的方法。 处理大量流量不再是问题,因为服务器处于非阻塞状态,并且客户端进程不必等待响应。 客户端无法直接观察程序如何在服务器上运行并与其同步。 当API发现难以处理请求时,它仍应给出合理的响应。 不应以不受控制的方式拒绝和丢弃消息。 它必须告知较高的组件它正在负载下工作,以便它们可以部分地摆脱负载。 这种技术称为反压,这是反应式编程的重要方面。

我们与Josh Long共同撰写了这篇文章。 Josh是Java冠军,Spring Developer Advocate,并且通常是在Pivotal工作的全球人员。 我已经与Spring合作了很长时间,但是Josh向我展示了Spring Boot,那是在比利时的Devoxx会议上。 从那时起,我们成为好朋友,我们喜欢Java并编写了出色的应用程序。

反应式编程或I / O,I / O,我们开始工作...

响应式编程是一种创建主动使用异步I / O的软件的方法。 异步I / O是一个很小的主意,充满了编程方面的巨大变化。 这个想法本身很简单:使用效率低下的资源分配来纠正这种情况,释放那些无需我们干预就可以闲置的资源,等待I / O完成。 异步输入/输出颠倒了常规的I / O处理方法:客户端被释放,可以执行其他任务,等待新的通知。

考虑一下同步和异步输入/输出之间的共同点,以及它们之间的区别是什么。

我们将编写一个简单的程序,该程序从源中读取数据(特别是我们在谈论java.io.File链接)。 让我们从使用良好的旧java.io.InputStream的实现开始:

例子1.从文件同步读取数据

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } } 

  1. 我们提供了一个用于读取的文件,该文件具有通常的java.io.File
  2. 一次从源中提取结果一行...
  3. 我编写了这段代码以使用Consumer<BytesPayloadgt; 新数据到达时调用

很简单,你怎么说? 运行此代码,您将在日志输出中(每行左侧)看到该代码,指示所有操作均在单个线程中发生。
在这里,我们从源中获取的数据中提取字节(在本例中,我们所讨论的是从java.io.InputStream继承的java.io.InputStream的子类)。 这个例子有什么问题? 在这种情况下,我们使用一个InputStream指向位于文件系统上的数据。 如果文件存在,并且硬盘驱动器正在运行,则此代码将按预期工作。

但是,如果我们不是从File而是从网络套接字读取数据,而是使用InputStream另一种实现,将会发生什么? 不用担心! 当然,如果网络的速度无限高,绝对没有什么可担心的。 并且如果此节点与另一个节点之间的网络通道永远不会失败。 如果满足这些条件,则代码将完美运行。

但是,如果网络变慢或瘫痪怎么办? 在这种情况下,我的意思是我们将增加周期,直到操作in.read(…)为止。 实际上,她可能根本不回来! 如果我们尝试对要从中读取数据的流执行其他操作,那么这将是一个问题。 当然,您始终可以创建另一个流并通过该流读取数据。 可以做到这一点,但最终,我们将达到极限,仅增加线程以进一步扩展就不再足够了。 除了我们机器上的内核数量之外,我们将没有真正的竞争。 死胡同! 在这种情况下,仅由于额外的流量,我们才可以增加输入/输出处理(此处是指读数),但在此我们迟早会达到极限。

在此示例中,主要工作是阅读-在其他方面几乎没有任何反应。 我们依赖于I / O。 考虑一下异步解决方案如何帮助我们部分克服流程的垄断。

例子2.从文件异步读取数据

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } } 

  1. 这次我们改编java.io.File ,从中制作Java NIO java.nio.file.Path
  2. 创建Channel ,尤其要指定java.util.concurrent.ExecutorService服务,该服务将在必要数据出现时用于调用CompletionHandler处理程序。
  3. 我们通过传递指向CompletionHandler<Integer, ByteBuffer> (this)的链接开始阅读CompletionHandler<Integer, ByteBuffer> (this)
  4. 在回调中,将字节ByteBufferbyte[]读取为byte[]容量byte[]
  5. 就像在Synchronous示例中一样, byte[]数据被传递给使用者。

我们将立即进行预订:事实证明,这段代码要难得多! 这里发生了很多事情,您的头脑马上就转动了,但是,让我指出...此代码从Java NIO Channel读取数据,然后在负责回调的单独线程中处理此数据。 因此,开始读取的流不会被独占。 调用.read(..)之后,我们几乎立即返回,最后,当我们拥有可用的数据时,就会进行回调-已经在另一个线程中。 如果对.read()调用之间存在延迟.read()可以在线程中执行其他操作来处理其他问题。 从第一个字节到最后一个字节的异步读取操作的持续时间最多不超过同步读取操作的持续时间。 通常,异步操作的时间可以忽略不计。 但是,面对这些额外的困难,我们可以更有效地处理流程。 做更多的工作,在有限数量的线程池中复用I / O。

我在一家云计算公司工作。 我们希望您获得该应用程序的新实例,以解决水平缩放问题! 当然,我这里有点不屑一顾。 异步I / O使事情变得有些复杂,但是我希望该示例说明反应式代码是如此有用:如果性能高度依赖于I / O,它允许您处理更多请求并在现有硬件上执行更多工作。 如果性能取决于处理器的使用(例如,我们正在谈论斐波那契数字的运算,挖掘比特币或加密技术),那么反应式编程将不会给我们任何帮助。

当前,我们大多数人在日常工作中都不使用ChannelInputStream实现! 我们必须在更高级别的抽象级别上考虑问题。 它与数组之类的东西有关,或者与java.util.Collection层次结构有关。 java.util.Collection集合在InputStream上显示得非常好:两个实体都假定您可以一次并几乎立即操作所有数据。 预期您将能够较早而不是以后完成对大多数InputStreams读取。 当移至大量数据时,收集类型变得有些不舒服。 如果您要处理潜在的无限(无限)事件,例如Web套接字或服务器事件,该怎么办? 如果两次录音之间有延迟怎么办?

我们需要一种更好的方式来描述此类数据。 我们正在谈论异步事件,这些事件最终会发生。 似乎Future<T>CompletableFuture<T>很适合此目的,但它们仅描述了最后发生的一件事。 实际上,Java没有提供合适的隐喻来描述这种数据。 Java 8中的IteratorStream类型可能都不相关,但是,两者都是针对拉的。 您自己请求下一个条目,而不是类型应向您的代码发送回调。 假定如果在这种情况下支持基于推的处理,这将允许在线程级别实现更多功能,则API还将提供线程和调度控制。 Iterator实现对线程一无所知,并且所有Java 8线程共享相同的fork-join池。

如果IteratorStream确实支持推送处理,那么我们将遇到另一个在I / O上下文中确实升级的问题:我们将需要某种反向渗透机制! 由于数据使用者是异步处理的,因此我们不知道何时将数据放入管道中以及数量。 我们不知道在下一个回调中需要处理多少数据:一个字节或一个TB!

InputStream提取数据,您所读取的信息已准备就绪,可以处理的数量就更多了。 在前面的示例中,我们将数据读取到固定长度和已知长度的byte[]缓冲区中。 在异步上下文中,我们需要某种方式来告知提供者我们愿意处理多少数据。
是的,长官 这里肯定缺少一些东西。

搜索丢失的隐喻

在这种情况下,我们正在寻找一个隐喻,该隐喻可以完美地反映异步I / O的本质,支持这种用于数据反向传输的机制,并允许我们控制分布式系统中的执行流程。 在反应式编程中,客户端发信号通知其能够处理的负载的能力称为“反向流”。

现在有许多好的项目-Vert.x,Akka Streams和RxJava-支持反应式编程。 Spring团队还运行一个名为Reactor的项目。 在这些不同的标准之间,存在一个相当广泛的通用领域,实际上分配给了Reactive Streams主动标准。 反应流计划定义了四种类型:

Publisher<T>界面Publisher<T> 产生可能最终到达的价值。 Publisher<T>界面Publisher<T>Subscriber<T>产生类型T值。

示例3.反应性流: Publisher<T>接口

 package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); } 

Subscriber类型订阅Publisher<T> ,并通过其onNext(T)方法接收类型T的任何新值的通知。 如果发生任何错误,则onError(Throwable)onError(Throwable)方法。 当处理正常完成时,将调用订户的onComplete方法。

示例4. Jet流: Subscriber<T>接口。

 package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } 

Subscriber首次连接到Publisher ,它会通过Subscriber#onSubscribe接收Subscription 。 订阅Subscription可能是整个规范中最重要的部分。 是她提供了回流。 订户订户使用Subscription#request方法请求其他数据,或使用Subscription#cancel方法停止处理。

示例5.反应性流: Subscription<T>接口

 package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); } 

反应性流规范提供了另一种有用的(尽管很明显)类型: Processor<A,B>只是一个继承了Subscriber<A>Publisher<B>

示例6. Jet流: Processor<T>接口

 package org.reactivestreams; public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> { } 

规范不作为实现的处方;实际上,其目的是定义支持互操作性的类型。 与响应流相关联的类型的明显好处是,它们仍然在Java 9发行版中找到了位置,而且在语义上它们是“一对一”的,对应于java.util.concurrent.Flow类的接口,例如: java.util.concurrent.Flow.Publisher

认识反应堆

仅反应流的类型是不够的。 需要更高阶的实现来支持诸如过滤和转换之类的操作。 这样,Reactor项目很方便。 它建立在Reactive Streams规范的基础上,并提供了两个Publisher<T>专长。

首先, Flux<T>是产生零个或多个值的发布者。 第二个, Mono<T> ,是Publisher<T> ,产生零或一个值。 它们都可以发布值并可以相应地处理它们,但是它们的功能比Reactive Streams规范要广泛得多。 两者都提供了允许您处理价值流的运算符。 Reactor类型组成良好-其中一个的输出可以用作另一个的输入,并且如果一个类型需要与其他数据流一起使用,则它们依赖Publisher<T>实例。

Mono<T>Flux<T>实现Publisher<T> ; 我们建议您的方法接受Publisher<T>实例,但返回Flux<T>Mono<T> ; 这将帮助客户区分他接收的数据类型。

假设您被授予Publisher<T>并要求显示该Publisher<T>的用户界面。 然后,由于您可以获得CompletableFuture<T> ,因此我是否应该显示一个包含详细记录的页面? 还是显示带有列表或网格的概览页面,其中所有条目都逐页显示? 很难说。

反过来, Flux<T>Mono<T>非常具体。 您知道,如果收到Flux<T>需要显示一个评论页面,而当收到Mono<T>时,则需要显示一个包含一个(或非单个)记录的详细信息的页面。

Reactor是Pivotal启动的一个开源项目; 现在他变得非常受欢迎。 Facebook在其喷气引擎中使用它来调用远程过程 ,并在RxJava创造者Ben Christensen领导的Rsocket中使用它。 Salesforce在其反应性gRPC实现中使用它。 Reactor实现了Reactive Streams类型,因此它可以与支持这些类型的其他技术进行交互,例如,与Netflix的RxJava 2Lightbend的 Akka Streams以及Eclipse Foundation的Vert.x项目。 RxJava 2的主管David Cairnock也积极与Pivotal合作开发Reactor,从而使该项目变得更好。 另外,当然,从Spring Framework 4.0开始,它在Spring Framework中以一种或另一种形式出现。

使用Spring WebFlux进行反应式编程

尽管有其所有用途,Reactor只是基础。 我们的应用程序必须与数据源通信。 必须支持身份验证和授权。 Spring提供了所有这些。 如果Reactor给我们提供了缺失的隐喻,那么Spring可以帮助我们所有人说一种共同的语言。

Spring Framework 5.0于2017年9月发布。它基于Reactor和Reactive Streams规范。 它具有一个称为Spring WebFlux的新的反应式运行时和组件模型。

Spring WebFlux独立于Servlet API,不需要它们工作。 它带有适配器,如有必要,可让您在Servlet引擎上使用它,但这不是必需的。 它还提供了一个全新的基于Netty的运行时,称为Spring WebFlux。 与Java 8和Java EE 7及更高版本一起使用的Spring Framework 5现在成为许多Spring生态系统的基础,包括Spring Data Kay,Spring Security 5,Spring Boot 2和Spring Cloud Finchley。

Source: https://habr.com/ru/post/zh-CN435972/


All Articles