Parte 1: Declaración del problema
Hola Habr! Soy arquitecto de soluciones en CleverDATA. Hoy hablaré sobre cómo clasificamos grandes cantidades de datos usando modelos construidos usando casi cualquier biblioteca de aprendizaje automático disponible. En esta serie de dos partes, consideraremos las siguientes preguntas.
- ¿Cómo presentar un modelo de aprendizaje automático como servicio (Modelo como servicio)?
- ¿Cómo se realizan físicamente las tareas de procesamiento distribuido de grandes cantidades de datos con Apache Spark?
- ¿Qué problemas surgen cuando Apache Spark interactúa con servicios externos?
- ¿Cómo se puede organizar la interacción Apache Spark con servicios externos utilizando las bibliotecas akka-streams y akka-http, así como el enfoque de Reactive Streams?
Inicialmente, planeé escribir un artículo, pero como el volumen de material resultó ser bastante grande, decidí dividirlo en dos partes. Hoy, en la primera parte, consideraremos el enunciado general del problema, así como los problemas principales que deben resolverse durante la implementación. En la
segunda parte, hablaremos sobre la implementación práctica de la solución a este problema utilizando el enfoque de Reactive Streams.
Nuestra empresa
CleverDATA cuenta con un equipo de analistas de datos que, con la ayuda de una amplia gama de herramientas (como scikit-learn, facebook fastText, xgboost, tensorFlow, etc.), capacitan modelos de aprendizaje automático. El lenguaje de programación central de facto que usan los analistas es Python. Casi todas las bibliotecas para el aprendizaje automático, incluso implementadas originalmente en otros idiomas, tienen una interfaz en Python y están integradas con las bibliotecas principales de Python (principalmente con NumPy).
Por otro lado, el ecosistema Hadoop se usa ampliamente para almacenar y procesar grandes cantidades de datos no estructurados. En él, los datos se almacenan en el sistema de archivos HDFS en forma de bloques distribuidos replicados de cierto tamaño (generalmente 128 MB, pero es posible configurarlo). Los algoritmos de procesamiento de datos distribuidos más eficientes intentan minimizar la interacción de red entre máquinas de clúster. Para hacer esto, los datos deben procesarse en las mismas máquinas donde se almacenan.
Por supuesto, en muchos casos, la interacción de la red no se puede evitar por completo, pero, sin embargo, debe intentar realizar todas las tareas localmente y minimizar la cantidad de datos que deberán transmitirse a través de la red.
Este principio de procesamiento de datos distribuidos se denomina "mover cálculos cerca de los datos". Todos los marcos principales, principalmente Hadoop MapReduce y Apache Spark, se adhieren a este principio. Determinan la composición y secuencia de operaciones específicas que deberán ejecutarse en máquinas donde se almacenan los bloques de datos necesarios.
Figura 1. El clúster HDFS consta de varias máquinas, una de las cuales es un Nodo de nombre y el resto es un Nodo de datos. Name Node almacena información sobre los archivos que componen sus bloques y sobre las máquinas donde se encuentran físicamente. Los bloques mismos se almacenan en el nodo de datos, que se replican en varias máquinas para aumentar la confiabilidad. El nodo de datos también ejecuta tareas de procesamiento de datos. Las tareas consisten en el proceso principal (Master, M), que coordina el inicio de los procesos de trabajo (Worker, W) en las máquinas donde se almacenan los bloques de datos necesarios.Casi todos los componentes del ecosistema de Hadoop se lanzan utilizando la máquina virtual Java (JVM) y están estrechamente integrados entre sí. Por ejemplo, para ejecutar tareas escritas usando Apache Spark para trabajar con datos almacenados en HDFS, casi no se requieren manipulaciones adicionales: el marco proporciona esta funcionalidad de forma inmediata.
Desafortunadamente, la mayoría de las bibliotecas diseñadas para el aprendizaje automático asumen que los datos se almacenan y procesan localmente. Al mismo tiempo, hay bibliotecas que están estrechamente integradas con el ecosistema Hadoop, por ejemplo, Spark ML o Apache Mahout. Sin embargo, tienen una serie de inconvenientes importantes. Primero, proporcionan muchas menos implementaciones de algoritmos de aprendizaje automático. En segundo lugar, no todos los analistas de datos pueden trabajar con ellos. Las ventajas de estas bibliotecas incluyen el hecho de que se pueden usar para entrenar modelos en grandes volúmenes de datos utilizando computación distribuida.
Sin embargo, los analistas de datos a menudo usan métodos alternativos para entrenar modelos, en particular bibliotecas que permiten el uso de GPU. No consideraré los problemas de los modelos de capacitación en este artículo, porque quiero centrarme en el uso de modelos listos para usar creados con cualquier biblioteca de aprendizaje automático disponible para clasificar grandes cantidades de datos.
Entonces, la tarea principal que estamos tratando de resolver aquí es aplicar modelos de aprendizaje automático a grandes cantidades de datos almacenados en HDFS. Si pudiéramos usar el módulo SparkML de la biblioteca Apache Spark, que implementa los algoritmos básicos de aprendizaje automático, clasificar grandes cantidades de datos sería una tarea trivial:
val model: LogisticRegressionModel = LogisticRegressionModel.load("/path/to/model") val dataset = spark.read.parquet("/path/to/data") val result = model.transform(dataset)
Desafortunadamente, este enfoque solo funciona para algoritmos implementados en el módulo SparkML (puede encontrar una lista completa
aquí ). En el caso de usar otras bibliotecas, además, implementadas no en la JVM, todo se vuelve mucho más complicado.
Para resolver este problema, decidimos incluir el modelo en un servicio REST. En consecuencia, al comenzar la tarea de clasificar los datos almacenados en HDFS, es necesario organizar la interacción entre las máquinas en las que se almacenan los datos y la máquina (o grupo de máquinas) en la que se ejecuta el servicio de clasificación.
Figura 2. El concepto de modelo como servicioDescripción del servicio de clasificación de Python
Para presentar el modelo como un servicio, es necesario resolver las siguientes tareas:
- implementar un acceso eficiente al modelo a través de HTTP;
- asegurar el uso más eficiente de los recursos de la máquina (principalmente todos los núcleos de procesador y memoria);
- proporcionar resistencia a altas cargas;
- Proporcionar la capacidad de escalar horizontalmente.
El acceso al modelo a través de HTTP es bastante simple de implementar: se ha desarrollado una gran cantidad de bibliotecas para Python que le permiten implementar un punto de acceso REST utilizando una pequeña cantidad de código. Uno de estos microframes es
Flask . La implementación del servicio de clasificación en Flask es la siguiente:
from flask import Flask, request, Response model = load_model() n_features = 100 app = Flask(__name__) @app.route("/score", methods=['PUT']) def score(): inp = np.frombuffer(request.data, dtype='float32').reshape(-1, n_features) result = model.predict(inp) return Response(result.tobytes(), mimetype='application/octet-stream') if __name__ == "__main__": app.run()
Aquí, cuando se inicia el servicio, cargamos el modelo en la memoria y luego lo usamos cuando llamamos al método de clasificación. La función load_model carga el modelo desde alguna fuente externa, ya sea el sistema de archivos, el almacenamiento de valores clave, etc.
Un modelo es un objeto que tiene un método de predicción. En el caso de la clasificación, toma una entrada a algún vector de características de cierto tamaño y produce un valor booleano que indica si el vector especificado es adecuado para este modelo, o algún valor de 0 a 1, al que luego puede aplicar el umbral de corte: todo por encima del umbral, es un resultado positivo de la clasificación, el resto no lo es.
El vector de características que necesitamos clasificar se pasa en forma binaria y se deserializa en una matriz numpy. Sería una sobrecarga hacer una solicitud HTTP para cada vector. Por ejemplo, en el caso de un vector de 100 dimensiones y el uso de valores de tipo float32, una solicitud HTTP completa, incluidos los encabezados, se vería así:
PUT /score HTTP/1.1 Host: score-node-1:8099 User-Agent: curl/7.58.0 Accept: */* Content-Type: application/binary Content-Length: 400 [400 bytes of data]
Como puede ver, la eficiencia de dicha solicitud es muy baja (400 bytes de carga útil / (encabezado de 133 bytes + cuerpo de 400 bytes) = 75%). Afortunadamente, en casi todas las bibliotecas, el método de predicción le permite recibir no el vector [1 xn], sino la matriz [mxn] y, en consecuencia, generar el resultado inmediatamente para m valores de entrada.
Además, la biblioteca numpy está optimizada para trabajar con matrices grandes, lo que le permite utilizar de manera efectiva todos los recursos disponibles de la máquina. Por lo tanto, podemos enviar no uno sino un número bastante grande de vectores de características en una solicitud, deserializarlos en una matriz numpy de tamaño [mxn], clasificar y devolver el vector [mx 1] desde valores booleanos o float32. Como resultado, la eficiencia de la interacción HTTP cuando se usa una matriz de 1000 filas se vuelve casi igual al 100%. El tamaño de los encabezados HTTP en este caso puede ser descuidado.
Para probar el servicio Flask en la máquina local, puede ejecutarlo desde la línea de comandos. Sin embargo, este método es completamente inadecuado para uso industrial. El hecho es que Flask tiene un solo subproceso y, si observamos el diagrama de carga del procesador mientras el servicio está en ejecución, veremos que un núcleo está cargado al 100% y el resto está inactivo. Afortunadamente, hay formas de usar todos los núcleos de la máquina: para esto, Flask debe ejecutarse a través del servidor de aplicaciones web uwsgi. Le permite configurar de manera óptima la cantidad de procesos y subprocesos para garantizar una carga uniforme en todos los núcleos de procesador. Puede encontrar más detalles sobre todas las opciones para configurar uwsgi
aquí .
Es mejor usar nginx como un punto de entrada HTTP, ya que uwsgi puede funcionar de manera inestable en caso de altas cargas. Nginx, por otro lado, toma todo el flujo de entrada de solicitudes en sí mismo, filtra las solicitudes no válidas y dosifica la carga en uwsgi. Nginx se comunica con uwsgi a través de sockets linux usando un archivo de proceso. A continuación se muestra un ejemplo de configuración nginx:
server { listen 80; server_name 127.0.0.1; location / { try_files $uri @score; } location @score { include uwsgi_params; uwsgi_pass unix:/tmp/score.sock; } }
Como podemos ver, resultó ser una configuración bastante complicada para una máquina. Si necesitamos clasificar grandes cantidades de datos, un gran número de solicitudes llegará a este servicio y puede convertirse en un cuello de botella. La solución a este problema es la escala horizontal.
Para mayor comodidad, empaquetamos el servicio en un contenedor Docker y luego lo implementamos en la cantidad requerida de máquinas. Si lo desea, puede usar herramientas de implementación automatizadas como Kubernetes. A continuación se muestra un ejemplo de estructura Dockerfile para crear un contenedor con un servicio.
FROM ubuntu #Installing required ubuntu and python modules RUN apt-get update RUN apt-get -y install python3 python3-pip nginx RUN update-alternatives
Entonces, la estructura del servicio de clasificación es la siguiente:
Figura 3. Esquema de servicio para clasificaciónUn breve resumen del trabajo de Apache Spark en el ecosistema Hadoop
Ahora considere el proceso de procesamiento de datos almacenados en HDFS. Como ya señalé, para esto se usa el principio de transferir cálculos a datos. Para comenzar a procesar tareas, debe saber en qué máquinas se almacenan los bloques de datos que necesitamos para ejecutar procesos directamente involucrados en su procesamiento. También es necesario coordinar el lanzamiento de estos procesos, reiniciarlos en caso de emergencia, si es necesario, agregar los resultados de varias subtareas, etc.
Todas estas tareas se llevan a cabo mediante una variedad de marcos que trabajan con el ecosistema de Hadoop. Uno de los más populares y convenientes es Apache Spark. El concepto principal en torno al cual se construye todo el marco es RDD (conjunto de datos distribuidos resilientes). En general, RDD puede considerarse como una colección distribuida que es resistente a caídas. RDD se puede obtener de dos maneras principales:
- creación desde una fuente externa, como una colección en memoria, un archivo o directorio en el sistema de archivos, etc.
- conversión de otro RDD mediante la aplicación de operaciones de transformación. RDD admite todas las operaciones básicas de trabajar con colecciones, como map, flatMap, filter, groupBy, join, etc.
Es importante comprender que RDD, a diferencia de las colecciones, no son datos directamente, sino una secuencia de operaciones que deben realizarse en los datos. Por lo tanto, cuando se llaman las operaciones de transformación, en realidad no ocurre ningún trabajo, y solo obtenemos un nuevo RDD, que contendrá una operación más que en la anterior. El trabajo en sí comienza cuando se llaman las llamadas operaciones de terminal, o acciones. Estos incluyen guardar en un archivo, guardar en una colección en la memoria, contar el número de elementos, etc.
Al iniciar una operación de terminal, Spark crea un gráfico de operación acíclico (DAG, gráfico acíclico dirigido) basado en el RDD resultante y los ejecuta secuencialmente en el clúster de acuerdo con el gráfico recibido. Al construir un DAG basado en RDD, Spark realiza una serie de optimizaciones, por ejemplo, si es posible, combina varias transformaciones sucesivas en una sola operación.
RDD fue la unidad principal de interacción con la API de Spark en las versiones de Spark 1.x. En Spark 2.x, los desarrolladores dijeron que ahora el concepto principal para la interacción es Dataset. Dataset es un complemento para RDD con soporte para interacción similar a SQL. Al usar la API de conjunto de datos, Spark le permite usar una amplia gama de optimizaciones, incluidas las de nivel bastante bajo. Pero, en general, los principios básicos que se aplican a los RDD también se aplican al conjunto de datos.
Se pueden encontrar más detalles sobre el trabajo de Spark en la
documentación en el sitio web oficial .
Consideremos un ejemplo de la clasificación más simple en Spark sin usar servicios externos. Aquí se implementa un algoritmo bastante sin sentido, que considera la proporción de cada una de las letras latinas en el texto, y luego considera la desviación estándar. Aquí, en primer lugar, es importante prestar atención directamente a los pasos básicos que se utilizan al trabajar con Spark.
case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) def std(vector: Array[Float]): Float = ???
En este ejemplo, nosotros:
- determinamos la estructura de los datos de entrada, intermedios y de salida (los datos de entrada se definen como texto con el que se asocia un determinado identificador, los datos intermedios coinciden con el identificador con el vector de características y la salida coincide con el identificador con algún valor numérico);
- definimos una función para calcular el valor resultante por un vector de características (por ejemplo, desviación estándar, implementación no mostrada);
- defina el conjunto de datos original como datos almacenados en HDFS en formato de parquet a lo largo de la ruta / ruta / a / datos;
- Defina un conjunto de datos intermedio como un mapa de mapa de bits del conjunto de datos original.
- Del mismo modo, determinamos el conjunto de datos resultante a través de una transformación bit a bit desde el intermedio;
- guarde el conjunto de datos resultante en HDFS en formato de parquet a lo largo de la ruta / ruta / a / resultado. Dado que guardar en un archivo es una operación de terminal, los cálculos mismos se inician precisamente en esta etapa.
Apache Spark funciona según el principio de maestro-trabajador. Cuando se inicia la aplicación, se inicia el proceso principal, llamado controlador. Ejecuta el código responsable de la formación del RDD, en base al cual se realizarán los cálculos.
Cuando se llama a una operación de terminal, el controlador genera un DAG basado en el RDD resultante. Luego, el controlador inicia el lanzamiento de flujos de trabajo llamados ejecutores, en los que los datos se procesarán directamente. Después de iniciar los flujos de trabajo, el controlador les pasa el bloque ejecutable que debe ejecutarse y también indica a qué parte de los datos debe aplicarse.
A continuación se muestra el código de nuestro ejemplo, en el que se resaltan las secciones de código que se ejecutan en el ejecutor (entre las líneas parte del ejecutor comienzan y finalizan la parte del ejecutor). El resto del código se ejecuta en el controlador.
case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) def std(vector: Array[Float]): Float = ??? val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] val result: Dataset[Score] = ds.map {
En el ecosistema de Hadoop, todas las aplicaciones se ejecutan en contenedores. Un contenedor es un proceso que se ejecuta en una de las máquinas en un clúster al que se le asigna una cierta cantidad de recursos. El lanzamiento del contenedor es manejado por el Administrador de recursos de YARN. Determina cuál de las máquinas tiene una cantidad suficiente de núcleos de procesador y RAM, así como si contiene los bloques de datos necesarios para el procesamiento.
Al iniciar la aplicación Spark, YARN crea y ejecuta el contenedor en una de las máquinas de clúster en las que inicia el controlador. Luego, cuando el conductor prepara el DAG de las operaciones que deben ejecutarse en los ejecutores, YARN lanza contenedores adicionales en las máquinas deseadas.
Como regla, es suficiente que el controlador asigne un núcleo y una pequeña cantidad de memoria (a menos que, por supuesto, el resultado del cálculo no se agregue al controlador en la memoria). Para los ejecutores, para optimizar los recursos y reducir el número total de procesos en el sistema, se puede distinguir más de un núcleo: en este caso, el ejecutor podrá realizar varias tareas simultáneamente.
Pero aquí es importante comprender que en caso de que falle una de las tareas que se ejecutan en el contenedor o en caso de recursos insuficientes, YARN puede decidir detener el contenedor, y luego todas las tareas que se ejecutaron en él deberán reiniciarse nuevamente en otro artista. Además, si asignamos un número suficientemente grande de núcleos por contenedor, entonces es probable que YARN no pueda iniciarlo. Por ejemplo, si tenemos dos máquinas en las que no se usan dos núcleos, entonces podemos comenzar en cada contenedor que requiera dos núcleos, pero no podemos iniciar un contenedor que requiera cuatro núcleos.
Ahora veamos cómo se ejecutará el código de nuestro ejemplo directamente en el clúster. Imagine que el tamaño de los datos de origen es de 2 terabytes. En consecuencia, si el tamaño de bloque en HDFS es de 128 megabytes, entonces habrá 16384 bloques en total. Cada bloque se replica en varias máquinas para garantizar la fiabilidad. Para simplificar, tomamos el factor de replicación igual a dos, es decir, habrá 32768 bloques disponibles en total. Supongamos que usamos un grupo de 16 máquinas para el almacenamiento. En consecuencia, en cada una de las máquinas en caso de distribución uniforme habrá aproximadamente 2048 bloques, o 256 Gigabytes por máquina. En cada una de las máquinas, tenemos 8 núcleos de procesador y 64 gigabytes de RAM.
Para nuestra tarea, el controlador no necesita muchos recursos, por lo que le asignaremos 1 núcleo y 1 GB de memoria. Les daremos a los artistas 2 núcleos y 4 GB de memoria. Supongamos que queremos maximizar el uso de los recursos del clúster. Por lo tanto, obtenemos 64 contenedores: uno para el conductor y 63 para los artistas.
Figura 4. Procesos que se ejecutan en el nodo de datos y los recursos que utilizan.Como en nuestro caso usamos solo operaciones de mapas, nuestro DAG consistirá en una operación. Consiste en las siguientes acciones:
- tomar un bloque de datos del disco duro local,
- Convertir datos
- guarde el resultado en un nuevo bloque en su propio disco local.
En total, necesitamos procesar 16384 bloques, por lo que cada ejecutor debe realizar 16384 / (63 ejecutores * 2 núcleos) = 130 operaciones. Por lo tanto, el ciclo de vida del ejecutor como un proceso separado (en caso de que todo ocurra sin caídas) se verá de la siguiente manera.
- Lanzamiento de contenedores.
- Recibir del controlador una tarea en la que habrá un identificador de bloque y la operación necesaria. Como asignamos dos núcleos al contenedor, el ejecutor recibe dos tareas a la vez.
- Realizar una tarea y enviar el resultado al controlador.
- Obtener la siguiente tarea del controlador y repetir los pasos 2 y 3 hasta que se procesen todos los bloques para esta máquina local.
- Parada de contenedores
Nota : se obtienen DAG más complejos si es necesario redistribuir datos intermedios entre máquinas, generalmente para operaciones de agrupación (groupBy, reduceByKey, etc.) y conexiones (join), cuya consideración está más allá del alcance de este artículo.
Los principales problemas de interacción entre Apache Spark y los servicios externos.
Si, dentro del marco de la operación del mapa, necesitamos acceder a algún servicio externo, la tarea se vuelve menos trivial. Suponga que un objeto de la clase ExternalServiceClient es responsable de interactuar con un servicio externo. En general, antes de comenzar a trabajar, debemos inicializarlo y luego llamarlo según sea necesario:
val client = ExternalServiceClient.create()
Por lo general, la inicialización del cliente lleva algún tiempo, por lo tanto, como regla general, se inicializa al inicio de la aplicación y luego se usa para obtener una instancia del cliente de algún contexto o grupo global. Por lo tanto, cuando un contenedor con el ejecutor Spark recibe una tarea que requiere interacción con un servicio externo, sería bueno obtener un cliente ya inicializado antes de comenzar a trabajar en la matriz de datos y luego reutilizarlo para cada elemento.
Hay dos formas de hacer esto en Spark. Primero, si el cliente es serializable (el cliente mismo y todos sus campos deben extender la interfaz java.io.Serializable), entonces se puede inicializar en el controlador y luego
pasar a los ejecutores a través del mecanismo de difusión variable .
val client = ExternalServiceClient.create() val clientBroadcast = sparkContext.broadcast(client) ds.map { f: Features => val score = clientBroadcast.value.score(f.vector) Score(f.id, score) }
En el caso de que el cliente no sea serializable, o la inicialización del cliente es un proceso que depende de la configuración de la máquina en particular en la que se está ejecutando (por ejemplo, para equilibrar, las solicitudes de una parte de las máquinas deben ir a la primera máquina de servicio, y de la otra a la segunda), entonces el cliente puede inicializarse directamente en el ejecutor.
Para esto, RDD (y Dataset) tiene una operación mapPartitions, que es una versión generalizada de la operación de mapa (si observa el código fuente de la clase RDD, la operación de mapa se implementa a través de mapPartitions). La función pasada a la operación mapPartitions se ejecuta una vez para cada bloque. , , , :
ds.mapPartitions {fi: Iterator[Features] => val client = ExternalServiceClient.create() fi.map { f: Features => val score = client.score(f.vector) Score(f.id, score) } }
. , , , , . , , , .
. , hasNext next:
while (i.hasNext()) { val item = i.next() … }
, , . , 8 , YARN 4 2 , , 8 . , . .
. , , , , . : , , . , hasNext , . (, , ) , , , . ,
.
5. , , mapPartitions, . ., , . , , , .
6., , , -, , , , -, , .
, . , . , . , . , , , , , , .
.
- , , , .
- , , . , . , .
- , hasNext false, , , , . : hasNext = false, , , . , , , .
,
. Stay tuned!