基于队列的系统

嗨,habrozhiteli!

我们决定分享即将发行的新颖性“分布式系统”中“基于任务队列的系统”一章的翻译。 设计模式”(已经在印刷厂使用了)。

图片

批处理的最简单形式是任务队列。 在具有任务队列的系统中,必须完成一组任务。 每个任务完全独立于其他任务,可以在不与它们进行任何交互的情况下进行处理。 在一般情况下,具有任务队列的系统的目标是确保每个工作阶段都在给定的时间内完成。 工作流的数量根据负载的变化而增加或减少。 广义任务队列的方案如图1所示。 10.1。

基于广义任务队列的系统


任务线是一个理想的示例,它演示了分布式系统设计模式的全部功能。 任务队列的大多数逻辑不取决于执行的工作类型。 在许多情况下,交付任务本身也是如此。

让我们使用图10中所示的任务队列来说明该语句。 10.1。 再次查看之后,确定一组共享的容器可以提供哪些功能。 很明显,容器化任务队列的大多数实现都可以被广泛的用户使用。

图片

基于容器的任务排队需要在库容器和具有用户逻辑的容器之间匹配接口。 在容器化任务队列中,区分两个接口:源容器接口(提供需要处理的任务流)和执行容器接口(知道如何处理它们)。

源容器接口


任何任务队列都基于一组需要处理的任务进行操作。 根据基于任务队列实施的特定应用程序,有许多任务源可以落入其中。 但是在接收到一组任务之后,队列操作方案非常简单。 因此,我们可以将任务源的特定于应用程序的逻辑与处理任务队列的通用方案分开。 回顾之前讨论的容器组模式,在这里您可以看到Ambassador模式的实现。 通用任务队列容器是主应用程序容器,特定于应用程序的源容器是将请求从队列调度程序容器广播到特定任务执行者的大使。 这组容器如图所示。 10.2。

图片

顺便说一句,尽管容器大使是特定于应用程序的(这很明显),但是任务源API也有许多通用的实现。 例如,来源可以是位于某个云存储中的照片列表,网络驱动器上的一组文件,甚至是按照“发布/订阅”原理运行的系统中的队列,例如Kafka或Redis。 尽管用户可以选择最适合其任务的容器大使,但他们应该使用容器本身的通用“库”实现。 这将减少工作量并最大化代码重用。

任务队列API 给定任务队列和依赖于应用程序的容器之间的交互机制,我们应该为两个容器之间的接口制定正式的定义。 有许多不同的协议,但是HTTP RESTful API易于实现,并且是此类接口的事实上的标准。 任务队列期望在after容器中实现以下URL:

您问为什么将v1添加到您的API定义中? 介面会有第二版吗? 看起来不合逻辑,但是最初定义API时版本控制的成本很小。 稍后进行适当的重构将非常昂贵。 即使您不确定版本是否会更改,也必须将版本添加到所有API中。 上帝保存了保险箱。
URL / items /返回所有任务的列表:

{ kind: ItemList, apiVersion: v1, items: [ "item-1", "item-2", …. ] } 

URL / items / <item-name>提供有关特定任务的详细信息:

 { kind: Item, apiVersion: v1, data: { "some": "json", "object": "here", } } 

请注意,API不提供任何解决任务事实的机制。 可以开发一种更复杂的API,然后将大部分实现转移给容器大使。 但是请记住,我们的目标是将尽可能多的总体实现集中在任务队列管理器中。 在这方面,任务队列管理器本身必须监视哪些任务已经被处理,哪些尚未被处理。

通过此API,我们可以获得有关特定任务的信息,然后传递执行程序的容器接口的item.data字段的值。

执行容器接口


队列管理器收到下一个任务后,应立即将其委托给某些执行者。 这是广义任务队列中的第二个接口。 由于多种原因,容器本身及其接口与源容器接口略有不同。 首先,它是一次性API。 执行程序的工作从一个调用开始,并且在容器的生命周期内不再进行任何调用。 其次,执行容器和任务队列管理器位于不同的容器组中。 容器执行程序通过容器编排API在其自己的组中启动。 这意味着任务队列管理器必须进行远程调用以启动执行容器。 这也意味着您必须更加注意安全性问题,因为群集的恶意用户可以将其加载到不必要的工作中。

在源容器中,我们使用了一个简单的HTTP调用将任务列表发送到任务管理器。 这样做的前提是该API调用需要进行多次,并且没有考虑安全问题,因为一切都在localhost框架内进行。 容器API必须仅被调用一次,并且重要的是要确保系统的其他用户即使在偶然或恶意的情况下也不能向执行者添加工作。 因此,对于执行容器,我们将使用文件API。 创建后,我们将向容器传递一个名为WORK_ITEM_FILE的环境变量,该变量的值引用容器的内部文件系统中的文件。 该文件包含有关要完成的任务的数据。 如下所示,可以通过ConfigMap Kubernetes对象实现这种API。 它可以作为文件安装在一组容器中(图10.3)。

图片

使用容器更容易实现这种文件API机制。 任务队列中的执行程序通常是访问多个工具的简单shell脚本。 为任务管理提升整个Web服务器是不切实际的-这导致了体系结构的复杂化。 与任务源一样,大多数容器执行器将是用于某些任务的专用容器,但是也有适用于解决多个不同任务的通用执行器。

考虑一个执行容器的示例,该容器从云存储中下载文件,在其上运行shell脚本,然后将结果复制回云存储中。 这样的容器在大多数情况下可以是通用的,但是可以将特定方案作为参数传递给它。 因此,大多数用户/任务队列都可以重用大多数文件处理代码。 最终用户只需要提供一个包含文件处理细节的脚本。

通用任务队列基础架构


如果您已经具有前面描述的两个容器接口的实现,那么在可重用队列实现中还有哪些要实现? 任务队列的基本算法非常简单。

  1. 从源容器下载当前可用的任务。
  2. 澄清任务队列的状态,以了解哪些任务已经完成或仍在执行。
  3. 对于每个未解决的任务,请使用适当的接口创建容器容器。
  4. 成功完成执行容器后,记录任务已完成。

该算法用词简单,但实际上却不那么容易实现。 幸运的是,Kubernetes乐团具有多项功能,可以大大简化其实施。 即:Kubernetes具有一个Job对象,以确保任务队列的可靠操作。 您可以配置Job对象,以使其一次性启动相应的执行容器,或者直到成功完成任务为止。 如果将执行容器配置为在任务完成之前运行,那么即使集群中的计算机发生故障,任务最终仍将成功完成。

因此,由于管弦乐队负责可靠地执行任务,因此大大简化了任务排队。

此外,Kubernetes允许您注释任务,这使我们可以使用已处理任务队列元素的名称标记每个任务对象。 区分成功完成和错误执行的任务变得越来越容易。

这意味着我们可以在Kubernetes编排器的顶部实现任务队列,而无需使用我们自己的存储库。 所有这些大大简化了构建任务队列基础结构的任务。

因此,用于容器操作的详细算法(任务队列管理器)如下。

无休止地重复。

  1. 通过容器的界面获取任务列表-任务的来源。
  2. 获取服务于此任务队列的任务列表。
  3. 在这些列表的基础上,选择未处理任务的列表。
  4. 对于每个未处理的任务,创建一个Job对象,该对象会生成相应的执行容器。

这是实现此队列的Python脚本:

 import requests import json from kubernetes import client, config import time namespace = "default" def make_container(item, obj): container = client.V1Container() container.image = "my/worker-image" container.name = "worker" return container def make_job(item): response = requests.get("http://localhost:8000/items/{}".format(item)) obj = json.loads(response.text) job = client.V1Job() job.metadata = client.V1ObjectMeta() job.metadata.name = item job.spec = client.V1JobSpec() job.spec.template = client.V1PodTemplate() job.spec.template.spec = client.V1PodTemplateSpec() job.spec.template.spec.restart_policy = "Never" job.spec.template.spec.containers = [ make_container(item, obj) ] return job def update_queue(batch): response = requests.get("http://localhost:8000/items") obj = json.loads(response.text) items = obj['items'] ret = batch.list_namespaced_job(namespace, watch=False) for item in items: found = False for i in ret.items: if i.metadata.name == item: found = True if not found: #    Job,  #   job = make_job(item) batch.create_namespaced_job(namespace, job) config.load_kube_config() batch = client.BatchV1Api() while True: update_queue(batch) time.sleep(10) 

工作坊 视频文件缩略图生成器的实现


作为使用任务队列的示例,请考虑生成视频文件缩略图的任务。 根据这些缩略图,用户可以决定要观看哪些视频。

要实现缩略图,您需要两个容器。 首先是任务的来源。 将任务放置在例如通过NFS(网络文件系统,网络文件系统)连接的共享网络驱动器上将是最容易的。 任务源在此目录中接收文件列表,并将它们传递给调用方。

我将在NodeJS上给出一个简单的程序:

 const http = require('http'); const fs = require('fs'); const port = 8080; const path = process.env.MEDIA_PATH; const requestHandler = (request, response) => { console.log(request.url); fs.readdir(path + '/*.mp4', (err, items) => { var msg = { 'kind': 'ItemList', 'apiVersion': 'v1', 'items': [] }; if (!items) { return msg; } for (var i = 0; i < items.length; i++) { msg.items.push(items[i]); } response.end(JSON.stringify(msg)); }); } const server = http.createServer(requestHandler); server.listen(port, (err) => { if (err) { return console.log('  ', err); } console.log(`    ${port}`) }); 

此源定义了要处理的电影列表。 ffmpeg实用程序用于提取缩略图。

您可以创建一个运行以下命令的容器:

 ffmpeg -i ${INPUT_FILE} -frames:v 100 thumb.png 

该命令每100帧(-frames:v 100参数)提取一个,并将其保存为PNG格式(例如thumb1.png,thumb2.png等)。

可以基于现有的ffmpeg Docker映像来实现这种处理。 jrottenberg / ffmpeg图像很受欢迎。

通过定义一个简单的源容器和一个甚至更简单的容器执行器,可以很容易地看到面向容器的通用队列管理系统的好处。 它大大减少了设计和执行任务队列之间的时间。

动态缩放艺术家


较早考虑的任务队列非常适合处理任务,因为它们变得可用,但会导致容器集群协调器资源的突然负载。 当您有许多不同类型的任务会在不同时间创建负载峰值,从而随着时间的推移在群集上平均分配负载时,这很好。

但是,如果您没有足够的负载类型,则“先厚然后空”的方法来扩展任务队列可能需要预留额外的资源以支持突发负载。 其余时间,资源将处于空闲状态,不必要地清空您的钱包。

要解决此问题,您可以限制任务队列生成的Job对象的总数。 这自然会限制并行处理的作业数量,从而减少高峰负载期间的资源使用。 另一方面,每个单独任务的持续时间将随着群集上的高负载而增加。

如果负载是零散的,那么这并不可怕,因为可以使用停机时间间隔来完成累积的任务。 但是,如果稳定负载太高,任务队列将没有时间处理传入的任务,并且将花费越来越多的时间来执行它们。

在这种情况下,您将必须动态调整并行任务的最大数量,并相应地调整可用的计算资源,以维持所需的性能水平。 幸运的是,有一些数学公式可让您确定何时需要扩展任务队列以处理更多请求。

考虑一个任务队列,其中新任务平均每分钟出现一次,并且平均需要30秒才能完成。 这样的队列能够应付进入其中的任务流。 即使大量任务同时到达,造成了交通拥堵,随着时间的流逝,交通拥堵也会被消除,因为在下一个任务到达之前,队列平均要处理两个任务。

如果一个新任务每分钟到达,并且平均需要1分钟来处理一个任务,那么这样的系统在理想情况下是平衡的,但是它对负载的变化响应不好。 她能够应付突发的负载,但是要花很多时间。 系统将不会处于空闲状态,但是将没有计算机时间来补偿新任务接收速度的长期提高。 为了保持系统稳定性,在长期负载增长或处理任务意外延迟的情况下,必须保留一个备用空间。

最后,考虑一种系统,其中每分钟完成一项任务,而任务处理需要两分钟。 这样的系统将不断失去性能。 任务队列的长度将随着任务的接收和处理之间的延迟(以及用户的恼怒程度)而增加。

必须持续监控这两个指标的值。 通过平均长时间接收任务之间的时间(例如,基于每天的任务数),我们可以得出任务间间隔的估计值。 还必须监视任务的平均处理时间(不包括在队列中花费的时间)。 在稳定的任务队列中,平均任务处理时间应小于任务间间隔。 为了确保满足此条件,有必要动态调整计算资源的可用队列数。 如果作业是并行处理的,则处理时间应除以并行处理的作业数。 例如,如果一分钟处理一个任务,但并行处理四个任务,则一个任务的有效处理时间为15秒,这意味着任务间间隔至少应为16秒。

这种方法使您可以轻松创建用于向上扩展任务队列的模块。 缩小比例会带来更多问题。 但是,可以使用与以前相同的计算,另外还可以放置由启发式方法确定的计算资源。 例如,您可以减少并行任务的数量,直到一个任务的处理时间为任务间间隔的90%。

多工模式


本书的主要主题之一是使用容器封装和重用代码。 它也与本章描述的任务排队模式有关。 除了管理队列本身的容器之外,您还可以重用组成表演者实现的容器组。 假设您需要以三种不同的方式处理队列中的每个任务。 例如,要检测照片中的脸部,将其与特定的人匹配,然后使图像的相应部分模糊。 您可以将所有处理放入一个执行容器中,但这是一次性的解决方案,无法重复使用。 要掩盖照片中的其他东西(例如汽车),您必须从头开始创建容器艺术家。

这种重用的可能性可以通过应用Multi-Worker模式来实现,这实际上是本书开头所描述的Adapter模式的特例。 Multi-Worker模式通过执行容器的软件接口将一组容器转换为一个公共容器。 此共享容器将处理委托给几个单独的可重用容器。 该过程在图1中示意性地示出。 10.4。

图片

通过组合执行容器来重用代码,从而减少了设计分布式批处理系统的工作量。

»这本书的更多信息可以在出版商的网站上找到
» 目录
» 摘录

对于habrozhitelami,可在优惠券- 分布式系统上享受20%的折扣。

Source: https://habr.com/ru/post/zh-CN440444/


All Articles