Spielen mit Threads in Node.JS 10.5.0

Guten Tag



Bei meiner Arbeit kam es zu einem Streit zwischen mir und den verbundenen Unternehmen über die Threads in der neuen Version von Node.JS und die Notwendigkeit, sie zu synchronisieren. Zunächst haben wir uns entschieden, parallel Zeilen in die Datei zu schreiben. Das Thema mit worker_threads ist heiß, bitte unter cat.

Ein wenig über die Bäche selbst. Es handelt sich um experimentelle Technologie in Node.JS 10.5.0. Um auf das Modul "worker_threads" zugreifen zu können, müssen Sie unsere Node.JS-Anwendung mit dem Flag "--experimental-worker" ausführen. Ich habe dieses Flag im Startskript in der Datei package.json registriert:
{ "name": "worker-test", "version": "1.0.0", "description": "", "main": "app.js", "scripts": { "start": "node --max-old-space-size=4096 --experimental-worker app.js " }, "author": "", "license": "ISC" } 

Nun zur Logik selbst. Der Hauptthread erzeugt N Arbeitsthreads, die alle in einem bestimmten Intervall in die Datei schreiben. Im Gegensatz zu allen Beispielen, bei denen der Haupt- und der untergeordnete Stream von einer Datei ausgehen, habe ich die Streams in einen separaten getrennt. Dies scheint mir sauberer und eleganter zu sein.

Eigentlich der Code.

Die Hauptdatei app.js ist der Einstiegspunkt.

 const { Worker } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Hello from main!'); for (var i = 1; i <= WORKERS_NUMBER ; i++) { const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); } 

Hier erstellen wir einfach untergeordnete Streams mit der Worker-Klasse und geben den Pfad zur Startdatei für den Stream './writer-worker-app/app.js' an. Beim Erstellen des Streams übertragen wir den selbstgeschriebenen Bezeichner als WorkerData-Daten.

Datei für Stream starten ./writer-worker-app/app.js:

 const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); while (true) { sendMessage(); } function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } 


Nun, die einfachste Logger-Klasse: ./writer-worker-app/logger.js
 const fs = require('fs'); function log(message) { return fs.appendFileSync('./my-file.txt', message); } module.exports = { log }; 

Als wir diese Anwendung starteten, hofften wir alle, dass wir am Ende etwas Unordnung in der Datei bekommen würden und die Spender schreien würden, wie wichtig Sperren mit Semaphoren und anderen Freuden der parallelen Ausführung sind. Aber nein! In der Datei werden alle Zeilen ohne Unterbrechung angezeigt, außer in zufälliger Reihenfolge:

Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11

Ein wunderbares Experiment, ein weiterer kleiner Sieg für Noda :-) Ich gehe davon aus, dass die gesamte Synchronisation auf der E / A-Ebene der Noda-Flows erfolgt, aber ich bin froh, die richtige Option in den Kommentaren zu kennen. Nur für den Fall, dass wir die Arbeit nicht mit fs.appendFileSync , sondern mit fs.createWriteStream und der stream.write- Methode überprüft haben .

Das Ergebnis war das gleiche.

Aber wir haben hier nicht aufgehört.


Ein Kollege schlug die Aufgabe vor, Threads zu synchronisieren. In unserem speziellen Beispiel ist es die Aufgabe, nacheinander in aufsteigender Reihenfolge der Bezeichner in eine Datei zu schreiben. Schreibt zuerst den ersten Stream, dann den zweiten, dann den dritten und so weiter.

Zu diesem Zweck habe ich einen weiteren Thread-Manager eingeführt. Es war möglich, mit der Hauptsache auszukommen, aber ich freue mich sehr, diese isolierten Arbeiter zu schaffen und die Kommunikation durch Botschaften aufzubauen. Bevor Sie mit der Implementierung des Stream-Managers beginnen, müssen Sie einen Kommunikationskanal zwischen ihm und den Autoren erstellen. Hierfür wurde die MessageChannel- Klasse verwendet. Die Instanzen dieser Klasse haben zwei Felder: port1 und port2 , von denen jedes mit den Methoden .on ('message') und .postMessage () Nachrichten abhören und an den anderen senden kann . Diese Klasse wurde als Teil des Moduls "worker_threads" für die Kommunikation zwischen Threads erstellt, da ein übertragenes Objekt normalerweise einfach geklont wird und in einer isolierten Thread-Ausführungsumgebung unbrauchbar ist.

Für die Kommunikation zwischen zwei Flows müssen wir jedem einen Port geben.

Eine interessante Tatsache : Bei 10.5.0 ist es unmöglich, den Port durch den Konstruktor des Workers zu übergeben . Sie müssen dies nur über worker.postMessage () tun und den Port im Parameter transferList angeben!

Der Thread-Manager selbst sendet Befehle in aufsteigender Reihenfolge ihrer Bezeichner an die Writer-Threads und sendet den nächsten Befehl erst, nachdem er vom Writer eine Antwort über den erfolgreichen Vorgang erhalten hat.

Under-UML-Anwendungsdiagramm:


Unsere mutierte Hauptdatei ./app.js:
 const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Main app initialized and started.'); const workersMeta = []; for (var i = 1; i <= WORKERS_NUMBER; i++) { const channel = new MessageChannel(); const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); workersMeta.push({ id: i, worker, channel }); } workersMeta.forEach(({ worker, channel }) => { worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]); }) setTimeout(() => { const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js')); const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 })); orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port)); console.log('All worker threads have been initialized'); }, WORKERS_NUMBER * 10); 

Hier erstellen wir zuerst Worker, dann senden wir jedem Port einen Port für die Kommunikation mit dem Manager (und nur so ist es über den Konstruktor unmöglich, dies zu tun).

Dann erstellen wir einen Thread-Manager und senden ihm eine Liste von Ports für die Kommunikation mit Writer-Flows.
Aktualisiert : empirisch habe ich herausgefunden, dass es bei der Arbeit mit Streams besser ist, sie zuerst brauen zu lassen (nach Bedarf initialisieren). Für immer war es notwendig, einige Antworten aus den Flüssen im Stil von „Ich bin bereit!“ Zu hören, aber ich entschied mich für den einfacheren Weg.

Wir werden auch das Verhalten des Writer-Threads so ändern, dass er nur dann eine Nachricht sendet, wenn er dazu aufgefordert wird, und das Ergebnis auch zurückgibt, wenn der Schreibvorgang abgeschlossen ist:
./writer-worer-app/app.js
 const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); parentPort.on('message', value => { const orchestratorPort = value.orchestratorPort; orchestratorPort.on('message', data => { if (data.command == 'write') { console.log(`Worker ${id} received write command`); sendMessage(); sendResult(orchestratorPort); } }); console.log(`Worker ${id} started.`); }); function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } function sendResult(port) { port.postMessage({ id, status: 'completed' }); } 

Wir haben aus der Nachricht des übergeordneten Streams korrekt initialisiert und den Kanal des Stream-Managers gestartet. Wenn wir den Befehl erhalten, schreiben wir zuerst in die Datei und senden dann das Ergebnis. Es ist zu beachten, dass die Datei synchron geschrieben wird, sodass sendResult () unmittelbar nach sendMessage () aufgerufen wird.

Alles was bleibt ist, die Implementierung unseres intelligenten Managers zu schreiben
./orchestrator-worker-app/app.js:
 const { parentPort } = require('worker_threads'); console.log('Orchestrator initialized.') let workerPorts; parentPort.on('message', (value) => { workerPorts = value.workerPorts; workerPorts.forEach(wp => wp.port.on('message', handleResponse)); console.log('Orchestrator started.'); sendCommand(workerPorts[0]); }); function handleResponse(status) { const responseWorkerId = status.id; let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1); if (!nextWorker) { nextWorker = workerPorts[0]; } sendCommand(nextWorker); } function sendCommand(worker) { worker.port.postMessage({ command: 'write' }); } 

Wir haben eine Liste der geordneten Ports erhalten, für jeden Port einen Rückruf auf die Antwort gesetzt und den Befehl an den ersten gesendet. Im Rückruf selbst suchen wir den nächsten Schreiber und senden ihm einen Befehl. Um das System nicht zu stark zu belasten, wurde das Intervall zwischen den Teams festgelegt.

Das war's, unsere Multithread-Thread-Management-Anwendung ist fertig. Wir haben gelernt, in Node.JS nicht nur Worker-Flows zu generieren, sondern auch effektive Kommunikationswege zwischen ihnen zu schaffen. Meiner persönlichen Meinung nach ist die Architektur isolierter Threads in Node.JS mit Warten und Senden von Nachrichten mehr als praktisch und vielversprechend. Vielen Dank für Ihre Aufmerksamkeit.

Den gesamten Quellcode finden Sie hier .

UPDATE


Um die Leser nicht irrezuführen und auch keine unnötigen Gründe für das Schreiben anzugeben, dass ich mit Timeouts betrüge, habe ich den Artikel und das Repository aktualisiert.
Änderungen:
1) Die Intervalle in den ursprünglichen Autoren werden entfernt, während (true) den Hardcore durchläuft
2) --max-old-space-size = 4096 Flag hinzugefügt, nur für den Fall, weil Die aktuelle Implementierung der Streams ist nicht sehr stabil und ich hoffe das hilft irgendwie.
3) Die Intervalle zum Senden von Nachrichten vom Thread-Manager wurden gelöscht. Jetzt ist die Aufnahme ohne Unterbrechung.
4) Bei der Initialisierung des Managers wurde ein Timeout hinzugefügt, warum - es ist oben beschrieben.

ZU TUN:
1) Nachrichten mit variabler Länge hinzufügen oder den Anruf des Loggers zählen - danke FANAT1242
2) Fügen Sie einen Benchmark hinzu, vergleichen Sie die Arbeit der ersten und zweiten Version (wie viele Zeilen werden beispielsweise in 10 Sekunden geschrieben)

UPDATE 2


1) Der Protokollierungscode wurde geändert: Jetzt hat jede Nachricht eine andere Länge.
2) Writer-worker-app / app.old.js wurde geändert: Jeder Thread schreibt 1000 Mal und wird dann beendet.

Dies wurde durchgeführt, um die Ideen des FANAT1242-Benutzers zu testen. Nachrichten schreiben sich trotzdem nicht neu, die Zeilen in der Datei sind genau 1000 * N Threads.

Source: https://habr.com/ru/post/de416015/


All Articles