本杰明·温特贝格Stream API指南翻译

哈Ha! 我向您提供文章“ Java 8 Stream Tutorial ”的翻译。

本教程基于代码示例,对Java 8中的流进行了全面的概述。当我首次引入Stream API时,我对这个名称感到困惑,因为它与java.io包中的InputStream和OutputStream非常一致。 但是,Java 8中的线程完全不同。 线程是monad ,它们在Java函数编程的开发中起着重要作用。
在函数式编程中,monad是一种结构,以一系列连续步骤的形式表示计算。 monad的类型和结构决定了操作链,在我们的例子中,是具有给定类型的内置函数的方法序列。
本教程将教您如何使用流,并展示如何处理Stream API中可用的各种方法。 我们将分析操作顺序,并查看链中方法的顺序如何影响性能。 flatMap强大的Stream API方法,例如reducecollectflatMap 。 在本手册的最后,我们将注意流的并行工作。

如果您不愿意随意使用lambda表达式,函数接口和引用方法,那么熟悉我的Java 8创新指南 (Habré的翻译 )对您很有用,然后再回到研究流程中。

线程如何工作


流代表一系列元素,并提供了用于对这些元素执行计算的各种方法:

 List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); // C1 // C2 

流的方法是中间 (中间)和终端 (终端)。 中间方法返回一个流,该流允许顺序调用这些方法中的许多方法。 终端方法要么不返回值(无效),要么不返回流以外的类型的结果。 在上面的示例中, filtermapsorted是中间sortedforEach是终端方法。 有关可用流程方法的完整列表,请参见文档 。 这样的流操作链也称为操作流水线。

Stream API中的大多数方法都将lambda表达式作为参数接受,lambda表达式是一个描述该方法特定行为的功能接口。 它们中的大多数必须同时是无干扰的和无状态的。 这是什么意思?

如果方法不修改流基础的基础数据,则该方法是无干扰的。 例如,在上面的示例中,没有lambda表达式修改列表数组myList。

如果指定了执行操作的顺序,则该方法是无状态的。 例如,示例中没有一个lambda表达式依赖于可变变量或可能在运行时发生变化的外部空间状态。

不同种类的线


可以从各种源数据(主要是集合)创建流。 列表和集合支持新的stream()parllelStream()方法,用于创建顺序流和并行流。 并行线程能够在多线程模式下工作(在多个线程上),将在本手册的最后进行讨论。 同时,请考虑顺序线程:

 Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println); // a1 

在这里,在列表上调用stream()方法将返回一个普通的流对象。
但是,要使用流,根本不需要创建集合:

 Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println); // a1 

只需使用Stream.of()从多个对象引用创建流。

除了常规对象流之外,Java 8还具有特殊类型的流,用于处理基本类型:int,long,double。 您可能会猜到,这是IntStreamLongStreamDoubleStream

IntStream流可以使用IntStream.range()替换常规的(;;) IntStream.range()

 IntStream.range(1, 4) .forEach(System.out::println); // 1 // 2 // 3 

所有与原始类型一起使用的流都与常规的对象流一样工作,除了以下各项:

  • 基本流使用特殊的lambda表达式。 例如,使用IntFunction代替Function或IntPredicate代替Predicate。
  • 基本流支持其他终端方法: sum()average()

     Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0 


有时将对象流转换为图元流,反之亦然。 为此,对象流支持特殊方法: mapToInt()mapToLong()mapToDouble()

 Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); // 3 

可以通过调用mapToObj()语流转换为对象流:

 IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 

在以下示例中,将浮点数流映射到整数流,然后再映射到对象流:

 Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 

执行顺序


现在,我们已经学习了如何创建各种流以及如何使用它们,我们将更深入地研究流式操作的外观。

中间方法的一个重要特征是它们的惰性 。 在此示例中,没有终端方法:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }); 

当执行这段代码时,什么都不会输出到控制台。 所有这些都是因为只有在有终端方法的情况下才执行中间方法。 让我们通过添加forEach终端方法来扩展示例:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s)); 

该代码段的执行将导致以下结果输出到控制台:

 filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c 

结果的排列顺序可能会令人惊讶。 可以天真地希望这些方法将“水平”执行:对于流的所有元素,一个接一个地执行。 但是,相反,元素“垂直”沿着链移动。 首先,“ d2”的第一行通过filter方法,然后通过forEach然后只有在第一个元素通过整个方法链之后,才开始处理下一个元素。

鉴于此行为,您可以减少实际的操作数:

 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); }); // map: d2 // anyMatch: D2 // map: a2 // anyMatch: A2 

一旦将谓词应用于传入元素, anyMatch方法将返回true 。 在这种情况下,这是序列的第二个元素-“ A2”。 因此,由于线程链的“垂直”执行, map将仅被调用两次。 因此,将不显示流的所有元素,而是尽可能少地调用map

为什么顺序很重要


下面的示例包含两个中间方法mapfilter和一个terminalEach方法。 考虑如何执行这些方法:

 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); // map: d2 // filter: D2 // map: a2 // filter: A2 // forEach: A2 // map: b1 // filter: B1 // map: b3 // filter: B3 // map: c // filter: C 

不难猜测, mapfilter方法在运行时都被调用了5次-对于源集合的每个元素一次,而forEach仅被调用一次-对于通过筛选器的元素。

通过将filter放在首位,可以通过更改方法调用的顺序来显着减少操作数:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // map: a2 // forEach: A2 // filter: b1 // filter: b3 // filter: c 

现在地图仅被调用一次。 使用大量输入元素,我们将观察到生产率的显着提高。 组成复杂的方法链时,请记住这一点。

我们通过添加其他排序操作-sorted方法来扩展上述示例:

 Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); 

排序是一种特殊的中间操作。 这就是所谓的有状态操作,因为对集合进行排序时,在整个操作过程中都必须考虑其状态。

作为执行此代码的结果,我们将以下输出输出到控制台:

 sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2 

首先,对整个集合进行排序。 换句话说, sorted方法是水平运行的。 在这种情况下,对于传入集合中元素的几种组合, sorted被调用8次。

再次,我们通过更改链中方法调用的顺序来优化此代码的执行:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // filter: b1 // filter: b3 // filter: c // map: a2 // forEach: A2 

在此示例中,根本不调用sortedfilter将输入集合减少为一个元素。 对于大量输入数据,性能将大为受益。

重用流


在Java 8中,无法重用线程。 调用任何终端方法后,线程终止:

 Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception 

在一个线程中的noneMatch之后调用noneMatch导致以下异常:

 java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28) 

为了克服此限制,应为每个终端方法创建一个新线程。

例如,您可以为新的线程构造函数创建一个供应商 ,其中将安装所有中间方法:

 Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok 

每次调用get方法都会创建一个新线程,您可以在其中安全地调用所需的终端方法。

进阶方法


线程支持大量不同的方法。 我们已经熟悉了最重要的方法。 要使自己熟悉其他内容,请参阅文档 。 现在,我们将更深入地研究更复杂的方法: collectflatMapreduce

本节中的大多数代码示例都参考以下代码片段来演示操作:

 class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); 

收集


Collect非常有用的终端方法,用于将流元素转换为其他类型的结果,例如列表,集合或映射。

Collect接受包含四个不同方法的Collector :供应商。 累加器,组合器,装订器。 乍一看,这看起来很复杂,但是Java 8通过Collectors类支持各种内置的收集Collectors ,在其中实现了最常用的方法。

热门案例:

 List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered); // [Peter, Pamela] 

如您所见,从流项目创建列表非常简单。 不需要列表,但很多吗? 使用Collectors.toSet()

在以下示例中,按年龄分组人员:

 Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p)); // age 18: [Max] // age 23: [Peter, Pamela] // age 12: [David] 

收藏家的多样性异常丰富。 您还可以汇总集合的元素,例如,确定平均年龄:

 Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge); // 19.0 

为了获得更全面的统计信息,我们使用汇总收集器,该收集器返回一个特殊对象,该对象具有以下信息:最小值,最大值和平均值,值的总和和元素数:

 IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary); // IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23} 

以下示例将所有名称合并在一行中:

 String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase); // In Germany Max and Peter and Pamela are of legal age. 

连接收集器接受分隔符以及可选的前缀和后缀。

要将流的元素转换为显示,必须确定键和值的显示方式。 请记住,映射中的键必须唯一。 否则,我们将收到一个IllegalStateException 。 您可以选择添加合并功能来绕过异常:

 Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map); // {18=Max, 23=Peter;Pamela, 12=David} 

因此,我们熟悉了一些最强大的内置收集器。 让我们尝试建立自己的。 我们希望将流的所有元素转换为单行,该行由用竖线|分隔的大写名称组成。 为此,请使用Collector.of()创建一个新的收集Collector.of() 。 我们需要收集器的四个组件:供应商,电池,连接器,整理器。

 Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisher String names = persons .stream() .collect(personNameCollector); System.out.println(names); // MAX | PETER | PAMELA | DAVID 

由于Java中的字符串是不可变的,因此我们需要一个类似于StringJoiner的帮助程序类, StringJoiner允许收集器为我们构建字符串。 第一步,提供程序构造一个带有分配的分隔符的StringJoiner 。 Battery用于将每个名称添加到StringJoiner

连接器知道如何将两个StringJoiner连接到一个。 最后,整理器从StringJoiner构造所需的字符串。

平面图


因此,我们学习了如何使用map方法将流对象转换为其他类型的对象。 Map是一种受限制的方法,因为每个对象只能映射到另一个对象。 但是,如果您要将一个对象映射到许多其他对象,或者根本不显示它,该怎么办? 这是flatMap方法提供帮助的地方。 FlatMap将每个流对象转换为其他对象流。 然后,将这些线程的内容打包到flatMap方法的返回流中。

为了查看实际使用的flatMap ,让我们为示例构建一个合适的类型层次结构:

 class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } } 

让我们创建一些对象:

 List<Foo> foos = new ArrayList<>(); // create foos IntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i))); // create bars foos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name)))); 

现在我们有了三个foo的列表,每个列表包含三个bar

FlatMap接受应返回对象流的函数。 因此,为了访问每个foobar对象,我们只需要找到适当的函数:

 foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); // Bar1 <- Foo1 // Bar2 <- Foo1 // Bar3 <- Foo1 // Bar1 <- Foo2 // Bar2 <- Foo2 // Bar3 <- Foo2 // Bar1 <- Foo3 // Bar2 <- Foo3 // Bar3 <- Foo3 

因此,我们已经成功地将三个foo对象的流变成了9个bar对象的流。

最后,上述所有代码都可以简化为简单的操作流程:

 IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); 

Java 8中引入的Optional类中也提供FlatMap类中的FlatMap返回另一个类的可选对象。 这可以用来避免一堆null检查。

想象这样一个层次结构:

 class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; } 

要从外部对象获取嵌套的字符串foo ,您需要添加多个null检查以避免NullPointException

 Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); } 

使用Optional类的flatMap可以实现相同的目的:

 Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println); 

每次对flatMap调用都会为所需对象(如果存在)返回一个Optional包装器;如果缺少该对象,则返回null

减少


简化操作将流的所有元素组合为单个结果。 Java 8支持三种不同类型的reduce方法。

第一种将元件的流量减少到单个流量元件。 我们使用这种方法来确定年龄最大的元素:

 persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println); // Pamela 

reduce方法采用带二进制运算符 (BinaryOperator)的累加函数。 这里的reduce是一个双功能 (BiFunction),其中两个参数都属于同一类型。 在我们的例子中,键入Person 。 双函数与函数几乎相同,但是它带有2个参数。 在我们的示例中,该函数比较两个人的年龄并返回年龄更大的元素。

reduce方法的下一种形式同时使用初始值和带有二进制运算符的电池。 此方法可用于创建新项目。 我们拥有-具有姓名和年龄的人,包括所有姓名的加上和居住年限的总和:

 Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age); // name=MaxPeterPamelaDavid; age=76 

第三种reduce方法采用三个参数:初始值,具有双功能的累加器和组合功能(例如二进制运算符)。 由于类型的初始值不限于类型Person,因此您可以使用减少来确定每个人的总生存年限:

 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum); // 76 

如您所见,我们得到了结果76,但实际上发生了什么?

我们使用调试文本的输出扩展以上代码片段:

 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Max // accumulator: sum=18; person=Peter // accumulator: sum=41; person=Pamela // accumulator: sum=64; person=David 

如您所见,累加功能执行所有工作。 首先使用初始值0和第一人称Max进行调用。 在接下来的三个步骤中,从最后一个步骤直到他的总年龄达到76岁,总和会随着年龄的增长而不断增加。

那接下来呢? 从未调用过合并器吗? 考虑此线程的并行执行:

 Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Pamela // accumulator: sum=0; person=David // accumulator: sum=0; person=Max // accumulator: sum=0; person=Peter // combiner: sum1=18; sum2=23 // combiner: sum1=23; sum2=12 // combiner: sum1=41; sum2=35 

通过并行执行,我们获得了完全不同的控制台输出。 现在,组合器实际上已被调用。由于电池是并联的,因此组合器必须汇总单独存储的值。

在下一章中,我们将更详细地研究线程的并行执行。

平行螺纹


处理大量传入元素时,线程可以并行运行以提高性能。并行线程ForkJoinPool通过调用static方法使用通常可用的方法ForkJoinPool.commonPool()主线程池的大小可以达到5个执行线程-确切的数量取决于可用物理处理器内核的数量。

 ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3 

在我的计算机上,默认情况下将常规线程池初始化为并行化为3个线程。可以通过设置以下JVM参数来增加或减少该值:

 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 

集合支持parallelStream()创建并行数据流的方法。您也可以调用中间方法,parallel()以将串行流转换为并行流。

为了了解并行执行中线程的行为,以下示例将有关每个当前线程(线程)的信息打印到System.out

 Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 

考虑调试条目的结论,以更好地了解哪个线程用于执行特定的流方法:

 filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2] 

如您所见,在并行执行数据流时,将使用当前线程的所有可用线程ForkJoinPool输出顺序可能会有所不同,因为未定义每个特定线程的执行顺序。

让我们通过添加方法来扩展示例sort

 Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 

乍一看,结果可能看起来很奇怪:

 filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1] 

它似乎是sort顺序执行的,并且仅在线程中执行实际上,当在sortStream API 的方法之下并行执行流时Arrays,将隐藏Java 8中添加的类排序方法Arrays.parallelSort()如文档中所述,此方法根据传入集合的长度确定如何并行或顺序地对其进行排序:
如果特定数组的长度小于最小“粒度”,则通过执行Arrays.sort方法执行排序。
让我们使用reduce上一章中的方法返回示例我们已经发现只有在并行处理线程时才调用统一函数。考虑涉及哪些线程:

 List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; }); 

控制台输出显示,使用所有可能的流程并行执行两个功能:累加和合并:

 accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2] 

可以说,在处理大量传入元素时,并行执行流程有助于显着提高效率。但是,应记住,并行执行的某些方法需要额外的计算(组合操作),而顺序执行则不需要。

另外,对于线程的并行执行,ForkJoinPool使用了在JVM中广泛使用的同一线程因此,由于阻塞了用于其他任务中处理的线程,因此使用慢速流阻塞方法可能会对整个程序的性能产生负面影响。

就这样


我有关在Java 8中使用线程的教程已结束。有关使用流的更详细的研究,请参考文档。如果您想更深入地了解线程基础的机制,您可能会对阅读Martin Fowler的Collection Pipelines文章感兴趣

如果您也对JavaScript感兴趣,则可能需要看一下Stream.js -Java 8 Streams API的JavaScript实现。您可能还想阅读有关Java 8 Tutorial俄语翻译为Habré)和Java 8 Nashorn Tutorial的文章

, , . GitHub . , .

Source: https://habr.com/ru/post/zh-CN437038/


All Articles