Parte 2: solución
Hola de nuevo Hoy continuaré mi historia sobre cómo clasificamos grandes cantidades de datos en Apache Spark utilizando modelos arbitrarios de aprendizaje automático. En la
primera parte del artículo, examinamos la declaración del problema en sí, así como los principales problemas que surgen al organizar la interacción entre el clúster en el que se almacenan y procesan los datos iniciales, y el servicio de clasificación externo. En la segunda parte, consideraremos una de las opciones para resolver este problema usando el enfoque Reactive Streams y su implementación usando la biblioteca akka-streams.
Concepto de corrientes reactivas
Para resolver los problemas descritos en la primera parte, puede usar el enfoque, llamado
Corrientes reactivas . Le permite controlar el proceso de transferencia de flujos de datos entre etapas de procesamiento, operando a diferentes velocidades e independientemente entre sí sin la necesidad de almacenamiento en búfer. Si una de las etapas de procesamiento es más lenta que la anterior, entonces es necesario indicarle a la etapa más rápida cuántos datos de entrada están listos para procesar en este momento. Esta interacción se llama contrapresión. Consiste en el hecho de que las etapas más rápidas procesan exactamente tantos elementos como se requieren para la etapa más lenta, y no más, y luego liberan recursos informáticos.
En general, Reactive Streams es una especificación para implementar la plantilla
Publisher-Subscriber . Esta especificación define un conjunto de cuatro interfaces (editor, suscriptor, procesador y suscripción) y un contrato para sus métodos.
Consideremos estas interfaces con más detalle:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Hay dos lados en el modelo de editor-suscriptor: transmitir y recibir. Al implementar las secuencias reactivas, la clase que implementa la interfaz del editor es responsable de la transferencia de datos, y el suscriptor es responsable de la recepción. Para establecer comunicación entre ellos, el suscriptor debe estar registrado con Publisher llamando a su método de suscripción. Según la especificación, después de registrar un suscriptor, el editor debe llamar a sus métodos en el siguiente orden:
- onSubscribe. Este método se llama inmediatamente después de registrar al suscriptor con Publisher. Como parámetro, se le pasa un objeto de Suscripción a través del cual el Suscriptor solicitará datos del Editor. Este objeto debe almacenarse y llamarse solo en el contexto de este suscriptor.
- Después de que el Suscriptor haya solicitado datos del Editor llamando al método de solicitud en el objeto de Suscripción correspondiente, el Editor puede llamar al método Suscriptor onNext, pasando el siguiente elemento.
- El suscriptor puede llamar periódicamente al método de solicitud en la Suscripción, pero Publisher no puede llamar al método onNext más que el total solicitado a través del método de solicitud.
- Si el flujo de datos es finito, después de pasar todos los elementos a través del método onNext, Publisher debe llamar al método onComplete.
- Si se produjo un error en Publisher y el procesamiento posterior de los elementos no es posible, debe llamar al método onError
- Después de llamar a los métodos onComplete o onError, debe excluirse la interacción adicional del editor con el suscriptor.
Se puede considerar que las llamadas a métodos envían señales entre el publicador y el suscriptor. El suscriptor le indica al editor cuántos elementos está listo para procesar, y el editor, a su vez, le indica que hay el siguiente elemento, que no hay más elementos o que se ha producido algún error.
Para excluir otra influencia del publicador y del suscriptor entre sí, las llamadas a todos los métodos que implementan las interfaces de Reactive Streams deben ser sin bloqueo. En este caso, la interacción entre ellos será completamente asíncrona.
Puede encontrar más detalles sobre la especificación para las interfaces de Reactive Streams
aquí .
Por lo tanto, al vincular los iteradores originales y resultantes mediante la conversión a Publicador y Suscriptor, respectivamente, podemos resolver los problemas identificados en la parte anterior del artículo. El problema del desbordamiento del búfer entre etapas se resuelve solicitando un cierto número de elementos por el suscriptor. El problema de la finalización exitosa o incorrecta se resuelve enviando señales al suscriptor a través de los métodos onComplete u onError, respectivamente. El editor se hace responsable del envío de estas señales, que en nuestro caso deben controlar cuántas solicitudes HTTP se enviaron y cuántas de ellas recibieron respuestas. Después de recibir la última respuesta y procesar todos los resultados que se obtuvieron, debe enviar una señal onComplete. En caso de que una de las solicitudes fallara, debería enviar una señal onError y dejar de enviar más elementos al suscriptor, así como restar elementos del iterador original.
El iterador resultante debe implementarse como un suscriptor. En este caso, no podemos prescindir de un búfer en el que los elementos se escribirán cuando se llame al método onNext desde la interfaz del suscriptor y se reste utilizando los métodos hasNext y next de la interfaz Iterator. Como implementación de búfer, puede usar una cola de bloqueo, por ejemplo, LinkedBlockedQueue.
Un lector atento hará inmediatamente la pregunta: ¿por qué es la cola de bloqueo, porque de acuerdo con la especificación de Reactive Streams, la implementación de todos los métodos debe ser sin bloqueo? Pero todo está bien aquí: dado que le estamos pidiendo a Publisher un número estrictamente definido de elementos, el método onNext no se llamará más de este número de veces, y la cola siempre puede agregar un nuevo elemento sin bloquear.
Por otro lado, el bloqueo puede ocurrir cuando se llama al método hasNext en caso de una cola vacía. Sin embargo, esto está bien: el método hasNext no forma parte del contrato de la interfaz del suscriptor, se define en la interfaz Iterator, que, como explicamos anteriormente, es una estructura de datos de bloqueo. Al llamar al siguiente método, restamos el siguiente elemento de la cola, y cuando su tamaño se vuelve menor que un cierto umbral, necesitaremos solicitar la siguiente porción de los elementos a través de una llamada al método de solicitud.
Figura 7. Interacción asincrónica con un servicio externo utilizando el enfoque de Reactive StreamsPor supuesto, en este caso no eliminaremos por completo el bloqueo de llamadas. Esto se debe a una falta de coincidencia de paradigmas entre las secuencias reactivas, que suponen una interacción completamente asincrónica, y un iterador, que debe llamar a trueN o false al llamar al método hasNext. Sin embargo, a diferencia de la interacción síncrona con un servicio externo, el tiempo de inactividad debido a bloqueos puede reducirse significativamente al aumentar la carga general de los núcleos del procesador.
Sería conveniente que los desarrolladores de Apache Spark en futuras versiones implementaran un análogo del método mapPartitions, que funciona con Publisher y Subscriber. Esto permitiría una interacción completamente asincrónica, eliminando así la posibilidad de bloquear hilos.
Akka-streams y akka-http como implementación de la especificación Reactive Streams
Actualmente, ya hay más de una docena de implementaciones de la especificación Reactive Streams. Una de esas implementaciones es el módulo akka-streams de la biblioteca
akka . En el mundo de JVM, Akka se ha establecido como uno de los medios más efectivos para escribir sistemas paralelos y distribuidos. Esto se logra debido al hecho de que el principio básico establecido en su base es
el modelo de actor , que le permite escribir aplicaciones altamente competitivas sin el control directo de los hilos y sus grupos.
Se ha escrito mucha literatura sobre la implementación del concepto de actores en akka, por lo que no nos detendremos aquí (el
sitio oficial de akka es una muy buena fuente de información, también recomiendo el libro de
acción de akka ). Aquí veremos más de cerca el lado tecnológico de la implementación bajo la JVM.
En general, los actores no existen por sí mismos, sino que forman un sistema jerárquico. Para crear un sistema de actor, debe asignar recursos para él, por lo que el primer paso al trabajar con akka es crear una instancia del objeto ActorSystem. Cuando se inicia ActorSystem, se crea un grupo separado de subprocesos, denominado despachador, en el que se ejecuta todo el código definido en los actores. Típicamente, un solo hilo ejecuta el código de múltiples actores, sin embargo, si es necesario, puede configurar un despachador separado para un grupo específico de actores (por ejemplo, para actores que interactúan directamente con una API de bloqueo).
Una de las tareas más comunes resueltas usando actores es el procesamiento secuencial de flujos de datos. Anteriormente, para esto, era necesario construir manualmente cadenas de actores y asegurarse de que no hubiera cuellos de botella entre ellos (por ejemplo, si un actor procesa los mensajes más rápido que el siguiente, entonces puede tener un desbordamiento de la cola de mensajes entrantes, lo que lleva a un error OutOfMemoryError).
A partir de la versión 2.4, se agregó el módulo akka-streams a akka, que le permite definir declarativamente el proceso de procesamiento de datos y luego crear los actores necesarios para su ejecución. Akka-streams también implementa el principio de contrapresión, que elimina la posibilidad de desbordar la cola de mensajes entrantes para todos los actores involucrados en el procesamiento.
Los elementos principales para definir el esquema de procesamiento del flujo de datos en los flujos de akka son Source, Flow y Sink. Al combinarlos entre sí, obtenemos un gráfico ejecutable. Para comenzar el proceso de procesamiento, se utiliza un materializador, que crea actores que trabajan de acuerdo con el gráfico definido por nosotros (la interfaz del Materializador y su implementación ActorMaterializer).
Consideremos las etapas Fuente, Flujo y Sumidero con más detalle. Fuente define la fuente de datos. Akka-streams admite más de una docena de formas diferentes de crear fuentes, incluso desde un iterador:
val featuresSource: Source[Array[Float], NotUsed] = Source.fromIterator { () => featuresIterator }
La fuente también se puede obtener convirtiendo una fuente existente:
val newSource: Source[String, NotUsed] = source.map(item => transform(item))
Si la transformación es una operación no trivial, se puede representar como una entidad Flow. Akka-streams admite muchas formas diferentes de crear Flow. La forma más fácil es crear desde una función:
val someFlow: Flow[String, Int, NotUsed] = Flow.fromFunction((x: String) => x.length)
Al combinar Source y Flow, obtenemos una nueva Fuente.
val newSource: Source[Int, NotUsed] = oldSource.via(someFlow)
El sumidero se utiliza como la etapa final del procesamiento de datos. Como en el caso de Source, akka-streams proporciona más de una docena de opciones diferentes de Sink, por ejemplo, Sink.foreach realiza una determinada operación para cada elemento, Sink.seq recopila todos los elementos de una colección, etc.
val printSink: Sink[Any, Future[Done]] = Sink.foreach(println)
Fuente, flujo y sumidero se parametrizan por los tipos de elementos de entrada y / o salida, respectivamente. Además, cada etapa de procesamiento puede tener algún resultado de su trabajo. Para esto, Source, Flow y Sink también se parametrizan mediante un tipo adicional que determina el resultado de la operación. Este tipo se llama el tipo de valor materializado. Si la operación no implica la presencia de un resultado adicional de su trabajo, por ejemplo, cuando definimos Flujo a través de una función, entonces el tipo NotUsed se usa como el valor materializado.
Combinando la fuente, el flujo y el sumidero necesarios, obtenemos RunnableGraph. Está parametrizado por un tipo, que determina el tipo de valor obtenido como resultado de la ejecución de este gráfico. Si es necesario, al combinar las etapas, puede especificar el resultado de cuál de las etapas será el resultado de toda la gráfica de operaciones. Por defecto, se toma el resultado de la etapa Fuente:
val graph: RunnableGraph[NotUsed] = someSource.to(Sink.foreach(println))
Sin embargo, si el resultado de la etapa Sink es más importante para nosotros, entonces debemos indicarlo explícitamente:
val graph: RunnableGraph[Future[Done]] = someSource.toMat(Sink.foreach(println))(Keep.right)
Después de haber definido el gráfico de operaciones, debemos ejecutarlo. Para hacer esto, runnableGraph necesita llamar al método de ejecución. Como parámetro, este método toma un objeto ActorMaterializer (que también puede estar en un alcance implícito), que es responsable de crear actores que realizarán operaciones. Por lo general, un ActorMaterializer se crea inmediatamente después de la creación de un ActorSystem, adjunto a su ciclo de vida, y lo utiliza para crear actores. Considere un ejemplo:
En el caso de combinaciones simples, puede hacerlo sin crear un RunnableGraph separado, sino simplemente conectar Source a Sink e iniciarlos llamando al método runWith en Source. Este método también supone que un objeto ActorMaterializer está presente en el ámbito implícito. Además, en este caso, se utilizará el valor materializado definido en Sink. Por ejemplo, usando el siguiente código, podemos convertir Source a Publisher desde la especificación Reactive Streams:
val source: Source[Score, NotUsed] = Source.fromIterator(() => sourceIterator).map(item => transform(item)) val publisher: Publisher[Score] = source.runWith(Sink.asPublisher(false))
Entonces, ahora hemos mostrado cómo puede obtener Reactive Streams Publisher creando una Fuente desde el iterador de fuente y realizando algunas transformaciones en sus elementos. Ahora podemos asociarlo con un suscriptor que suministre datos al iterador resultante. Queda por considerar la última pregunta: cómo organizar la interacción HTTP con un servicio externo.
La estructura de akka incluye el módulo
akka-http , que le permite organizar la comunicación asincrónica sin bloqueo a través de HTTP. Además, este módulo está construido sobre la base de flujos de akka, lo que le permite agregar interacción HTTP como un paso adicional en el gráfico de las operaciones de procesamiento de flujo de datos.
Para conectarse a servicios externos, akka-http proporciona tres interfaces diferentes.
- API de nivel de solicitud: es la opción más simple para el caso de solicitudes individuales a una máquina arbitraria. En este nivel, las conexiones HTTP se gestionan de forma completamente automática, y en cada solicitud es necesario transferir la dirección completa de la máquina a la que se dirige la solicitud.
- API de nivel de host: adecuada cuando sabemos a qué puerto en qué máquina accederemos. En este caso, akka-http toma el control del conjunto de conexiones HTTP, y en las solicitudes es suficiente para especificar la ruta relativa al recurso solicitado.
- API de nivel de conexión: le permite obtener un control total sobre la administración de las conexiones HTTP, es decir, abrir, cerrar y distribuir solicitudes entre conexiones.
En nuestro caso, la dirección del servicio de clasificación la conocemos de antemano, por lo tanto, es necesario organizar la interacción HTTP solo con esta máquina en particular. Por lo tanto, la API de nivel de host es la mejor para nosotros. Ahora, veamos cómo se crea el grupo de conexiones HTTP cuando se usa:
val httpFlow: Flow[(HttpRequest,Id), (Try[HttpResponse],Id), Http.HostConnectionPool] = Http().cachedHostConnectionPool[Id](hostAddress, portNumber)
Al llamar a Http (). CachedHostConnectionPool [T] (hostAddress, portNumber) en el ActorSystem, que está en un ámbito implícito, los recursos se asignan para crear un grupo de conexiones, pero las conexiones en sí no están establecidas. Como resultado de esta llamada, se devuelve Flow, que recibe un par de una solicitud HTTP y algún objeto de identificación Id como entrada. El objeto de identificación es necesario para hacer coincidir la solicitud con la respuesta correspondiente debido al hecho de que la llamada HTTP en akka-http es una operación asincrónica, y el orden en que se reciben las respuestas no corresponde necesariamente al orden en que se envían las solicitudes. Por lo tanto, en la salida, Flow da un par del resultado de la consulta y el objeto de identificación correspondiente.
Directamente, las conexiones HTTP se establecen cuando se inicia (materializa) un gráfico (incluido este Flujo). Akka-http se implementa de tal manera que no importa cuántas veces se hayan materializado los gráficos que contienen httpFlow, dentro de un ActorSystem siempre habrá un grupo común de conexiones HTTP que serán utilizadas por todas las materializaciones. Esto le permite controlar mejor el uso de los recursos de la red y evitar sobrecargarlos.
Por lo tanto, el ciclo de vida del grupo de conexiones HTTP está vinculado al ActorSystem. Como ya se mencionó, también se adjunta el ciclo de vida del grupo de subprocesos, en el que se realizan las operaciones definidas en los actores (o en nuestro caso, definidas como las etapas akka-streams y akka-http). Por lo tanto, para lograr la máxima eficiencia, debemos reutilizar una instancia de ActorSystem dentro del mismo proceso JVM.
Poniendo todo esto junto: un ejemplo de implementación de interacción con el servicio de clasificación
Entonces, ahora podemos pasar al proceso de clasificar grandes volúmenes de datos distribuidos en Apache Spark utilizando la interacción asincrónica con servicios externos. El esquema general de esta interacción ya se ha mostrado en la Figura 7.
Supongamos que tenemos un conjunto de datos inicial [Características] definido. Al aplicarle la operación mapPartitions, deberíamos obtener un conjunto de datos, en el que cada identificación del conjunto fuente se estampa con un cierto valor obtenido como resultado de la clasificación (conjunto de datos [puntuación]). Para organizar el procesamiento asincrónico en los ejecutores, debemos ajustar los iteradores de origen y resultantes en Publisher y Subscriber, respectivamente, de la especificación de flujos reactivos y vincularlos.
case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) val batchesRequestCount = config.getInt(“scoreService. batchesRequestCount”)
En esta implementación, se tiene en cuenta que el servicio de clasificación para una llamada puede procesar un grupo de vectores de características a la vez, por lo tanto, el resultado de la clasificación después de una llamada también estará disponible de inmediato para todo el grupo. Por lo tanto, como un tipo de parámetro para Publisher, no solo tenemos Score, como es de esperar, sino Iterable [Score]. Por lo tanto, enviamos los resultados de clasificación para este grupo al iterador resultante (que también es un Suscriptor) mediante una sola llamada al método onNext. Esto es mucho más eficiente que llamar a Next para cada elemento. Ahora analizaremos este código con más detalle.
- Determinamos la estructura de los datos de entrada y salida. Como entrada, tendremos un grupo de algunos identificadores con un vector de características, y como salida, tendremos un grupo de identificadores con un valor numérico obtenido como resultado de la clasificación.
- Determinamos el número de grupos que el suscriptor solicitará al editor a la vez. Dado que se supone que estos valores permanecerán en el búfer y esperarán hasta que se lean desde el iterador resultante, este valor depende de la cantidad de memoria asignada al ejecutor.
- Crear editor desde el iterador de origen. Será responsable de interactuar con el servicio de clasificación. La función createPublisher se analiza a continuación.
- Cree un suscriptor, que será el iterador resultante. El código de clase IteratorSubscriber también se proporciona a continuación.
- Registro de suscriptor con Publisher.
- Devuelve IteratorSubscriber como resultado de la operación mapPartitions.
Ahora considere la implementación de la función createPublisher.
type Ids = Seq[String]
- - , . httpFlow, .
- : , (batchSize) (parallelismLevel).
- implicit scope ActorSystem, ActorMaterializer httpFlow. Spark-. ActorSystemHolder .
- akka-streams . Source[Features] .
- batchSize .
- HttpRequest . HttpRequest createHttpRequest. createPublisher. feature-, , ( predict). , HTTP-. , HTTP-, HTTP-, URI .
- httpFlow.
- , . flatMapMerge, akka-http Source[ByteString], , . . parallelismLevel , ( ). HTTP-: , , , .
- : . akka ByteString. , ByteString O(1), ByteString . , , . , .
- HTTP- , Stream . , discardEntityBytes , , .
- . akka-http , .
- , Publisher, . , . false Sink.asPublisher , Publisher Subscriber-.
, akka ActorSystem, . , Spark , . Spark JVM , , , ActorSystem ActorMatrializer httpFlow.
object ActorSystemHolder { implicit lazy val actorSystem: ActorSystem = {
- , , , .
- ActorSystem .
- , , ActorSystem, terminate, , , , . , JVM-.
- ActorMaterializer, akka-streams, ActorSystem.
- , httpFlow . , HTTP- ActorSystem.
Subscriber- HTTP-.
sealed trait QueueItem[+T] case class Item[+T](item: T) extends QueueItem[T] case object Done extends QueueItem[Nothing] case class Failure(cause: Throwable) extends QueueItem[Nothing] //(1) class StreamErrorCompletionException(cause: Throwable) extends Exception(cause) //(2) class IteratorSubscriber[T](requestSize: Int) extends Subscriber[Iterable[T]] with Iterator[T] {
IteratorSubscriber Producer-Consumer. , Subscriber, Producer-, , Iterator, – Consumer-. , . Iterator Apache Spark, Subscriber – , ActorSystem.
IteratorSubscriber .
- . , Done, , Throwable, .
- , hasNext .
- , , Publisher-.
- , . LinkedBlockingQueue, . , .
- , . , , Publisher-. , , Publisher- . hasNext next ( requestNextBatches hasNext), , .
- subscriptionPromise subscription Subscription, Publisher onSubscribe. , Reactive Streams Subscriber- Publisher- , , hasNext , onSubscribe. , subscription, Publisher-. lazy subscription, Promise.
- . hasNext next, , .
- , , hasNext false . hasNext, .
- onSubscribe Publisher- Subscription Promise, subscription.
- onNext Publisher-, . .
- Publisher onComplete, Done.
- Publisher onError. .
- hasNext , . , true, . , .
- , false.
- , , requestSize, Publisher. , , , Publisher- , HTTP- .
- . , , , . , , ( , , subscription), , , , .
- , currentIterator. , . , hasNext , ( , ), .
- , false hasNext. , isDone, , . - , hasNext , false. , hasNext , false , . , .
- , , , .
- next . , hasNext, next .
- Publisher- , , subscription, Publisher-. requestSize. .
, , , :
8. .:
, , . , HTTP , . .
– . , , Hadoop , . , , - . , , hdfs, , , , .
, . , akka-http , . , -, - Apache Spark , , , -.
, , . , , http-, , .
, . , , . , . , .
, . , , Hadoop , , .
, , Hadoop- , , .
, ,
CleverDATA . . , , , , , . , .