Panduan Terjemahan Aliran Benjamin Winterberg Stream

Halo, Habr! Saya hadir untuk Anda terjemahan artikel " Java 8 Stream Tutorial ".

Tutorial berbasis kode ini memberikan tinjauan komprehensif aliran di Jawa 8. Ketika saya pertama kali memperkenalkan API Stream, saya bingung dengan namanya karena sangat konsisten dengan InputStream dan OutputStream dari paket java.io; Namun, utas di Java 8 adalah sesuatu yang sangat berbeda. Thread adalah monad yang memainkan peran penting dalam pengembangan pemrograman fungsional di Jawa.
Dalam pemrograman fungsional, monad adalah struktur yang mewakili perhitungan dalam bentuk rangkaian langkah-langkah yang berurutan. Jenis dan struktur monad menentukan rantai operasi, dalam kasus kami, urutan metode dengan fungsi bawaan dari jenis yang diberikan.
Tutorial ini akan mengajarkan Anda cara bekerja dengan stream dan menunjukkan cara menangani berbagai metode yang tersedia di Stream API. Kami akan menganalisis urutan operasi dan melihat bagaimana urutan metode dalam rantai mempengaruhi kinerja. flatMap metode Stream API yang kuat seperti reduce , collect dan flatMap . Di akhir manual, kami akan memperhatikan pekerjaan paralel dengan stream.

Jika Anda tidak merasa bebas untuk bekerja dengan ekspresi lambda, antarmuka fungsional, dan metode referensi, akan berguna bagi Anda untuk membiasakan diri dengan panduan saya untuk inovasi di Java 8 ( terjemahan dalam Habré), dan setelah itu kembali mempelajari aliran.

Cara kerja utas


Aliran mewakili urutan elemen dan menyediakan berbagai metode untuk melakukan perhitungan pada elemen ini:

 List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); // C1 // C2 

Metode aliran adalah perantara (intermediate) dan terminal (terminal). Metode menengah mengembalikan aliran, yang memungkinkan banyak metode ini disebut secara berurutan. Metode terminal tidak mengembalikan nilai (batal) atau mengembalikan hasil dari jenis selain aliran. Dalam contoh di atas, filter , map dan sorted bersifat menengah, dan forEach adalah terminal. Untuk daftar lengkap metode aliran yang tersedia, lihat dokumentasi . Rantai operasi aliran semacam itu juga dikenal sebagai pipa operasi.

Sebagian besar metode dari Stream API menerima sebagai ekspresi parameter lambda, antarmuka fungsional yang menggambarkan perilaku spesifik metode. Sebagian besar dari mereka harus secara bersamaan tidak mencampuri dan berkewarganegaraan. Apa artinya ini?

Metode tidak mengganggu jika tidak mengubah data dasar yang mendasari aliran. Misalnya, dalam contoh di atas, tidak ada ekspresi lambda memodifikasi daftar array daftar.

Metode stateless jika urutan operasi dilakukan ditentukan. Misalnya, tidak satu pun ekspresi lambda dari contoh tergantung pada variabel yang bisa berubah atau keadaan ruang eksternal yang dapat berubah pada saat dijalankan.

Berbagai jenis utas


Streaming dapat dibuat dari berbagai sumber data, terutama dari koleksi. Daftar dan Set mendukung metode stream() baru stream() dan parllelStream() untuk membuat stream berurutan dan paralel. Utas paralel dapat bekerja dalam mode multi-utas (pada banyak utas) dan akan dibahas pada akhir manual. Sementara itu, pertimbangkan utas berurutan:

 Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println); // a1 

Di sini, memanggil metode stream() pada daftar mengembalikan objek stream normal.
Namun, untuk bekerja dengan aliran, sama sekali tidak perlu membuat koleksi:

 Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println); // a1 

Cukup gunakan Stream.of() untuk membuat aliran dari beberapa referensi objek.

Selain aliran objek biasa, Java 8 memiliki jenis aliran khusus untuk bekerja dengan tipe primitif: int, panjang, ganda. Seperti yang Anda duga, ini adalah IntStream , LongStream , DoubleStream .

Aliran IntStream dapat menggantikan reguler untuk (;;) IntStream.range() menggunakan IntStream.range() :

 IntStream.range(1, 4) .forEach(System.out::println); // 1 // 2 // 3 

Semua aliran ini untuk bekerja dengan tipe primitif bekerja seperti aliran objek biasa, kecuali untuk yang berikut:

  • Streaming primitif menggunakan ekspresi lambda khusus. Misalnya, IntFunction alih-alih Function, atau IntPredicate alih-alih Predicate.
  • Streaming primitif mendukung metode terminal tambahan: sum() dan average() - average()

     Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0 


Terkadang berguna untuk mengubah aliran objek menjadi aliran primitif atau sebaliknya. Untuk tujuan ini, aliran objek mendukung metode khusus: mapToInt() , mapToLong() , mapToDouble() :

 Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); // 3 

Streaming primitif dapat dikonversi menjadi stream objek dengan memanggil mapToObj() :

 IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 

Dalam contoh berikut, aliran angka floating-point dipetakan ke aliran bilangan bulat dan kemudian dipetakan ke aliran objek:

 Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 

Perintah eksekusi


Sekarang kami telah belajar cara membuat berbagai aliran dan cara bekerja dengannya, kami akan menyelam lebih dalam dan mempertimbangkan bagaimana operasi streaming terlihat di bawah tenda.

Karakteristik penting dari metode perantara adalah kemalasan mereka. Tidak ada metode terminal dalam contoh ini:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }); 

Ketika potongan kode ini dieksekusi, tidak ada yang akan ditampilkan ke konsol. Dan semua karena metode antara dijalankan hanya jika ada metode terminal. Mari kita memperluas contoh dengan menambahkan metode terminal forEach :

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s)); 

Eksekusi fragmen kode ini mengarah ke output ke konsol hasil berikut:

 filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c 

Urutan pengaturan hasil mungkin mengejutkan. Seseorang dengan naif dapat berharap bahwa metode akan dieksekusi "secara horizontal": satu demi satu untuk semua elemen aliran. Namun, sebaliknya, elemen bergerak di sepanjang rantai "secara vertikal". Pertama, baris pertama "d2" melewati metode filter , kemudian melalui forEach dan hanya kemudian, setelah melewati elemen pertama melalui seluruh rantai metode, elemen berikutnya mulai diproses.

Dengan perilaku ini, Anda dapat mengurangi jumlah operasi aktual:

 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); }); // map: d2 // anyMatch: D2 // map: a2 // anyMatch: A2 

Metode anyMatch akan mengembalikan true segera setelah predikat diterapkan ke elemen yang masuk. Dalam hal ini, ini adalah elemen kedua dari urutan - "A2". Dengan demikian, karena eksekusi "vertikal" dari rantai utas, map akan dipanggil hanya dua kali. Jadi, alih-alih menampilkan semua elemen aliran, map akan dipanggil sesering mungkin.

Mengapa urutan itu penting


Contoh berikut terdiri dari dua metode menengah map dan filter dan metode terminal untuk masing-masing. Pertimbangkan bagaimana metode ini dilakukan:

 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); // map: d2 // filter: D2 // map: a2 // filter: A2 // forEach: A2 // map: b1 // filter: B1 // map: b3 // filter: B3 // map: c // filter: C 

Sangat mudah untuk menebak bahwa kedua metode map dan filter disebut 5 kali saat runtime - satu kali untuk setiap elemen koleksi sumber, sedangkan forEach disebut hanya sekali - untuk elemen yang melewati filter.

Anda dapat secara signifikan mengurangi jumlah operasi dengan mengubah urutan panggilan metode dengan menempatkan filter di tempat pertama:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // map: a2 // forEach: A2 // filter: b1 // filter: b3 // filter: c 

Sekarang peta hanya dipanggil sekali. Dengan sejumlah besar elemen input, kami akan mengamati peningkatan nyata dalam produktivitas. Ingatlah hal ini ketika menyusun rantai metode yang kompleks.

Kami memperluas contoh di atas dengan menambahkan operasi pengurutan tambahan - metode yang diurutkan:

 Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); 

Penyortiran adalah jenis khusus dari operasi perantara. Ini adalah operasi yang disebut stateful, karena untuk mengurutkan koleksi, kondisinya harus diperhitungkan selama operasi.

Sebagai hasil dari eksekusi kode ini, kami mendapatkan output berikut ke konsol:

 sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2 

Pertama, seluruh koleksi diurutkan. Dengan kata lain, metode yang sorted berjalan secara horizontal. Dalam hal ini, sorted disebut 8 kali untuk beberapa kombinasi elemen dalam koleksi yang masuk.

Sekali lagi, kami mengoptimalkan eksekusi kode ini dengan mengubah urutan pemanggilan metode dalam rantai:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // filter: b1 // filter: b3 // filter: c // map: a2 // forEach: A2 

Dalam contoh ini, sorted tidak dipanggil sama sekali. filter mengurangi pengumpulan input ke satu elemen. Dalam hal data input besar, kinerja akan mendapat manfaat secara signifikan.

Gunakan kembali aliran


Di Java 8, utas tidak dapat digunakan kembali. Setelah memanggil metode terminal apa pun, utas berakhir:

 Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception 

Memanggil noneMatch setelah anyMatch dalam satu thread menghasilkan pengecualian berikut:

 java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28) 

Untuk mengatasi batasan ini, utas baru harus dibuat untuk setiap metode terminal.

Misalnya, Anda dapat membuat pemasok untuk konstruktor utas baru di mana semua metode perantara akan dipasang:

 Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok 

Setiap panggilan ke metode get membuat thread baru di mana Anda dapat dengan aman memanggil metode terminal yang diinginkan.

Metode lanjutan


Utas mendukung sejumlah besar metode berbeda. Kami telah membiasakan diri dengan metode yang paling penting. Untuk membiasakan diri dengan sisanya, lihat dokumentasi . Dan sekarang menyelam lebih dalam ke metode yang lebih kompleks: collect , flatMap , dan reduce .

Sebagian besar contoh kode di bagian ini merujuk ke cuplikan kode berikut untuk menunjukkan operasi:

 class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); 

Kumpulkan


Collect metode terminal yang sangat berguna, yang digunakan untuk mengonversi elemen aliran ke hasil dari jenis yang berbeda, misalnya, Daftar, Setel atau Peta.

Collect menerima Collector yang berisi empat metode berbeda: pemasok. akumulator, combiner, finisher. Pada pandangan pertama, ini terlihat sangat rumit, tetapi Java 8 mendukung berbagai kolektor bawaan melalui kelas Collectors , di mana metode yang paling banyak digunakan diimplementasikan.

Kasus populer:

 List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered); // [Peter, Pamela] 

Seperti yang Anda lihat, membuat daftar dari item aliran sangat sederhana. Tidak perlu daftar tapi banyak? Gunakan Collectors.toSet() .

Dalam contoh berikut, orang dikelompokkan berdasarkan usia:

 Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p)); // age 18: [Max] // age 23: [Peter, Pamela] // age 12: [David] 

Kolektor sangat beragam. Anda juga dapat mengagregasi elemen koleksi, misalnya, menentukan usia rata-rata:

 Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge); // 19.0 

Untuk mendapatkan statistik yang lebih komprehensif, kami menggunakan pengumpul ringkasan yang mengembalikan objek khusus dengan informasi: nilai minimum, maksimum dan rata-rata, jumlah nilai dan jumlah elemen:

 IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary); // IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23} 

Contoh berikut menggabungkan semua nama dalam satu baris:

 String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase); // In Germany Max and Peter and Pamela are of legal age. 

Kolektor penghubung menerima pemisah, serta awalan dan sufiks opsional.

Untuk mengubah elemen aliran menjadi tampilan, Anda harus menentukan bagaimana kunci dan nilai harus ditampilkan. Ingat bahwa kunci dalam pemetaan harus unik. Kalau tidak, kami mendapatkan IllegalStateException . Anda bisa menambahkan fungsi gabungan untuk memotong pengecualian:

 Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map); // {18=Max, 23=Peter;Pamela, 12=David} 

Jadi, kami berkenalan dengan beberapa kolektor bawaan paling kuat. Ayo coba buat sendiri. Kami ingin mengonversi semua elemen aliran menjadi satu baris, yang terdiri atas nama huruf besar yang dipisahkan oleh bilah vertikal |. Untuk melakukan ini, buat kolektor baru menggunakan Collector.of() . Kami membutuhkan empat komponen kolektor kami: pemasok, baterai, konektor, finisher.

 Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisher String names = persons .stream() .collect(personNameCollector); System.out.println(names); // MAX | PETER | PAMELA | DAVID 

Karena string di Java tidak dapat diubah, kita membutuhkan kelas helper seperti StringJoiner yang memungkinkan kolektor untuk membangun string untuk kita. Pada langkah pertama, penyedia membangun StringJoiner dengan pembatas yang ditugaskan. Baterai digunakan untuk menambahkan setiap nama ke StringJoiner .

Konektor tahu bagaimana menghubungkan dua StringJoiner menjadi satu. Dan pada akhirnya, finisher membangun string yang diinginkan dari StringJoiner .

Flatmap


Jadi, kami belajar cara mengubah objek aliran menjadi jenis objek lain menggunakan metode map . Map adalah semacam metode terbatas, karena setiap objek hanya dapat dipetakan ke satu objek lainnya. Tetapi bagaimana jika Anda ingin memetakan satu objek ke banyak objek lainnya, atau tidak menampilkannya sama sekali? Di sinilah metode flatMap membantu. FlatMap mengubah setiap aliran objek menjadi aliran objek lain. Isi utas ini kemudian dikemas ke dalam aliran balik metode flatMap .

Untuk melihat flatMap dalam aksi, mari kita buat hierarki tipe yang cocok untuk contoh:

 class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } } 

Mari kita membuat beberapa objek:

 List<Foo> foos = new ArrayList<>(); // create foos IntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i))); // create bars foos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name)))); 

Sekarang kami memiliki daftar tiga foo , yang masing-masing berisi tiga bilah .

FlatMap menerima fungsi yang harus mengembalikan aliran objek. Jadi, untuk mengakses objek bar dari setiap foo , kita hanya perlu menemukan fungsi yang sesuai:

 foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); // Bar1 <- Foo1 // Bar2 <- Foo1 // Bar3 <- Foo1 // Bar1 <- Foo2 // Bar2 <- Foo2 // Bar3 <- Foo2 // Bar1 <- Foo3 // Bar2 <- Foo3 // Bar3 <- Foo3 

Jadi, kami telah berhasil mengubah aliran tiga objek foo menjadi aliran 9 objek bar .

Akhirnya, semua kode di atas dapat direduksi menjadi pipeline operasi sederhana:

 IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); 

FlatMap juga tersedia di kelas Optional diperkenalkan di Java 8. FlatMap dari kelas Optional mengembalikan objek opsional dari kelas lain. Ini dapat digunakan untuk menghindari banyak cek null .

Bayangkan sebuah struktur hierarkis seperti ini:

 class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; } 

Untuk mendapatkan foo string bersarang dari objek eksternal, Anda perlu menambahkan beberapa pemeriksaan null untuk menghindari NullPointException :

 Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); } 

Hal yang sama dapat dicapai dengan menggunakan flatMap dari kelas opsional:

 Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println); 

Setiap panggilan ke flatMap mengembalikan bungkus Optional untuk objek yang diinginkan, jika ada, atau null jika objek hilang.

Kurangi


Operasi penyederhanaan menggabungkan semua elemen aliran ke dalam satu hasil. Java 8 mendukung tiga jenis metode pengurangan.

Yang pertama mengurangi aliran elemen ke elemen aliran tunggal. Kami menggunakan metode ini untuk menentukan elemen dengan usia terbesar:

 persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println); // Pamela 

Metode reduce mengambil fungsi akumulasi dengan operator biner (BinaryOperator). Reduksi di sini adalah fungsi ganda (BiFunction), di mana kedua argumen memiliki tipe yang sama. Dalam kasus kami, untuk tipe Person . Dua fungsi hampir sama dengan , tetapi dibutuhkan 2 argumen. Dalam contoh kami, fungsi membandingkan usia dua orang dan mengembalikan elemen dengan usia yang lebih besar.

Bentuk berikutnya dari metode reduce mengambil nilai awal dan baterai dengan operator biner. Metode ini dapat digunakan untuk membuat item baru. Kami memiliki - Orang dengan nama dan usia, yang terdiri dari penambahan semua nama dan jumlah tahun yang dijalani:

 Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age); // name=MaxPeterPamelaDavid; age=76 

Metode reduce ketiga mengambil tiga parameter: nilai awal, akumulator dengan fungsi ganda, dan fungsi kombinasi seperti operator biner. Karena nilai awal dari tipe ini tidak terbatas pada tipe Person, Anda dapat menggunakan pengurangan untuk menentukan tahun total setiap orang:

 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum); // 76 

Seperti yang Anda lihat, kami mendapat hasil 76, tetapi apa yang sebenarnya terjadi di bawah tenda?

Kami memperluas fragmen kode di atas dengan output teks untuk debug:

 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Max // accumulator: sum=18; person=Peter // accumulator: sum=41; person=Pamela // accumulator: sum=64; person=David 

Seperti yang Anda lihat, fungsi akumulasi melakukan semua pekerjaan. Ini pertama kali disebut dengan nilai awal 0 dan Max orang pertama. Dalam tiga langkah berikutnya, jumlah terus meningkat berdasarkan usia orang tersebut dari langkah terakhir hingga mencapai usia 76 tahun.

Jadi apa selanjutnya? Apakah combiner tidak pernah dipanggil? Pertimbangkan eksekusi paralel utas ini:

 Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Pamela // accumulator: sum=0; person=David // accumulator: sum=0; person=Max // accumulator: sum=0; person=Peter // combiner: sum1=18; sum2=23 // combiner: sum1=23; sum2=12 // combiner: sum1=41; sum2=35 

Dengan eksekusi paralel, kami mendapatkan output konsol yang sama sekali berbeda. Sekarang combiner benar-benar dipanggil.Karena baterai disebut secara paralel, penggabung harus meringkas nilai yang disimpan secara terpisah.

Pada bab selanjutnya, kita akan memeriksa secara lebih rinci eksekusi paralel dari utas.

Utas paralel


Thread dapat berjalan secara paralel untuk meningkatkan kinerja saat berhadapan dengan sejumlah besar elemen yang masuk. Utas paralel menggunakan yang biasa ForkJoinPooltersedia melalui panggilan ke metode statis ForkJoinPool.commonPool(). Ukuran kumpulan utas utama dapat mencapai 5 utas eksekusi - jumlah pastinya tergantung pada jumlah inti prosesor fisik yang tersedia.

 ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3 

Di komputer saya, kumpulan utas biasa diinisialisasi secara default dengan paralelisasi menjadi 3 utas. Nilai ini dapat ditingkatkan atau dikurangi dengan menetapkan parameter JVM berikut:

 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 

Koleksi mendukung metode parallelStream()untuk membuat aliran data paralel. Anda juga dapat memanggil metode perantara parallel()untuk mengubah aliran serial menjadi paralel.

Untuk memahami perilaku utas dalam eksekusi paralel, contoh berikut ini mencetak informasi tentang setiap utas saat ini (utas) untuk System.out:

 Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 

Pertimbangkan kesimpulan dengan entri debug untuk lebih memahami utas mana yang digunakan untuk menjalankan metode aliran tertentu:

 filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2] 

Seperti yang Anda lihat, dalam eksekusi paralel dari aliran data, semua utas yang tersedia saat ini digunakan ForkJoinPool. Urutan output mungkin berbeda, karena urutan eksekusi dari setiap utas tertentu tidak ditentukan.

Mari kita memperluas contoh dengan menambahkan metode sort:

 Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 

Sekilas, hasilnya mungkin aneh:

 filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1] 

Tampaknya sortdijalankan secara berurutan dan hanya di utas utama . Bahkan, ketika aliran dieksekusi secara paralel di bawah kap metode sortdari Stream API Arrays, metode penyortiran kelas , ditambahkan di Java 8, disembunyikan Arrays.parallelSort(). Seperti ditunjukkan dalam dokumentasi, metode ini, berdasarkan pada panjang koleksi yang masuk, menentukan bagaimana ia akan diurutkan secara paralel atau berurutan:
Jika panjang array tertentu kurang dari "butir" minimum, pengurutan dilakukan dengan mengeksekusi metode Array.sort.
Mari kita kembali ke contoh dengan metode reducedari bab sebelumnya. Kami telah menemukan bahwa fungsi pemersatu dipanggil hanya ketika bekerja dengan utas secara paralel. Pertimbangkan utas mana yang terlibat:

 List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; }); 

Keluaran konsol menunjukkan bahwa kedua fungsi: terakumulasi dan digabungkan, dilakukan secara paralel, menggunakan semua kemungkinan aliran:

 accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2] 

Dapat dikatakan bahwa eksekusi paralel dari aliran berkontribusi pada peningkatan efisiensi yang signifikan ketika bekerja dengan sejumlah besar elemen yang masuk. Namun, harus diingat bahwa beberapa metode dalam eksekusi paralel memerlukan perhitungan tambahan (menggabungkan operasi), yang tidak diperlukan dalam eksekusi berurutan.

Selain itu, untuk eksekusi paralel dari thread, yang sama ForkJoinPool, yang digunakan secara luas di JVM, digunakan. Jadi penggunaan metode pemblokiran lambat dari aliran dapat mempengaruhi kinerja seluruh program secara negatif, karena pemblokiran benang yang digunakan untuk memproses dalam tugas-tugas lain.

Itu semua


Tutorial saya tentang menggunakan utas di Java 8 sudah berakhir. Untuk studi yang lebih rinci tentang bekerja dengan stream, Anda dapat merujuk ke dokumentasi . Jika Anda ingin lebih dalam dan mempelajari lebih lanjut tentang mekanisme yang mendasari utas, Anda mungkin tertarik membaca artikel oleh Martin Fowler Collection Pipelines .

Jika Anda juga tertarik dengan JavaScript, Anda mungkin ingin melihat Stream.js - implementasi JavaScript dari Java 8 Streams API. Anda mungkin juga ingin membaca artikel saya tentang Tutorial Java 8 ( terjemahan bahasa Rusia tentang Habré) dan Tutorial Java 8 Nashorn .

Saya harap panduan ini bermanfaat dan menarik bagi Anda, dan Anda menikmati proses membaca. Kode lengkap disimpan di GitHub . Jangan ragu untuk membuat cabang di repositori.

Source: https://habr.com/ru/post/id437038/


All Articles