Hallo Habr! Ich präsentiere Ihnen die Übersetzung des Artikels " 
Java 8 Stream Tutorial ".
Dieses Tutorial, das auf Codebeispielen basiert, bietet einen umfassenden Überblick über Streams in Java 8. Als ich die Stream-API zum ersten Mal einführte, war ich über den Namen verwirrt, da er sehr gut mit InputStream und OutputStream aus dem Paket java.io übereinstimmt. Threads in Java 8 sind jedoch etwas völlig anderes. 
Threads sind 
Monaden , die eine wichtige Rolle bei der Entwicklung der funktionalen Programmierung in Java spielen.
In der funktionalen Programmierung ist eine Monade eine Struktur, die eine Berechnung in Form einer Kette aufeinanderfolgender Schritte darstellt. Der Typ und die Struktur einer Monade bestimmen eine Operationskette, in unserem Fall eine Folge von Methoden mit integrierten Funktionen eines bestimmten Typs.
In diesem Handbuch erfahren Sie, wie Sie mit Streams arbeiten und wie Sie mit den verschiedenen in der Stream-API verfügbaren Methoden umgehen. Wir werden die Reihenfolge der Operationen analysieren und sehen, wie sich die Reihenfolge der Methoden in der Kette auf die Leistung auswirkt. 
flatMap Sie 
flatMap leistungsstarken Stream-API-Methoden wie 
reduce , 
collect und 
flatMap . Am Ende des Handbuchs werden wir auf die parallele Arbeit mit Threads achten.
Wenn Sie nicht frei sind, mit Lambda-Ausdrücken, funktionalen Schnittstellen und Referenzmethoden zu arbeiten, ist es hilfreich, sich mit 
meinem Leitfaden zu Innovationen in Java 8 ( 
Übersetzung in Habré) vertraut zu machen und danach wieder mit dem Studium von Flows zu beginnen.
Wie Threads funktionieren
Ein Stream stellt eine Folge von Elementen dar und bietet verschiedene Methoden zum Durchführen von Berechnungen für diese Elemente:
 List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println);  
Strömungsmethoden sind 
Intermediate (Intermediate) und 
Terminal (Terminal). Zwischenmethoden geben einen Stream zurück, wodurch viele dieser Methoden nacheinander aufgerufen werden können. Terminalmethoden geben entweder keinen Wert (void) oder ein Ergebnis eines anderen Typs als eines Streams zurück. Im obigen Beispiel sind die 
filter , 
map und 
sorted intermediär und 
forEach sind terminal. Eine vollständige Liste der verfügbaren Flussmethoden finden Sie in der 
Dokumentation . Eine solche Kette von Stream-Operationen ist auch als Operations-Pipeline bekannt.
Die meisten Methoden der Stream-API akzeptieren als Parameter Lambda-Ausdrücke, eine funktionale Schnittstelle, die das spezifische Verhalten der Methode beschreibt. Die meisten von ihnen müssen gleichzeitig störungsfrei und staatenlos sein. Was bedeutet das?
Eine Methode ist nicht störend, wenn sie die zugrunde liegenden Daten, die dem Stream zugrunde liegen, nicht ändert. Im obigen Beispiel ändern beispielsweise keine Lambda-Ausdrücke das Listenarray myList.
Eine Methode ist zustandslos, wenn die Reihenfolge angegeben ist, in der die Operation ausgeführt wird. Beispielsweise hängt kein einzelner Lambda-Ausdruck aus dem Beispiel von veränderlichen Variablen oder externen Raumzuständen ab, die sich zur Laufzeit ändern können.
Verschiedene Arten von Fäden
Streams können aus verschiedenen Quelldaten erstellt werden, hauptsächlich aus Sammlungen. Listen und Sets unterstützen die neuen Methoden 
stream() und 
parllelStream() zum Erstellen sequentieller und paralleler Streams. Parallele Threads können im Multithread-Modus (auf mehreren Threads) arbeiten und werden am Ende des Handbuchs erläutert. Berücksichtigen Sie in der Zwischenzeit sequenzielle Threads:
 Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println);  
Hier gibt der Aufruf der 
stream() -Methode in einer Liste ein normales Stream-Objekt zurück.
Um mit einem Stream zu arbeiten, ist es jedoch überhaupt nicht erforderlich, eine Sammlung zu erstellen:
 Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println);  
Verwenden 
Stream.of() einfach 
Stream.of() , um einen Stream aus mehreren Objektreferenzen zu erstellen.
Zusätzlich zu regulären Objekt-Streams verfügt Java 8 über spezielle Stream-Typen für die Arbeit mit primitiven Typen: int, long, double. Wie Sie vielleicht erraten haben, ist dies 
IntStream , 
LongStream , 
DoubleStream .
IntStream-Streams können reguläre for (;;) - 
IntStream.range() mithilfe von 
IntStream.range() ersetzen:
 IntStream.range(1, 4) .forEach(System.out::println);  
Alle diese Streams für die Arbeit mit primitiven Typen funktionieren wie normale Streams von Objekten, mit Ausnahme der folgenden:
Manchmal ist es nützlich, einen Strom von Objekten in einen Strom von Grundelementen umzuwandeln oder umgekehrt. Zu diesem Zweck unterstützen Objektflüsse spezielle Methoden: 
mapToInt() , 
mapToLong() , 
mapToDouble() :
 Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println);  
Ströme von 
mapToObj() können durch Aufrufen von 
mapToObj() in Ströme von Objekten 
mapToObj() :
 IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println);  
Im folgenden Beispiel wird ein Strom von Gleitkommazahlen einem Strom von Ganzzahlen und dann einem Strom von Objekten zugeordnet:
 Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println);  
Ausführungsreihenfolge
Nachdem wir gelernt haben, wie man verschiedene Streams erstellt und wie man damit arbeitet, werden wir tiefer eintauchen und überlegen, wie Streaming-Vorgänge unter der Haube aussehen.
Ein wichtiges Merkmal von Zwischenmethoden ist ihre 
Faulheit . In diesem Beispiel gibt es keine Terminalmethode:
 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }); 
Wenn dieser Code ausgeführt wird, wird nichts an die Konsole ausgegeben. Und das alles, weil Zwischenmethoden nur ausgeführt werden, wenn es eine Terminalmethode gibt. Erweitern wir das Beispiel, indem wir die 
forEach Terminalmethode hinzufügen:
 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s)); 
Die Ausführung dieses Codefragments führt zur Ausgabe des folgenden Ergebnisses an die Konsole:
 filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c 
Die Reihenfolge, in der die Ergebnisse angeordnet sind, kann überraschen. Man kann naiv erwarten, dass die Methoden „horizontal“ ausgeführt werden: nacheinander für alle Elemente des Streams. Stattdessen bewegt sich das Element jedoch „vertikal“ entlang der Kette. Zuerst durchläuft die erste Zeile von „d2“ die 
filter , dann 
forEach und erst dann, nachdem das erste Element die gesamte Methodenkette durchlaufen hat, beginnt das nächste Element mit der Verarbeitung.
Aufgrund dieses Verhaltens können Sie die tatsächliche Anzahl von Vorgängen reduzieren:
 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); });  
Die 
anyMatch Methode gibt 
true zurück , sobald das Prädikat auf das eingehende Element angewendet wird. In diesem Fall ist dies das zweite Element der Sequenz - "A2". Dementsprechend wird 
map aufgrund der "vertikalen" Ausführung der Thread-Kette nur zweimal aufgerufen. Anstatt alle Elemente des Streams anzuzeigen, wird die 
map so oft wie möglich aufgerufen.
Warum Sequenz wichtig ist
Das folgende Beispiel besteht aus zwei Zwischenmethoden 
map und 
filter und einer Terminalmethode für 
forEach . Überlegen Sie, wie diese Methoden ausgeführt werden:
 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s));  
Es ist leicht zu erraten, dass sowohl die 
map als auch die 
filter zur Laufzeit fünfmal aufgerufen werden - einmal für jedes Element der Quellensammlung, während 
forEach nur einmal aufgerufen wird - für das Element, das den Filter bestanden hat.
Sie können die Anzahl der Operationen erheblich reduzieren, indem Sie die Reihenfolge der Methodenaufrufe ändern, indem Sie zuerst den 
filter platzieren:
 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));  
Jetzt wird die Karte nur noch einmal aufgerufen. Bei einer großen Anzahl von Eingabeelementen werden wir eine spürbare Steigerung der Produktivität beobachten. Beachten Sie dies beim Erstellen komplexer Methodenketten.
Wir erweitern das obige Beispiel, indem wir eine zusätzliche Sortieroperation hinzufügen - die sortierte Methode:
 Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); 
Das Sortieren ist eine spezielle Art von Zwischenoperation. Dies ist die sogenannte Stateful-Operation, da zum Sortieren einer Sammlung deren Status während der gesamten Operation berücksichtigt werden muss.
Als Ergebnis der Ausführung dieses Codes erhalten wir die folgende Ausgabe an die Konsole:
 sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2 
Zunächst wird die gesamte Sammlung sortiert. Mit anderen Worten, die 
sorted Methode wird horizontal ausgeführt. In diesem Fall wird die 
sorted für mehrere Kombinationen der Elemente in der eingehenden Sammlung achtmal aufgerufen.
Wir optimieren erneut die Ausführung dieses Codes, indem wir die Reihenfolge der Methodenaufrufe in der Kette ändern:
 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));  
In diesem Beispiel wird 
sorted nicht aufgerufen. 
filter reduziert die Eingabesammlung auf ein Element. Bei großen Eingabedaten wird die Leistung erheblich verbessert.
Streams wiederverwenden
In Java 8 können Threads nicht wiederverwendet werden. Nach dem Aufrufen einer beliebigen Terminalmethode wird der Thread beendet:
 Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true);  
Das Aufrufen von 
noneMatch nach 
anyMatch in einem Thread führt zu der folgenden Ausnahme:
 java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28) 
Um diese Einschränkung zu überwinden, sollte für jede Terminalmethode ein neuer Thread erstellt werden.
Sie können beispielsweise einen 
Lieferanten für einen neuen Thread-Konstruktor erstellen, in dem alle Zwischenmethoden installiert werden:
 Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true);  
Jeder Aufruf der 
get Methode erstellt einen neuen Thread, in dem Sie die gewünschte Terminalmethode sicher aufrufen können.
Fortgeschrittene Methoden
Threads unterstützen eine Vielzahl unterschiedlicher Methoden. Wir haben uns bereits mit den wichtigsten Methoden vertraut gemacht. Informationen zum Rest finden Sie in der 
Dokumentation . Und jetzt tauchen Sie noch tiefer in komplexere Methoden ein: 
collect , 
flatMap und 
reduce .
Die meisten Codebeispiele in diesem Abschnitt beziehen sich auf das folgende Codefragment, um die Funktionsweise zu demonstrieren:
 class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); 
Sammeln
Collect sehr nützliche Terminalmethode, mit der Stream-Elemente in ein Ergebnis eines anderen Typs konvertiert werden, z. B. List, Set oder Map.
Collect akzeptiert einen 
Collector , der vier verschiedene Methoden enthält: einen Lieferanten. Akku, Kombinierer, Finisher. Auf den ersten Blick sieht dies sehr kompliziert aus, aber Java 8 unterstützt verschiedene integrierte Kollektoren über die 
Collectors Klasse, in der die am häufigsten verwendeten Methoden implementiert sind.
Beliebter Fall:
 List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered);  
Wie Sie sehen können, ist das Erstellen einer Liste aus Stream-Elementen sehr einfach. Benötigen Sie keine Liste, aber viel? Verwenden Sie 
Collectors.toSet() .
Im folgenden Beispiel werden Personen nach Alter gruppiert:
 Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));  
Sammler sind unglaublich vielfältig. Sie können auch die Elemente der Sammlung aggregieren, z. B. das Durchschnittsalter bestimmen:
 Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge);  
Um umfassendere Statistiken zu erhalten, verwenden wir einen Zusammenfassungskollektor, der ein spezielles Objekt mit Informationen zurückgibt: Minimal-, Maximal- und Durchschnittswerte, die Summe der Werte und die Anzahl der Elemente:
 IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary);  
Im folgenden Beispiel werden alle Namen in einer Zeile zusammengefasst:
 String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase);  
Der Verbindungskollektor akzeptiert ein Trennzeichen sowie ein optionales Präfix und Suffix.
Um die Elemente eines Streams in eine Anzeige umzuwandeln, müssen Sie festlegen, wie Schlüssel und Werte angezeigt werden sollen. Denken Sie daran, dass die Schlüssel in der Zuordnung eindeutig sein müssen. Andernfalls erhalten wir eine 
IllegalStateException . Sie können optional eine Zusammenführungsfunktion hinzufügen, um die Ausnahme zu umgehen:
 Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map);  
So haben wir einige der leistungsstärksten eingebauten Sammler kennengelernt. Lassen Sie uns versuchen, Ihre eigenen zu bauen. Wir möchten alle Elemente des Streams in eine einzelne Zeile konvertieren, die aus Großbuchstaben besteht, die durch einen vertikalen Balken | getrennt sind. Erstellen Sie dazu mit 
Collector.of() einen neuen 
Collector.of() . Wir benötigen die vier Komponenten unseres Kollektors: Lieferant, Batterie, Stecker, Finisher.
 Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "),  
Da Strings in Java unveränderlich sind, benötigen wir eine 
StringJoiner wie 
StringJoiner , mit der der Collector einen String für uns erstellen kann. Im ersten Schritt erstellt der Anbieter einen 
StringJoiner mit einem zugewiesenen Trennzeichen. Mit der Batterie wird jeder Name zu 
StringJoiner .
Der Connector weiß, wie zwei 
StringJoiner zu einem verbunden werden. Und am Ende erstellt der Finisher den gewünschten String aus 
StringJoiner s.
Flatmap
Daher haben wir gelernt, wie Stream-Objekte mithilfe der 
map in andere 
map werden. 
Map ist eine Art eingeschränkte Methode, da jedes Objekt nur einem anderen Objekt zugeordnet werden kann. Was aber, wenn Sie ein Objekt vielen anderen zuordnen oder gar nicht anzeigen möchten? Hier hilft die 
flatMap Methode. 
FlatMap verwandelt jedes Stream-Objekt in einen Stream anderer Objekte. Der Inhalt dieser Threads wird dann in den zurückgegebenen Stream der 
flatMap Methode 
flatMap .
Um 
flatMap in Aktion zu betrachten, 
flatMap wir eine geeignete 
flatMap für ein Beispiel:
 class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } } 
Lassen Sie uns einige Objekte erstellen:
 List<Foo> foos = new ArrayList<>();  
Jetzt haben wir eine Liste von drei 
foo , von denen jeder drei 
Balken enthält.
FlatMap akzeptiert eine Funktion, die einen Strom von Objekten zurückgeben soll. Um auf die 
Balkenobjekte jedes 
Foo zugreifen zu können, müssen wir nur die entsprechende Funktion finden:
 foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name));  
Wir haben also erfolgreich einen Stream von drei 
foo- Objekten in einen Stream von 9 
Balkenobjekten umgewandelt .
Schließlich kann der gesamte obige Code auf eine einfache Pipeline von Operationen reduziert werden:
 IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); 
FlatMap auch in der in Java 8 eingeführten 
Optional Klasse verfügbar. 
FlatMap aus der 
Optional Klasse gibt ein optionales Objekt einer anderen Klasse zurück. Dies kann verwendet werden, um eine Reihe von 
null zu vermeiden.
Stellen Sie sich eine hierarchische Struktur wie diese vor:
 class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; } 
Um die verschachtelte Zeichenfolge 
foo von einem externen Objekt 
NullPointException , müssen Sie mehrere 
NullPointException hinzufügen, um eine 
NullPointException zu vermeiden:
 Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); } 
Dasselbe kann mit der flatMap der optionalen Klasse erreicht werden:
 Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println); 
Jeder Aufruf von 
flatMap gibt einen 
Optional Wrapper für das gewünschte Objekt zurück, falls vorhanden, oder für 
null wenn das Objekt fehlt.
Reduzieren
Die Vereinfachungsoperation kombiniert alle Elemente eines Streams zu einem einzigen Ergebnis. Java 8 unterstützt drei verschiedene Arten von Reduktionsmethoden.
Der erste reduziert den Fluss von Elementen auf ein einzelnes Flusselement. Wir verwenden diese Methode, um das Element mit dem größten Alter zu bestimmen:
 persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println);  
Die 
reduce übernimmt eine Akkumulationsfunktion mit einem 
Binäroperator (BinaryOperator). Hier ist 
reduce eine 
Bi-Funktion (BiFunction), bei der beide Argumente zum selben Typ gehören. In unserem Fall zum Typ 
Person . Eine Bi-Funktion ist fast die gleiche wie eine 
, benötigt jedoch zwei Argumente. In unserem Beispiel vergleicht die Funktion das Alter von zwei Personen und gibt ein Element mit einem höheren Alter zurück.
Die nächste Form der 
reduce verwendet sowohl einen Anfangswert als auch eine Batterie mit einem binären Operator. Mit dieser Methode können Sie ein neues Element erstellen. Wir haben - 
Person mit einem Namen und Alter, bestehend aus der Addition aller Namen und der Summe der gelebten Jahre:
 Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age);  
Die dritte 
reduce verwendet drei Parameter: den Anfangswert, den Akkumulator mit einer Bi-Funktion und eine Kombinationsfunktion wie ein binärer Operator. Da der Anfangswert des Typs nicht auf den Typ Person beschränkt ist, können Sie mithilfe der Reduzierung die Gesamtzahl der gelebten Jahre jeder Person bestimmen:
 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum);  
Wie Sie sehen können, haben wir das Ergebnis 76 erhalten, aber was passiert wirklich unter der Haube?
Wir erweitern das obige Codefragment mit der Ausgabe des Textes für das Debug:
 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; });  
Wie Sie sehen können, führt die Akkumulationsfunktion die gesamte Arbeit aus. Es wird zuerst mit einem Anfangswert von 0 und der ersten Person max aufgerufen. In den nächsten drei Schritten erhöht sich die Summe vom letzten Schritt bis zum Erreichen eines Gesamtalters von 76 Jahren ständig um das Alter der Person.
Und was dann? Wird der Kombinierer nie gerufen? Betrachten Sie die parallele Ausführung dieses Threads:
 Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; });  
Bei paralleler Ausführung erhalten wir eine völlig andere Konsolenausgabe. Jetzt wird der Kombinierer wirklich gerufen.
Da die Batterie parallel aufgerufen wurde, musste der Kombinierer die separat gespeicherten Werte zusammenfassen.Im nächsten Kapitel werden wir die parallele Ausführung von Threads genauer untersuchen.Parallele Fäden
Threads können parallel ausgeführt werden, um die Leistung bei der Verarbeitung einer großen Anzahl eingehender Elemente zu verbessern. Parallele Threads verwenden das übliche, ForkJoinPooldas durch einen Aufruf der statischen Methode verfügbar ist ForkJoinPool.commonPool(). Die Größe des Haupt-Thread-Pools kann 5 Ausführungsthreads erreichen - die genaue Anzahl hängt von der Anzahl der verfügbaren physischen Prozessorkerne ab. ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism());  
Auf meinem Computer wird standardmäßig ein regulärer Thread-Pool mit Parallelisierung in 3 Threads initialisiert. Dieser Wert kann durch Einstellen des folgenden JVM-Parameters erhöht oder verringert werden: -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 
Sammlungen unterstützen eine Methode parallelStream()zum Erstellen paralleler Datenströme. Sie können auch eine Zwischenmethode aufrufen parallel(), um einen seriellen Stream in einen parallelen zu verwandeln.Um das Verhalten eines Threads bei der parallelen Ausführung zu verstehen, werden im folgenden Beispiel Informationen zu jedem aktuellen Thread (Thread) gedruckt an System.out: Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 
Betrachten Sie die Schlussfolgerungen mit den Debug-Einträgen, um besser zu verstehen, welcher Thread zum Ausführen bestimmter Stream-Methoden verwendet wird: filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2] 
Wie Sie sehen können, werden bei der parallelen Ausführung des Datenstroms alle verfügbaren Threads des aktuellen verwendet ForkJoinPool. Die Ausgabesequenz kann unterschiedlich sein, da die Ausführungssequenz für jeden bestimmten Thread nicht definiert ist.Erweitern wir das Beispiel durch Hinzufügen einer Methode sort: Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 
Auf den ersten Blick mag das Ergebnis seltsam erscheinen: filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1] 
Es scheint , als ob sortkonsequent und nur in dem Fluss des geführte Hauptes . Wenn der Stream parallel unter der Haube der Methode sortaus der Stream-API ausgeführt wird Arrays, ist die in Java 8 hinzugefügte Klassensortierungsmethode ausgeblendet Arrays.parallelSort(). Wie in der Dokumentation angegeben, bestimmt diese Methode basierend auf der Länge der eingehenden Sammlung, wie sie parallel oder nacheinander sortiert wird:Wenn die Länge eines bestimmten Arrays kleiner als das minimale „Korn“ ist, wird die Sortierung durch Ausführen der Methode Arrays.sort durchgeführt.
Kehren wir zum Beispiel mit der Methode reduceaus dem vorherigen Kapitel zurück. Wir haben bereits herausgefunden, dass die Vereinheitlichungsfunktion nur aufgerufen wird, wenn mit dem Thread parallel gearbeitet wird. Überlegen Sie, welche Threads betroffen sind: List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; }); 
Die Konsolenausgabe zeigt, dass beide Funktionen: Akkumulieren und Kombinieren parallel ausgeführt werden, wobei alle möglichen Abläufe verwendet werden: accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2] 
Es kann argumentiert werden, dass die parallele Ausführung des Flusses zu einer signifikanten Steigerung der Effizienz beim Arbeiten mit großen Mengen eingehender Elemente beiträgt. Es ist jedoch zu beachten, dass einige Methoden bei der parallelen Ausführung zusätzliche Berechnungen (Kombinationsoperationen) erfordern, die bei der sequentiellen Ausführung nicht erforderlich sind.Darüber hinaus wird für die parallele Ausführung des Threads derselbe ForkJoinPoolverwendet, der in der JVM so häufig verwendet wird. Die Verwendung langsamer Blockierungsmethoden des Flusses kann sich daher negativ auf die Leistung des gesamten Programms auswirken, da Threads blockiert werden, die für die Verarbeitung in anderen Aufgaben verwendet werden.Das ist alles
Mein Tutorial zur Verwendung von Threads in Java 8 ist beendet. Weitere Informationen zum Arbeiten mit Streams finden Sie in der Dokumentation . Wenn Sie tiefer gehen und mehr über die Mechanismen erfahren möchten, die Threads zugrunde liegen, könnten Sie einen Artikel von Martin Fowler Collection Pipelines lesen .Wenn Sie sich auch für JavaScript interessieren, sollten Sie sich Stream.js ansehen - die JavaScript-Implementierung der Java 8 Streams-API. Vielleicht möchten Sie auch meine Artikel über das Java 8-Tutorial ( russische Übersetzung auf Habré) und das Java 8 Nashorn-Tutorial lesen .Ich hoffe, dieser Leitfaden war nützlich und interessant für Sie und Sie haben den Lesevorgang genossen. Der vollständige Code wird auf GitHub gespeichert . Fühlen Sie sich frei, einen Zweig im Repository zu erstellen.