在实现Web应用程序和移动应用程序的后端时,即使是最简单的后端,也已习惯使用以下工具:数据库,邮件(smtp)服务器,redis服务器。 使用的工具集正在不断扩展。 例如,根据
amqplib软件包的安装数量(每周65万次安装)判断消息队列,并与关系数据库一起使用(mysql软件包每周46万次安装和pg 80万每周安装)。
今天,我要谈的是工作队列,尽管几乎在所有实际项目中都需要它们,但到目前为止使用的数量要少一个数量级。
因此,作业队列使您可以异步执行某些任务,实际上是在给定的时间使用给定的输入参数执行功能。
根据参数,可以执行任务:
作业队列使您可以将参数传输到正在运行的作业,跟踪和重新运行失败的作业,并设置同时运行的作业数量的限制。
Node.js上的绝大多数应用程序都与针对Web和移动应用程序的REST-API的开发相关。 减少REST-API的执行时间对于用户舒适地使用应用程序很重要。 同时,对REST-API的调用可能会启动冗长的和/或占用大量资源的操作。 例如,购买后,您必须向用户发送推送消息到移动应用程序,或发送请求在CRM REST-API上进行购买。 这些查询可以异步执行。 如果您没有用于处理工作队列的工具,该怎么办? 例如,您可以将消息发送到消息队列,启动将阅读这些消息并根据这些消息执行必要工作的工作人员。
实际上,这就是作业队列所做的。 但是,如果仔细观察,作业队列与消息队列有几个基本区别。 首先,将消息(静态)放入消息队列中,而作业队列涉及某种工作(函数调用)。 其次,作业队列暗示将执行给定工作的某些处理器(工人)的存在。 在这种情况下,需要其他功能。 在增加负载的情况下,应该透明地缩放处理器的数量。 另一方面,有必要限制一个处理器工人上同时运行的任务的数量,以便使峰值负载变得平稳并防止拒绝服务。 这表明需要一种可以通过设置各种参数来运行异步任务的工具,就像使用REST-API发出请求一样容易(如果更容易,甚至更好)。
使用消息队列,实现在队列排队后立即运行的作业队列相对简单。 但是通常需要在设定的时间或按照时间表一次完成任务。 对于这些任务,广泛使用了许多在Linux中实现cron逻辑的软件包。 为了避免毫无根据,我将说node-cron程序包每周有48万次安装,而节点计划-每周有17万次安装。
当然,使用node-cron比使用苦行setInterval()更方便,但是就我个人而言,使用它时遇到了许多问题。 如果要表达一个普遍的缺点,那就是缺乏对同时执行的任务数量的控制(这会激发峰值负载:增加负载会减慢任务的工作,减慢任务会增加同时执行的任务的数量,进而给系统带来更多负载),无法运行节点来提高生产率-cron在多个内核上(在这种情况下,所有任务都在每个内核上独立执行),并且缺少跟踪和重新启动已完成任务的工具 霞出现错误。
我希望我已经表明,对诸如工作队列之类的工具的需求与诸如数据库之类的工具一样。 尽管还没有广泛使用,但已经出现了这类资金。 我将列出其中最受欢迎的:
今天,我将考虑使用与我自己合作的Bull Bull软件包。 我为什么选择这个特殊的包装(尽管我没有将选择强加给其他人)。 那时,当我开始寻找消息队列的便捷实现时,bee-queue项目已经停止。 根据bee-queue存储库中给出的基准,kue实现远远落后于其他实现,此外,它不包含运行定期执行的任务的方法。 这个议程项目实现了将队列存储在mongodb数据库中的功能。 如果在将任务放入队列时需要超级可靠性,则在某些情况下这是一大优势。 但是,这不仅是决定性因素。 自然,我测试了库的所有耐用性选项,在队列中生成了大量任务,但仍然无法从议程中获得不间断的工作。 当超过一定数量的任务时,议程停止并停止将任务投入工作。
因此,由于Bull包使用Redis服务器作为后端,因此我选择了bull,该Bull实现了便捷的API,具有足够的速度和可伸缩性。 特别是,您可以使用Redis服务器集群。
创建队列时,为作业队列选择最佳参数非常重要。 有许多参数,其中一些参数的价值并没有立即传给我。 经过大量的实验,我确定了以下参数:
const Bull = require('bull'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const defaultJobOptions = { removeOnComplete: true, removeOnFail: false, }; const limiter = { max: 10000, duration: 1000, bounceBack: false, }; const settings = { lockDuration: 600000,
在平凡的情况下,不需要创建许多队列,因为您可以在每个队列中为不同的任务指定名称,并将处理器工作人员与每个名称相关联:
const { bull } = require('../bull'); bull.process('push:news', 1, `${__dirname}/push-news.js`); bull.process('push:status', 2, `${__dirname}/push-status.js`); ... bull.process('some:job', function(...args) { ... });
我利用了“开箱即用”的机会-使多个内核上的处理器工作并行化。 为此,第二个参数设置将在其上启动处理器工作程序的内核数,第三个参数设置具有作业处理功能定义的文件名。 如果不需要此功能,则只需将回调函数作为第二个参数即可。
任务通过调用add()方法排队,该队列名称和对象在参数中传递给该方法,稍后将传递给任务处理程序。 例如,在ORM挂钩中,创建带有新新闻的条目后,我可以异步向所有客户端发送推送消息:
afterCreate(instance) { bull.add('push:news', _.pick(instance, 'id', 'title', 'message'), options); }
事件处理程序在参数中接受带有传递给add()方法和done()函数的参数的任务对象,必须调用它们以确认任务已完成或通知任务以错误结束:
const { firebase: { admin } } = require('../firebase'); const { makePayload } = require('./makePayload'); module.exports = (job, done) => { const { id, title, message } = job.data; const data = { id: String(id), type: 'news', }; const payloadRu = makePayload(title.ru, message.ru, data); const payloadEn = makePayload(title.en, message.en, data); return Promise.all([ admin.messaging().send({ ...payloadRu, condition: "'news' in topics && 'ru' in topics" }), admin.messaging().send({ ...payloadEn, condition: "'news' in topics && 'en' in topics" }), ]) .then(response => done(null, response)) .catch(done); };
要查看作业队列的状态,可以使用arena-bull工具:
const Arena = require('bull-arena'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const arena = Arena({ queues: [ { name: 'my_gueue', hostId: 'My Queue', redis, }, ], }, { basePath: '/', disableListen: true, }); module.exports = { arena };
最后,一点生活技巧。 如我所说,bull使用redis服务器作为后端。 重新启动Redis服务器时,作业消失的可能性很小。 但是,知道系统管理员有时可以“清除萝卜缓存”,尤其是删除所有任务时,我主要关心的是定期运行的任务,在这种情况下,该任务将永远停止。 在这方面,我找到了恢复此类定期任务的机会:
const cron = '*/10 * * * * *'; const { bull } = require('./app/services/bull'); bull.getRepeatableJobs() .then(jobs => Promise.all(_.map(jobs, (job) => { const [name, cron] = job.key.split(/:{2,}/); return bull.removeRepeatable(name, { cron }); }))) .then(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } })); setInterval(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }), 60000);
也就是说,首先将任务从队列中排除,然后再次进行设置,并通过setInterval()重新设置所有这些(alas)。 实际上,如果没有这样的生活技巧,我可能不会决定对公牛使用定期任务。
apapacy@gmail.com
2019年7月3日