Hallo Habr! Ich präsentiere Ihnen die Übersetzung des Artikels "
Java 8 Stream Tutorial ".
Dieses Tutorial, das auf Codebeispielen basiert, bietet einen umfassenden Überblick über Streams in Java 8. Als ich die Stream-API zum ersten Mal einführte, war ich über den Namen verwirrt, da er sehr gut mit InputStream und OutputStream aus dem Paket java.io übereinstimmt. Threads in Java 8 sind jedoch etwas völlig anderes.
Threads sind
Monaden , die eine wichtige Rolle bei der Entwicklung der funktionalen Programmierung in Java spielen.
In der funktionalen Programmierung ist eine Monade eine Struktur, die eine Berechnung in Form einer Kette aufeinanderfolgender Schritte darstellt. Der Typ und die Struktur einer Monade bestimmen eine Operationskette, in unserem Fall eine Folge von Methoden mit integrierten Funktionen eines bestimmten Typs.
In diesem Handbuch erfahren Sie, wie Sie mit Streams arbeiten und wie Sie mit den verschiedenen in der Stream-API verfügbaren Methoden umgehen. Wir werden die Reihenfolge der Operationen analysieren und sehen, wie sich die Reihenfolge der Methoden in der Kette auf die Leistung auswirkt.
flatMap
Sie
flatMap
leistungsstarken Stream-API-Methoden wie
reduce
,
collect
und
flatMap
. Am Ende des Handbuchs werden wir auf die parallele Arbeit mit Threads achten.
Wenn Sie nicht frei sind, mit Lambda-Ausdrücken, funktionalen Schnittstellen und Referenzmethoden zu arbeiten, ist es hilfreich, sich mit
meinem Leitfaden zu Innovationen in Java 8 (
Übersetzung in Habré) vertraut zu machen und danach wieder mit dem Studium von Flows zu beginnen.
Wie Threads funktionieren
Ein Stream stellt eine Folge von Elementen dar und bietet verschiedene Methoden zum Durchführen von Berechnungen für diese Elemente:
List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println);
Strömungsmethoden sind
Intermediate (Intermediate) und
Terminal (Terminal). Zwischenmethoden geben einen Stream zurück, wodurch viele dieser Methoden nacheinander aufgerufen werden können. Terminalmethoden geben entweder keinen Wert (void) oder ein Ergebnis eines anderen Typs als eines Streams zurück. Im obigen Beispiel sind die
filter
,
map
und
sorted
intermediär und
forEach
sind terminal. Eine vollständige Liste der verfügbaren Flussmethoden finden Sie in der
Dokumentation . Eine solche Kette von Stream-Operationen ist auch als Operations-Pipeline bekannt.
Die meisten Methoden der Stream-API akzeptieren als Parameter Lambda-Ausdrücke, eine funktionale Schnittstelle, die das spezifische Verhalten der Methode beschreibt. Die meisten von ihnen müssen gleichzeitig störungsfrei und staatenlos sein. Was bedeutet das?
Eine Methode ist nicht störend, wenn sie die zugrunde liegenden Daten, die dem Stream zugrunde liegen, nicht ändert. Im obigen Beispiel ändern beispielsweise keine Lambda-Ausdrücke das Listenarray myList.
Eine Methode ist zustandslos, wenn die Reihenfolge angegeben ist, in der die Operation ausgeführt wird. Beispielsweise hängt kein einzelner Lambda-Ausdruck aus dem Beispiel von veränderlichen Variablen oder externen Raumzuständen ab, die sich zur Laufzeit ändern können.
Verschiedene Arten von Fäden
Streams können aus verschiedenen Quelldaten erstellt werden, hauptsächlich aus Sammlungen. Listen und Sets unterstützen die neuen Methoden
stream()
und
parllelStream()
zum Erstellen sequentieller und paralleler Streams. Parallele Threads können im Multithread-Modus (auf mehreren Threads) arbeiten und werden am Ende des Handbuchs erläutert. Berücksichtigen Sie in der Zwischenzeit sequenzielle Threads:
Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println);
Hier gibt der Aufruf der
stream()
-Methode in einer Liste ein normales Stream-Objekt zurück.
Um mit einem Stream zu arbeiten, ist es jedoch überhaupt nicht erforderlich, eine Sammlung zu erstellen:
Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println);
Verwenden
Stream.of()
einfach
Stream.of()
, um einen Stream aus mehreren Objektreferenzen zu erstellen.
Zusätzlich zu regulären Objekt-Streams verfügt Java 8 über spezielle Stream-Typen für die Arbeit mit primitiven Typen: int, long, double. Wie Sie vielleicht erraten haben, ist dies
IntStream
,
LongStream
,
DoubleStream
.
IntStream-Streams können reguläre for (;;) -
IntStream.range()
mithilfe von
IntStream.range()
ersetzen:
IntStream.range(1, 4) .forEach(System.out::println);
Alle diese Streams für die Arbeit mit primitiven Typen funktionieren wie normale Streams von Objekten, mit Ausnahme der folgenden:
Manchmal ist es nützlich, einen Strom von Objekten in einen Strom von Grundelementen umzuwandeln oder umgekehrt. Zu diesem Zweck unterstützen Objektflüsse spezielle Methoden:
mapToInt()
,
mapToLong()
,
mapToDouble()
:
Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println);
Ströme von
mapToObj()
können durch Aufrufen von
mapToObj()
in Ströme von Objekten
mapToObj()
:
IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println);
Im folgenden Beispiel wird ein Strom von Gleitkommazahlen einem Strom von Ganzzahlen und dann einem Strom von Objekten zugeordnet:
Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println);
Ausführungsreihenfolge
Nachdem wir gelernt haben, wie man verschiedene Streams erstellt und wie man damit arbeitet, werden wir tiefer eintauchen und überlegen, wie Streaming-Vorgänge unter der Haube aussehen.
Ein wichtiges Merkmal von Zwischenmethoden ist ihre
Faulheit . In diesem Beispiel gibt es keine Terminalmethode:
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; });
Wenn dieser Code ausgeführt wird, wird nichts an die Konsole ausgegeben. Und das alles, weil Zwischenmethoden nur ausgeführt werden, wenn es eine Terminalmethode gibt. Erweitern wir das Beispiel, indem wir die
forEach
Terminalmethode hinzufügen:
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s));
Die Ausführung dieses Codefragments führt zur Ausgabe des folgenden Ergebnisses an die Konsole:
filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c
Die Reihenfolge, in der die Ergebnisse angeordnet sind, kann überraschen. Man kann naiv erwarten, dass die Methoden „horizontal“ ausgeführt werden: nacheinander für alle Elemente des Streams. Stattdessen bewegt sich das Element jedoch „vertikal“ entlang der Kette. Zuerst durchläuft die erste Zeile von „d2“ die
filter
, dann
forEach
und erst dann, nachdem das erste Element die gesamte Methodenkette durchlaufen hat, beginnt das nächste Element mit der Verarbeitung.
Aufgrund dieses Verhaltens können Sie die tatsächliche Anzahl von Vorgängen reduzieren:
Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); });
Die
anyMatch
Methode gibt
true zurück , sobald das Prädikat auf das eingehende Element angewendet wird. In diesem Fall ist dies das zweite Element der Sequenz - "A2". Dementsprechend wird
map
aufgrund der "vertikalen" Ausführung der Thread-Kette nur zweimal aufgerufen. Anstatt alle Elemente des Streams anzuzeigen, wird die
map
so oft wie möglich aufgerufen.
Warum Sequenz wichtig ist
Das folgende Beispiel besteht aus zwei Zwischenmethoden
map
und
filter
und einer Terminalmethode für
forEach
. Überlegen Sie, wie diese Methoden ausgeführt werden:
Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s));
Es ist leicht zu erraten, dass sowohl die
map
als auch die
filter
zur Laufzeit fünfmal aufgerufen werden - einmal für jedes Element der Quellensammlung, während
forEach
nur einmal aufgerufen wird - für das Element, das den Filter bestanden hat.
Sie können die Anzahl der Operationen erheblich reduzieren, indem Sie die Reihenfolge der Methodenaufrufe ändern, indem Sie zuerst den
filter
platzieren:
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));
Jetzt wird die Karte nur noch einmal aufgerufen. Bei einer großen Anzahl von Eingabeelementen werden wir eine spürbare Steigerung der Produktivität beobachten. Beachten Sie dies beim Erstellen komplexer Methodenketten.
Wir erweitern das obige Beispiel, indem wir eine zusätzliche Sortieroperation hinzufügen - die sortierte Methode:
Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));
Das Sortieren ist eine spezielle Art von Zwischenoperation. Dies ist die sogenannte Stateful-Operation, da zum Sortieren einer Sammlung deren Status während der gesamten Operation berücksichtigt werden muss.
Als Ergebnis der Ausführung dieses Codes erhalten wir die folgende Ausgabe an die Konsole:
sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2
Zunächst wird die gesamte Sammlung sortiert. Mit anderen Worten, die
sorted
Methode wird horizontal ausgeführt. In diesem Fall wird die
sorted
für mehrere Kombinationen der Elemente in der eingehenden Sammlung achtmal aufgerufen.
Wir optimieren erneut die Ausführung dieses Codes, indem wir die Reihenfolge der Methodenaufrufe in der Kette ändern:
Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));
In diesem Beispiel wird
sorted
nicht aufgerufen.
filter
reduziert die Eingabesammlung auf ein Element. Bei großen Eingabedaten wird die Leistung erheblich verbessert.
Streams wiederverwenden
In Java 8 können Threads nicht wiederverwendet werden. Nach dem Aufrufen einer beliebigen Terminalmethode wird der Thread beendet:
Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true);
Das Aufrufen von
noneMatch
nach
anyMatch
in einem Thread führt zu der folgenden Ausnahme:
java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)
Um diese Einschränkung zu überwinden, sollte für jede Terminalmethode ein neuer Thread erstellt werden.
Sie können beispielsweise einen
Lieferanten für einen neuen Thread-Konstruktor erstellen, in dem alle Zwischenmethoden installiert werden:
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true);
Jeder Aufruf der
get
Methode erstellt einen neuen Thread, in dem Sie die gewünschte Terminalmethode sicher aufrufen können.
Fortgeschrittene Methoden
Threads unterstützen eine Vielzahl unterschiedlicher Methoden. Wir haben uns bereits mit den wichtigsten Methoden vertraut gemacht. Informationen zum Rest finden Sie in der
Dokumentation . Und jetzt tauchen Sie noch tiefer in komplexere Methoden ein:
collect
,
flatMap
und
reduce
.
Die meisten Codebeispiele in diesem Abschnitt beziehen sich auf das folgende Codefragment, um die Funktionsweise zu demonstrieren:
class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12));
Sammeln
Collect
sehr nützliche Terminalmethode, mit der Stream-Elemente in ein Ergebnis eines anderen Typs konvertiert werden, z. B. List, Set oder Map.
Collect
akzeptiert einen
Collector
, der vier verschiedene Methoden enthält: einen Lieferanten. Akku, Kombinierer, Finisher. Auf den ersten Blick sieht dies sehr kompliziert aus, aber Java 8 unterstützt verschiedene integrierte Kollektoren über die
Collectors
Klasse, in der die am häufigsten verwendeten Methoden implementiert sind.
Beliebter Fall:
List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered);
Wie Sie sehen können, ist das Erstellen einer Liste aus Stream-Elementen sehr einfach. Benötigen Sie keine Liste, aber viel? Verwenden Sie
Collectors.toSet()
.
Im folgenden Beispiel werden Personen nach Alter gruppiert:
Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
Sammler sind unglaublich vielfältig. Sie können auch die Elemente der Sammlung aggregieren, z. B. das Durchschnittsalter bestimmen:
Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge);
Um umfassendere Statistiken zu erhalten, verwenden wir einen Zusammenfassungskollektor, der ein spezielles Objekt mit Informationen zurückgibt: Minimal-, Maximal- und Durchschnittswerte, die Summe der Werte und die Anzahl der Elemente:
IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary);
Im folgenden Beispiel werden alle Namen in einer Zeile zusammengefasst:
String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase);
Der Verbindungskollektor akzeptiert ein Trennzeichen sowie ein optionales Präfix und Suffix.
Um die Elemente eines Streams in eine Anzeige umzuwandeln, müssen Sie festlegen, wie Schlüssel und Werte angezeigt werden sollen. Denken Sie daran, dass die Schlüssel in der Zuordnung eindeutig sein müssen. Andernfalls erhalten wir eine
IllegalStateException
. Sie können optional eine Zusammenführungsfunktion hinzufügen, um die Ausnahme zu umgehen:
Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map);
So haben wir einige der leistungsstärksten eingebauten Sammler kennengelernt. Lassen Sie uns versuchen, Ihre eigenen zu bauen. Wir möchten alle Elemente des Streams in eine einzelne Zeile konvertieren, die aus Großbuchstaben besteht, die durch einen vertikalen Balken | getrennt sind. Erstellen Sie dazu mit
Collector.of()
einen neuen
Collector.of()
. Wir benötigen die vier Komponenten unseres Kollektors: Lieferant, Batterie, Stecker, Finisher.
Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "),
Da Strings in Java unveränderlich sind, benötigen wir eine
StringJoiner
wie
StringJoiner
, mit der der Collector einen String für uns erstellen kann. Im ersten Schritt erstellt der Anbieter einen
StringJoiner
mit einem zugewiesenen Trennzeichen. Mit der Batterie wird jeder Name zu
StringJoiner
.
Der Connector weiß, wie zwei
StringJoiner
zu einem verbunden werden. Und am Ende erstellt der Finisher den gewünschten String aus
StringJoiner
s.
Flatmap
Daher haben wir gelernt, wie Stream-Objekte mithilfe der
map
in andere
map
werden.
Map
ist eine Art eingeschränkte Methode, da jedes Objekt nur einem anderen Objekt zugeordnet werden kann. Was aber, wenn Sie ein Objekt vielen anderen zuordnen oder gar nicht anzeigen möchten? Hier hilft die
flatMap
Methode.
FlatMap
verwandelt jedes Stream-Objekt in einen Stream anderer Objekte. Der Inhalt dieser Threads wird dann in den zurückgegebenen Stream der
flatMap
Methode
flatMap
.
Um
flatMap
in Aktion zu betrachten,
flatMap
wir eine geeignete
flatMap
für ein Beispiel:
class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } }
Lassen Sie uns einige Objekte erstellen:
List<Foo> foos = new ArrayList<>();
Jetzt haben wir eine Liste von drei
foo , von denen jeder drei
Balken enthält.
FlatMap
akzeptiert eine Funktion, die einen Strom von Objekten zurückgeben soll. Um auf die
Balkenobjekte jedes
Foo zugreifen zu können, müssen wir nur die entsprechende Funktion finden:
foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name));
Wir haben also erfolgreich einen Stream von drei
foo- Objekten in einen Stream von 9
Balkenobjekten umgewandelt .
Schließlich kann der gesamte obige Code auf eine einfache Pipeline von Operationen reduziert werden:
IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name));
FlatMap
auch in der in Java 8 eingeführten
Optional
Klasse verfügbar.
FlatMap
aus der
Optional
Klasse gibt ein optionales Objekt einer anderen Klasse zurück. Dies kann verwendet werden, um eine Reihe von
null
zu vermeiden.
Stellen Sie sich eine hierarchische Struktur wie diese vor:
class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; }
Um die verschachtelte Zeichenfolge
foo von einem externen Objekt
NullPointException
, müssen Sie mehrere
NullPointException
hinzufügen, um eine
NullPointException
zu vermeiden:
Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); }
Dasselbe kann mit der flatMap der optionalen Klasse erreicht werden:
Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println);
Jeder Aufruf von
flatMap
gibt einen
Optional
Wrapper für das gewünschte Objekt zurück, falls vorhanden, oder für
null
wenn das Objekt fehlt.
Reduzieren
Die Vereinfachungsoperation kombiniert alle Elemente eines Streams zu einem einzigen Ergebnis. Java 8 unterstützt drei verschiedene Arten von Reduktionsmethoden.
Der erste reduziert den Fluss von Elementen auf ein einzelnes Flusselement. Wir verwenden diese Methode, um das Element mit dem größten Alter zu bestimmen:
persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println);
Die
reduce
übernimmt eine Akkumulationsfunktion mit einem
Binäroperator (BinaryOperator). Hier ist
reduce
eine
Bi-Funktion (BiFunction), bei der beide Argumente zum selben Typ gehören. In unserem Fall zum Typ
Person . Eine Bi-Funktion ist fast die gleiche wie eine
, benötigt jedoch zwei Argumente. In unserem Beispiel vergleicht die Funktion das Alter von zwei Personen und gibt ein Element mit einem höheren Alter zurück.
Die nächste Form der
reduce
verwendet sowohl einen Anfangswert als auch eine Batterie mit einem binären Operator. Mit dieser Methode können Sie ein neues Element erstellen. Wir haben -
Person mit einem Namen und Alter, bestehend aus der Addition aller Namen und der Summe der gelebten Jahre:
Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age);
Die dritte
reduce
verwendet drei Parameter: den Anfangswert, den Akkumulator mit einer Bi-Funktion und eine Kombinationsfunktion wie ein binärer Operator. Da der Anfangswert des Typs nicht auf den Typ Person beschränkt ist, können Sie mithilfe der Reduzierung die Gesamtzahl der gelebten Jahre jeder Person bestimmen:
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum);
Wie Sie sehen können, haben wir das Ergebnis 76 erhalten, aber was passiert wirklich unter der Haube?
Wir erweitern das obige Codefragment mit der Ausgabe des Textes für das Debug:
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; });
Wie Sie sehen können, führt die Akkumulationsfunktion die gesamte Arbeit aus. Es wird zuerst mit einem Anfangswert von 0 und der ersten Person max aufgerufen. In den nächsten drei Schritten erhöht sich die Summe vom letzten Schritt bis zum Erreichen eines Gesamtalters von 76 Jahren ständig um das Alter der Person.
Und was dann? Wird der Kombinierer nie gerufen? Betrachten Sie die parallele Ausführung dieses Threads:
Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; });
Bei paralleler Ausführung erhalten wir eine völlig andere Konsolenausgabe. Jetzt wird der Kombinierer wirklich gerufen.
Da die Batterie parallel aufgerufen wurde, musste der Kombinierer die separat gespeicherten Werte zusammenfassen.Im nächsten Kapitel werden wir die parallele Ausführung von Threads genauer untersuchen.Parallele Fäden
Threads können parallel ausgeführt werden, um die Leistung bei der Verarbeitung einer großen Anzahl eingehender Elemente zu verbessern. Parallele Threads verwenden das übliche, ForkJoinPool
das durch einen Aufruf der statischen Methode verfügbar ist ForkJoinPool.commonPool()
. Die Größe des Haupt-Thread-Pools kann 5 Ausführungsthreads erreichen - die genaue Anzahl hängt von der Anzahl der verfügbaren physischen Prozessorkerne ab. ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism());
Auf meinem Computer wird standardmäßig ein regulärer Thread-Pool mit Parallelisierung in 3 Threads initialisiert. Dieser Wert kann durch Einstellen des folgenden JVM-Parameters erhöht oder verringert werden: -Djava.util.concurrent.ForkJoinPool.common.parallelism=5
Sammlungen unterstützen eine Methode parallelStream()
zum Erstellen paralleler Datenströme. Sie können auch eine Zwischenmethode aufrufen parallel()
, um einen seriellen Stream in einen parallelen zu verwandeln.Um das Verhalten eines Threads bei der parallelen Ausführung zu verstehen, werden im folgenden Beispiel Informationen zu jedem aktuellen Thread (Thread) gedruckt an System.out
: Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
Betrachten Sie die Schlussfolgerungen mit den Debug-Einträgen, um besser zu verstehen, welcher Thread zum Ausführen bestimmter Stream-Methoden verwendet wird: filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2]
Wie Sie sehen können, werden bei der parallelen Ausführung des Datenstroms alle verfügbaren Threads des aktuellen verwendet ForkJoinPool
. Die Ausgabesequenz kann unterschiedlich sein, da die Ausführungssequenz für jeden bestimmten Thread nicht definiert ist.Erweitern wir das Beispiel durch Hinzufügen einer Methode sort
: Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
Auf den ersten Blick mag das Ergebnis seltsam erscheinen: filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1]
Es scheint , als ob sort
konsequent und nur in dem Fluss des geführte Hauptes . Wenn der Stream parallel unter der Haube der Methode sort
aus der Stream-API ausgeführt wird Arrays
, ist die in Java 8 hinzugefügte Klassensortierungsmethode ausgeblendet Arrays.parallelSort()
. Wie in der Dokumentation angegeben, bestimmt diese Methode basierend auf der Länge der eingehenden Sammlung, wie sie parallel oder nacheinander sortiert wird:Wenn die Länge eines bestimmten Arrays kleiner als das minimale „Korn“ ist, wird die Sortierung durch Ausführen der Methode Arrays.sort durchgeführt.
Kehren wir zum Beispiel mit der Methode reduce
aus dem vorherigen Kapitel zurück. Wir haben bereits herausgefunden, dass die Vereinheitlichungsfunktion nur aufgerufen wird, wenn mit dem Thread parallel gearbeitet wird. Überlegen Sie, welche Threads betroffen sind: List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; });
Die Konsolenausgabe zeigt, dass beide Funktionen: Akkumulieren und Kombinieren parallel ausgeführt werden, wobei alle möglichen Abläufe verwendet werden: accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
Es kann argumentiert werden, dass die parallele Ausführung des Flusses zu einer signifikanten Steigerung der Effizienz beim Arbeiten mit großen Mengen eingehender Elemente beiträgt. Es ist jedoch zu beachten, dass einige Methoden bei der parallelen Ausführung zusätzliche Berechnungen (Kombinationsoperationen) erfordern, die bei der sequentiellen Ausführung nicht erforderlich sind.Darüber hinaus wird für die parallele Ausführung des Threads derselbe ForkJoinPool
verwendet, der in der JVM so häufig verwendet wird. Die Verwendung langsamer Blockierungsmethoden des Flusses kann sich daher negativ auf die Leistung des gesamten Programms auswirken, da Threads blockiert werden, die für die Verarbeitung in anderen Aufgaben verwendet werden.Das ist alles
Mein Tutorial zur Verwendung von Threads in Java 8 ist beendet. Weitere Informationen zum Arbeiten mit Streams finden Sie in der Dokumentation . Wenn Sie tiefer gehen und mehr über die Mechanismen erfahren möchten, die Threads zugrunde liegen, könnten Sie einen Artikel von Martin Fowler Collection Pipelines lesen .Wenn Sie sich auch für JavaScript interessieren, sollten Sie sich Stream.js ansehen - die JavaScript-Implementierung der Java 8 Streams-API. Vielleicht möchten Sie auch meine Artikel über das Java 8-Tutorial ( russische Übersetzung auf Habré) und das Java 8 Nashorn-Tutorial lesen .Ich hoffe, dieser Leitfaden war nützlich und interessant für Sie und Sie haben den Lesevorgang genossen. Der vollständige Code wird auf GitHub gespeichert . Fühlen Sie sich frei, einen Zweig im Repository zu erstellen.