Node.js 10.5.0中使用线程的实践

最近,Node.js平台的10.5.0版已经发布。 它的主要功能之一是对使用流的支持,该支持最初是添加到Node.js中的,但仍处于试验阶段。 考虑到该平台现在有这个机会,这一事实特别有趣,因为出色的异步I / O子系统不需要平台流,其平台的支持者一直为之感到骄傲。 但是,线程支持已出现在Node.js中。 为什么会这样呢? 对于谁,为什么他们可以派上用场?



简而言之,这是必需的,这样Node.js平台才能在以前未显示出最出色结果的领域中达到新的高度。 我们正在谈论执行密集使用处理器资源的计算。 这主要是因为Node.js在人工智能,机器学习和大量数据处理等领域表现不佳的原因。 为了使Node.js在解决此类问题方面表现得淋漓尽致,已经付出了很多努力,但是在这个平台上,与例如在微服务的开发中相比,该平台看起来仍然谦虚得多。

该材料的作者(我们今天将其翻译发表)说,他决定将技术文档(这些文档可以从原始的拉取请求官方资料中找到)减少为一组简单的实际示例。 他希望看到这些示例的任何人都能够充分了解以开始使用Node.js中的线程。

关于worker_threads模块和--experimental-worker标志


Node.js中的多线程支持实现为worker_threads模块。 因此,为了利用新功能,必须使用require命令连接该模块。

请注意,在运行脚本时,只能使用worker_threads experimental-worker标志来使用worker_threads ,否则系统将找不到此模块。

注意,该标志包括单词“ worker”,而不是“ thread”。 文档中提到了我们所说的确切内容,该文档使用术语“工作线程”(worker thread)或仅使用“ worker”(worker)。 将来,我们将采用相同的方法。

如果您已经编写了多线程代码,那么在探索Node.js的新功能时,您将看到很多您已经熟悉的东西。 如果您以前没有使用过此类工具,请继续阅读更多内容,此处将为新手提供适当的说明。

关于可以在Node.js中的工作人员帮助下解决的任务


如前所述,工作流旨在解决密集使用处理器功能的任务。 应当指出,将它们用于解决I / O问题是浪费资源,因为根据官方文档,旨在组织异步I / O的内部Node.js机制本身比使用I.O问题效率更高。解决工人流动的相同问题。 因此,我们立即决定不使用工作者处理数据的输入和输出。

让我们从一个简单的示例开始,该示例演示如何创建和使用工作程序。

示例1


 const { Worker, isMainThread,  workerData } = require('worker_threads'); let currentVal = 0; let intervals = [100,1000, 500] function counter(id, i){   console.log("[", id, "]", i)   return i; } if(isMainThread) {   console.log("this is the main thread")   for(let i = 0; i < 2; i++) {       let w = new Worker(__filename, {workerData: i});   }   setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[2], "MainThread"); } else {   console.log("this isn't")   setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[workerData], workerData); } 

该代码的输出看起来像一组显示计数器的行,这些计数器的值以不同的速度增加。


第一个例子的结果

我们将处理这里发生的事情:

  1. if表达式中的指令创建了2个线程,由于使用__filename参数,其代码来自运行示例时Node.js传递的同一脚本。 现在工作人员需要带有代码的文件的完整路径,他们不支持相对路径,这就是为什么在此使用此值的原因。
  2. 发送给这两个工作程序的数据workerData属性的形式作为全局参数发送,该参数在第二个参数中使用。 之后,可以通过具有相同名称的常量来访问该值(请注意如何在文件的第一行中创建相应的常量,以及如何在最后一行中使用该常量)。

这是一个使用worker_threads模块的非常简单的示例,目前还没有发生任何有趣的事情。 因此,请考虑另一个示例。

示例2


考虑一个示例,首先,我们将执行一些“大量”计算,其次,在主线程中执行异步操作。

 const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); const request = require("request"); if(isMainThread) {   console.log("This is the main thread")   let w = new Worker(__filename, {workerData: null});   w.on('message', (msg) => { //  !       console.log("First value is: ", msg.val);       console.log("Took: ", (msg.timeDiff / 1000), " seconds");   })   w.on('error', console.error);   w.on('exit', (code) => {       if(code != 0)           console.error(new Error(`Worker stopped with exit code ${code}`))   });   request.get('http://www.google.com', (err, resp) => {       if(err) {           return console.error(err);       }       console.log("Total bytes received: ", resp.body.length);   }) } else { //    function random(min, max) {       return Math.random() * (max - min) + min   }   const sorter = require("./list-sorter");   const start = Date.now()   let bigList = Array(1000000).fill().map( (_) => random(1,10000))   sorter.sort(bigList);   parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start}); } 

为了运行此示例,请注意以下事实:该代码需要request模块(可以使用npm进行安装,例如,使用npm init --yesnpm install request --save命令在空目录中,该文件包含上述代码npm install request --save ),以及它使用辅助模块的事实,该辅助模块通过命令const sorter = require("./list-sorter"); 。 该模块的文件( list-sorter.js )应该与上述文件位于同一位置,其代码如下所示:

 module.exports = {   firstValue: null,   sort: function(list) {       let sorted = list.sort();       this.firstValue = sorted[0]   } } 

这次我们正在同时解决两个问题。 首先,我们加载google.com主页,其次,我们对随机生成的一百万个数字进行排序。 这可能需要花费几秒钟的时间,这为我们提供了一个很好的机会来查看新的Node.js机制的运行情况。 另外,这里我们测量工作线程对数字进行排序所花费的时间,之后我们将测量结果(连同排序数组的第一个元素一起)发送到主流,该主流在控制台中显示结果。


第二个例子的结果

在此示例中,最重要的是演示线程之间的数据交换机制。
借助on方法,工作人员可以从主线程接收消息。 在代码中,您可以找到我们正在侦听的事件。 每当我们使用parentPort.postMessage方法从某个线程发送消息时,都会message事件。 另外,可以使用相同的方法通过访问工作程序实例将消息发送到线程,并使用parentPort对象接收parentPort

现在让我们来看另一个例子,它与我们已经看到的非常相似,但是这次我们将特别注意项目的结构。

示例3


作为最后一个示例,我们建议考虑实现与上一个示例相同的功能,但是这次我们将改进代码的结构,使其更整洁,并使其具有增强支持软件项目便利性的形式。

这是主程序的代码。

 const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); const request = require("request"); function startWorker(path, cb) {   let w = new Worker(path, {workerData: null});   w.on('message', (msg) => {       cb(null, msg)   })   w.on('error', cb);   w.on('exit', (code) => {       if(code != 0)           console.error(new Error(`Worker stopped with exit code ${code}`))  });   return w; } console.log("this is the main thread") let myWorker = startWorker(__dirname + '/workerCode.js', (err, result) => {   if(err) return console.error(err);   console.log("[[Heavy computation function finished]]")   console.log("First value is: ", result.val);   console.log("Took: ", (result.timeDiff / 1000), " seconds"); }) const start = Date.now(); request.get('http://www.google.com', (err, resp) => {   if(err) {       return console.error(err);   }   console.log("Total bytes received: ", resp.body.length);   //myWorker.postMessage({finished: true, timeDiff: Date.now() - start}) //     }) 

这是描述工作线程行为的代码(在上述程序中,使用__dirname + '/workerCode.js'形成具有此代码的文件的路径):

 const {  parentPort } = require('worker_threads'); function random(min, max) {   return Math.random() * (max - min) + min } const sorter = require("./list-sorter"); const start = Date.now() let bigList = Array(1000000).fill().map( (_) => random(1,10000)) /** //      : parentPort.on('message', (msg) => {   console.log("Main thread finished on: ", (msg.timeDiff / 1000), " seconds..."); }) */ sorter.sort(bigList); parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start}); 

这是此示例的功能:

  1. 现在,主线程和辅助线程的代码位于不同的文件中。 这有利于项目的支持和扩展。
  2. startWorker函数返回工作程序的新实例,如果需要,该实例允许从主流向该工作程序发送消息。
  3. 无需检查代码是否在主线程中执行(我们通过相应的检查删除了if )。
  4. 工作程序显示了注释掉的代码片段,演示了从主流接收消息的机制,在已经讨论过的消息发送机制的前提下,该机制允许在主流和工作流之间进行双向异步数据交换。

总结


在本文中,我们通过实际示例研究了使用新功能处理Node.js中流的功能。 如果您已经掌握了此处讨论的内容,则意味着您已经准备好查看文档并使用worker_threads模块开始自己的实验。 也许值得注意的是,该功能仅在Node.js中出现,尽管它是试验性的,所以随着时间的流逝,其实现方式可能会发生变化。 此外,如果您在使用worker_threads自己的实验时遇到错误,或者发现此模块不会干扰它缺少的某些功能,请让开发人员了解它并帮助改进Node.js.平台。

亲爱的读者们! 您如何看待Node.js中的多线程支持? 您打算在项目中使用此功能吗?

Source: https://habr.com/ru/post/zh-CN415659/


All Articles