Alat Pengembang Node.js Antrian pekerjaan

Ketika mengimplementasikan aplikasi web dan aplikasi mobile back-end, bahkan yang paling sederhana, sudah menjadi kebiasaan untuk menggunakan alat-alat seperti: database, server mail (smtp), server redis. Seperangkat alat yang digunakan terus berkembang. Misalnya, antrian pesan, dilihat dari jumlah instalasi paket amqplib (650 ribu instalasi per minggu), digunakan bersama dengan basis data relasional (paket mysql 460 ribu instalasi per minggu dan pg 800 ribu instalasi per minggu).

Hari ini saya ingin berbicara tentang antrian pekerjaan, yang sejauh ini menggunakan urutan besarnya kurang, meskipun kebutuhan mereka muncul, di hampir semua proyek nyata

Jadi, antrian pekerjaan memungkinkan Anda untuk melakukan beberapa tugas secara serempak, pada kenyataannya, menjalankan fungsi dengan parameter input yang diberikan dan pada waktu yang ditentukan.

Bergantung pada parameter, tugas dapat dilakukan:

  • segera setelah menambahkan ke antrian pekerjaan;
  • sekali pada waktu yang ditentukan;
  • berkali-kali sesuai jadwal.

Antrian pekerjaan memungkinkan Anda mentransfer parameter ke pekerjaan yang sedang dijalankan, melacak dan menjalankan kembali pekerjaan yang gagal, dan menetapkan batas jumlah pekerjaan yang sedang berjalan secara bersamaan.

Sebagian besar aplikasi di Node.js dikaitkan dengan pengembangan REST-API untuk aplikasi web dan seluler. Mengurangi waktu eksekusi REST-API penting untuk kenyamanan pengguna dengan aplikasi tersebut. Pada saat yang sama, panggilan ke REST-API dapat memulai operasi yang panjang dan / atau intensif sumber daya. Misalnya, setelah melakukan pembelian, Anda harus mengirim pesan push kepada pengguna ke aplikasi seluler, atau mengirim permintaan untuk melakukan pembelian di CRM REST-API. Pertanyaan ini dapat dilakukan secara tidak sinkron. Bagaimana melakukannya dengan benar jika Anda tidak memiliki alat untuk bekerja dengan antrian pekerjaan? Misalnya, Anda dapat mengirim pesan ke antrian pesan, memulai pekerja yang akan membaca pesan-pesan ini dan melakukan pekerjaan yang diperlukan berdasarkan pesan-pesan ini.

Sebenarnya, inilah yang dilakukan oleh antrian pekerjaan. Namun, jika Anda perhatikan lebih dekat, antrian pekerjaan memiliki beberapa perbedaan mendasar dari antrian pesan. Pertama, pesan (statis) dimasukkan ke dalam antrian pesan, dan antrian pekerjaan melibatkan beberapa jenis pekerjaan (panggilan fungsi). Kedua, antrian pekerjaan menyiratkan adanya beberapa prosesor (pekerja) yang akan melakukan pekerjaan yang diberikan. Dalam hal ini, fungsionalitas tambahan diperlukan. Jumlah prosesor prosesor harus diskalakan secara transparan jika ada peningkatan beban. Di sisi lain, perlu untuk membatasi jumlah tugas yang berjalan secara bersamaan pada satu pekerja prosesor untuk memuluskan beban puncak dan mencegah penolakan layanan. Ini menunjukkan bahwa ada kebutuhan untuk alat yang dapat menjalankan tugas yang tidak sinkron dengan mengatur berbagai parameter, semudah membuat permintaan menggunakan REST-API (atau lebih baik jika lebih mudah).

Menggunakan antrian pesan, relatif mudah untuk mengimplementasikan antrian pekerjaan yang berjalan segera setelah suatu pekerjaan antri. Namun seringkali diperlukan untuk menyelesaikan tugas sekali pada waktu yang ditentukan atau sesuai dengan jadwal. Untuk tugas-tugas ini, sejumlah paket banyak digunakan yang mengimplementasikan logika cron di linux. Agar tidak berdasar, saya akan mengatakan bahwa paket node-cron memiliki 480 ribu instalasi per minggu, jadwal-simpul - 170 ribu instalasi per minggu.

Menggunakan node-cron, tentu saja, lebih nyaman daripada asket setInterval (), tetapi secara pribadi, saya telah menemui sejumlah masalah saat menggunakannya. Jika untuk menyatakan kelemahan umum, ini adalah kurangnya kontrol atas jumlah tugas yang dijalankan secara bersamaan (ini merangsang beban puncak: meningkatkan beban memperlambat pekerjaan tugas, memperlambat pekerjaan tugas meningkatkan jumlah tugas yang dijalankan secara bersamaan, yang pada gilirannya memuat sistem lebih banyak), ketidakmampuan menjalankan simpul untuk meningkatkan produktivitas -cron pada beberapa inti (dalam hal ini, semua tugas dieksekusi secara independen pada setiap inti) dan kurangnya alat untuk melacak dan memulai kembali tugas yang telah menyelesaikan Xia dengan kesalahan.

Saya berharap bahwa saya telah menunjukkan bahwa kebutuhan akan alat seperti antrian pekerjaan setara dengan alat-alat seperti database. Dan dana tersebut telah muncul, meskipun belum digunakan secara luas. Saya akan daftar yang paling populer di antara mereka:

Nama paketJumlah instalasi per mingguJumlah suka
kue291908753
antrian lebahtidak ada informasi1431
agenda254595488
banteng562325909


Hari ini saya akan mempertimbangkan penggunaan paket banteng, yang saya kerjakan sendiri. Mengapa saya memilih paket khusus ini (walaupun saya tidak memaksakan pilihan saya pada orang lain). Pada saat itu, ketika saya mulai mencari implementasi antrian pesan yang nyaman, proyek antrian lebah sudah dihentikan. Implementasi kue, sesuai dengan tolok ukur yang diberikan dalam repositori bee-queue, tertinggal jauh di belakang implementasi lainnya dan, di samping itu, tidak mengandung sarana untuk menjalankan tugas yang dieksekusi secara berkala. Proyek agenda mengimplementasikan antrian dengan menyimpan dalam database mongodb. Ini adalah nilai tambah yang besar untuk beberapa kasus, jika Anda membutuhkan keandalan super saat menempatkan tugas dalam antrian. Namun, ini bukan saja faktor penentu. Secara alami, saya menguji semua opsi ketahanan perpustakaan, menghasilkan banyak tugas dalam antrian, dan masih tidak bisa mendapatkan pekerjaan yang tidak terputus dari agenda. Ketika melebihi sejumlah tugas tertentu, agenda berhenti dan berhenti menjalankan tugas.

Oleh karena itu, saya memilih bull yang mengimplementasikan API yang nyaman, dengan kecepatan dan skalabilitas yang memadai, karena paket bull menggunakan server redis sebagai backend. Secara khusus, Anda dapat menggunakan sekelompok server redis.

Saat membuat antrian, sangat penting untuk memilih parameter antrian pekerjaan yang optimal. Ada banyak parameter, dan nilai beberapa dari mereka tidak langsung menjangkau saya. Setelah banyak percobaan, saya menetapkan parameter berikut:

const Bull = require('bull'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const defaultJobOptions = { removeOnComplete: true, removeOnFail: false, }; const limiter = { max: 10000, duration: 1000, bounceBack: false, }; const settings = { lockDuration: 600000, // Key expiration time for job locks. stalledInterval: 5000, // How often check for stalled jobs (use 0 for never checking). maxStalledCount: 2, // Max amount of times a stalled job will be re-processed. guardInterval: 5000, // Poll interval for delayed jobs and added jobs. retryProcessDelay: 30000, // delay before processing next job in case of internal error. drainDelay: 5, // A timeout for when the queue is in drained state (empty waiting for jobs). }; const bull = new Bull('my_queue', { redis, defaultJobOptions, settings, limiter }); module.exports = { bull }; 

Dalam kasus sepele, tidak perlu membuat banyak antrian, karena di setiap antrian Anda dapat menentukan nama untuk tugas yang berbeda, dan mengaitkan pekerja prosesor dengan masing-masing nama:

 const { bull } = require('../bull'); bull.process('push:news', 1, `${__dirname}/push-news.js`); bull.process('push:status', 2, `${__dirname}/push-status.js`); ... bull.process('some:job', function(...args) { ... }); 

Saya menggunakan kesempatan yang masuk ke bull "out of the box" - untuk memparalelkan pekerja prosesor pada beberapa core. Untuk melakukan ini, parameter kedua menetapkan jumlah core di mana pekerja prosesor akan diluncurkan, dan pada parameter ketiga, nama file dengan definisi fungsi pemrosesan pekerjaan. Jika fitur seperti itu tidak diperlukan, Anda dapat dengan mudah melewatkan fungsi panggilan balik sebagai parameter kedua.

Tugas di-antri dengan panggilan ke metode add (), di mana nama dan objek antrian diteruskan dalam parameter, yang selanjutnya akan diteruskan ke penangan tugas. Misalnya, dalam kait ORM, setelah membuat entri dengan berita baru, saya dapat secara sinkron mengirim pesan push ke semua klien:

  afterCreate(instance) { bull.add('push:news', _.pick(instance, 'id', 'title', 'message'), options); } 

Event handler menerima dalam parameter objek tugas dengan parameter yang diteruskan ke metode add () dan fungsi done (), yang harus dipanggil untuk mengkonfirmasi tugas selesai atau untuk menginformasikan bahwa tugas berakhir dengan kesalahan:

 const { firebase: { admin } } = require('../firebase'); const { makePayload } = require('./makePayload'); module.exports = (job, done) => { const { id, title, message } = job.data; const data = { id: String(id), type: 'news', }; const payloadRu = makePayload(title.ru, message.ru, data); const payloadEn = makePayload(title.en, message.en, data); return Promise.all([ admin.messaging().send({ ...payloadRu, condition: "'news' in topics && 'ru' in topics" }), admin.messaging().send({ ...payloadEn, condition: "'news' in topics && 'en' in topics" }), ]) .then(response => done(null, response)) .catch(done); }; 

Untuk melihat status antrian pekerjaan, Anda dapat menggunakan alat arena-bull:

 const Arena = require('bull-arena'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const arena = Arena({ queues: [ { name: 'my_gueue', hostId: 'My Queue', redis, }, ], }, { basePath: '/', disableListen: true, }); module.exports = { arena }; 

Dan akhirnya, sedikit hack hidup. Seperti yang saya katakan, bull menggunakan server redis sebagai backend. Ketika server redis dihidupkan ulang, kemungkinan pekerjaan menghilang sangat kecil. Tetapi mengetahui fakta bahwa administrator sistem kadang-kadang hanya dapat "membersihkan cache lobak", sambil menghapus semua tugas khususnya, saya terutama prihatin tentang menjalankan tugas secara berkala, yang dalam hal ini berhenti selamanya. Dalam hal ini, saya menemukan peluang untuk melanjutkan tugas-tugas berkala seperti itu:

 const cron = '*/10 * * * * *'; const { bull } = require('./app/services/bull'); bull.getRepeatableJobs() .then(jobs => Promise.all(_.map(jobs, (job) => { const [name, cron] = job.key.split(/:{2,}/); return bull.removeRepeatable(name, { cron }); }))) .then(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } })); setInterval(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }), 60000); 

Artinya, tugas pertama dikecualikan dari antrian, dan kemudian diatur lagi, dan semua ini (sayangnya) oleh setInterval (). Sebenarnya, tanpa hack seumur hidup, saya mungkin tidak akan memutuskan untuk menggunakan tugas berkala pada banteng.

apapacy@gmail.com
3 Juli 2019

Source: https://habr.com/ru/post/id458608/


All Articles