第2部分:解决方案
你好! 今天,我将继续讲述我们如何使用任意机器学习模型在Apache Spark上对大量数据进行分类的故事。 在本文的
第一部分中,我们研究了问题本身的陈述以及组织初始数据在其上存储和处理的集群与外部分类服务之间的交互时出现的主要问题。 在第二部分中,我们将考虑使用Reactive Streams方法解决此问题的一种方法,以及使用akka-streams库实现的方法。
反应流概念
要解决第一部分中描述的问题,您可以使用称为
Reactive Streams的方法。 它使您可以控制在处理阶段之间传输数据流的过程,以不同的速度运行并且彼此独立,而无需缓冲。 如果其中一个处理阶段比上一个阶段慢,则有必要向更快的阶段发出信号,告知它目前准备处理多少输入数据。 这种相互作用称为背压。 它包含以下事实:较快的阶段处理的元素数量与较慢的阶段所需的元素数量一样多,而不是更多,然后释放了计算资源。
通常,反应式流是用于实现
发布者-订阅者模板的规范。 该规范定义了一组四个接口(发布者,订阅者,处理器和订阅)以及它们的方法的协定。
让我们更详细地考虑以下接口:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
发布者-订阅者模型有两个方面:发送和接收。 在实现反应式流时,实现Publisher接口的类负责数据传输,而Subscriber负责接收。 若要在它们之间建立通信,必须通过调用其Subscriber方法向Publisher注册注册。 根据规范,在注册订阅服务器后,发布者必须按以下顺序调用其方法:
- onSubscribe。 向发布服务器注册订阅服务器后,立即调用此方法。 作为参数,订阅对象将传递给它,订阅者将通过该对象向发布者请求数据。 此对象应仅在此订户的上下文中存储和调用。
- 订阅服务器通过在相应的Subscription对象上调用request方法从发布服务器请求数据后,发布服务器可以调用订阅服务器的onNext方法,并传递下一个元素。
- 然后,订阅者可以定期在Subscription上调用request方法,但是Publisher不能调用onNext方法,而不能超过通过request方法请求的总数。
- 如果数据流是有限的,则在将所有元素传递给onNext方法之后,发布者必须调用onComplete方法。
- 如果Publisher中发生错误,并且无法进一步处理元素,则应调用onError方法
- 调用onComplete或onError方法后,应排除发布者与订阅者的进一步交互。
方法调用可以视为在发布者和订阅者之间发送信号。 订阅服务器向发布服务器发出信号,指示它准备处理多少个元素,而发布服务器又向其发出信号,表明存在下一个元素,或者没有其他元素,或者发生了一些错误。
为了排除发布者和订阅者彼此之间的另一个影响,对实现反应式流接口的所有方法的调用必须是非阻塞的。 在这种情况下,它们之间的交互将完全异步。
可在
此处找到有关Reactive Streams接口规范的更多详细信息。
因此,通过将原始迭代器和生成的迭代器分别转换为Publisher和Subscriber来链接它们,我们可以解决本文前面部分中确定的问题。 通过订阅服务器请求一定数量的元素,可以解决阶段之间的缓冲区溢出问题。 通过分别通过onComplete或onError方法向订户发送信号来解决成功或不成功完成的问题。 Publisher负责发送这些信号,在我们的情况下,这些信号必须控制发送了多少HTTP请求以及其中有多少接收了响应。 收到最后一个响应并处理了所有返回结果后,它应该发送一个onComplete信号。 万一其中一个请求失败,它应该发送一个onError信号,并停止向订阅服务器发送其他元素,并从原始迭代器中减去元素。
生成的迭代器应实现为订阅服务器。 在这种情况下,我们不能没有一个缓冲区,当从Subscriber接口调用onNext方法时,将在其中写入元素,并从Iterator接口使用hasNext和next方法将其减去。 作为缓冲区实现,可以使用阻塞队列,例如LinkedBlockedQueue。
细心的读者会立即问一个问题:阻塞队列为什么是阻塞队列,因为根据Reactive Streams规范,所有方法的实现都应该是非阻塞的? 但这没什么问题:由于我们要向Publisher要求严格定义数量的元素,因此onNext方法的调用次数不得超过此次数,并且队列始终可以添加新元素而不会阻塞。
另一方面,在队列为空的情况下调用hasNext方法时,可能会发生阻塞。 但是,没关系:hasNext方法不是Subscriber接口协定的一部分,它是在Iterator接口中定义的,正如我们前面所解释的,Iterator接口是一个阻塞数据结构。 调用next方法时,我们从队列中减去下一个元素,并且当其大小变得小于某个阈值时,我们将需要通过调用request方法来请求元素的下一部分。
图7.使用Reactive Streams方法与外部服务进行异步交互当然,在这种情况下,我们不会完全摆脱阻塞呼叫。 这是由于Reactive流之间的范式不匹配而导致的,后者假定完全异步交互,而后者在调用hasNext方法时必须调用trueN或false。 但是,与与外部服务的同步交互不同,可以通过增加处理器内核的总体负载来显着减少由于锁定导致的停机时间。
如果将来的版本中Apache Spark开发人员实现了可与Publisher和Subscriber一起使用的mapPartitions方法的类似物,将会很方便。 这将允许完全异步的交互,从而消除了阻塞线程的可能性。
Akka-streams和akka-http作为Reactive Streams规范的实现
当前,已经有十几个Reactive Streams规范的实现。 一种这样的实现是
akka库中的akka-streams模块。 在JVM的世界中,akka已经确立了其作为编写并行和分布式系统的最有效手段之一。 之所以能够实现这一目标,是因为其基础上奠定了基本原理,那就是
参与者模型 ,该
模型使您可以编写竞争激烈的应用程序而无需直接控制线程及其池。
关于在akka中执行演员概念的方法已有许多文献报道,所以我们不会在此止步(
akka的官方网站是很好的信息来源,我也推荐
akka在行动中使用 )。 在这里,我们将仔细研究JVM下实现的技术方面。
通常,参与者不是一个人存在,而是形成一个等级系统。 为了创建一个actor系统,您需要为其分配资源,因此使用akka的第一步是创建ActorSystem对象的实例。 当ActorSystem启动时,将创建一个单独的线程池,称为调度程序,其中将执行actor中定义的所有代码。 通常,一个线程执行多个参与者的代码,但是,如有必要,您可以为一组特定的参与者(例如,直接与阻塞API交互的参与者)配置单独的调度程序。
使用参与者解决的最常见任务之一是数据流的顺序处理。 以前,为此,必须手动构建actor链并确保它们之间没有瓶颈(例如,如果一个actor处理消息的速度比下一个actor快,那么他可能会传入消息队列溢出,从而导致OutOfMemoryError错误)。
从2.4版开始,akka-streams模块已添加到akka中,它允许您声明性地定义数据处理过程,然后为执行它创建必要的参与者。 Akka流还实现了反压原理,从而消除了处理中涉及的所有参与者溢出传入消息队列的可能性。
定义akka流中数据流处理方案的主要元素是Source,Flow和Sink。 通过将它们彼此组合,我们得到了一个可运行图。 要开始处理过程,将使用一个实现器,它会根据我们定义的图形(实现器接口及其实现ActorMaterializer)创建执行器。
让我们更详细地考虑阶段Source,Flow和Sink。 源定义数据源。 Akka-streams支持十几种不同的方法来创建源,包括从迭代器中进行创建:
val featuresSource: Source[Array[Float], NotUsed] = Source.fromIterator { () => featuresIterator }
也可以通过转换现有源来获得源:
val newSource: Source[String, NotUsed] = source.map(item => transform(item))
如果转换是不平凡的操作,则可以将其表示为Flow实体。 Akka流支持多种创建Flow的方式。 最简单的方法是从函数创建:
val someFlow: Flow[String, Int, NotUsed] = Flow.fromFunction((x: String) => x.length)
通过结合Source和Flow,我们得到了一个新Source。
val newSource: Source[Int, NotUsed] = oldSource.via(someFlow)
接收器用作数据处理的最后阶段。 与Source一样,akka-streams提供了十几个不同的Sink选项,例如,Sink.foreach为每个元素执行特定的操作,Sink.seq收集集合中的所有元素,等等。
val printSink: Sink[Any, Future[Done]] = Sink.foreach(println)
源,流和接收器分别通过输入和/或输出元素的类型进行参数化。 另外,每个处理阶段可能都有其工作结果。 为此,还可以通过确定操作结果的其他类型对Source,Flow和Sink进行参数化。 此类型称为实现值的类型。 如果该操作不表示存在其工作的其他结果,例如,当我们定义通过函数的流时,则将NotUsed类型用作实现值。
结合必要的Source,Flow和Sink,我们得到RunnableGraph。 它通过一种类型进行参数化,该类型确定了执行此图所获得的值的类型。 如有必要,在合并阶段时,您可以指定哪个阶段的结果将是整个操作图的结果。 默认情况下,采用Source阶段的结果:
val graph: RunnableGraph[NotUsed] = someSource.to(Sink.foreach(println))
但是,如果接收阶段的结果对我们来说更重要,那么我们必须明确指出:
val graph: RunnableGraph[Future[Done]] = someSource.toMat(Sink.foreach(println))(Keep.right)
定义操作图后,必须运行它。 为此,runnableGraph需要调用run方法。 作为一种参数,此方法采用ActorMaterializer对象(也可以在隐式范围内),该对象负责创建将执行操作的actor。 通常,ActorMaterializer是在ActorSystem创建后立即创建的,并附加到其生命周期,并使用它来创建actor。 考虑一个例子:
在简单组合的情况下,您可以不创建单独的RunnableGraph而做,而只需将Source连接到Sink并通过在Source上调用runWith方法来启动它们。 此方法还假定隐式范围中存在ActorMaterializer对象。 另外,在这种情况下,将使用Sink中定义的物化值。 例如,使用以下代码,我们可以从Reactive Streams规范将Source转换为Publisher:
val source: Source[Score, NotUsed] = Source.fromIterator(() => sourceIterator).map(item => transform(item)) val publisher: Publisher[Score] = source.runWith(Sink.asPublisher(false))
因此,现在我们展示了如何通过从源迭代器创建Source并对其元素执行一些转换来获取Reactive Streams Publisher。 现在,我们可以将其与为数据提供给最终迭代器的订阅服务器关联。 还有一个问题要考虑:如何组织与外部服务的HTTP交互。
akka的结构包括
akka-http模块,该模块允许您组织基于HTTP的异步非阻塞通信。 此外,该模块基于akka流构建,允许您在数据流处理操作图中添加HTTP交互作为附加步骤。
为了连接外部服务,akka-http提供了三种不同的接口。
- 请求级API-对于对任意计算机的单个请求,这是最简单的选项。 在此级别上,HTTP连接是完全自动管理的,并且在每个请求中,都必须将请求将发送到的计算机的完整地址。
- 主机级API-当我们知道要访问的计算机上的哪个端口时适用。 在这种情况下,akka-http将控制HTTP连接池,并且在请求中足以指定到所请求资源的相对路径。
- 连接级API-允许您完全控制HTTP连接的管理,即跨连接打开,关闭和分发请求。
在我们的情况下,分类服务的地址是我们事先知道的,因此,仅与该特定机器组织HTTP交互是必要的。 因此,主机级API最适合我们。 现在,让我们来看一下使用HTTP连接池时如何创建它:
val httpFlow: Flow[(HttpRequest,Id), (Try[HttpResponse],Id), Http.HostConnectionPool] = Http().cachedHostConnectionPool[Id](hostAddress, portNumber)
当在隐式作用域中的ActorSystem中调用Http()。CachedHostConnectionPool [T](hostAddress,portNumber)时,会分配资源来创建连接池,但不会建立连接本身。 调用的结果是,返回Flow,该Flow接收一对HTTP请求和一些Id标识对象作为输入。 由于akka-http中的HTTP调用是异步操作,因此需要标识对象才能将请求与相应的响应进行匹配,并且接收响应的顺序不一定与请求的发送顺序相对应。 因此,在输出处,Flow提供了一些查询结果和相应的标识对象。
直接启动(实现)图形(包括该流)时,将建立HTTP连接。 Akka-http的实现方式是,不管包含httpFlow的图已实现多少次,在一个ActorSystem中,始终将有一个公共的HTTP连接池,所有实现都将使用该HTTP连接。 这使您可以更好地控制网络资源的使用并避免过载。
因此,HTTP连接池的生命周期与ActorSystem有关。 如前所述,线程池的生命周期也已附加到该线程池中,在该线程池中执行参与者中定义的操作(或在我们的示例中,定义为akka-streams和akka-http阶段)。 因此,为了获得最大效率,我们必须在同一JVM进程中重用ActorSystem的一个实例。
全部放在一起:实现与分类服务交互的示例
因此,现在我们可以继续使用与外部服务的异步交互对Apache Spark上的大量分布式数据进行分类的过程。 这种交互的一般方案已在图7中显示。
假设我们已经定义了一些初始数据集[功能]。 对它应用mapPartitions操作,我们应该获得一个数据集,其中来自源集的每个id都标记有作为分类结果而获得的某个值(数据集[Score])。 要在执行程序上组织异步处理,我们必须将响应式流规范中的源迭代器和生成的迭代器分别包装在Publisher和Subscriber中,并将它们链接在一起。
case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) val batchesRequestCount = config.getInt(“scoreService. batchesRequestCount”)
在该实现中,考虑到一个呼叫的分类服务可以一次处理一组特征向量,因此,对其进行呼叫后的分类结果也将立即用于整个组。 因此,作为Publisher的参数类型,我们不仅拥有您可能期望的Score,还拥有Iterable [Score]。 因此,我们通过一次调用onNext方法将该组的分类结果发送到结果迭代器(也是订阅者)。 这比为每个元素调用onNext效率更高。 现在,我们将更详细地分析此代码。
- 我们确定输入和输出数据的结构。 作为输入,我们将具有一堆带有特征向量的id标识符,作为输出,我们将具有一堆具有分类结果的数值的标识符。
- 我们确定一次订阅服务器将要从发布服务器请求的组数。 由于假定这些值将位于缓冲区中并等待直到从生成的迭代器中读取它们,所以此值取决于分配给执行程序的内存量。
- 从源迭代器创建Publisher。 他将负责与分类服务进行交互。 下面将讨论createPublisher函数。
- 创建一个订户,它将成为结果的迭代器。 IteratorSubscriber类代码也在下面给出。
- 向发布者注册订户。
- 返回iteratorSubscriber作为mapPartitions操作的结果。
现在考虑createPublisher函数的实现。
type Ids = Seq[String]
- - , . httpFlow, .
- : , (batchSize) (parallelismLevel).
- implicit scope ActorSystem, ActorMaterializer httpFlow. Spark-. ActorSystemHolder .
- akka-streams . Source[Features] .
- batchSize .
- HttpRequest . HttpRequest createHttpRequest. createPublisher. feature-, , ( predict). , HTTP-. , HTTP-, HTTP-, URI .
- httpFlow.
- , . flatMapMerge, akka-http Source[ByteString], , . . parallelismLevel , ( ). HTTP-: , , , .
- : . akka ByteString. , ByteString O(1), ByteString . , , . , .
- HTTP- , Stream . , discardEntityBytes , , .
- . akka-http , .
- , Publisher, . , . false Sink.asPublisher , Publisher Subscriber-.
如前一节所述,要使用akka,您需要一个ActorSystem,该ActorSystem必须创建一次然后再使用。不幸的是,我们无法调用Spark执行程序的全局环境,但是我们可以使用标准方法来创建全局对象。由于Spark执行程序是一个单独的JVM进程,因此,在其中,我们可以创建一个全局对象,在其中将使用它存储ActorSystem和ActorMatrializer和httpFlow。 object ActorSystemHolder { implicit lazy val actorSystem: ActorSystem = {
- 我们使用延迟初始化创建所有全局变量,也就是说,实际上将在首次需要它们时创建它们。
- 这将创建一个具有特定名称的新ActorSystem。
- 为了正确地终止ActorSystem框架中执行的所有进程,我们必须在其上调用终止方法,该方法将依次使用其标准停止机制停止所有参与者。为此,我们必须注册在JVM进程终止时调用的钩子。
- 接下来,我们创建一个ActorMaterializer,它将使用我们的ActorSystem开始执行akka-streams流程。
- 最后,我们创建httpFlow与外部服务进行交互。如前一节所述,我们在ActorSystem框架内为HTTP连接池分配资源。
现在,在我们的HTTP交互过程中,将结果迭代器的实现视为订户。 sealed trait QueueItem[+T] case class Item[+T](item: T) extends QueueItem[T] case object Done extends QueueItem[Nothing] case class Failure(cause: Throwable) extends QueueItem[Nothing] //(1) class StreamErrorCompletionException(cause: Throwable) extends Exception(cause) //(2) class IteratorSubscriber[T](requestSize: Int) extends Subscriber[Iterable[T]] with Iterator[T] {
IteratorSubscriber类是Producer-Consumer模型的实现。实现Subscriber接口的部分是生产者,而实现Iterator的部分是消费者。被实现为阻塞队列的缓冲区被用作通信手段。从Apache Spark执行程序池的流中调用Iterator接口中的方法,并从ActorSystem拥有的池中调用Subscriber接口的方法。现在,让我们详细了解给定的IteratorSubscriber实现代码。- 首先,我们为缓冲区元素定义代数数据类型。在缓冲区中,我们可以是以下元素组,或者是成功完成Done的信号,或者是包含Throwable的导致失败的信号的失败。
- , hasNext .
- , , Publisher-.
- , . LinkedBlockingQueue, . , .
- , . , , Publisher-. , , Publisher- . hasNext next ( requestNextBatches hasNext), , .
- subscriptionPromise subscription Subscription, Publisher onSubscribe. , Reactive Streams Subscriber- Publisher- , , hasNext , onSubscribe. , subscription, Publisher-. lazy subscription, Promise.
- . hasNext next, , .
- , , hasNext false . hasNext, .
- onSubscribe Publisher- Subscription Promise, subscription.
- onNext Publisher-, . .
- Publisher onComplete, Done.
- Publisher onError. .
- hasNext , . , true, . , .
- , false.
- , , requestSize, Publisher. , , , Publisher- , HTTP- .
- . , , , . , , ( , , subscription), , , , .
- , currentIterator. , . , hasNext , ( , ), .
- , false hasNext. , isDone, , . - , hasNext , false. , hasNext , false , . , .
- , , , .
- next方法从当前迭代器返回下一个元素。根据其调用的语义,在此之前,调用方必须调用hasNext方法,因此在调用next时,next元素应始终是next元素。
- 在这里,我们向发布服务器发送信号,表明我们准备使用在发布服务器注册时收到的订阅对象来处理下一组结果。组的数量由requestSize的值确定。我们还通过此值增加了预期元素的数量。
因此,在执行程序上启动的对数据块进行完整处理的一般方案如下所示:图8.参与者与源迭代器和结果迭代器的交互。结论:此解决方案的优缺点
该方案的主要优点是,它允许您使用使用任何可用方法实现的机器学习模型。之所以能够实现此目标,是因为使用了HTTP协议来访问模型,这是应用程序之间进行通信的标准方式。因此,模型的实现不依赖于其接口。另一个优点-这种方案允许您实现所有元素的水平缩放。根据负载最重的部分,我们可以将机器添加到Hadoop集群,也可以运行其他模型实例。结果,该方案是容错的,因为在任何一台机器出现问题的情况下,我们都可以轻松地替换它。这是由于以下事实而实现的:存储在hdfs上的数据被复制,并且分类服务不依赖于某些常规可变状态,因此可以在几种情况下进行部署。另外,该方案为调优和优化提供了充足的机会。例如,由于akka-http在整个进程的整个生命周期内都使用一个连接池,因此很容易控制与外部服务的连接数。或者,如果集群分布在多个数据中心中,我们可以在每个数据中心中提升模型的实例,并配置Apache Spark进程,使其仅引用其实例,从而消除数据中心之间的调用。最后,使用设置,此方案几乎可以避免停机。通过选择发送用于分类的组的大小,分类服务的实例数和http连接池的大小,可以在服务端和群集端实现最大的计算能力负载。该方案的主要缺点之一是其相对复杂性,这是由于组件分离以及需要组织它们之间的交互而引起的。此外,部分计算能力将用于确保通信,这会稍微降低效率。可能还会发生其他通信错误。结果,需要附加设置来增加交互的有效性。为了排除网络交互,可以考虑将服务实例与数据部署在同一台计算机上。但是,通常,Hadoop集群中的机器数量非常大,因此在每个机器上部署模型实例将是无利可图的,特别是在大型模型的情况下。此外,通常,在Hadoop群集上,同时执行多个任务以争夺其资源,因此启动其他服务会降低群集的整体性能。总结一下,我想指出的是,该解决方案已在我们的CleverDATA公司成功实现。多亏了他,数据分析人员和应用程序开发人员团队可以使用任何合适的方式进行工作,而不会互相限制。实际上,唯一需要共同讨论和协调的地方是机器学习模型提供的界面,所有其他问题都可以在一个团队中解决。因此,团队可以彼此独立地解决常见问题。