在CompletableFuture中改进allOf和anyOf



你好 预期该课程将开始, “ Java Developer”为您准备了有用材料的翻译。


CompletableFuture有两种方法的设计使我感到惊讶:

  • CompletableFuture#allOf
  • CompletableFuture#anyOf


在本文中,我们将看到它们的问题以及如何使它们更方便。

CompletableFuture#allOf


让我们看一下方法签名:

 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { // ... } 


这里至少有两个有争议的问题:

  1. 该方法接受几个CompletableFuture对象,这些对象返回不同类型的对象。
  2. 方法返回CompletableFuture ,后者返回Void


另外,有些可能不喜欢可变数量的参数,因此让我们也看一下这一部分。

CompletableFuture<Void>通常用作完成操作的信号;但是,通过对API进行少量更改, 此方法既可以用作信号设备,也可以用作所有已完成操作的结果的载体 。 让我们尝试去做。

异步CompletableFuture#allOf


首先,让我们提出正确的签名。

可以假设大多数情况下都需要处理同类CompletableFuture列表并返回包含结果列表的CompletableFuture

 public static <T> CompletableFuture<List<T>> allOf( Collection<CompletableFuture<T>> futures) { // ... } 


原始方法的内幕很可能比您预期的要复杂:

 static CompletableFuture<Void> andTree( CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Void> d = new CompletableFuture<Void>(); if (lo > hi) // empty d.result = NIL; else { CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.biRelay(a, b)) { BiRelay<?,?> c = new BiRelay<>(d, a, b); a.bipush(b, c); c.tryFire(SYNC); } } return d; } 


因此,我们将尝试重用原始方法中已经存在的内容,而不是从头开始创建它,就像将其用作完成信号设备一样,然后将void结果更改为将来的列表:

 CompletableFuture<List<CompletableFuture<T>>> i = futures.stream() .collect(collectingAndThen( toList(), l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0])) .thenApply(__ -> l))); 


到目前为止一切顺利。 我们设法得到
CompletableFuture<List<CompletableFuture<T> > > CompletableFuture<Void>代替CompletableFuture<Void> ,这已经很好了。 但是我们不需要结果列表,我们需要结果列表。

现在,我们可以简单地处理列表,并从中删除不需要的将来。 调用CompletableFuture#join方法是完全正常的,因为我们知道它们将永远不会被阻塞(此时,所有将来都已经完成):

 CompletableFuture<List<T>> result = intermediate .thenApply(list -> list.stream() .map(CompletableFuture::join) .collect(toList())); 


现在,让我们将所有这些结合到最终解决方案中:

 public static <T> CompletableFuture<List<T>> allOf( Collection<CompletableFuture<T>> futures) { return futures.stream() .collect(collectingAndThen( toList(), l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0])) .thenApply(__ -> l.stream() .map(CompletableFuture::join) .collect(Collectors.toList())))); } 


异步下降CompletableFuture#allOf


如果有异常,原始的CompletableFuture#allOf将等待所有剩余操作完成。

而且,如果我们想在发生异常时报告操作的完成,那么我们将不得不更改实现。

为此,请创建一个CompletableFuture的新实例,并在其中一项操作引发异常后手动终止该实例:

 CompletableFuture<List<T>> result = new CompletableFuture<>(); futures.forEach(f -> f .handle((__, ex) -> ex == null || result.completeExceptionally(ex))); 


...但是我们需要处理所有未来都将成功完成的情况。 使用改进的allOf()方法可以轻松完成此操作,然后只需手动终止将来:

 allOf(futures).thenAccept(result::complete); 


现在,我们可以将所有内容组合在一起以形成最终解决方案:

 public static <T> CompletableFuture<List<T>> allOfShortcircuiting(Collection<CompletableFuture<T>> futures) { CompletableFuture<List<T>> result = new CompletableFuture<>(); for (CompletableFuture<?> f : futures) { f.handle((__, ex) -> ex == null || result.completeExceptionally(ex)); } allOf(futures).thenAccept(result::complete); return result; } 


CompletableFuture#anyOf


让我们从方法签名开始:

 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { // ... } 


我们可以立即发现与上述方法相同的问题:

  1. 该方法接受几个CompletableFuture对象,其中包含不同类型的对象。
  2. 该方法返回一个CompletableFuture其中包含Object类型的Object


据我了解, CompletableFuture#allOf被设计为用作信号设备。 但是CompletableFuture#anyOf不遵循这种原理,返回的CompletableFuture<Object>更加令人困惑。

看下面的示例,其中我试图处理包含不同类型数据的CompletableFuture:

 CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(1); CompletableFuture<String> f2 = CompletableFuture.completedFuture("2"); Integer result = CompletableFuture.anyOf(f1, f2) .thenApply(r -> { if (r instanceof Integer) { return (Integer) r; } else if (r instanceof String) { return Integer.valueOf((String) r); } throw new IllegalStateException("unexpected object type!"); }).join(); 


很不舒服,不是吗?

幸运的是,通过更改签名并引入直接类型转换,这很容易适应更常见的情况(等待将来包含相同类型的值中的一个)。

因此,通过我们的改进,我们可以重用现有方法并安全地带来结果:

 public static <T> CompletableFuture<T> anyOf(List<CompletableFuture<T>> cfs) { return CompletableFuture.anyOf(cfs.toArray(new CompletableFuture[0])) .thenApply(o -> (T) o); } public static <T> CompletableFuture<T> anyOf(CompletableFuture<T>... cfs) { return CompletableFuture.anyOf(cfs).thenApply(o -> (T) o); } 


源代码


源代码可以在Github上找到。

仅此而已。 在课程上见。

Source: https://habr.com/ru/post/zh-CN481804/


All Articles