带有示例的异步编程:重构java.util.concurrent.CompletableFuture方法

如果此类的源代码是开放的,为什么必须进行重构?


仅仅是因为在引擎盖下有一个高度优化的,难以阅读的代码,对它的研究在教学上几乎没有。


因此,我们将根据操作的规范重新创建操作的语义,并编写功能等效,可理解和可读的代码,尽管就内存消耗和处理器时间而言,这可能不是最经济的。


让我们从一个相对简单的方法开始:


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture 

仔细阅读规格:


 Returns a new CompletableFuture 

即,创建了CompletableFuture类型的对象或其子类,并将其作为结果返回。


 that is asynchronously completed by a task running in the given executor` 

此外,还会执行在Executor 'e上Executor的任务。
众所周知, Executor仅接受可运行对象。
Runnable是一个接口,第一个对象可以很好地实现它-因此我们将两个功能组合在一个对象中。


  completed ... with the value obtained by calling the given Supplier. 

Runnable应该调用给定的Supplier并使用接收到的值完成创建的CompletableFuture


Supplier是无参数函数,因此对它进行编码非常简单:


  class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable { Supplier<U> supplier; public CompletableFutureForSupplyAsync(Supplier<U> supplier) { this.supplier = supplier; } public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; } 

以下示例更加复杂:


 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage<T> Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage 

Returns a new CompletionStage that... is executed using the supplied Executor


在这里,我们直接为您安排创建的对象以Runnable的形式发布。


... with this stage's result as the argument to the supplied function.


但这已经更加有趣了。 传递给我们的函数有一个参数,该参数的值是完成当前CompletionStage的值。 在调用thenApplyAsync此值可能未知,因此我们无法立即在Executor上运行任务。 相反,我们必须与当前的CompletionStage进行协商,
以便在完成时将其价值转移给任务。 在众多的CompletionStage方法中CompletionStage有一个完全适合此目的的方法whenComplete


 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. 

也就是说,在新创建的任务对象中,足以实现BiConsumer接口以接收参数:


  class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override // implementation of BiConsumer interface public void accept(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; } else { this.arg = argument; } executor.execute(this); } @Override public void run() { if (throwable == null) { try { U result = fn.apply(arg); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } else { super.completeExceptionally(throwable); } } } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor ) { CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); this.whenComplete(task); return task; } } 

该示例对于理解异步编程的性质非常重要,因此我们再次列出其主要步骤:


1)创建一个异步过程:


  CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); 

2)它尚未准备好执行,因此我们要求缺少参数的提供者将来通过调用我们提交的方法将此参数传递给我们:


  this.whenComplete(task); 

3)在此方法中,我们不仅保存接收到的参数,还运行任务以执行(请参见accept ()方法)。


4)任务的执行减少到履行给我们的功能并保存结果。
可以通过应用到我们新构造的对象的whenComplete ()方法来使此结果与其他过程所要求的whenComplete ,以便我们可以构建whenComplete任意长度的异步过程。 但是,此链将严格按顺序完成,没有任何并行性。


但是,如何描绘一个包含并行分支的更复杂的计算图呢?
为此,请使用thenCombineAsync方法。


如果在上一个示例中,我们使用一个参数启动了异步过程,则在此参数中使用了两个参数。


在这种情况下,两个参数的计算可以并行进行。


 ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function. 

在这里,所有内容都与上一个使用thenApplyAsync示例thenApplyAsync ,但是function参数已经有两个参数,并且添加了CompletionStage<? extends U> other参数。 CompletionStage<? extends U> other ,这是第二个参数的异步提供程序。


我们如何确保第二个参数得到处理?


好吧,首先,代替两个变量T arg描述两个: T arg1; U arg2; T arg1; U arg2; ,而不是一个方法public void accept(T argument, Throwable throwable)描述了两个accept1accept2
每种方法都有其自己的论点。


同时,我们正在构造的对象不再实现BiConsumer<T,Throwable>并且我们不再可以编写用于链接异步计算图节点的关键字


  this.whenComplete(task); 

幸运的是,功能接口对象可以通过对方法的引用来表示,而不必在单独的类中结束它:


  this.whenComplete(task::accept1); other.whenComplete(task::accept2); 

也就是说,当前this对象提供第一个参数, other对象提供第二个参数。


这只是必须更改的方法的代码,这样它们就不会在参数到达时立即启动任务,而且还要检查第二个方法的接收:


  public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } } 

类似地描述accept2方法。


注意:


  • 方法变得同步(我们使用共同的数据)
  • 如果发生错误传输,则无需等待第二个参数。
  • 通过与null进行比较来检查参数是否到达并不是最好的方法,也许您需要为每个参数添加一个布尔变量。

这样,您可以从数量超过两个的参数中创建异步过程,但是想法很快就浮现了-您是否仍可以为参数创建一个单独的类,以便不编写自己的方法来接收每个参数,而是通过动态创建参数来做到这一点?


  Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2); 

是的,您可以创建这样的类,但下次再创建。


前述内容的简要概述:


  • 异步程序是由相互连接的异步过程组成的网络,
    就像多线程程序一样,它们是相互连接的执行线程(线程)的网络。

但是通信流的方式和异步过程根本不同。


流使用信号量,阻塞队列和其他类似对象进行连接,
如果信息尚未到达,则阻塞接收方流,但是该流已经在尝试使用基于拉取的操作将其提取。


异步过程-收件人仅在准备好所需的所有信息后才输入执行信息。
他们被动地等待,直到信息提供者自己通过基于推送的操作将其传递给他们。
因此,它们在等待时不会在堆栈上花费内存,因此占用的内存比执行线程少得多。


  • 建立异步过程网络的过程被简化为创建对象并将它们链接在一起,更确切地说,将其链接为它们的子对象-参数:将接收者输入参数的地址传输到信息提供者的输出参数。

这组CompletableFuture方法完全可以做到这一点,原则上,如上例所示,可以通过显式创建对象而无需使用这些方法。
但是为此,必须具有与这些示例中描述的类相似的类。
由于某些原因, java.util.concurrent的创建者选择不让用户访问这些类,而将其隐藏在CompletableFuture代码的深处。


想要直观地了解正在创建的异步网络的人,可以通过继续上面的示例来重建这些类。 示例的源代码可在Github上获得

Source: https://habr.com/ru/post/zh-CN418547/


All Articles