Bermain dengan utas di Node.JS 10.5.0

Hari yang baik



Di tempat kerja saya, perselisihan muncul antara saya dan afiliasi tentang utas di Node.JS versi baru dan kebutuhan untuk menyinkronkannya. Untuk memulainya, kami memutuskan untuk memilih tugas menulis baris ke file secara paralel. Topik dengan worker_threads panas, tolong, di bawah kucing.

Sedikit tentang aliran itu sendiri. Mereka adalah teknologi eksperimental di Node.JS 10.5.0, dan untuk memiliki akses ke modul "worker_threads", Anda perlu menjalankan aplikasi Node.JS kami dengan bendera "- pekerja eksperimen". Saya mendaftarkan bendera ini di skrip awal di file package.json:
{ "name": "worker-test", "version": "1.0.0", "description": "", "main": "app.js", "scripts": { "start": "node --max-old-space-size=4096 --experimental-worker app.js " }, "author": "", "license": "ISC" } 

Sekarang tentang logika itu sendiri. Utas utama memunculkan utas pekerja, semuanya menulis ke file pada interval tertentu. Tidak seperti semua contoh di mana stream utama dan anak mulai dari satu file, saya memisahkan stream menjadi yang terpisah, menurut saya lebih bersih dan elegan.

Sebenarnya kodenya.

File app.js utama adalah titik masuk.

 const { Worker } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Hello from main!'); for (var i = 1; i <= WORKERS_NUMBER ; i++) { const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); } 

Di sini kita cukup membuat stream anak menggunakan kelas Worker dan menentukan path ke file awal untuk stream './writer-worker-app/app.js'. Saat membuat aliran, kami mentransfer pengenal yang ditulis sendiri sebagai data data pekerja.

Mulai file untuk streaming ./writer-worker-app/app.js:

 const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); while (true) { sendMessage(); } function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } 


Yah, kelas logger yang paling sederhana: ./writer-worker-app/logger.js
 const fs = require('fs'); function log(message) { return fs.appendFileSync('./my-file.txt', message); } module.exports = { log }; 

Ketika memulai aplikasi ini, kami semua berharap bahwa pada akhirnya kami akan mendapatkan beberapa kekacauan dalam file dan para donor akan berteriak betapa diperlukannya kunci dengan semaphore dan kegembiraan lain dari eksekusi paralel. Tapi tidak! Dalam file, semua baris tanpa gangguan, kecuali dalam urutan acak:

Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11

Eksperimen yang hebat, kemenangan kecil lain untuk Noda :-) Asumsi saya adalah bahwa semua sinkronisasi terjadi pada level I / O dari aliran Noda, tetapi saya akan senang mengetahui opsi yang benar dalam komentar. Untuk jaga-jaga, kami memeriksa pekerjaan menggunakan bukan fs.appendFileSync , tetapi fs.createWriteStream dan metode stream.write .

Hasilnya keluar sama.

Tapi kami tidak berhenti di situ.


Seorang kolega menyarankan tugas menyinkronkan utas. Untuk contoh khusus kami, biarkan itu tugas menulis berurutan ke file dalam urutan pengidentifikasi. Pertama menulis aliran pertama, lalu yang kedua, lalu yang ketiga dan seterusnya.

Untuk melakukan ini, saya memperkenalkan Thread manajer lain. Itu mungkin untuk bertahan dengan hal utama, tetapi saya sangat senang menciptakan pekerja yang terisolasi ini dan membangun komunikasi melalui pesan. Sebelum Anda mulai menulis implementasi Stream-Manager, Anda perlu membuat saluran komunikasi antara dia dan pekerja penulis. Kelas MessageChannel digunakan untuk ini. Contoh kelas ini memiliki dua bidang: port1 dan port2 , masing-masing dapat mendengarkan dan mengirim pesan ke yang lain menggunakan metode .on ('message') dan .postMessage () . Kelas ini dibuat sebagai bagian dari modul โ€œworker_threadsโ€ untuk komunikasi antar utas, karena biasanya ketika suatu objek ditransfer, ia hanya dikloning, dan dalam lingkungan eksekusi utas terisolasi itu tidak akan berguna.

Untuk komunikasi antara 2 aliran, kita harus memberi semua orang port.

Fakta menarik : pada 10.5.0 tidak mungkin untuk melewati port melalui konstruktor pekerja , Anda hanya perlu melakukan ini melalui pekerja.postMessage (), dan pastikan untuk menentukan port di parameter transferList!

Pengelola utas sendiri akan mengirim perintah ke utas penulis dengan urutan naik dari pengidentifikasi mereka, dan itu akan mengirim perintah berikutnya hanya setelah menerima respons dari penulis tentang operasi yang berhasil.

Diagram aplikasi UML:


File ./app.js bermutasi utama kami:
 const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Main app initialized and started.'); const workersMeta = []; for (var i = 1; i <= WORKERS_NUMBER; i++) { const channel = new MessageChannel(); const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); workersMeta.push({ id: i, worker, channel }); } workersMeta.forEach(({ worker, channel }) => { worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]); }) setTimeout(() => { const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js')); const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 })); orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port)); console.log('All worker threads have been initialized'); }, WORKERS_NUMBER * 10); 

Di sini kita pertama-tama membuat pekerja, lalu kami mengirim setiap port port untuk komunikasi dengan manajer (dan hanya dengan cara ini, melalui konstruktor tidak mungkin untuk melakukan ini).

Kemudian kami membuat manajer utas, kami mengirimkan daftar port untuk komunikasi dengan penulis mengalir.
Diperbarui : secara empiris, saya mengetahui bahwa ketika bekerja dengan stream, lebih baik membiarkannya dibuat terlebih dahulu (diinisialisasi sesuai kebutuhan). Demi kebaikan itu perlu untuk mendengarkan beberapa jawaban dari aliran dalam gaya "Aku siap!", Tapi aku memutuskan untuk pergi dengan cara yang lebih mudah.

Kami juga akan mengubah perilaku utas penulis sehingga hanya mengirim pesan saat diperintahkan kepadanya, dan juga mengembalikan hasilnya saat operasi penulisan selesai:
./writer-worer-app/app.js
 const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); parentPort.on('message', value => { const orchestratorPort = value.orchestratorPort; orchestratorPort.on('message', data => { if (data.command == 'write') { console.log(`Worker ${id} received write command`); sendMessage(); sendResult(orchestratorPort); } }); console.log(`Worker ${id} started.`); }); function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } function sendResult(port) { port.postMessage({ id, status: 'completed' }); } 

Kami diinisialisasi dengan benar dari pesan stream induk, mulai terjadi saluran manajer aliran, ketika kami menerima perintah, pertama-tama kami menulis ke file, kemudian kami mengirim hasilnya. Perlu dicatat bahwa file tersebut ditulis secara sinkron, jadi sendResult () dipanggil segera setelah sendMessage ().

Yang tersisa adalah menulis implementasi manajer cerdas kami
./orchestrator-worker-app/app.js:
 const { parentPort } = require('worker_threads'); console.log('Orchestrator initialized.') let workerPorts; parentPort.on('message', (value) => { workerPorts = value.workerPorts; workerPorts.forEach(wp => wp.port.on('message', handleResponse)); console.log('Orchestrator started.'); sendCommand(workerPorts[0]); }); function handleResponse(status) { const responseWorkerId = status.id; let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1); if (!nextWorker) { nextWorker = workerPorts[0]; } sendCommand(nextWorker); } function sendCommand(worker) { worker.port.postMessage({ command: 'write' }); } 

Kami mendapat daftar port, dipesan, untuk setiap port menetapkan panggilan balik ke respons, well, dan mengirim perintah ke yang pertama. Dalam panggilan balik itu sendiri, kami mencari penulis berikutnya dan mengirim perintah kepadanya. Agar tidak terlalu membebani sistem, interval antara tim ditetapkan.

Itu saja, aplikasi manajemen thread multithreaded kami sudah siap. Kami belajar tidak hanya untuk menghasilkan arus pekerja di Node.JS, tetapi juga untuk menciptakan cara komunikasi yang efektif di antara mereka. Menurut pendapat pribadi saya, arsitektur utas terisolasi di Node.JS dengan menunggu dan mengirim pesan lebih dari nyaman dan menjanjikan. Terima kasih atas perhatiannya.

Semua kode sumber dapat ditemukan di sini .

PEMBARUAN


Agar tidak menyesatkan pembaca, dan juga untuk tidak memberikan alasan yang tidak perlu untuk menulis bahwa saya curang dengan waktu habis, saya memperbarui artikel dan repositori.
Perubahan:
1) interval dalam penulis asli dihapus, sekarang sementara (benar) berjalan di sepanjang hardcore
2) menambahkan --max-old-space-size = 4096 flag, untuk berjaga-jaga, karena implementasi stream saat ini tidak terlalu stabil dan saya harap ini bisa membantu.
3) interval untuk mengirim pesan dari manajer utas telah dihapus. Sekarang rekamannya non-stop.
4) batas waktu ditambahkan ketika menginisialisasi manajer, mengapa - dijelaskan di atas.

YANG HARUS DILAKUKAN:
1) menambahkan pesan dengan panjang variabel atau menghitung panggilan logger - terima kasih FANAT1242
2) tambahkan patokan, bandingkan karya versi pertama dan kedua (berapa banyak baris akan ditulis dalam 10 detik, misalnya)

PEMBARUAN 2


1) Kode logging telah diubah: sekarang setiap pesan memiliki panjang yang berbeda.
2) Writer-worker-app / app.old.js telah diubah: setiap utas menulis 1000 kali, lalu berakhir.

Ini dilakukan untuk menguji ide-ide pengguna FANAT1242. Semua pesan yang sama tidak saling menulis ulang, baris dalam file persis 1000 * N utas.

Source: https://habr.com/ru/post/id416015/


All Articles