Bonne journée

Lors de mon travail, un conflit est survenu entre moi et les affiliés concernant les threads dans la nouvelle version de Node.JS et la nécessité de les synchroniser. Pour commencer, nous avons décidé de choisir la tâche d'écrire des lignes dans le fichier en parallèle. Le sujet avec worker_threads est chaud, s'il vous plaît, sous cat.
Un peu sur les ruisseaux eux-mêmes. Il s'agit d'une technologie expérimentale dans Node.JS 10.5.0, et pour avoir accès au module "worker_threads", vous devez exécuter notre application Node.JS avec le drapeau "--experimental-worker". J'ai enregistré cet indicateur dans le script de démarrage dans le fichier package.json:
{ "name": "worker-test", "version": "1.0.0", "description": "", "main": "app.js", "scripts": { "start": "node --max-old-space-size=4096 --experimental-worker app.js " }, "author": "", "license": "ISC" }
Maintenant sur la logique elle-même. Le thread principal génère N threads de travail; ils écrivent tous dans le fichier à un certain intervalle. Contrairement à tous les exemples où les flux principal et enfant commencent à partir d'un seul fichier, j'ai séparé les flux en un autre, il me semble plus propre et élégant.
En fait, le code.
Le fichier principal app.js est le point d'entrée.
const { Worker } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Hello from main!'); for (var i = 1; i <= WORKERS_NUMBER ; i++) { const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); }
Ici, nous créons simplement des flux enfants en utilisant la classe Worker et en spécifiant le chemin d'accès au fichier de démarrage pour le flux './writer-worker-app/app.js'. Lors de la création du flux, nous transférons l'identifiant auto-écrit en tant que données workerData.
Fichier de démarrage pour le flux ./writer-worker-app/app.js:
const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); while (true) { sendMessage(); } function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); }
Eh bien, la classe d'enregistreur la plus simple: ./writer-worker-app/logger.js
const fs = require('fs'); function log(message) { return fs.appendFileSync('./my-file.txt', message); } module.exports = { log };
Au démarrage de cette application, nous espérions tous qu'à la fin nous obtiendrions un peu de gâchis dans le fichier et que les donateurs crieraient la nécessité des verrous avec des sémaphores et d'autres joies de l'exécution parallèle. Mais non! Dans le fichier, toutes les lignes passent sans interruption, sauf dans un ordre aléatoire:
Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11
Une merveilleuse expérience, une autre petite victoire pour Noda :-) Mon hypothèse est que toute la synchronisation se produit au niveau des E / S des flux Noda, mais je serai heureux de connaître l'option correcte dans les commentaires. Juste au cas où, nous avons vérifié le travail en utilisant non
fs.appendFileSync , mais
fs.createWriteStream et la méthode
stream.write .
Le résultat est le même.
Mais nous ne nous sommes pas arrêtés là.
Un collègue a suggéré de synchroniser les threads. Pour notre exemple spécifique, que ce soit la tâche d'écrire séquentiellement dans un fichier dans l'ordre croissant des identifiants. Écrit d'abord le premier flux, puis le second, puis le troisième et ainsi de suite.
Pour ce faire, j'ai introduit un autre gestionnaire de threads. Il était possible de se débrouiller avec l'essentiel, mais je suis très heureux de créer ces travailleurs isolés et de développer la communication par le biais de messages. Avant de commencer à écrire l'implémentation de Stream-Manager, vous devez créer un canal de communication entre lui et les écrivains-travailleurs. La classe
MessageChannel a été utilisée pour cela. Les instances de cette classe ont deux champs:
port1 et
port2 , chacun pouvant écouter et envoyer des messages à l'autre à l'aide des
méthodes .on ('message') et
.postMessage () . Cette classe a été créée dans le cadre du module «worker_threads» pour la communication entre les threads, car généralement lorsqu'un objet est transféré, il est simplement cloné et dans un environnement d'exécution de thread isolé, il sera inutile.
Pour la communication entre 2 flux, il faut donner à chacun un port.
Un fait intéressant : à 10.5.0,
il est impossible de passer le port via le constructeur de l'ouvrier , vous devez le faire uniquement via worker.postMessage (), et assurez-vous de spécifier le port dans le paramètre transferList!
Le gestionnaire de threads lui-même enverra des commandes aux threads d'écriture dans l'ordre croissant de leurs identifiants, et il n'enverra la commande suivante qu'après avoir reçu une réponse de l'auteur sur l'opération réussie.
Diagramme d'application sous-UML:

Notre fichier principal ./app.js muté:
const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Main app initialized and started.'); const workersMeta = []; for (var i = 1; i <= WORKERS_NUMBER; i++) { const channel = new MessageChannel(); const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); workersMeta.push({ id: i, worker, channel }); } workersMeta.forEach(({ worker, channel }) => { worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]); }) setTimeout(() => { const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js')); const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 })); orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port)); console.log('All worker threads have been initialized'); }, WORKERS_NUMBER * 10);
Ici, nous créons d'abord des travailleurs, puis nous envoyons à chaque port un port de communication avec le gestionnaire (et seulement de cette façon, par le biais du constructeur, il est impossible de le faire).
Ensuite, nous créons un gestionnaire de threads, nous lui envoyons une liste de ports pour la communication avec les flux d'écriture.
Mise à jour : empiriquement, j'ai découvert que lorsque vous travaillez avec des flux, il est préférable de les laisser infuser en premier (initialiser au besoin). Pour de bon il a fallu écouter quelques réponses des flux dans le style «Je suis prêt!», Mais j'ai décidé d'aller plus facilement.
Nous allons également modifier le comportement du thread d'écriture afin qu'il n'envoie un message que lorsqu'il lui est dit, et renvoie également le résultat lorsque l'opération d'écriture est terminée:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); parentPort.on('message', value => { const orchestratorPort = value.orchestratorPort; orchestratorPort.on('message', data => { if (data.command == 'write') { console.log(`Worker ${id} received write command`); sendMessage(); sendResult(orchestratorPort); } }); console.log(`Worker ${id} started.`); }); function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } function sendResult(port) { port.postMessage({ id, status: 'completed' }); }
Nous avons correctement initialisé à partir du message du flux parent, commencé à se produire le canal du gestionnaire de flux, lorsque nous recevons la commande, nous écrivons d'abord dans le fichier, puis nous envoyons le résultat. Il convient de noter que le fichier est écrit de manière synchrone, donc sendResult () est appelé immédiatement après sendMessage ().
Il ne reste plus qu'à écrire l'implémentation de notre gestionnaire intelligent
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads'); console.log('Orchestrator initialized.') let workerPorts; parentPort.on('message', (value) => { workerPorts = value.workerPorts; workerPorts.forEach(wp => wp.port.on('message', handleResponse)); console.log('Orchestrator started.'); sendCommand(workerPorts[0]); }); function handleResponse(status) { const responseWorkerId = status.id; let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1); if (!nextWorker) { nextWorker = workerPorts[0]; } sendCommand(nextWorker); } function sendCommand(worker) { worker.port.postMessage({ command: 'write' }); }
Nous avons obtenu une liste de ports, ordonnés, pour chaque port, définissez un rappel à la réponse, eh bien, et avons envoyé la commande au premier. Dans le rappel lui-même, nous recherchons le prochain écrivain et lui envoyons une commande. Afin de ne pas trop solliciter le système, l'intervalle entre les équipes a été fixé.
Voilà, notre application de gestion des threads multithread est prête. Nous avons appris non seulement à générer des flux de travail dans Node.JS, mais aussi à créer des moyens de communication efficaces entre eux. À mon avis, l'architecture de threads isolés dans Node.JS avec des messages en attente et en envoi est plus que pratique et prometteuse. Merci à tous pour votre attention.
Tout le code source peut être trouvé ici .
MISE À JOUR
Afin de ne pas induire les lecteurs en erreur, et aussi de ne pas donner de raisons inutiles d'écrire que je triche avec des timeouts, j'ai mis à jour l'article et le référentiel.
Changements:
1) les intervalles dans les scénaristes originaux sont supprimés, maintenant alors que (vrai) va le long du hardcore
2) ajout de l'indicateur --max-old-space-size = 4096, juste au cas où, car l'implémentation actuelle des flux n'est pas très stable et j'espère que cela aide d'une manière ou d'une autre.
3) les intervalles d'envoi de messages depuis le gestionnaire de threads ont été supprimés. L'enregistrement est maintenant non-stop.
4) un délai d'attente a été ajouté lors de l'initialisation du gestionnaire, pourquoi - il est décrit ci-dessus.
À FAIRE:
1) ajouter des messages de longueur variable ou compter l'appel de l'enregistreur - merci FANAT1242
2) ajouter un benchmark, comparer le travail des première et deuxième versions (combien de lignes seront écrites en 10 secondes, par exemple)
MISE À JOUR 2
1) Le code de journalisation a été modifié: désormais chaque message a une longueur différente.
2) Writer-worker-app / app.old.js a été modifié: chaque thread écrit 1000 fois, puis se termine.
Cela a été fait pour tester les idées de l'utilisateur du FANAT1242. Les messages ne se réécrivent tout de même pas, les lignes du fichier sont exactement 1000 * N threads.