Traducción de la guía API de Benjamin Winterberg Stream

Hola Habr! Le presento la traducción del artículo " Java 8 Stream Tutorial ".

Este tutorial, basado en ejemplos de código, proporciona una descripción general completa de las secuencias en Java 8. Cuando presenté por primera vez la API de Stream, me sorprendió el nombre porque es muy acorde con InputStream y OutputStream del paquete java.io; Sin embargo, los hilos en Java 8 son algo completamente diferente. Los hilos son mónadas que juegan un papel importante en el desarrollo de la programación funcional en Java.
En la programación funcional, una mónada es una estructura que representa un cálculo en forma de una cadena de pasos sucesivos. El tipo y la estructura de la mónada determinan la cadena de operaciones, en nuestro caso, una secuencia de métodos con funciones integradas de un tipo dado.
Este tutorial le enseñará cómo trabajar con transmisiones y le mostrará cómo manejar los diversos métodos disponibles en la API de transmisiones. Analizaremos el orden de las operaciones y veremos cómo la secuencia de métodos en la cadena afecta el rendimiento. Conozca flatMap potentes métodos de Stream API, como reduce , collect y flatMap . Al final del manual, prestaremos atención al trabajo paralelo con flujos.

Si no se siente libre de trabajar con expresiones lambda, interfaces funcionales y métodos de referencia, será útil que se familiarice con mi guía de innovaciones en Java 8 ( traducción en Habré), y luego vuelva a estudiar los flujos.

Cómo funcionan los hilos


Una secuencia representa una secuencia de elementos y proporciona varios métodos para realizar cálculos en estos elementos:

 List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1"); myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println); // C1 // C2 

Los métodos de flujo son intermedios (intermedios) y terminales (terminales). Los métodos intermedios devuelven una secuencia, lo que permite que muchos de estos métodos se llamen secuencialmente. Los métodos de terminal no devuelven un valor (vacío) o devuelven un resultado de un tipo que no sea una secuencia. En el ejemplo anterior, el filter , el map y los sorted son intermedios y forEach son terminales. Para obtener una lista completa de los métodos de flujo disponibles, consulte la documentación . Tal cadena de operaciones de flujo también se conoce como una tubería de operación.

La mayoría de los métodos de Stream API aceptan como parámetros expresiones lambda, una interfaz funcional que describe el comportamiento específico del método. La mayoría de ellos deben ser simultáneamente no interferentes y apátridas. ¿Qué significa esto?

Un método no interfiere si no modifica los datos subyacentes subyacentes a la secuencia. Por ejemplo, en el ejemplo anterior, ninguna expresión lambda modifica la matriz de lista myList.

Un método no tiene estado si se especifica el orden en que se realiza la operación. Por ejemplo, ni una sola expresión lambda del ejemplo depende de variables mutables o estados de espacio externo que podrían cambiar en tiempo de ejecución.

Diferentes tipos de hilos


Las secuencias se pueden crear a partir de varios datos de origen, principalmente de colecciones. Las listas y conjuntos admiten los nuevos métodos stream() y parllelStream() para crear secuencias secuenciales y paralelas. Los subprocesos paralelos pueden funcionar en modo de subprocesos múltiples (en subprocesos múltiples) y se analizarán al final del manual. Mientras tanto, considere hilos secuenciales:

 Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println); // a1 

Aquí, llamar al método stream() en una lista devuelve un objeto stream normal.
Sin embargo, para trabajar con una secuencia, no es necesario crear una colección:

 Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println); // a1 

Simplemente use Stream.of() para crear una secuencia a partir de múltiples referencias de objeto.

Además de las secuencias de objetos normales, Java 8 tiene tipos especiales de secuencias para trabajar con tipos primitivos: int, long, double. Como puede suponer, esto es IntStream , LongStream , DoubleStream .

Las secuencias IntStream pueden reemplazar los IntStream.range() regulares para (;;) usando IntStream.range() :

 IntStream.range(1, 4) .forEach(System.out::println); // 1 // 2 // 3 

Todos estos flujos para trabajar con tipos primitivos funcionan igual que los flujos de objetos normales, excepto los siguientes:

  • Las secuencias primitivas usan expresiones lambda especiales. Por ejemplo, IntFunction en lugar de Function, o IntPredicate en lugar de Predicate.
  • Los flujos primitivos admiten métodos terminales adicionales: sum() y average()

     Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0 


A veces es útil convertir un flujo de objetos en un flujo de primitivas o viceversa. Para este propósito, los flujos de objetos admiten métodos especiales: mapToInt() , mapToLong() , mapToDouble() :

 Stream.of("a1", "a2", "a3") .map(s -> s.substring(1)) .mapToInt(Integer::parseInt) .max() .ifPresent(System.out::println); // 3 

Las secuencias de primitivas se pueden convertir en secuencias de objetos llamando a mapToObj() :

 IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 

En el siguiente ejemplo, una secuencia de números de punto flotante se asigna a una secuencia de enteros y luego se asigna a una secuencia de objetos:

 Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println); // a1 // a2 // a3 

Orden de ejecución


Ahora que hemos aprendido cómo crear varias transmisiones y cómo trabajar con ellas, profundizaremos y consideraremos cómo se ven las operaciones de transmisión bajo el capó.

Una característica importante de los métodos intermedios es su pereza . No hay un método terminal en este ejemplo:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }); 

Cuando se ejecuta este fragmento de código, no se enviará nada a la consola. Y todo porque los métodos intermedios se ejecutan solo si hay un método terminal. forEach el ejemplo agregando el método de terminal forEach :

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s)); 

La ejecución de este fragmento de código conduce a la salida a la consola del siguiente resultado:

 filter: d2 forEach: d2 filter: a2 forEach: a2 filter: b1 forEach: b1 filter: b3 forEach: b3 filter: c forEach: c 

El orden en que se organizan los resultados puede sorprender. Uno puede esperar ingenuamente que los métodos se ejecutarán "horizontalmente": uno tras otro para todos los elementos de la secuencia. Sin embargo, en cambio, el elemento se mueve a lo largo de la cadena "verticalmente". Primero, la primera línea de "d2" pasa por el método de filter , luego por forEach y solo entonces, después de pasar el primer elemento por toda la cadena de métodos, el siguiente elemento comienza a procesarse.

Dado este comportamiento, puede reducir el número real de operaciones:

 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> { System.out.println("anyMatch: " + s); return s.startsWith("A"); }); // map: d2 // anyMatch: D2 // map: a2 // anyMatch: A2 

El método anyMatch devolverá verdadero tan pronto como el predicado se aplique al elemento entrante. En este caso, este es el segundo elemento de la secuencia: "A2". En consecuencia, debido a la ejecución "vertical" de la cadena de subprocesos, el map se llamará solo dos veces. Por lo tanto, en lugar de mostrar todos los elementos de la secuencia, se llamará a map lo menos posible.

Por qué la secuencia importa


El siguiente ejemplo consta de dos métodos intermedios de map y filter y un método terminal para cada forEach . Considere cómo se realizan estos métodos:

 Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("A"); }) .forEach(s -> System.out.println("forEach: " + s)); // map: d2 // filter: D2 // map: a2 // filter: A2 // forEach: A2 // map: b1 // filter: B1 // map: b3 // filter: B3 // map: c // filter: C 

Es fácil adivinar que tanto el map como filter métodos de filter se llaman 5 veces en tiempo de ejecución, una vez para cada elemento de la colección de origen, mientras que forEach se llama solo una vez, para el elemento que pasó el filtro.

Puede reducir significativamente el número de operaciones cambiando el orden de las llamadas a métodos colocando el filter en primer lugar:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // map: a2 // forEach: A2 // filter: b1 // filter: b3 // filter: c 

Ahora el mapa se llama solo una vez. Con una gran cantidad de elementos de entrada, observaremos un notable aumento en la productividad. Tenga esto en cuenta al componer cadenas de métodos complejos.

Ampliamos el ejemplo anterior agregando una operación de ordenación adicional: el método ordenado:

 Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); 

La clasificación es un tipo especial de operación intermedia. Esta es la llamada operación con estado, porque para ordenar una colección, su estado debe tenerse en cuenta durante toda la operación.

Como resultado de la ejecución de este código, obtenemos el siguiente resultado en la consola:

 sort: a2; d2 sort: b1; a2 sort: b1; d2 sort: b1; a2 sort: b3; b1 sort: b3; d2 sort: c; b3 sort: c; d2 filter: a2 map: a2 forEach: A2 filter: b1 filter: b3 filter: c filter: d2 

Primero, se ordena toda la colección. En otras palabras, el método sorted se ejecuta horizontalmente. En este caso, sorted se llama 8 veces para varias combinaciones de los elementos en la colección entrante.

Una vez más, optimizamos la ejecución de este código cambiando el orden de las llamadas a métodos en la cadena:

 Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s)); // filter: d2 // filter: a2 // filter: b1 // filter: b3 // filter: c // map: a2 // forEach: A2 

En este ejemplo, sorted no se llama en absoluto. filter reduce la colección de entrada a un elemento. En el caso de grandes datos de entrada, el rendimiento se beneficiará significativamente.

Reutilizar transmisiones


En Java 8, los hilos no se pueden reutilizar. Después de llamar a cualquier método de terminal, el hilo termina:

 Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception 

Llamar a noneMatch después de anyMatch en un hilo da como resultado la siguiente excepción:

 java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28) 

Para superar esta limitación, se debe crear un nuevo hilo para cada método de terminal.

Por ejemplo, puede crear un proveedor para un nuevo constructor de hilos en el que se instalarán todos los métodos intermedios:

 Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok 

Cada llamada al método get crea un nuevo hilo en el que puede llamar de forma segura al método de terminal deseado.

Métodos avanzados


Los hilos admiten una gran cantidad de métodos diferentes. Ya nos hemos familiarizado con los métodos más importantes. Para familiarizarse con el resto, consulte la documentación . Y ahora sumérgete aún más en métodos más complejos: collect , flatMap y reduce .

La mayoría de los ejemplos de código en esta sección se refieren al siguiente fragmento de código para demostrar la operación:

 class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; } } List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); 

Recoger


Collect método de terminal muy útil, que se utiliza para convertir elementos de flujo a un resultado de un tipo diferente, por ejemplo, Lista, Conjunto o Mapa.

Collect acepta un Collector que contiene cuatro métodos diferentes: un proveedor. acumulador, combinador, finalizador. A primera vista, esto parece muy complicado, pero Java 8 admite varios recopiladores integrados a través de la clase Collectors , donde se implementan los métodos más utilizados.

Caso popular:

 List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList()); System.out.println(filtered); // [Peter, Pamela] 

Como puede ver, crear una lista a partir de elementos de transmisión es muy simple. ¿No necesitas una lista sino mucho? Use Collectors.toSet() .

En el siguiente ejemplo, las personas se agrupan por edad:

 Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age)); personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p)); // age 18: [Max] // age 23: [Peter, Pamela] // age 12: [David] 

Los coleccionistas son increíblemente diversos. También puede agregar elementos de la colección, por ejemplo, determinar la edad promedio:

 Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age)); System.out.println(averageAge); // 19.0 

Para obtener estadísticas más completas, utilizamos un recopilador de resumen que devuelve un objeto especial con información: valores mínimos, máximos y promedio, la suma de los valores y el número de elementos:

 IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age)); System.out.println(ageSummary); // IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23} 

El siguiente ejemplo combina todos los nombres en una línea:

 String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); System.out.println(phrase); // In Germany Max and Peter and Pamela are of legal age. 

El recopilador de conexión acepta un separador, así como un prefijo y sufijo opcionales.

Para convertir los elementos de una secuencia en una pantalla, debe determinar cómo se deben mostrar las claves y los valores. Recuerde que las claves en el mapeo deben ser únicas. De lo contrario, obtenemos una IllegalStateException . Opcionalmente, puede agregar una función de fusión para evitar la excepción:

 Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2)); System.out.println(map); // {18=Max, 23=Peter;Pamela, 12=David} 

Entonces, nos familiarizamos con algunos de los colectores integrados más potentes. Tratemos de construir el suyo propio. Queremos convertir todos los elementos de la secuencia en una sola línea, que consiste en nombres en mayúsculas separados por una barra vertical |. Para hacer esto, cree un nuevo recopilador utilizando Collector.of() . Necesitamos los cuatro componentes de nuestro colector: proveedor, batería, conector, finalizador.

 Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisher String names = persons .stream() .collect(personNameCollector); System.out.println(names); // MAX | PETER | PAMELA | DAVID 

Como las cadenas en Java son inmutables, necesitamos una clase auxiliar como StringJoiner que permita al recopilador construir una cadena para nosotros. En el primer paso, el proveedor construye un StringJoiner con un delimitador asignado. La batería se usa para agregar cada nombre a StringJoiner .

El conector sabe cómo conectar dos StringJoiner en uno. Y al final, el finalizador construye la cadena deseada a partir de StringJoiner s.

Mapa plano


Entonces, aprendimos cómo convertir objetos de flujo en otros tipos de objetos usando el método de map . Map es un tipo de método limitado, ya que cada objeto se puede asignar a otro solo. Pero, ¿qué sucede si desea asignar un objeto a muchos otros, o no mostrarlo en absoluto? Aquí es donde ayuda el método flatMap . FlatMap convierte cada objeto de flujo en un flujo de otros objetos. El contenido de estos subprocesos se empaqueta en la secuencia devuelta del método flatMap .

Para ver flatMap en acción, construyamos una jerarquía de tipos adecuada para un ejemplo:

 class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; } } class Bar { String name; Bar(String name) { this.name = name; } } 

Vamos a crear algunos objetos:

 List<Foo> foos = new ArrayList<>(); // create foos IntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i))); // create bars foos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name)))); 

Ahora tenemos una lista de tres foo , cada uno de los cuales contiene tres barras .

FlatMap acepta una función que debería devolver una secuencia de objetos. Por lo tanto, para acceder a los objetos de barra de cada foo , solo necesitamos encontrar la función adecuada:

 foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); // Bar1 <- Foo1 // Bar2 <- Foo1 // Bar3 <- Foo1 // Bar1 <- Foo2 // Bar2 <- Foo2 // Bar3 <- Foo2 // Bar1 <- Foo3 // Bar2 <- Foo3 // Bar3 <- Foo3 

Por lo tanto, hemos convertido con éxito una secuencia de tres objetos foo en una secuencia de 9 objetos de barra .

Finalmente, todo el código anterior se puede reducir a una simple tubería de operaciones:

 IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name)); 

FlatMap también FlatMap disponible en la clase Optional introducida en Java 8. FlatMap de la clase Optional devuelve un objeto opcional de otra clase. Esto se puede usar para evitar un montón de comprobaciones null .

Imagine una estructura jerárquica como esta:

 class Outer { Nested nested; } class Nested { Inner inner; } class Inner { String foo; } 

Para obtener la cadena anidada de un objeto externo, debe agregar varias comprobaciones null para evitar una NullPointException :

 Outer outer = new Outer(); if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo); } 

Lo mismo se puede lograr usando el flatMap de la clase Opcional:

 Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println); 

Cada llamada a flatMap devuelve un contenedor Optional para el objeto deseado, si está presente, o para null si falta el objeto.

Reducir


La operación de simplificación combina todos los elementos de una secuencia en un solo resultado. Java 8 admite tres tipos diferentes de métodos de reducción.

El primero reduce el flujo de elementos a un solo elemento de flujo. Utilizamos este método para determinar el elemento con mayor edad:

 persons .stream() .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2) .ifPresent(System.out::println); // Pamela 

El método reduce toma una función acumulativa con un operador binario (BinaryOperator). Aquí reduce es una bi-función (BiFunction), donde ambos argumentos pertenecen al mismo tipo. En nuestro caso, al tipo Persona . Una bi-función es casi lo mismo que una , pero requiere 2 argumentos. En nuestro ejemplo, la función compara la edad de dos personas y devuelve un elemento con una edad mayor.

La siguiente forma del método de reduce toma tanto un valor inicial como una batería con un operador binario. Este método se puede usar para crear un nuevo elemento. Tenemos - Persona con nombre y edad, que consiste en agregar todos los nombres y la suma de años vividos:

 Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; }); System.out.format("name=%s; age=%s", result.name, result.age); // name=MaxPeterPamelaDavid; age=76 

El tercer método de reduce toma tres parámetros: el valor inicial, el acumulador con una función bi y una función de combinación como un operador binario. Como el valor inicial del tipo no se limita al tipo Persona, puede usar la reducción para determinar el total de años vividos de cada persona:

 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2); System.out.println(ageSum); // 76 

Como puede ver, obtuvimos el resultado 76, pero ¿qué sucede realmente bajo el capó?

Expandimos el fragmento de código anterior con la salida del texto para la depuración:

 Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Max // accumulator: sum=18; person=Peter // accumulator: sum=41; person=Pamela // accumulator: sum=64; person=David 

Como puede ver, la función de acumulación realiza todo el trabajo. Primero se llama con un valor inicial de 0 y la primera persona Max. En los siguientes tres pasos, la suma aumenta constantemente según la edad de la persona desde el último paso hasta que alcanza los 76 años.

Entonces, ¿qué sigue? ¿Nunca se llama al combinador? Considere la ejecución paralela de este hilo:

 Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Pamela // accumulator: sum=0; person=David // accumulator: sum=0; person=Max // accumulator: sum=0; person=Peter // combiner: sum1=18; sum2=23 // combiner: sum1=23; sum2=12 // combiner: sum1=41; sum2=35 

Con ejecución paralela, obtenemos una salida de consola completamente diferente. Ahora se llama realmente al combinador.Como la batería se llamó en paralelo, el combinador tuvo que resumir los valores almacenados por separado.

En el próximo capítulo, examinaremos con más detalle la ejecución paralela de hilos.

Hilos paralelos


Los subprocesos pueden ejecutarse en paralelo para mejorar el rendimiento cuando se trata con un gran número de elementos entrantes. Los subprocesos paralelos utilizan el habitual ForkJoinPooldisponible a través de una llamada al método estático ForkJoinPool.commonPool(). El tamaño del grupo de subprocesos principales puede alcanzar los 5 subprocesos de ejecución: el número exacto depende del número de núcleos de procesadores físicos disponibles.

 ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3 

En mi computadora, un grupo de subprocesos regulares se inicializa de forma predeterminada con paralelización en 3 subprocesos. Este valor se puede aumentar o disminuir configurando el siguiente parámetro JVM:

 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5 

Las colecciones admiten un método parallelStream()para crear flujos de datos paralelos. También puede llamar a un método intermedio parallel()para convertir un flujo en serie en uno paralelo.

Para comprender el comportamiento de un subproceso en ejecución paralela, el siguiente ejemplo imprime información sobre cada subproceso actual (subproceso) en System.out:

 Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 

Considere las conclusiones con las entradas de depuración para comprender mejor qué hilo se utiliza para ejecutar métodos de flujo específicos:

 filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2] 

Como puede ver, en la ejecución paralela del flujo de datos, se utilizan todos los subprocesos disponibles del actual ForkJoinPool. La secuencia de salida puede diferir, ya que la secuencia de ejecución de cada subproceso específico no está definida.

Expandamos el ejemplo agregando un método sort:

 Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName())); 

A primera vista, el resultado puede parecer extraño:

 filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1] 

Parece sortejecutarse secuencialmente y solo en el hilo principal . De hecho, cuando la transmisión se ejecuta en paralelo bajo el capó del método sortdesde la API de transmisión Arrays, el método de clasificación de clase , agregado en Java 8, está oculto Arrays.parallelSort(). Como se indica en la documentación, este método, basado en la longitud de la colección entrante, determina cómo se ordenará en paralelo o secuencialmente:
Si la longitud de una matriz particular es menor que el "grano" mínimo, la clasificación se realiza ejecutando el método Arrays.sort.
Volvamos al ejemplo con el método reducedel capítulo anterior. Ya hemos descubierto que la función unificadora solo se llama cuando se trabaja con el hilo en paralelo. Considere qué hilos están involucrados:

 List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; }); 

La salida de la consola muestra que ambas funciones: acumulación y combinación, se realizan en paralelo, utilizando todos los flujos posibles:

 accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2] 

Se puede argumentar que la ejecución paralela del flujo contribuye a un aumento significativo en la eficiencia cuando se trabaja con grandes cantidades de elementos entrantes. Sin embargo, debe recordarse que algunos métodos en ejecución paralela requieren cálculos adicionales (operaciones de combinación), que no son necesarios en la ejecución secuencial.

Además, para la ejecución paralela del hilo, ForkJoinPoolse usa el mismo , tan ampliamente utilizado en la JVM. Por lo tanto, el uso de métodos de bloqueo lento del flujo puede afectar negativamente el rendimiento de todo el programa, debido al bloqueo de los hilos utilizados para el procesamiento en otras tareas.

Eso es todo


Mi tutorial sobre el uso de hilos en Java 8 ha terminado. Para un estudio más detallado del trabajo con transmisiones, puede consultar la documentación . Si desea profundizar y aprender más sobre los mecanismos que subyacen a los hilos, podría estar interesado en leer un artículo de Martin Fowler Collection Pipelines .

Si también está interesado en JavaScript, puede echarle un vistazo a Stream.js , la implementación de JavaScript de la API Java 8 Streams. También es posible que desee leer mis artículos sobre el Tutorial Java 8 ( traducción al ruso en Habré) y el Tutorial Nashorn Java 8 .

Espero que esta guía te haya sido útil e interesante, y que hayas disfrutado el proceso de lectura. El código completo se almacena en GitHub . Siéntase libre de crear una rama en el repositorio.

Source: https://habr.com/ru/post/437038/


All Articles