Benjamin Winterberg Stream API Guide Übersetzung

Hallo Habr! Ich präsentiere Ihnen die Übersetzung des Artikels " Java 8 Stream Tutorial ".

Dieses Tutorial, das auf Codebeispielen basiert, bietet einen umfassenden Überblick über Streams in Java 8. Als ich die Stream-API zum ersten Mal einführte, war ich über den Namen verwirrt, da er sehr gut mit InputStream und OutputStream aus dem Paket java.io übereinstimmt. Threads in Java 8 sind jedoch etwas völlig anderes. Threads sind Monaden , die eine wichtige Rolle bei der Entwicklung der funktionalen Programmierung in Java spielen.
In der funktionalen Programmierung ist eine Monade eine Struktur, die eine Berechnung in Form einer Kette aufeinanderfolgender Schritte darstellt. Der Typ und die Struktur einer Monade bestimmen eine Operationskette, in unserem Fall eine Folge von Methoden mit integrierten Funktionen eines bestimmten Typs.
In diesem Handbuch erfahren Sie, wie Sie mit Streams arbeiten und wie Sie mit den verschiedenen in der Stream-API verfügbaren Methoden umgehen. Wir werden die Reihenfolge der Operationen analysieren und sehen, wie sich die Reihenfolge der Methoden in der Kette auf die Leistung auswirkt. flatMap Sie flatMap leistungsstarken Stream-API-Methoden wie reduce , collect und flatMap . Am Ende des Handbuchs werden wir auf die parallele Arbeit mit Threads achten.

Wenn Sie nicht frei sind, mit Lambda-Ausdrücken, funktionalen Schnittstellen und Referenzmethoden zu arbeiten, ist es hilfreich, sich mit meinem Leitfaden zu Innovationen in Java 8 ( Übersetzung in Habré) vertraut zu machen und danach wieder mit dem Studium von Flows zu beginnen.

Wie Threads funktionieren


Ein Stream stellt eine Folge von Elementen dar und bietet verschiedene Methoden zum Durchführen von Berechnungen für diese Elemente:

 List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); // C1 // C2 

Strömungsmethoden sind Intermediate (Intermediate) und Terminal (Terminal). Zwischenmethoden geben einen Stream zurück, wodurch viele dieser Methoden nacheinander aufgerufen werden können. Terminalmethoden geben entweder keinen Wert (void) oder ein Ergebnis eines anderen Typs als eines Streams zurück. Im obigen Beispiel sind die filter , map und sorted intermediär und forEach sind terminal. Eine vollständige Liste der verfügbaren Flussmethoden finden Sie in der Dokumentation . Eine solche Kette von Stream-Operationen ist auch als Operations-Pipeline bekannt.

Die meisten Methoden der Stream-API akzeptieren als Parameter Lambda-Ausdrücke, eine funktionale Schnittstelle, die das spezifische Verhalten der Methode beschreibt. Die meisten von ihnen müssen gleichzeitig störungsfrei und staatenlos sein. Was bedeutet das?

Eine Methode ist nicht störend, wenn sie die zugrunde liegenden Daten, die dem Stream zugrunde liegen, nicht ändert. Im obigen Beispiel ändern beispielsweise keine Lambda-Ausdrücke das Listenarray myList.

Eine Methode ist zustandslos, wenn die Reihenfolge angegeben ist, in der die Operation ausgeführt wird. Beispielsweise hängt kein einzelner Lambda-Ausdruck aus dem Beispiel von veränderlichen Variablen oder externen Raumzuständen ab, die sich zur Laufzeit ändern können.

Verschiedene Arten von Fäden


Streams können aus verschiedenen Quelldaten erstellt werden, hauptsächlich aus Sammlungen. Listen und Sets unterstützen die neuen Methoden stream() und parllelStream() zum Erstellen sequentieller und paralleler Streams. Parallele Threads können im Multithread-Modus (auf mehreren Threads) arbeiten und werden am Ende des Handbuchs erläutert. Berücksichtigen Sie in der Zwischenzeit sequenzielle Threads:

 Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println); // a1 

Hier gibt der Aufruf der stream() -Methode in einer Liste ein normales Stream-Objekt zurück.
Um mit einem Stream zu arbeiten, ist es jedoch überhaupt nicht erforderlich, eine Sammlung zu erstellen:

 Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println); // a1 

Verwenden Stream.of() einfach Stream.of() , um einen Stream aus mehreren Objektreferenzen zu erstellen.

Zusätzlich zu regulären Objekt-Streams verfügt Java 8 über spezielle Stream-Typen für die Arbeit mit primitiven Typen: int, long, double. Wie Sie vielleicht erraten haben, ist dies IntStream , LongStream , DoubleStream .

IntStream-Streams können reguläre for (;;) - IntStream.range() mithilfe von IntStream.range() ersetzen:

 IntStream.range(1, 4) .forEach(System.out::println); // 1 // 2 // 3 

Alle diese Streams für die Arbeit mit primitiven Typen funktionieren wie normale Streams von Objekten, mit Ausnahme der folgenden:

  • Primitive Streams verwenden spezielle Lambda-Ausdrücke. Zum Beispiel IntFunction anstelle von Function oder IntPredicate anstelle von Predicate.
  • Primitive Streams unterstützen zusätzliche Terminalmethoden: sum() und average()

     Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0 


Manchmal ist es nützlich, einen Strom von Objekten in einen Strom von Grundelementen umzuwandeln oder umgekehrt. Zu diesem Zweck unterstützen Objektflüsse spezielle Methoden: mapToInt() , mapToLong() , mapToDouble() :

 Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); // 3 

Ströme von mapToObj() können durch Aufrufen von mapToObj() in Ströme von Objekten mapToObj() :

 IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 

Im folgenden Beispiel wird ein Strom von Gleitkommazahlen einem Strom von Ganzzahlen und dann einem Strom von Objekten zugeordnet:

 Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 

Ausführungsreihenfolge


Nachdem wir gelernt haben, wie man verschiedene Streams erstellt und wie man damit arbeitet, werden wir tiefer eintauchen und überlegen, wie Streaming-Vorgänge unter der Haube aussehen.

Ein wichtiges Merkmal von Zwischenmethoden ist ihre Faulheit . In diesem Beispiel gibt es keine Terminalmethode:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }); 

Wenn dieser Code ausgeführt wird, wird nichts an die Konsole ausgegeben. Und das alles, weil Zwischenmethoden nur ausgeführt werden, wenn es eine Terminalmethode gibt. Erweitern wir das Beispiel, indem wir die forEach Terminalmethode hinzufügen:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s)); 

Die Ausführung dieses Codefragments führt zur Ausgabe des folgenden Ergebnisses an die Konsole:

 filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c 

Die Reihenfolge, in der die Ergebnisse angeordnet sind, kann überraschen. Man kann naiv erwarten, dass die Methoden „horizontal“ ausgeführt werden: nacheinander für alle Elemente des Streams. Stattdessen bewegt sich das Element jedoch „vertikal“ entlang der Kette. Zuerst durchläuft die erste Zeile von „d2“ die filter , dann forEach und erst dann, nachdem das erste Element die gesamte Methodenkette durchlaufen hat, beginnt das nächste Element mit der Verarbeitung.

Aufgrund dieses Verhaltens können Sie die tatsächliche Anzahl von Vorgängen reduzieren:

 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); }); // map: d2 // anyMatch: D2 // map: a2 // anyMatch: A2 

Die anyMatch Methode gibt true zurück , sobald das Prädikat auf das eingehende Element angewendet wird. In diesem Fall ist dies das zweite Element der Sequenz - "A2". Dementsprechend wird map aufgrund der "vertikalen" Ausführung der Thread-Kette nur zweimal aufgerufen. Anstatt alle Elemente des Streams anzuzeigen, wird die map so oft wie möglich aufgerufen.

Warum Sequenz wichtig ist


Das folgende Beispiel besteht aus zwei Zwischenmethoden map und filter und einer Terminalmethode für forEach . Überlegen Sie, wie diese Methoden ausgeführt werden:

 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); // map: d2 // filter: D2 // map: a2 // filter: A2 // forEach: A2 // map: b1 // filter: B1 // map: b3 // filter: B3 // map: c // filter: C 

Es ist leicht zu erraten, dass sowohl die map als auch die filter zur Laufzeit fünfmal aufgerufen werden - einmal für jedes Element der Quellensammlung, während forEach nur einmal aufgerufen wird - für das Element, das den Filter bestanden hat.

Sie können die Anzahl der Operationen erheblich reduzieren, indem Sie die Reihenfolge der Methodenaufrufe ändern, indem Sie zuerst den filter platzieren:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // map: a2 // forEach: A2 // filter: b1 // filter: b3 // filter: c 

Jetzt wird die Karte nur noch einmal aufgerufen. Bei einer großen Anzahl von Eingabeelementen werden wir eine spürbare Steigerung der Produktivität beobachten. Beachten Sie dies beim Erstellen komplexer Methodenketten.

Wir erweitern das obige Beispiel, indem wir eine zusätzliche Sortieroperation hinzufügen - die sortierte Methode:

 Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); 

Das Sortieren ist eine spezielle Art von Zwischenoperation. Dies ist die sogenannte Stateful-Operation, da zum Sortieren einer Sammlung deren Status während der gesamten Operation berücksichtigt werden muss.

Als Ergebnis der Ausführung dieses Codes erhalten wir die folgende Ausgabe an die Konsole:

 sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2 

Zunächst wird die gesamte Sammlung sortiert. Mit anderen Worten, die sorted Methode wird horizontal ausgeführt. In diesem Fall wird die sorted für mehrere Kombinationen der Elemente in der eingehenden Sammlung achtmal aufgerufen.

Wir optimieren erneut die Ausführung dieses Codes, indem wir die Reihenfolge der Methodenaufrufe in der Kette ändern:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // filter: b1 // filter: b3 // filter: c // map: a2 // forEach: A2 

In diesem Beispiel wird sorted nicht aufgerufen. filter reduziert die Eingabesammlung auf ein Element. Bei großen Eingabedaten wird die Leistung erheblich verbessert.

Streams wiederverwenden


In Java 8 können Threads nicht wiederverwendet werden. Nach dem Aufrufen einer beliebigen Terminalmethode wird der Thread beendet:

 Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception 

Das Aufrufen von noneMatch nach anyMatch in einem Thread führt zu der folgenden Ausnahme:

 java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28) 

Um diese Einschränkung zu überwinden, sollte für jede Terminalmethode ein neuer Thread erstellt werden.

Sie können beispielsweise einen Lieferanten für einen neuen Thread-Konstruktor erstellen, in dem alle Zwischenmethoden installiert werden:

 Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok 

Jeder Aufruf der get Methode erstellt einen neuen Thread, in dem Sie die gewünschte Terminalmethode sicher aufrufen können.

Fortgeschrittene Methoden


Threads unterstützen eine Vielzahl unterschiedlicher Methoden. Wir haben uns bereits mit den wichtigsten Methoden vertraut gemacht. Informationen zum Rest finden Sie in der Dokumentation . Und jetzt tauchen Sie noch tiefer in komplexere Methoden ein: collect , flatMap und reduce .

Die meisten Codebeispiele in diesem Abschnitt beziehen sich auf das folgende Codefragment, um die Funktionsweise zu demonstrieren:

 class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); 

Sammeln


Collect sehr nützliche Terminalmethode, mit der Stream-Elemente in ein Ergebnis eines anderen Typs konvertiert werden, z. B. List, Set oder Map.

Collect akzeptiert einen Collector , der vier verschiedene Methoden enthält: einen Lieferanten. Akku, Kombinierer, Finisher. Auf den ersten Blick sieht dies sehr kompliziert aus, aber Java 8 unterstützt verschiedene integrierte Kollektoren über die Collectors Klasse, in der die am häufigsten verwendeten Methoden implementiert sind.

Beliebter Fall:

 List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered); // [Peter, Pamela] 

Wie Sie sehen können, ist das Erstellen einer Liste aus Stream-Elementen sehr einfach. Benötigen Sie keine Liste, aber viel? Verwenden Sie Collectors.toSet() .

Im folgenden Beispiel werden Personen nach Alter gruppiert:

 Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p)); // age 18: [Max] // age 23: [Peter, Pamela] // age 12: [David] 

Sammler sind unglaublich vielfältig. Sie können auch die Elemente der Sammlung aggregieren, z. B. das Durchschnittsalter bestimmen:

 Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge); // 19.0 

Um umfassendere Statistiken zu erhalten, verwenden wir einen Zusammenfassungskollektor, der ein spezielles Objekt mit Informationen zurückgibt: Minimal-, Maximal- und Durchschnittswerte, die Summe der Werte und die Anzahl der Elemente:

 IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary); // IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23} 

Im folgenden Beispiel werden alle Namen in einer Zeile zusammengefasst:

 String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase); // In Germany Max and Peter and Pamela are of legal age. 

Der Verbindungskollektor akzeptiert ein Trennzeichen sowie ein optionales Präfix und Suffix.

Um die Elemente eines Streams in eine Anzeige umzuwandeln, müssen Sie festlegen, wie Schlüssel und Werte angezeigt werden sollen. Denken Sie daran, dass die Schlüssel in der Zuordnung eindeutig sein müssen. Andernfalls erhalten wir eine IllegalStateException . Sie können optional eine Zusammenführungsfunktion hinzufügen, um die Ausnahme zu umgehen:

 Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map); // {18=Max, 23=Peter;Pamela, 12=David} 

So haben wir einige der leistungsstärksten eingebauten Sammler kennengelernt. Lassen Sie uns versuchen, Ihre eigenen zu bauen. Wir möchten alle Elemente des Streams in eine einzelne Zeile konvertieren, die aus Großbuchstaben besteht, die durch einen vertikalen Balken | getrennt sind. Erstellen Sie dazu mit Collector.of() einen neuen Collector.of() . Wir benötigen die vier Komponenten unseres Kollektors: Lieferant, Batterie, Stecker, Finisher.

 Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisher String names = persons .stream() .collect(personNameCollector); System.out.println(names); // MAX | PETER | PAMELA | DAVID 

Da Strings in Java unveränderlich sind, benötigen wir eine StringJoiner wie StringJoiner , mit der der Collector einen String für uns erstellen kann. Im ersten Schritt erstellt der Anbieter einen StringJoiner mit einem zugewiesenen Trennzeichen. Mit der Batterie wird jeder Name zu StringJoiner .

Der Connector weiß, wie zwei StringJoiner zu einem verbunden werden. Und am Ende erstellt der Finisher den gewünschten String aus StringJoiner s.

Flatmap


Daher haben wir gelernt, wie Stream-Objekte mithilfe der map in andere map werden. Map ist eine Art eingeschränkte Methode, da jedes Objekt nur einem anderen Objekt zugeordnet werden kann. Was aber, wenn Sie ein Objekt vielen anderen zuordnen oder gar nicht anzeigen möchten? Hier hilft die flatMap Methode. FlatMap verwandelt jedes Stream-Objekt in einen Stream anderer Objekte. Der Inhalt dieser Threads wird dann in den zurückgegebenen Stream der flatMap Methode flatMap .

Um flatMap in Aktion zu betrachten, flatMap wir eine geeignete flatMap für ein Beispiel:

 class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } } 

Lassen Sie uns einige Objekte erstellen:

 List<Foo> foos = new ArrayList<>(); // create foos IntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i))); // create bars foos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name)))); 

Jetzt haben wir eine Liste von drei foo , von denen jeder drei Balken enthält.

FlatMap akzeptiert eine Funktion, die einen Strom von Objekten zurückgeben soll. Um auf die Balkenobjekte jedes Foo zugreifen zu können, müssen wir nur die entsprechende Funktion finden:

 foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); // Bar1 <- Foo1 // Bar2 <- Foo1 // Bar3 <- Foo1 // Bar1 <- Foo2 // Bar2 <- Foo2 // Bar3 <- Foo2 // Bar1 <- Foo3 // Bar2 <- Foo3 // Bar3 <- Foo3 

Wir haben also erfolgreich einen Stream von drei foo- Objekten in einen Stream von 9 Balkenobjekten umgewandelt .

Schließlich kann der gesamte obige Code auf eine einfache Pipeline von Operationen reduziert werden:

 IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); 

FlatMap auch in der in Java 8 eingeführten Optional Klasse verfügbar. FlatMap aus der Optional Klasse gibt ein optionales Objekt einer anderen Klasse zurück. Dies kann verwendet werden, um eine Reihe von null zu vermeiden.

Stellen Sie sich eine hierarchische Struktur wie diese vor:

 class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; } 

Um die verschachtelte Zeichenfolge foo von einem externen Objekt NullPointException , müssen Sie mehrere NullPointException hinzufügen, um eine NullPointException zu vermeiden:

 Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); } 

Dasselbe kann mit der flatMap der optionalen Klasse erreicht werden:

 Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println); 

Jeder Aufruf von flatMap gibt einen Optional Wrapper für das gewünschte Objekt zurück, falls vorhanden, oder für null wenn das Objekt fehlt.

Reduzieren


Die Vereinfachungsoperation kombiniert alle Elemente eines Streams zu einem einzigen Ergebnis. Java 8 unterstützt drei verschiedene Arten von Reduktionsmethoden.

Der erste reduziert den Fluss von Elementen auf ein einzelnes Flusselement. Wir verwenden diese Methode, um das Element mit dem größten Alter zu bestimmen:

 persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println); // Pamela 

Die reduce übernimmt eine Akkumulationsfunktion mit einem Binäroperator (BinaryOperator). Hier ist reduce eine Bi-Funktion (BiFunction), bei der beide Argumente zum selben Typ gehören. In unserem Fall zum Typ Person . Eine Bi-Funktion ist fast die gleiche wie eine , benötigt jedoch zwei Argumente. In unserem Beispiel vergleicht die Funktion das Alter von zwei Personen und gibt ein Element mit einem höheren Alter zurück.

Die nächste Form der reduce verwendet sowohl einen Anfangswert als auch eine Batterie mit einem binären Operator. Mit dieser Methode können Sie ein neues Element erstellen. Wir haben - Person mit einem Namen und Alter, bestehend aus der Addition aller Namen und der Summe der gelebten Jahre:

 Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age); // name=MaxPeterPamelaDavid; age=76 

Die dritte reduce verwendet drei Parameter: den Anfangswert, den Akkumulator mit einer Bi-Funktion und eine Kombinationsfunktion wie ein binärer Operator. Da der Anfangswert des Typs nicht auf den Typ Person beschränkt ist, können Sie mithilfe der Reduzierung die Gesamtzahl der gelebten Jahre jeder Person bestimmen:

 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum); // 76 

Wie Sie sehen können, haben wir das Ergebnis 76 erhalten, aber was passiert wirklich unter der Haube?

Wir erweitern das obige Codefragment mit der Ausgabe des Textes für das Debug:

 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Max // accumulator: sum=18; person=Peter // accumulator: sum=41; person=Pamela // accumulator: sum=64; person=David 

Wie Sie sehen können, führt die Akkumulationsfunktion die gesamte Arbeit aus. Es wird zuerst mit einem Anfangswert von 0 und der ersten Person max aufgerufen. In den nächsten drei Schritten erhöht sich die Summe vom letzten Schritt bis zum Erreichen eines Gesamtalters von 76 Jahren ständig um das Alter der Person.

Und was dann? Wird der Kombinierer nie gerufen? Betrachten Sie die parallele Ausführung dieses Threads:

 Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Pamela // accumulator: sum=0; person=David // accumulator: sum=0; person=Max // accumulator: sum=0; person=Peter // combiner: sum1=18; sum2=23 // combiner: sum1=23; sum2=12 // combiner: sum1=41; sum2=35 

Bei paralleler Ausführung erhalten wir eine völlig andere Konsolenausgabe. Jetzt wird der Kombinierer wirklich gerufen.Da die Batterie parallel aufgerufen wurde, musste der Kombinierer die separat gespeicherten Werte zusammenfassen.

Im nächsten Kapitel werden wir die parallele Ausführung von Threads genauer untersuchen.

Parallele Fäden


Threads können parallel ausgeführt werden, um die Leistung bei der Verarbeitung einer großen Anzahl eingehender Elemente zu verbessern. Parallele Threads verwenden das übliche, ForkJoinPooldas durch einen Aufruf der statischen Methode verfügbar ist ForkJoinPool.commonPool(). Die Größe des Haupt-Thread-Pools kann 5 Ausführungsthreads erreichen - die genaue Anzahl hängt von der Anzahl der verfügbaren physischen Prozessorkerne ab.

 ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3 

Auf meinem Computer wird standardmäßig ein regulärer Thread-Pool mit Parallelisierung in 3 Threads initialisiert. Dieser Wert kann durch Einstellen des folgenden JVM-Parameters erhöht oder verringert werden:

 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 

Sammlungen unterstützen eine Methode parallelStream()zum Erstellen paralleler Datenströme. Sie können auch eine Zwischenmethode aufrufen parallel(), um einen seriellen Stream in einen parallelen zu verwandeln.

Um das Verhalten eines Threads bei der parallelen Ausführung zu verstehen, werden im folgenden Beispiel Informationen zu jedem aktuellen Thread (Thread) gedruckt an System.out:

 Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 

Betrachten Sie die Schlussfolgerungen mit den Debug-Einträgen, um besser zu verstehen, welcher Thread zum Ausführen bestimmter Stream-Methoden verwendet wird:

 filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2] 

Wie Sie sehen können, werden bei der parallelen Ausführung des Datenstroms alle verfügbaren Threads des aktuellen verwendet ForkJoinPool. Die Ausgabesequenz kann unterschiedlich sein, da die Ausführungssequenz für jeden bestimmten Thread nicht definiert ist.

Erweitern wir das Beispiel durch Hinzufügen einer Methode sort:

 Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 

Auf den ersten Blick mag das Ergebnis seltsam erscheinen:

 filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1] 

Es scheint , als ob sortkonsequent und nur in dem Fluss des geführte Hauptes . Wenn der Stream parallel unter der Haube der Methode sortaus der Stream-API ausgeführt wird Arrays, ist die in Java 8 hinzugefügte Klassensortierungsmethode ausgeblendet Arrays.parallelSort(). Wie in der Dokumentation angegeben, bestimmt diese Methode basierend auf der Länge der eingehenden Sammlung, wie sie parallel oder nacheinander sortiert wird:
Wenn die Länge eines bestimmten Arrays kleiner als das minimale „Korn“ ist, wird die Sortierung durch Ausführen der Methode Arrays.sort durchgeführt.
Kehren wir zum Beispiel mit der Methode reduceaus dem vorherigen Kapitel zurück. Wir haben bereits herausgefunden, dass die Vereinheitlichungsfunktion nur aufgerufen wird, wenn mit dem Thread parallel gearbeitet wird. Überlegen Sie, welche Threads betroffen sind:

 List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; }); 

Die Konsolenausgabe zeigt, dass beide Funktionen: Akkumulieren und Kombinieren parallel ausgeführt werden, wobei alle möglichen Abläufe verwendet werden:

 accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2] 

Es kann argumentiert werden, dass die parallele Ausführung des Flusses zu einer signifikanten Steigerung der Effizienz beim Arbeiten mit großen Mengen eingehender Elemente beiträgt. Es ist jedoch zu beachten, dass einige Methoden bei der parallelen Ausführung zusätzliche Berechnungen (Kombinationsoperationen) erfordern, die bei der sequentiellen Ausführung nicht erforderlich sind.

Darüber hinaus wird für die parallele Ausführung des Threads derselbe ForkJoinPoolverwendet, der in der JVM so häufig verwendet wird. Die Verwendung langsamer Blockierungsmethoden des Flusses kann sich daher negativ auf die Leistung des gesamten Programms auswirken, da Threads blockiert werden, die für die Verarbeitung in anderen Aufgaben verwendet werden.

Das ist alles


Mein Tutorial zur Verwendung von Threads in Java 8 ist beendet. Weitere Informationen zum Arbeiten mit Streams finden Sie in der Dokumentation . Wenn Sie tiefer gehen und mehr über die Mechanismen erfahren möchten, die Threads zugrunde liegen, könnten Sie einen Artikel von Martin Fowler Collection Pipelines lesen .

Wenn Sie sich auch für JavaScript interessieren, sollten Sie sich Stream.js ansehen - die JavaScript-Implementierung der Java 8 Streams-API. Vielleicht möchten Sie auch meine Artikel über das Java 8-Tutorial ( russische Übersetzung auf Habré) und das Java 8 Nashorn-Tutorial lesen .

Ich hoffe, dieser Leitfaden war nützlich und interessant für Sie und Sie haben den Lesevorgang genossen. Der vollständige Code wird auf GitHub gespeichert . Fühlen Sie sich frei, einen Zweig im Repository zu erstellen.

Source: https://habr.com/ru/post/de437038/


All Articles