用于假人的Node.js流或如何使用流

我认为许多人不止一次听说过Node js Streams,但是从未使用过它,或者在不考虑它们如何工作,管道化流和规范的情况下使用了它。 让我们找出什么是流,管道(管道),块(块(数据的一部分)以及所有这些))



为什么了解流在Node js中的工作原理为何很重要? 答案很简单:Node js中的许多内置模块都实现了流,例如HTTP请求/响应,fs读/写,zlib,crypto,TCP套接字等。 您还需要流,例如,在处理大文件,处理图片时。 您可能没有在编写自己的流,但是了解它的工作原理将使您变得更有能力。

因此,什么是流(在下文中,我将代替流(流)使用)。 流是一个概念,您可以用它来处理小部分的数据,从而可以使用少量的RAM。 同样,在它的帮助下,我们可以将每个部分的处理分成彼此独立的模块(函数或类)。 例如,我们可以立即压缩部分数据,然后加密并写入文件。 主要思想不是处理整个数据,而是一个接一个地处理部分数据。

Node js中有4种流:

  • 可读的
  • 可写-写作
  • 双面-读写
  • 转换-一种可以修改数据的双工流

您可以在官方网站上找到更多详细信息,现在让我们继续练习。

简单的例子


我认为许多人甚至没有意识到就已经使用了流。 在此示例中,我们仅将文件发送给客户端。

// 1 - ( )      ,         const getFile = async (req, res, next) => { const fileStream = fs.createReadStream('path to file'); res.contentType('application/pdf'); fileStream.pipe(res); }; // 2 - (  )         const getFile = async (req, res, next) => { const file = fs.readFileSync('path to file'); res.contentType('application/pdf'); res.send(file); }; 

唯一的区别是,在第一种情况下,我们下载了文件的一部分并发送,因此没有加载服务器的RAM。 在第二种情况下,我们立即将整个文件加载到RAM中,然后再发送。

在本文的进一步部分,我们将分别分析每个流。 您可以使用继承或使用构造函数创建流。

 const { Readable } = require('stream'); // 1 -   const myReadable = new Readable(opt); // 2 -   class myReadable extends Readable { constructor(opt) { super(opt); } } 

在所有示例中,我将使用2方法。

可读流


让我们看看如何在NodeJS中创建Readable流。

 const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} } 

从上面的示例中可以看到,该类接受一组参数。 我们将只考虑对Readable流具有一般理解所必需的那些,其余的您可以在文档中看到。 我们对highWaterMark参数和_read方法感兴趣。

highWaterMark是内部流缓冲区的最大字节数(默认为16kb),达到该值时,将从资源中读取的数据将被挂起。 为了继续阅读,我们需要释放内部缓冲区。 我们可以通过调用管道,恢复或订阅数据事件来做到这一点。

_read是私有方法的实现,该方法由Readable类的内部方法调用。 连续调用它,直到数据大小达到highWaterMark。

好吧,我们感兴趣的最后一个方法是可读。 它返回true,但是一旦缓冲区已满,对此方法的调用将开始返回false。 可以由可读._read方法控制。

现在让我们看一个例子来澄清这种情况。

 class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`); 

首先,我要说counter.read()不是我们在该类中实现的_read。 该方法是私有的,而该方法是公共的,它从内部缓冲区返回数据。 当我们执行此代码时,在控制台中,我们将看到以下内容:



发生什么事了 当创建新的Counter({highWaterMark:2})流时,我们表示内部缓冲区的大小为2个字节,即 可以存储2个字符(1个字符= 1个字节)。 调用counter.read()之后,流开始读取,将“ 1”写入内部缓冲区并返回。 然后他继续阅读,写为“ 2”。 当它写为“ 3”时,缓冲区将已满,可读性。push将返回false,流将等待直到内部缓冲区被释放。 因为 在我们的示例中,没有释放缓冲区的逻辑,脚本将结束。

如前所述,为了确保读取不被中断,我们需要不断清除内部缓冲区。 为此,我们订阅了data事件。 用以下代码替换最后两行。

 const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); }); 

现在,如果运行此示例,我们将看到一切正常运行,并且控制台中显示了1到1000之间的数字。

书面流


实际上,它与Readable流非常相似,仅用于写入数据。

 const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} } 

它接受类似的参数,例如Readable Stream。 我们对highWaterMark和_write感兴趣。

_write是一个私有方法,由Writable类的内部方法调用以写入一段数据。 它包含3个参数:块(数据的一部分),编码(如果块是字符串,则进行编码),回调(在成功或不成功写入之后调用的函数)。

highWaterMark是内部流缓冲区的最大字节数(默认为16kb),达到该值时stream.write将开始返回false。

让我们用计数器重写前面的示例。

 const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); } 

实际上,一切都很简单,但是有一个有趣的细微差别值得记住! 当创建新的Counter({highWaterMark:2})流时,我们表示内部缓冲区的大小为2个字节,即 可以存储2个字符(1个字符= 1个字节)。 当计数器达到10时,每次写调用都会填充缓冲区;因此,如果写是针对慢速源的,则在调用write时所有其他数据将保存到RAM,这可能导致其溢出(在此示例中,当然没关系,因为我们的缓冲区是2个字节,但是对于大文件,您需要记住这一点)。 当出现这种情况时,我们需要等到流写入数据的当前部分,释放内部缓冲区(触发耗用事件),然后才能恢复记录数据。 让我们重写我们的示例。

 const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })(); 

在v11.13.0中添加了events.once方法,该方法使您可以创建promise并等待特定事件执行一次。 在此示例中,我们检查是否可以将数据写入流,否则,请等待直到缓冲区释放后继续记录。

乍一看,这似乎是不必要的操作,但是当处理大量数据(例如,文件大小超过10GB的文件)而忘记执行此操作时,可能会遇到内存泄漏。

双工流


它结合了Readable和Writable流,也就是说,我们必须编写_read和_write两个方法的实现。

 const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} } 

在这里,我们对可传递给构造函数的2个参数感兴趣,它们是可读高水位标记和可写高水位标记,这两个参数使我们可以分别为可读流和可写流指定内部缓冲区的大小。 这就是在Duplex流的帮助下前两个示例的实现的样子。

 const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })(); 

我认为此代码不需要解释,因为它与以前相同,仅在一个类中。

转换流


该流是双工流。 需要转换数据块并将其进一步发送到链下。 可以与流的其余部分相同的方式来实现它。

 const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} } 

我们对_transform方法感兴趣。

_transform是一个私有方法,由Transform类的内部方法调用以转换数据块。 它包含3个参数:块(数据的一部分),编码(如果块是字符串,则进行编码),回调(在成功或不成功写入后调用的函数)。

使用此方法,数据部分将发生变化。 在此方法内部,我们可以调用transform.push()零次或多次,以提交更改。 完成数据转换后,我们需要调用一个回调,该回调将发送添加到transform.push()中的所有内容。 此回调函数的第一个参数是错误。 另外,我们不能使用transform.push(),而是将更改后的数据作为第二个参数发送到回调函数(例如:callback(空,数据))。 为了了解如何使用这种类型的流,让我们分析stream.pipe方法。

stream.pipe-此方法用于将Readable流连接到Writable流,并创建流链。 这意味着我们可以读取部分数据并将其传输到下一个流进行处理,然后再传输到下一个流,依此类推。

让我们编写一个Transform流,它将在每个数据的开头和结尾添加*字符。

 class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter); 

在此示例中,我使用了先前示例中的Readable和Writable流,还添加了Transform。 如您所见,结果非常简单。

因此,我们研究了流的排列方式。 它们的主要概念是部分处理数据,这非常方便并且不需要大量资源。 而且,流可以与迭代器一起使用,这使它们使用起来更加方便,但这是一个完全不同的故事。

Source: https://habr.com/ru/post/zh-CN479048/


All Articles