最近,Node.js平台的10.5.0版已经发布。 它的主要功能之一是对使用流的支持,该支持最初是添加到Node.js中的,但仍处于试验阶段。 考虑到该平台现在有这个机会,这一事实特别有趣,因为出色的异步I / O子系统不需要平台流,其平台的支持者一直为之感到骄傲。 但是,线程支持已出现在Node.js中。 为什么会这样呢? 对于谁,为什么他们可以派上用场?

简而言之,这是必需的,这样Node.js平台才能在以前未显示出最出色结果的领域中达到新的高度。 我们正在谈论执行密集使用处理器资源的计算。 这主要是因为Node.js在人工智能,机器学习和大量数据处理等领域表现不佳的原因。 为了使Node.js在解决此类问题方面表现得淋漓尽致,已经付出了很多努力,但是在这个平台上,与例如在微服务的开发中相比,该平台看起来仍然谦虚得多。
该材料的作者(我们今天将其翻译发表)说,他决定将技术文档(这些文档可以从原始的
拉取请求和
官方资料中找到)减少为一组简单的实际示例。 他希望看到这些示例的任何人都能够充分了解以开始使用Node.js中的线程。
关于worker_threads模块和--experimental-worker标志
Node.js中的多线程支持实现为
worker_threads
模块。 因此,为了利用新功能,必须使用
require
命令连接该模块。
请注意,在运行脚本时,只能使用
worker_threads
experimental-worker
标志来使用
worker_threads
,否则系统将找不到此模块。
注意,该标志包括单词“ worker”,而不是“ thread”。 文档中提到了我们所说的确切内容,该文档使用术语“工作线程”(worker thread)或仅使用“ worker”(worker)。 将来,我们将采用相同的方法。
如果您已经编写了多线程代码,那么在探索Node.js的新功能时,您将看到很多您已经熟悉的东西。 如果您以前没有使用过此类工具,请继续阅读更多内容,此处将为新手提供适当的说明。
关于可以在Node.js中的工作人员帮助下解决的任务
如前所述,工作流旨在解决密集使用处理器功能的任务。 应当指出,将它们用于解决I / O问题是浪费资源,因为根据官方文档,旨在组织异步I / O的内部Node.js机制本身比使用I.O问题效率更高。解决工人流动的相同问题。 因此,我们立即决定不使用工作者处理数据的输入和输出。
让我们从一个简单的示例开始,该示例演示如何创建和使用工作程序。
示例1
const { Worker, isMainThread, workerData } = require('worker_threads'); let currentVal = 0; let intervals = [100,1000, 500] function counter(id, i){ console.log("[", id, "]", i) return i; } if(isMainThread) { console.log("this is the main thread") for(let i = 0; i < 2; i++) { let w = new Worker(__filename, {workerData: i}); } setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[2], "MainThread"); } else { console.log("this isn't") setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[workerData], workerData); }
该代码的输出看起来像一组显示计数器的行,这些计数器的值以不同的速度增加。
第一个例子的结果我们将处理这里发生的事情:
if
表达式中的指令创建了2个线程,由于使用__filename
参数,其代码来自运行示例时Node.js传递的同一脚本。 现在工作人员需要带有代码的文件的完整路径,他们不支持相对路径,这就是为什么在此使用此值的原因。
- 发送给这两个工作程序的数据
workerData
属性的形式作为全局参数发送,该参数在第二个参数中使用。 之后,可以通过具有相同名称的常量来访问该值(请注意如何在文件的第一行中创建相应的常量,以及如何在最后一行中使用该常量)。
这是一个使用
worker_threads
模块的非常简单的示例,目前还没有发生任何有趣的事情。 因此,请考虑另一个示例。
示例2
考虑一个示例,首先,我们将执行一些“大量”计算,其次,在主线程中执行异步操作。
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); const request = require("request"); if(isMainThread) { console.log("This is the main thread") let w = new Worker(__filename, {workerData: null}); w.on('message', (msg) => { // ! console.log("First value is: ", msg.val); console.log("Took: ", (msg.timeDiff / 1000), " seconds"); }) w.on('error', console.error); w.on('exit', (code) => { if(code != 0) console.error(new Error(`Worker stopped with exit code ${code}`)) }); request.get('http://www.google.com', (err, resp) => { if(err) { return console.error(err); } console.log("Total bytes received: ", resp.body.length); }) } else { // function random(min, max) { return Math.random() * (max - min) + min } const sorter = require("./list-sorter"); const start = Date.now() let bigList = Array(1000000).fill().map( (_) => random(1,10000)) sorter.sort(bigList); parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start}); }
为了运行此示例,请注意以下事实:该代码需要
request
模块(可以使用npm进行安装,例如,使用
npm init --yes
和
npm install request --save
命令在空目录中,该文件包含上述代码
npm install request --save
),以及它使用辅助模块的事实,该辅助模块通过命令
const sorter = require("./list-sorter");
。 该模块的文件(
list-sorter.js
)应该与上述文件位于同一位置,其代码如下所示:
module.exports = { firstValue: null, sort: function(list) { let sorted = list.sort(); this.firstValue = sorted[0] } }
这次我们正在同时解决两个问题。 首先,我们加载google.com主页,其次,我们对随机生成的一百万个数字进行排序。 这可能需要花费几秒钟的时间,这为我们提供了一个很好的机会来查看新的Node.js机制的运行情况。 另外,这里我们测量工作线程对数字进行排序所花费的时间,之后我们将测量结果(连同排序数组的第一个元素一起)发送到主流,该主流在控制台中显示结果。
第二个例子的结果在此示例中,最重要的是演示线程之间的数据交换机制。
借助
on
方法,工作人员可以从主线程接收消息。 在代码中,您可以找到我们正在侦听的事件。 每当我们使用
parentPort.postMessage
方法从某个线程发送消息时,都会
message
事件。 另外,可以使用相同的方法通过访问工作程序实例将消息发送到线程,并使用
parentPort
对象接收
parentPort
。
现在让我们来看另一个例子,它与我们已经看到的非常相似,但是这次我们将特别注意项目的结构。
示例3
作为最后一个示例,我们建议考虑实现与上一个示例相同的功能,但是这次我们将改进代码的结构,使其更整洁,并使其具有增强支持软件项目便利性的形式。
这是主程序的代码。
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); const request = require("request"); function startWorker(path, cb) { let w = new Worker(path, {workerData: null}); w.on('message', (msg) => { cb(null, msg) }) w.on('error', cb); w.on('exit', (code) => { if(code != 0) console.error(new Error(`Worker stopped with exit code ${code}`)) }); return w; } console.log("this is the main thread") let myWorker = startWorker(__dirname + '/workerCode.js', (err, result) => { if(err) return console.error(err); console.log("[[Heavy computation function finished]]") console.log("First value is: ", result.val); console.log("Took: ", (result.timeDiff / 1000), " seconds"); }) const start = Date.now(); request.get('http://www.google.com', (err, resp) => { if(err) { return console.error(err); } console.log("Total bytes received: ", resp.body.length); //myWorker.postMessage({finished: true, timeDiff: Date.now() - start}) // })
这是描述工作线程行为的代码(在上述程序中,使用
__dirname + '/workerCode.js'
形成具有此代码的文件的路径):
const { parentPort } = require('worker_threads'); function random(min, max) { return Math.random() * (max - min) + min } const sorter = require("./list-sorter"); const start = Date.now() let bigList = Array(1000000).fill().map( (_) => random(1,10000)) sorter.sort(bigList); parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start});
这是此示例的功能:
- 现在,主线程和辅助线程的代码位于不同的文件中。 这有利于项目的支持和扩展。
startWorker
函数返回工作程序的新实例,如果需要,该实例允许从主流向该工作程序发送消息。- 无需检查代码是否在主线程中执行(我们通过相应的检查删除了
if
)。 - 工作程序显示了注释掉的代码片段,演示了从主流接收消息的机制,在已经讨论过的消息发送机制的前提下,该机制允许在主流和工作流之间进行双向异步数据交换。
总结
在本文中,我们通过实际示例研究了使用新功能处理Node.js中流的功能。 如果您已经掌握了此处讨论的内容,则意味着您已经准备好查看文档并使用
worker_threads
模块开始自己的实验。 也许值得注意的是,该功能仅在Node.js中出现,尽管它是试验性的,所以随着时间的流逝,其实现方式可能会发生变化。 此外,如果您在使用
worker_threads
自己的实验时遇到错误,或者发现此模块不会干扰它缺少的某些功能,请让开发人员了解它并帮助改进Node.js.平台。
亲爱的读者们! 您如何看待Node.js中的多线程支持? 您打算在项目中使用此功能吗?