使用任意机器学习模型在Apache Spark上对大量数据进行分类

第1部分:问题陈述


哈Ha! 我是CleverDATA的解决方案架构师。 今天,我将讨论如何使用几乎使用任何可用的机器学习库构建的模型对大量数据进行分类。 在这个分为两部分的系列中,我们将考虑以下问题。

  • 如何呈现机器学习模型即服务(Model as a Service)?
  • 使用Apache Spark物理上执行分布式处理大量数据的任务吗?
  • Apache Spark与外部服务交互时会出现什么问题?
  • 如何使用akka-streams和akka-http库以及Reactive Streams方法来组织Apache Spark与外部服务的交互?

最初,我计划写一篇文章,但是由于材料量很大,我决定将其分为两部分。 今天,在第一部分中,我们将考虑问题的一般说明以及在实施过程中需要解决的主要问题。 在第二部分中,我们将讨论使用Reactive Streams方法解决该问题的方法的实际实现。


我们的公司CleverDATA拥有一支数据分析人员团队,他们借助各种各样的工具(例如scikit-learn,facebook fastText,xgboost,tensorFlow等)来训练机器学习模型。 分析人员使用的事实上的核心编程语言是Python。 几乎所有用于机器学习的库,甚至最初都是用其他语言实现的,都具有Python界面,并且与主要的Python库(主要是NumPy)集成在一起。

另一方面,Hadoop生态系统被广泛用于存储和处理大量非结构化数据。 在其中,数据以一定大小(通常为128 MB,但可以配置)的分布式复制块的形式存储在HDFS文件系统上。 最有效的分布式数据处理算法试图最小化群集计算机之间的网络交互。 为此,必须在存储数据的同一台计算机上处​​理数据。

当然,在许多情况下,无法完全避免网络交互,但是,您仍然需要尝试在本地执行所有任务,并最大程度地减少需要通过网络传输的数据量。

处理分布式数据的这一原理称为“使计算靠近数据”。 所有主要框架(主要是Hadoop MapReduce和Apache Spark)都遵守该原则。 它们确定需要在存储所需数据块的计算机上运行的特定操作的组成和顺序。

图1. HDFS集群由几台计算机组成,其中一台是名称节点,其余是数据节点。 名称节点存储有关组成文件块的文件以及物理上位于它们的机器的信息。 块本身存储在数据节点上,然后将其复制到多台机器上以提高可靠性。 数据节点还运行数据处理任务。 任务由主流程(Master,M)组成,该主流程协调存储必要数据块的机器上工作流程(Worker,W)的启动。

Hadoop生态系统的几乎所有组件都是使用Java虚拟机(JVM)启动的,并且彼此紧密集成。 例如,要运行使用Apache Spark编写的任务来处理HDFS上存储的数据,几乎不需要进行其他操作:该框架提供了开箱即用的功能。

不幸的是,为机器学习而设计的大量库都假定数据是在本地存储和处理的。 同时,存在与Hadoop生态系统紧密集成的库,例如Spark ML或Apache Mahout。 然而,它们具有许多明显的缺点。 首先,它们提供的机器学习算法的实现要少得多。 其次,并非所有的数据分析师都可以与他们合作。 这些库的优点包括以下事实:它们可以用于使用分布式计算在大量数据上训练模型。

但是,数据分析人员经常使用替代方法来训练模型,尤其是能够使用GPU的库。 在本文中,我将不考虑培训模型的问题,因为我想着重于使用通过任何可用的机器学习库构建的现成模型来对大量数据进行分类。

因此,我们在此要解决的主要任务是将机器学习模型应用于HDFS上存储的大量数据。 如果我们可以使用Apache Spark库中的SparkML模块,该模块实现了基本的机器学习算法,那么对大量数据进行分类将是一项微不足道的任务:

val model: LogisticRegressionModel = LogisticRegressionModel.load("/path/to/model") val dataset = spark.read.parquet("/path/to/data") val result = model.transform(dataset) 

不幸的是,这种方法仅适用于SparkML模块中实现的算法(可以在此处找到完整列表)。 此外,在使用其他库而不是在JVM上实现的情况下,一切都变得更加复杂。

为了解决此问题,我们决定将模型包装在REST服务中。 因此,当开始对存储在HDFS上的数据进行分类的任务时,有必要组织存储数据的机器与运行分类服务的机器(或机器集群)之间的交互。

图2.模型即服务的概念

Python分类服务说明


为了将模型作为服务呈现,有必要解决以下任务:

  1. 通过HTTP实现对模型的有效访问;
  2. 确保最有效地使用机器资源(主要是所有处理器核心和内存);
  3. 提供对高负载的抵抗力;
  4. 提供水平缩放的能力。

通过HTTP访问模型非常简单:已经为Python开发了许多库,这些库允许您使用少量代码来实现REST访问点。 这些微框架之一是Flask 。 Flask上的分类服务的实现如下:

 from flask import Flask, request, Response model = load_model() n_features = 100 app = Flask(__name__) @app.route("/score", methods=['PUT']) def score(): inp = np.frombuffer(request.data, dtype='float32').reshape(-1, n_features) result = model.predict(inp) return Response(result.tobytes(), mimetype='application/octet-stream') if __name__ == "__main__": app.run() 

在这里,当服务启动时,我们将模型加载到内存中,然后在调用分类方法时使用它。 load_model函数从某个外部源(例如文件系统,键值存储等)加载模型。

模型是具有预测方法的对象。 在分类的情况下,它会输入一定大小的某个特征向量,并产生一个布尔值(指示指定的向量是否适用于此模型),或者生成一个介于0到1之间的值,然后您可以将截止阈值应用于该阈值:高于阈值的所有值,是分类的积极结果,其余则不是。

我们需要分类的特征向量以二进制形式传递并反序列化为numpy数组。 为每个向量发出一个HTTP请求将是开销。 例如,对于一个100维向量,并使用float32类型的值,一个完整的HTTP请求(包括标头)将如下所示:

 PUT /score HTTP/1.1 Host: score-node-1:8099 User-Agent: curl/7.58.0 Accept: */* Content-Type: application/binary Content-Length: 400 [400 bytes of data] 

如您所见,这种请求的效率非常低(有效载荷400字节/(133字节头+ 400字节正文)= 75%)。 幸运的是,在几乎所有库中,预测方法都可以让您接收[mxn]矩阵,而不是[1 xn]向量,并因此立即输出m个输入值的结果。

此外,numpy库针对使用大型矩阵进行了优化,从而使您可以有效地使用所有可用的计算机资源。 因此,我们不能在一个请求中发送一个,而是发送大量特征向量,将它们反序列化为大小[mxn]的numpy矩阵,进行分类,然后从布尔值或float32值返回向量[mx 1]。 结果,使用1000行矩阵时HTTP交互的效率几乎等于100%。 在这种情况下,HTTP标头的大小可以忽略。

要在本地计算机上测试Flask服务,可以从命令行运行它。 但是,这种方法完全不适合工业使用。 事实是Flask是单线程的,如果在服务运行时查看处理器负载图,我们将看到一个内核100%加载,其余内核处于非活动状态。 幸运的是,有多种方法可以使用计算机的所有内核:为此,Flask需要通过uwsgi Web应用程序服务器运行。 它使您可以最佳地配置进程和线程数,以确保所有处理器核心上的负载均匀。 可在此处找到有关配置uwsgi的所有选项的更多详细信息。

最好使用nginx作为HTTP入口点,因为uwsgi在高负载的情况下会不稳定地工作。 另一方面,Nginx将请求的整个输入流带到自身上,过滤掉无效的请求,并分配uwsgi的负载。 Nginx使用进程文件通过linux套接字与uwsgi通信。 下面显示了一个示例nginx配置:

 server { listen 80; server_name 127.0.0.1; location / { try_files $uri @score; } location @score { include uwsgi_params; uwsgi_pass unix:/tmp/score.sock; } } 

正如我们所看到的,事实证明,对于一台机器来说,这是一个相当复杂的配置。 如果我们需要对大量数据进行分类,则此服务会收到大量请求,这可能会成为瓶颈。 解决此问题的方法是水平缩放。

为了方便起见,我们将服务打包在Docker容器中,然后将其部署在所需数量的计算机上。 如果需要,您可以使用自动部署工具,例如Kubernetes。 下面给出了一个用于创建带有服务的容器的示例Dockerfile结构。

 FROM ubuntu #Installing required ubuntu and python modules RUN apt-get update RUN apt-get -y install python3 python3-pip nginx RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 RUN update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 RUN pip install uwsgi flask scipy scikit-learn #copying script files WORKDIR /etc/score COPY score.py . COPY score.ini . COPY start.sh . RUN chmod +x start.sh RUN rm /etc/nginx/sites-enabled/default COPY score.nginx /etc/nginx/sites-enabled/ EXPOSE 80 ENTRYPOINT ["./start.sh"] 

因此,分类服务的结构如下:

图3.分类服务方案

Hadoop生态系统中Apache Spark的工作摘要


现在考虑处理存储在HDFS上的数据的过程。 如前所述,为此使用了将计算转移到数据的原理。 要开始处理任务,您需要知道我们需要的数据块存储在哪台机器上,以便运行直接处理它们的过程。 还需要协调这些进程的启动,在紧急情况下重新启动它们,必要时汇总各种子任务的结果等。

所有这些任务都是通过与Hadoop生态系统一起使用的各种框架来完成的。 最受欢迎和便捷的工具之一是Apache Spark。 构建整个框架所基于的主要概念是RDD(弹性分布式数据集)。 通常,RDD可以看作是耐摔的分布式集合。 RDD可以通过两种主要方式获得:

  1. 从外部源创建,例如内存中的集合,文件系统上的文件或目录等;
  2. 通过应用转换操作从另一个RDD转换。 RDD支持使用集合的所有基本操作,例如map,flatMap,filter,groupBy,join等。

重要的是要理解,RDD与集合不同,它不是直接数据,而是必须对数据执行的一系列操作。 因此,当调用转换操作时,实际上没有任何工作发生,我们只得到了一个新的RDD,它将比上一个包含更多的操作。 当所谓的终端操作或动作被调用时,工作本身开始。 其中包括保存到文件,保存到内存中的集合,计数元素数等。

当启动终端操作时,Spark根据生成的RDD构建一个非循环操作图(DAG,有向非循环图),并根据收到的图在群集上顺序运行它们。 在基于RDD构建DAG时,Spark会执行许多优化,例如,如有可能,将几个连续的转换合并为一个操作。

RDD是与Spark 1.x版本中的Spark API交互的主要单元。 在Spark 2.x中,开发人员表示,现在交互的主要概念是数据集。 数据集是RDD的附加组件,支持类似SQL的交互。 使用数据集API时,Spark允许您使用各种优化,包括相当低级的优化。 但是总的来说,适用于RDD的基本原理也适用于数据集。

有关Spark工作的更多详细信息,请参见官方网站上的文档

让我们考虑一个不使用外部服务的Spark最简单分类的示例。 此处实现了一种相当无意义的算法,该算法考虑了文本中每个拉丁字母的比例,然后考虑了标准偏差。 首先,重要的是直接注意使用Spark时使用的基本步骤。

 case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) def std(vector: Array[Float]): Float = ??? //(2) val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] //(3) val result: Dataset[Score] = ds.map {d: Data => //(4) val filteredText = d.text.toLowerCase.filter { letter => 'a' <= letter && letter <= 'z' } val featureVector = new Array[Float](26) if (filteredText.nonEmpty) { filteredText.foreach(letter => featureVector(letter) += 1) featureVector.indicies.foreach { i => featureVector(i) = featureVector(i) / filteredText.length() } } Features(d.id, featureVector) }.map {f: Features => Score(f.id, std(f.vector)) //(5) } result.write.parquet("/path/to/result") //(6) 

在此示例中,我们:

  1. 我们确定输入,中间和输出数据的结构(输入数据定义为与某些标识符相关联的某些文本,中间数据将标识符与特征向量匹配,输出将标识符与某些数值匹配);
  2. 我们定义了一个用于通过特征向量计算结果值的函数(例如,标准差,实现未显示);
  3. 将原始数据集定义为沿路径/路径/到/数据以拼花格式存储在HDFS上的数据;
  4. 定义一个中间数据集作为原始数据集的位图。
  5. 同样,我们通过从中间对象按位转换来确定结果数据集;
  6. 将结果数据集沿路径/ path /到/结果以拼合格式保存到HDFS。 由于保存到文件是一项终端操作,因此计算本身会在此阶段精确启动。

Apache Spark遵循熟练工人的原则。 当应用程序启动时,称为驱动程序的主进程启动。 它执行负责形成RDD的代码,在此基础上将进行计算。

调用终端操作时,驱动程序将根据生成的RDD生成DAG。 然后,驱动程序启动称为执行程序的工作流的启动,其中将直接处理数据。 在启动工作流之后,驱动程序将需要执行的可执行块传递给他们,并且还指示需要将其应用于数据的哪一部分。

下面是我们示例的代码,其中突出显示了在执行程序上执行的代码部分(在执行程序部分的开始和执行者部分的结束之间)。 其余代码在驱动程序上执行。

 case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) def std(vector: Array[Float]): Float = ??? val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] val result: Dataset[Score] = ds.map { // --------------- EXECUTOR PART BEGIN ----------------------- d: Data => val filteredText = d.text.toLowerCase.filter { letter => 'a' <= letter && letter <= 'z' } val featureVector = new Array[Float](26) if (filteredText.nonEmpty) { filteredText.foreach(letter => featureVector(letter) += 1) featureVector.indicies.foreach { i => featureVector(i) = featureVector(i) / filteredText.length() } } Features(d.id, featureVector) // --------------- EXECUTOR PART END ----------------------- }.map { // --------------- EXECUTOR PART BEGIN ----------------------- f: Features => Score(f.id, std(f.vector)) // --------------- EXECUTOR PART END ----------------------- } result.write.parquet(“/path/to/result”) 

在Hadoop生态系统中,所有应用程序都在容器中运行。 容器是在群集中的一台机器上运行的进程,该进程被分配了一定数量的资源。 容器的启动由YARN资源管理器处理。 它确定哪台机器具有足够数量的处理器内核和RAM,以及它是否包含用于处理的必要数据块。

启动Spark应用程序时,YARN会在其中启动驱动程序的一台群集计算机上创建并运行该容器。 然后,当驱动程序根据需要在执行程序上运行的操作准备DAG时,YARN将在所需的计算机上启动其他容器。

通常,驱动程序分配一个内核和少量内存就足够了(当然,除非计算结果不会在驱动程序上聚合到内存中)。 对于执行者,为了优化资源并减少系统中的进程总数,可以区分多个核心:在这种情况下,执行者将能够同时执行多个任务。

但是这里重要的是要了解,如果容器中运行的一项任务失败或资源不足,YARN可能会决定停止容器,然后在其中执行的所有任务都必须在另一个执行器上重新启动。 另外,如果我们为每个容器分配足够多的核心,那么YARN很可能将无法启动它。 例如,如果我们有两台未使用两个内核的机器,那么我们可以在每个需要两个内核的容器上启动,但是不能启动一个需要四个内核的容器。

现在,让我们看看示例中的代码将如何直接在集群上执行。 想象一下,源数据的大小为2 TB。 因此,如果HDFS上的块大小为128兆字节,则总共将有16384个块。 每个块都复制到多台机器上,以确保可靠性。 为简单起见,我们将复制因子设为2,即总共有32768个可用块。 假设我们使用一个由16台计算机组成的集群进行存储。 因此,在均匀分布的情况下,在每台机器上,大约有2048个块,即每台机器256 GB。 在每台计算机上,我们都有8个处理器内核和64 GB的RAM。

对于我们的任务,驱动程序不需要很多资源,因此我们将为其分配1个内核和1 GB内存。 我们将为表演者提供2个内核和4 GB的内存。 假设我们要最大程度地利用群集资源。 这样,我们得到了64个容器:一个用于驱动程序,一个用于表演人员63。

图4.在数据节点上运行的进程及其使用的资源。

由于在本例中,我们仅使用地图操作,因此DAG将包含一个操作。 它包含以下操作:

  1. 从本地硬盘中获取一块数据,
  2. 转换资料
  3. 将结果保存到您自己的本地磁盘上的新块中。

总共需要处理16384个块,因此每个执行程序必须执行16384 /(63个执行程序* 2个内核)= 130个操作。 因此,执行程序作为一个单独的过程的生命周期(万一发生而不会跌倒)将如下所示。

  1. 容器启动。
  2. 从驱动程序接收一个任务,其中将有一个块标识符和必要的操作。 由于我们为容器分配了两个核心,因此执行程序可以一次接收两个任务。
  3. 执行任务并将结果发送给驱动程序。
  4. 从驱动程序获取下一个任务,并重复步骤2和3,直到处理完该本地计算机的所有块。
  5. 集装箱停靠站

注意 :如果有必要在机器之间重新分配中间数据,通常是为了进行分组操作(groupBy,reduceByKey等)和连接(联接),则需要获得更复杂的DAG,其考虑不在本文的讨论范围之内。

Apache Spark与外部服务之间交互的主要问题


如果在map操作的框架内,我们需要访问某些外部服务,那么任务就变得不那么琐碎了。 假设ExternalServiceClient类的一个对象负责与外部服务进行交互。 通常,在开始工作之前,我们需要对其进行初始化,然后根据需要调用它:

 val client = ExternalServiceClient.create() // val score = client.score(featureVector) // . 

通常,客户端初始化需要一些时间,因此,通常,它在应用程序启动时进行初始化,然后用于从某些全局上下文或池中获取客户端实例。 因此,当具有Spark执行器的容器收到需要与外部服务交互的任务时,最好在开始对数据数组进行工作之前获取一个已初始化的客户端,然后将其重新用于每个元素。

在Spark中有两种方法可以做到这一点。 首先,如果客户端是可序列化的(客户端本身及其所有字段必须扩展java.io.Serializable接口),则可以在驱动程序上对其进行初始化,然后通过广播变量机制将其传递给执行程序。

 val client = ExternalServiceClient.create() val clientBroadcast = sparkContext.broadcast(client) ds.map { f: Features => val score = clientBroadcast.value.score(f.vector) Score(f.id, score) } 

如果客户端不可序列化,或者客户端的初始化取决于其运行的特定计算机的设置(例如,为了平衡起见,来自一台计算机的请求必须发送到第一台服务计算机,而另一台则发送到第二台服务计算机),然后可以直接在执行程序上初始化客户端。

为此,RDD(和数据集)具有mapPartitions操作,它是map操作的通用版本(如果查看RDD类的源代码,则map操作是通过mapPartitions实现的)。 传递给mapPartitions操作的函数对每个块运行一次。 我们将从该块读取的数据的迭代器提供给该函数的输入,在输出处,它应为与输入块相对应的输出数据返回一个迭代器:

 ds.mapPartitions {fi: Iterator[Features] => val client = ExternalServiceClient.create() fi.map { f: Features => val score = client.score(f.vector) Score(f.id, score) } } 

在此代码中,为每个源数据块创建了一个外部服务的客户端。当然,这比每次创建一个客户端来处理每个元素都更好,并且在许多情况下,这是一个完全可以接受的解决方案。但是,我将进一步展示如何创建一个对象,该对象将在容器的开头初始化一次,然后用于启动该容器中的所有任务。

生成的迭代器的处理操作是单线程的。让我提醒您,访问迭代器类型结构的主要模式是对hasNext和next方法的顺序调用:

 while (i.hasNext()) { val item = i.next() … } 

如果我们为执行者分配了两个核心,那么它们将只有两个主要的工作流涉及数据处理。让我提醒您,如果一台机器上有8个内核,那么YARN将不允许它运行超过2个内核的执行程序的4个以上的进程,每台机器只有8个线程。对于本地计算,这是最佳选择,因为它将提供最大的计算能力负载,并且流量控制的开销最少。但是,在与外部服务交互的情况下,情况会发生变化。

使用外部服务时,最重要的问题之一是性能。实现此目的的最简单方法是使用同步客户端,在该客户端中,我们访问每个元素的服务,并在收到来自元素的响应后形成结果值。但是,此方法有一个重大缺陷:在同步交互期间,在与该服务交互的持续时间内,同步调用外部服务的线程被阻止。事实是,当调用hasNext方法时,我们希望得到关于是否还有更多要处理的元素的明确答案。在不确定的情况下(例如,当我们发送对外部服务的请求并且不知道它将返回空还是非空答案时),我们别无选择,只能等待答案,从而阻塞了调用此方法的线程。因此迭代器是阻塞的数据结构

图5.通过调用传递给mapPartitions的函数而获得的迭代器的按位处理发生在单个线程中。结果,我们无法充分利用资源。

您还记得,我们优化了分类服务,以便我们可以同时处理多个请求。因此,我们需要从原始迭代器中收集必要数量的请求,将其发送到服务,获得响应,然后将其发布给生成的迭代器。

图6.发送一组元素的分类请求时的同步交互

实际上,在这种情况下,性能不会好很多,因为,首先,在与外部服务交互时,我们被迫将主线程保持在阻塞状态,并且,其次,当我们处理结果时,外部服务处于空闲状态。

问题的最终陈述


因此,在使用外部服务时,必须解决同步访问的问题。理想情况下,将与外部服务的交互转移到单独的线程池将很方便。在这种情况下,对外部服务的请求将与先前请求的结果的处理同时执行,因此有可能更有效地使用机器的资源。对于线程之间的交互,可以使用阻塞队列,该队列将用作通信缓冲区。负责与外部服务进行交互的流会将数据放入队列中,而处理生成的迭代器的流将相应地从那里获取数据。

但是,这种异步处理带来了许多其他问题。

  1. , , , .
  2. , , . , . , .
  3. 为了使hasNext方法在生成的迭代器中返回false,您需要确保所有请求都已得到回答,并发出信号告知缓冲区中将没有更多数据。对于同步处理,这非常简单:如果在处理下一个响应之后,原始迭代器返回的hasNext = false,则相应地将不再有元素。在异步处理的情况下,特别是如果我们同时发送多个请求,则需要协调响应的接收,并且只有在接收到最后一个响应之后,才发送信号来完成处理。

关于我们如何有效解决这些问题,我将在下一部分中讲述敬请期待!

同时,查看我们公司的职位空缺,也许我们正在寻找您?

Source: https://habr.com/ru/post/zh-CN413137/


All Articles