Ich denke, viele haben schon mehr als einmal von Node js Streams gehört, es aber nie benutzt oder benutzt, ohne darüber nachzudenken, wie sie funktionieren, Streams und Normen weitergeleitet. Lassen Sie uns herausfinden, was Streams sind: Pipe (Pipe), Chunks (Chunk - Teil der Daten) und all das.

Warum ist es wichtig zu verstehen, wie Streams in Node js funktionieren? Die Antwort ist einfach: Viele der in Node js integrierten Module implementieren Streams, z. B. HTTP-Anforderungen / -Antworten, fs-Lese- / Schreibzugriff, zlib, Krypto, TCP-Sockets und andere. Sie benötigen auch Streams, wenn Sie beispielsweise große Dateien verarbeiten oder mit Bildern arbeiten. Sie schreiben vielleicht nicht Ihren eigenen Stream, aber wenn Sie verstehen, wie er funktioniert, werden Sie zu einem kompetenteren Entwickler.
Also, was genau ist ein Stream (im Folgenden werde ich anstelle von Stream (stream) verwenden). Stream ist ein Konzept, mit dem Sie Daten in kleinen Teilen verarbeiten können, wodurch Sie eine kleine Menge an RAM verwenden können. Mit seiner Hilfe können wir auch die Verarbeitung jedes Teils in voneinander unabhängige Module (Funktionen oder Klassen) aufteilen. Zum Beispiel können wir einen Teil der Daten sofort komprimieren, dann verschlüsseln und in eine Datei schreiben. Die Hauptidee ist nicht, mit den gesamten Daten zu arbeiten, sondern einen Teil der Daten nach dem anderen zu verarbeiten.
Es gibt 4 Arten von Streams in Node js:
- Lesbar
- Beschreibbar - Schreiben
- Duplex - Lesen und Schreiben
- Transformieren - eine Art Duplex-Stream, der Daten ändern kann
Ausführlichere Informationen finden Sie auf der offiziellen Website. Lassen Sie uns nun mit dem Üben fortfahren.
Einfaches Beispiel
Ich denke, dass viele Streams bereits benutzt haben, ohne es zu merken. In diesem Beispiel senden wir die Datei einfach an den Client.
Der einzige Unterschied besteht darin, dass wir im ersten Fall einen Teil der Datei herunterladen und senden und somit nicht den Arbeitsspeicher des Servers laden. Im zweiten Fall laden wir sofort die gesamte Datei in den Arbeitsspeicher und senden sie erst dann.
Weiter im Artikel werden wir jeden Stream separat analysieren. Sie können einen Stream mithilfe der Vererbung oder der Konstruktorfunktion erstellen.
const { Readable } = require('stream');
In allen Beispielen verwende ich 2 Methoden.
Lesbarer Stream
Schauen wir uns an, wie wir in NodeJS einen lesbaren Stream erstellen können.
const { Readable } = require('stream'); class myReadable extends Readable { constructor(opt) { super(opt); } _read(size) {} }
Wie Sie aus dem obigen Beispiel sehen können, akzeptiert diese Klasse eine Reihe von Parametern. Wir betrachten nur diejenigen, die für ein allgemeines Verständnis des Readable-Streams erforderlich sind, den Rest können Sie in der Dokumentation sehen. Wir interessieren uns für den Parameter highWaterMark und die Methode _read.
highWaterMark - Dies ist die maximale Anzahl von Bytes des internen Stream-Puffers (standardmäßig 16 KB), bei deren Erreichen der Lesevorgang von der Ressource unterbrochen wird. Um weiterlesen zu können, müssen wir den internen Puffer freigeben. Wir können dies tun, indem wir die Pipe aufrufen, Methoden fortsetzen oder das Datenereignis abonnieren.
_read ist eine Implementierung einer privaten Methode, die von internen Methoden der Readable-Klasse aufgerufen wird. Es wird kontinuierlich aufgerufen, bis die Datengröße highWaterMark erreicht.
Nun, die letzte Methode, die uns interessiert, ist readable.push. Sie fügt Daten direkt zum internen Puffer hinzu. Es gibt true zurück, aber sobald der Puffer voll ist, beginnt ein Aufruf dieser Methode, false zurückzugeben. Sie kann mit der readable._read-Methode gesteuert werden.
Lassen Sie uns nun ein Beispiel sehen, um die Situation zu klären.
class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); console.log(`Added: ${this._index}. Could be added? `, this.push(buf)); } } } const counter = new Counter({ highWaterMark: 2 }); console.log(`Received: ${counter.read().toString()}`);
Zunächst möchte ich sagen, dass counter.read () nicht das _read ist, das wir in der Klasse implementiert haben. Diese Methode ist privat und diese ist öffentlich und gibt Daten aus dem internen Puffer zurück. Wenn wir diesen Code ausführen, sehen wir in der Konsole Folgendes:

Was ist hier passiert? Beim Erstellen des neuen Counter-Streams ({highWaterMark: 2}) haben wir angegeben, dass die Größe unseres internen Puffers 2 Byte betragen würde, d. H. kann 2 Zeichen speichern (1 Zeichen = 1 Byte). Nach dem Aufruf von counter.read () beginnt der Stream mit dem Lesen, schreibt '1' in den internen Puffer und gibt ihn zurück. Dann liest er weiter und schreibt '2'. Wenn '3' geschrieben wird, ist der Puffer voll, readable.push gibt false zurück und der Stream wartet, bis der interne Puffer freigegeben ist. Weil In unserem Beispiel gibt es keine Logik, um den Puffer freizugeben. Das Skript wird beendet.
Wie bereits erwähnt, müssen wir den internen Puffer ständig löschen, um sicherzustellen, dass der Lesevorgang nicht unterbrochen wird. Dazu abonnieren wir das Datenereignis. Ersetzen Sie die letzten 2 Zeilen durch den folgenden Code.
const counter = new Counter({ highWaterMark: 2 }); counter.on('data', chunk => { console.log(`Received: ${chunk.toString()}`); });
Wenn wir jetzt dieses Beispiel ausführen, werden wir sehen, dass alles so funktioniert hat, wie es sollte, und die Zahlen von 1 bis 1000 werden in der Konsole angezeigt.
Schriftlicher Stream
Tatsächlich ist es einem lesbaren Stream sehr ähnlich, der nur zum Schreiben von Daten vorgesehen ist.
const { Writable } = require('stream'); class myWritable extends Writable { constructor(opt) { super(opt); } _write(chunk, encoding, callback) {} }
Es akzeptiert ähnliche Parameter wie Readable Stream. Wir interessieren uns für highWaterMark und _write.
_write ist eine private Methode, die von den internen Methoden der Writable-Klasse zum Schreiben von Daten aufgerufen wird. Es werden 3 Parameter benötigt: Chunk (Teil der Daten), Codierung (Codierung, wenn Chunk eine Zeichenfolge ist), Rückruf (eine Funktion, die nach einem erfolgreichen oder erfolglosen Schreibvorgang aufgerufen wird).
highWaterMark ist die maximale Anzahl von Bytes des internen Stream-Puffers (standardmäßig 16 KB). Sobald stream.write erreicht ist, wird false zurückgegeben.
Lassen Sie uns das vorherige Beispiel mit einem Zähler umschreiben.
const { Writable } = require('stream'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); for (let i = 1; i < 1000; i += 1) { counter.write(Buffer.from(`${i}`, 'utf8')); }
In der Tat ist alles einfach, aber es gibt eine interessante Nuance, an die es sich zu erinnern lohnt! Beim Erstellen des neuen Counter-Streams ({highWaterMark: 2}) haben wir angegeben, dass die Größe unseres internen Puffers 2 Byte betragen würde, d. H. kann 2 Zeichen speichern (1 Zeichen = 1 Byte). Wenn der Zähler zehn erreicht, wird der Puffer mit jedem Aufruf zum Schreiben gefüllt. Wenn also der Schreibvorgang auf eine langsame Quelle ausgeführt wird, werden alle anderen Daten beim Aufrufen von write im RAM gespeichert, was zu einem Überlauf führen kann (in diesem Beispiel: Natürlich spielt es keine Rolle, da unser Puffer 2 Byte groß ist. Bei großen Dateien müssen Sie sich dies merken.) Wenn eine solche Situation auftritt, müssen wir warten, bis der Stream den aktuellen Teil der Daten schreibt, den internen Puffer freigibt (das Drain-Ereignis auslöst), und dann können wir die Aufzeichnung der Daten fortsetzen. Lassen Sie uns unser Beispiel umschreiben.
const { Writable } = require('stream'); const { once } = require('events'); class Counter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ highWaterMark: 2 }); (async () => { for (let i = 1; i < 1000; i += 1) { const canWrite = counter.write(Buffer.from(`${i}`, 'utf8')); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } } })();
Die events.once-Methode wurde in Version 11.13.0 hinzugefügt und ermöglicht es Ihnen, ein Versprechen zu erstellen und auf die einmalige Ausführung eines bestimmten Ereignisses zu warten. In diesem Beispiel wird geprüft, ob Daten in den Stream geschrieben werden können. Wenn nicht, warten Sie, bis der Puffer freigegeben ist, und fahren Sie mit der Aufzeichnung fort.
Auf den ersten Blick scheint dies eine unnötige Aktion zu sein. Wenn Sie jedoch mit großen Datenmengen arbeiten, z. B. Dateien mit einem Gewicht von mehr als 10 GB, kann dies zu einem Speicherverlust führen.
Duplex-Stream
Es kombiniert lesbare und schreibbare Streams, dh, wir müssen eine Implementierung der beiden Methoden _read und _write schreiben.
const { Duplex } = require('stream'); class myDuplex extends Duplex { constructor(opt) { super(opt); } _read(size) {} _write(chunk, encoding, callback) {} }
Hier interessieren uns 2 Parameter, die wir an den Konstruktor übergeben können, nämlich readableHighWaterMark und writableHighWaterMark, mit denen wir die Größe des internen Puffers für lesbare bzw. schreibbare Streams angeben können. So sieht die Implementierung der beiden vorherigen Beispiele mit Hilfe des Duplex-Streams aus.
const { Duplex } = require('stream'); const events = require('events'); class Counter extends Duplex { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } const counter = new Counter({ readableHighWaterMark: 2, writableHighWaterMark: 2 }); (async () => { let chunk = counter.read(); while (chunk !== null) { const canWrite = counter.write(chunk); console.log(`Can we write bunch of data? ${canWrite}`); if (!canWrite) { await events.once(counter, 'drain'); console.log('drain event fired.'); } chunk = counter.read(); } })();
Ich denke, dieser Code muss nicht erklärt werden, da er derselbe ist wie zuvor, nur in einer Klasse.
Stream transformieren
Dieser Stream ist ein Duplex-Stream. Es wird benötigt, um einen Teil der Daten zu konvertieren und weiter in der Kette zu senden. Es kann auf die gleiche Weise wie der Rest des Streams implementiert werden.
const { Transform } = require('stream'); class myTransform extends Transform { _ transform(chunk, encoding, callback) {} }
Wir interessieren uns für die _transform-Methode.
_transform ist eine private Methode, die von den internen Methoden der Transform-Klasse aufgerufen wird, um einen Datenblock zu transformieren. Es werden 3 Parameter benötigt: chunk (Teil der Daten), encoding (Codierung, wenn chunk eine Zeichenfolge ist), callback (eine Funktion, die nach einem erfolgreichen oder erfolglosen Schreibvorgang aufgerufen wird).
Mit dieser Methode wird ein Teil der Daten geändert. Innerhalb dieser Methode können wir transform.push () null oder mehrmals aufrufen, wodurch die Änderungen festgeschrieben werden. Wenn wir die Datenkonvertierung abgeschlossen haben, müssen wir einen Rückruf aufrufen, der alles sendet, was wir zu transform.push () hinzugefügt haben. Der erste Parameter dieser Rückruffunktion ist ein Fehler. Außerdem können wir transform.push () nicht verwenden, sondern die geänderten Daten als zweiten Parameter an die Rückruffunktion senden (Beispiel: Rückruf (null, Daten)). Um zu verstehen, wie dieser Streamtyp verwendet wird, analysieren wir die stream.pipe-Methode.
stream.pipe - Diese Methode wird verwendet, um den Readable-Stream mit dem Writable-Stream zu verbinden und Stream-Ketten zu erstellen. Dies bedeutet, dass wir einen Teil der Daten lesen und zur Verarbeitung an den nächsten Stream und dann an den nächsten usw. übertragen können.
Schreiben wir einen Transformations-Stream, der das Zeichen * am Anfang und am Ende jedes Datenelements anfügt.
class CounterReader extends Readable { constructor(opt) { super(opt); this._max = 1000; this._index = 0; } _read() { this._index += 1; if (this._index > this._max) { this.push(null); } else { const buf = Buffer.from(`${this._index}`, 'utf8'); this.push(buf); } } } class CounterWriter extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } } class CounterTransform extends Transform { _transform(chunk, encoding, callback) { try { const resultString = `*${chunk.toString('utf8')}*`; callback(null, resultString); } catch (err) { callback(err); } } } const counterReader = new CounterReader({ highWaterMark: 2 }); const counterWriter = new CounterWriter({ highWaterMark: 2 }); const counterTransform = new CounterTransform({ highWaterMark: 2 }); counterReader.pipe(counterTransform).pipe(counterWriter);
In diesem Beispiel habe ich Readable- und Writable-Streams aus den vorherigen Beispielen verwendet und außerdem Transform hinzugefügt. Wie Sie sehen, hat es sich als ziemlich einfach erwiesen.
Also haben wir uns angesehen, wie die Streams angeordnet sind. Ihr Hauptkonzept ist die teilweise Datenverarbeitung, die sehr komfortabel ist und keine großen Ressourcen erfordert. Streams können auch mit Iteratoren verwendet werden, was ihre Verwendung noch komfortabler macht, aber dies ist eine ganz andere Geschichte.