Jugando con hilos en Node.JS 10.5.0

Buen dia



En mi trabajo, surgió una disputa entre mí y los afiliados sobre los hilos en la nueva versión de Node.JS y la necesidad de sincronizarlos. Para empezar, decidimos elegir la tarea de escribir líneas en el archivo en paralelo. El tema con worker_threads es candente, por favor, debajo de cat.

Un poco sobre las corrientes en sí. Son tecnologías experimentales en Node.JS 10.5.0, y para tener acceso al módulo "worker_threads", debe ejecutar nuestra aplicación Node.JS con el indicador "--experimental-worker". Registré este indicador en el script de inicio en el archivo package.json:
{ "name": "worker-test", "version": "1.0.0", "description": "", "main": "app.js", "scripts": { "start": "node --max-old-space-size=4096 --experimental-worker app.js " }, "author": "", "license": "ISC" } 

Ahora sobre la lógica misma. El hilo principal genera N hilos de trabajo; todos escriben en el archivo en algún intervalo. A diferencia de todos los ejemplos en los que las secuencias principales y secundarias comienzan desde un archivo, separé las secuencias en una separada, me parece más limpio y elegante.

En realidad, el código.

El archivo principal app.js es el punto de entrada.

 const { Worker } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Hello from main!'); for (var i = 1; i <= WORKERS_NUMBER ; i++) { const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); } 

Aquí simplemente creamos secuencias secundarias utilizando la clase Worker y especificando la ruta al archivo de inicio de la secuencia './writer-worker-app/app.js'. Al crear la secuencia, transferimos el identificador autoescrito como datos de dataData de trabajo.

Archivo de inicio para la secuencia ./writer-worker-app/app.js:

 const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); while (true) { sendMessage(); } function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } 


Bueno, la clase de registrador más simple: ./writer-worker-app/logger.js
 const fs = require('fs'); function log(message) { return fs.appendFileSync('./my-file.txt', message); } module.exports = { log }; 

Al iniciar esta aplicación, todos esperábamos que al final nos metiéramos un poco en el archivo y los donantes gritaran cómo se necesitaban bloqueos con semáforos y otras alegrías de ejecución paralela. Pero no! En el archivo, todas las líneas van sin interrupción, excepto en orden aleatorio:

Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11

Un experimento maravilloso, otra pequeña victoria para Noda :-) Mi suposición es que toda la sincronización ocurre en el nivel de E / S de los flujos de Noda, pero me alegrará saber la opción correcta en los comentarios. Por si acaso, verificamos el trabajo utilizando no fs.appendFileSync , sino fs.createWriteStream y el método stream.write .

El resultado salió igual.

Pero no nos detuvimos allí.


Un colega sugirió la tarea de sincronizar hilos. Para nuestro ejemplo específico, que sea la tarea de escribir secuencialmente en un archivo en orden ascendente de identificadores. Primero escribe la primera secuencia, luego la segunda, luego la tercera y así sucesivamente.

Para hacer esto, presenté otro administrador de hilos. Era posible salir adelante con lo principal, pero estoy muy contento de crear estos trabajadores aislados y construir la comunicación a través de mensajes. Antes de comenzar a escribir la implementación de Stream-Manager, debe crear un canal de comunicación entre él y los escritores-trabajadores. La clase MessageChannel se usó para esto. Las instancias de esta clase tienen dos campos: puerto1 y puerto2 , cada uno de los cuales puede escuchar y enviar mensajes al otro utilizando los métodos .on ('mensaje') y .postMessage () . Esta clase se creó como parte del módulo "trabajador_procesos" para la comunicación entre subprocesos, porque generalmente cuando se transfiere un objeto, simplemente se clona, ​​y en un entorno de ejecución de subproceso aislado será inútil.

Para la comunicación entre 2 flujos, debemos dar a todos un puerto.

Un hecho interesante : en 10.5.0 es imposible pasar el puerto a través del constructor del trabajador , solo debe hacerlo a través de worker.postMessage (), ¡y asegúrese de especificar el puerto en el parámetro transferList!

El administrador de subprocesos mismo enviará comandos a los subprocesos del escritor en orden ascendente de sus identificadores, y enviará el siguiente comando solo después de recibir una respuesta del escritor sobre la operación exitosa.

Diagrama de aplicación bajo UML:


Nuestro archivo principal mutado ./app.js:
 const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Main app initialized and started.'); const workersMeta = []; for (var i = 1; i <= WORKERS_NUMBER; i++) { const channel = new MessageChannel(); const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); workersMeta.push({ id: i, worker, channel }); } workersMeta.forEach(({ worker, channel }) => { worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]); }) setTimeout(() => { const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js')); const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 })); orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port)); console.log('All worker threads have been initialized'); }, WORKERS_NUMBER * 10); 

Aquí primero creamos trabajadores, luego enviamos a cada puerto un puerto para la comunicación con el gerente (y solo de esta manera, a través del constructor es imposible hacer esto).

Luego creamos un administrador de hilos, le enviamos una lista de puertos para la comunicación con los flujos de escritor.
Actualizado : empíricamente, descubrí que cuando se trabaja con transmisiones, es mejor dejar que se preparen primero (inicializar según sea necesario). Para bien, era necesario escuchar algunas respuestas de los flujos al estilo de "Estoy listo", pero decidí seguir el camino más fácil.

También cambiaremos el comportamiento del hilo del escritor para que envíe un mensaje solo cuando se lo indiquen, y también devuelva el resultado cuando se complete la operación de escritura:
./writer-worer-app/app.js
 const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); parentPort.on('message', value => { const orchestratorPort = value.orchestratorPort; orchestratorPort.on('message', data => { if (data.command == 'write') { console.log(`Worker ${id} received write command`); sendMessage(); sendResult(orchestratorPort); } }); console.log(`Worker ${id} started.`); }); function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } function sendResult(port) { port.postMessage({ id, status: 'completed' }); } 

Inicializamos correctamente desde el mensaje de la secuencia principal, comenzamos a pasar el canal del administrador de secuencia, cuando recibimos el comando, primero escribimos en el archivo, luego enviamos el resultado. Cabe señalar que el archivo se escribe de forma sincrónica, por lo que se llama a sendResult () inmediatamente después de sendMessage ().

Todo lo que queda es escribir la implementación de nuestro administrador inteligente
./orchestrator-worker-app/app.js:
 const { parentPort } = require('worker_threads'); console.log('Orchestrator initialized.') let workerPorts; parentPort.on('message', (value) => { workerPorts = value.workerPorts; workerPorts.forEach(wp => wp.port.on('message', handleResponse)); console.log('Orchestrator started.'); sendCommand(workerPorts[0]); }); function handleResponse(status) { const responseWorkerId = status.id; let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1); if (!nextWorker) { nextWorker = workerPorts[0]; } sendCommand(nextWorker); } function sendCommand(worker) { worker.port.postMessage({ command: 'write' }); } 

Obtuvimos una lista de puertos, ordenados, para cada puerto establecimos una devolución de llamada a la respuesta, bueno, y enviamos el comando al primero. En la devolución de llamada en sí, estamos buscando al próximo escritor y le enviamos un comando. Para no forzar demasiado el sistema, se estableció el intervalo entre los equipos.

Eso es todo, nuestra aplicación de gestión de subprocesos multiproceso está lista. Aprendimos no solo a generar flujos de trabajadores en Node.JS, sino también a crear formas efectivas de comunicación entre ellos. En mi opinión personal, la arquitectura de hilos aislados en Node.JS con espera y envío de mensajes es más que conveniente y prometedora. Gracias a todos por su atención.

Todo el código fuente se puede encontrar aquí .

ACTUALIZAR


Para no confundir a los lectores, y también para no dar razones innecesarias para escribir que hago trampa con los tiempos de espera, actualicé el artículo y el repositorio.
Cambios:
1) los intervalos en los escritores originales se eliminan, ahora mientras que (verdadero) va a lo largo del hardcore
2) agregado --max-old-space-size = 4096 flag, por si acaso, porque La implementación actual de las transmisiones no es muy estable y espero que esto ayude de alguna manera.
3) se han eliminado los intervalos para enviar mensajes desde el administrador de hilos. Ahora la grabación es continua.
4) se agregó un tiempo de espera al inicializar el administrador, por qué, se describe anteriormente.

Para hacer:
1) agregue mensajes de longitud variable o contando la llamada del registrador - gracias FANAT1242
2) agregue un punto de referencia, compare el trabajo de la primera y la segunda versión (cuántas líneas se escribirán en 10 segundos, por ejemplo)

ACTUALIZACIÓN 2


1) El código de registro ha cambiado: ahora cada mensaje tiene una longitud diferente.
2) Writer-worker-app / app.old.js ha cambiado: cada hilo escribe 1000 veces, luego termina.

Esto se hizo para probar las ideas del usuario de FANAT1242. Los mensajes aún no se sobrescriben entre sí, hay exactamente 1000 * N hilos en el archivo.

Source: https://habr.com/ru/post/es416015/


All Articles