Bei der Implementierung der Sicherung von Webanwendungen und mobilen Anwendungen, auch der einfachsten, ist es üblich geworden, Tools wie Datenbanken, Mail-Server (SMTP) und Redis-Server zu verwenden. Der Werkzeugsatz wird ständig erweitert. Beispielsweise werden Nachrichtenwarteschlangen, gemessen an der Anzahl der Installationen des
amqplib- Pakets (
650.000 Installationen pro Woche), zusammen mit relationalen Datenbanken verwendet (
MySQL- Paket
460.000 Installationen pro Woche und S. 800.000 Installationen pro Woche).
Heute möchte ich über Job-Warteschlangen sprechen, die bisher in fast allen realen Projekten um eine Größenordnung weniger genutzt werden, obwohl sie benötigt werden
Mit Jobwarteschlangen können Sie also eine Aufgabe asynchron ausführen. Tatsächlich können Sie eine Funktion mit den angegebenen Eingabeparametern und zur festgelegten Zeit ausführen.
Abhängig von den Parametern kann die Aufgabe ausgeführt werden:
- unmittelbar nach dem Hinzufügen zur Jobwarteschlange;
- einmal zu einer festgelegten Zeit;
- viele Male im Zeitplan.
Mit Jobwarteschlangen können Sie Parameter an einen Job übertragen, der ausgeführt wird, fehlgeschlagene Jobs verfolgen und erneut ausführen sowie die Anzahl der gleichzeitig ausgeführten Jobs begrenzen.
Die überwiegende Mehrheit der Anwendungen auf Node.js ist mit der Entwicklung einer REST-API für Web- und mobile Anwendungen verbunden. Die Verkürzung der Ausführungszeit der REST-API ist wichtig für eine komfortable Arbeit des Benutzers mit der Anwendung. Gleichzeitig kann ein Aufruf der REST-API langwierige und / oder ressourcenintensive Vorgänge auslösen. Nach dem Kauf müssen Sie dem Benutzer beispielsweise eine Push-Nachricht an die mobile Anwendung senden oder eine Anforderung zum Kauf über die CRM REST-API senden. Diese Abfragen können asynchron ausgeführt werden. Wie mache ich es richtig, wenn Sie kein Tool zum Arbeiten mit Jobwarteschlangen haben? Sie können beispielsweise eine Nachricht an die Nachrichtenwarteschlange senden, einen Mitarbeiter starten, der diese Nachrichten liest, und die erforderlichen Arbeiten basierend auf diesen Nachrichten ausführen.
Genau das tun Jobwarteschlangen. Wenn Sie genau hinschauen, unterscheiden sich die Jobwarteschlangen jedoch grundlegend von der Nachrichtenwarteschlange. Erstens werden Nachrichten (statisch) in die Nachrichtenwarteschlange gestellt, und Jobwarteschlangen beinhalten irgendeine Art von Arbeit (Funktionsaufruf). Zweitens impliziert die Jobwarteschlange das Vorhandensein eines Prozessors (Arbeiters), der die angegebene Arbeit ausführt. In diesem Fall wird zusätzliche Funktionalität benötigt. Die Anzahl der Prozessorprozessoren sollte bei erhöhter Auslastung transparent skaliert werden. Andererseits ist es notwendig, die Anzahl der gleichzeitig ausgeführten Aufgaben auf einem Prozessor-Worker zu begrenzen, um Spitzenlasten auszugleichen und Denial-of-Service zu verhindern. Dies zeigt, dass ein Tool erforderlich ist, das asynchrone Aufgaben durch Festlegen verschiedener Parameter ausführen kann, so einfach wie das Anfordern einer Anforderung mithilfe der REST-API (oder besser, wenn es noch einfacher ist).
Mithilfe von Nachrichtenwarteschlangen ist es relativ einfach, eine Jobwarteschlange zu implementieren, die unmittelbar nach dem Einreihen eines Jobs ausgeführt wird. Oft ist es jedoch erforderlich, die Aufgabe einmal zu einem festgelegten Zeitpunkt oder nach einem Zeitplan auszuführen. Für diese Aufgaben werden häufig eine Reihe von Paketen verwendet, die die Cron-Logik unter Linux implementieren. Um nicht unbegründet zu sein, werde ich sagen, dass das Node-Cron-Paket 480.000 Installationen pro Woche hat, Node-Schedule - 170.000 Installationen pro Woche.
Die Verwendung von Node-Cron ist natürlich bequemer als das asketische setInterval (), aber ich persönlich habe bei der Verwendung eine Reihe von Problemen festgestellt. Um einen allgemeinen Nachteil auszudrücken, ist dies die mangelnde Kontrolle über die Anzahl der gleichzeitig ausgeführten Aufgaben (dies stimuliert Spitzenlasten: Durch Erhöhen der Last wird die Arbeit der Aufgaben verlangsamt, durch Verlangsamen der Aufgaben wird die Anzahl der gleichzeitig ausgeführten Aufgaben erhöht, was wiederum das System noch mehr belastet) und durch die Unfähigkeit, Knoten auszuführen, um die Produktivität zu steigern -cron auf mehreren Kernen (in diesem Fall werden alle Aufgaben unabhängig auf jedem Kern ausgeführt) und das Fehlen von Tools zum Verfolgen und Neustarten abgeschlossener Aufgaben Xia mit einem Fehler.
Ich hoffe, dass ich gezeigt habe, dass der Bedarf an einem Tool wie der Jobwarteschlange mit Tools wie Datenbanken vergleichbar ist. Und solche Mittel sind erschienen, obwohl sie noch nicht weit verbreitet sind. Ich werde die beliebtesten auflisten:
Heute werde ich über die Verwendung des Bullenpakets nachdenken, das ich mit mir selbst arbeite. Warum habe ich dieses spezielle Paket gewählt (obwohl ich meine Wahl nicht anderen auferlege)? In diesem Moment, als ich nach einer bequemen Implementierung der Nachrichtenwarteschlange suchte, wurde das Bienenwarteschlangenprojekt bereits gestoppt. Die kue-Implementierung blieb gemäß den im Bienenwarteschlangen-Repository angegebenen Benchmarks weit hinter anderen Implementierungen zurück und enthielt außerdem nicht die Mittel zum Ausführen periodisch ausgeführter Aufgaben. Das Agenda-Projekt implementiert Warteschlangen mit Speicherung in der Mongodb-Datenbank. Dies ist in einigen Fällen ein großes Plus, wenn Sie beim Platzieren von Aufgaben in der Warteschlange eine hohe Zuverlässigkeit benötigen. Dies ist jedoch nicht nur ein entscheidender Faktor. Natürlich habe ich alle Ausdaueroptionen der Bibliothek getestet, eine große Anzahl von Aufgaben in der Warteschlange generiert und konnte trotzdem keine ununterbrochene Arbeit von der Tagesordnung streichen. Bei Überschreitung einer bestimmten Anzahl von Aufgaben wurde die Agenda angehalten und die Ausführung von Aufgaben eingestellt.
Aus diesem Grund habe ich mich für bull entschieden, das eine praktische API mit ausreichender Geschwindigkeit und Skalierbarkeit implementiert, da das bull-Paket einen Redis-Server als Backend verwendet. Insbesondere können Sie einen Cluster von Redis-Servern verwenden.
Beim Erstellen einer Warteschlange ist es sehr wichtig, die optimalen Parameter für die Jobwarteschlange auszuwählen. Es gibt viele Parameter, und der Wert einiger von ihnen hat mich nicht sofort erreicht. Nach zahlreichen Experimenten habe ich mich auf folgende Parameter festgelegt:
const Bull = require('bull'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const defaultJobOptions = { removeOnComplete: true, removeOnFail: false, }; const limiter = { max: 10000, duration: 1000, bounceBack: false, }; const settings = { lockDuration: 600000,
In trivialen Fällen müssen nicht viele Warteschlangen erstellt werden, da Sie in jeder Warteschlange Namen für verschiedene Aufgaben angeben und jedem Namen einen Prozessorarbeiter zuordnen können:
const { bull } = require('../bull'); bull.process('push:news', 1, `${__dirname}/push-news.js`); bull.process('push:status', 2, `${__dirname}/push-status.js`); ... bull.process('some:job', function(...args) { ... });
Ich nutze die Gelegenheit, die der Bulle "out of the box" bietet - um Prozessorarbeiter auf mehreren Kernen zu parallelisieren. Dazu legt der zweite Parameter die Anzahl der Kerne fest, auf denen der Prozessor-Worker gestartet wird, und im dritten Parameter den Dateinamen mit der Definition der Jobverarbeitungsfunktion. Wenn eine solche Funktion nicht benötigt wird, können Sie einfach eine Rückruffunktion als zweiten Parameter übergeben.
Die Aufgabe wird durch einen Aufruf der add () -Methode in die Warteschlange gestellt, an die der Warteschlangenname und das Objekt in den Parametern übergeben werden, die anschließend an den Aufgabenhandler übergeben werden. In einem ORM-Hook kann ich beispielsweise nach dem Erstellen eines Eintrags mit neuen Nachrichten asynchron eine Push-Nachricht an alle Clients senden:
afterCreate(instance) { bull.add('push:news', _.pick(instance, 'id', 'title', 'message'), options); }
Der Ereignishandler akzeptiert in den Parametern das Task-Objekt mit den an die add () -Methode und die done () -Funktion übergebenen Parametern, die aufgerufen werden müssen, um zu bestätigen, dass die Task abgeschlossen ist, oder um zu informieren, dass die Task mit einem Fehler beendet wurde:
const { firebase: { admin } } = require('../firebase'); const { makePayload } = require('./makePayload'); module.exports = (job, done) => { const { id, title, message } = job.data; const data = { id: String(id), type: 'news', }; const payloadRu = makePayload(title.ru, message.ru, data); const payloadEn = makePayload(title.en, message.en, data); return Promise.all([ admin.messaging().send({ ...payloadRu, condition: "'news' in topics && 'ru' in topics" }), admin.messaging().send({ ...payloadEn, condition: "'news' in topics && 'en' in topics" }), ]) .then(response => done(null, response)) .catch(done); };
Um den Status der Jobwarteschlange anzuzeigen, können Sie das Arena-Bull-Tool verwenden:
const Arena = require('bull-arena'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const arena = Arena({ queues: [ { name: 'my_gueue', hostId: 'My Queue', redis, }, ], }, { basePath: '/', disableListen: true, }); module.exports = { arena };
Und zum Schluss noch ein kleiner Life-Hack. Wie gesagt, bull verwendet einen Redis-Server als Backend. Wenn der Redis-Server neu gestartet wird, ist die Wahrscheinlichkeit, dass der Job verschwindet, sehr gering. Da ich jedoch wusste, dass Systemadministratoren manchmal nur den Rettich-Cache leeren können, während sie insbesondere alle Aufgaben löschen, war ich in erster Linie besorgt über die regelmäßige Ausführung von Aufgaben, die in diesem Fall für immer gestoppt wurden. In dieser Hinsicht fand ich eine Gelegenheit, solche periodischen Aufgaben wieder aufzunehmen:
const cron = '*/10 * * * * *'; const { bull } = require('./app/services/bull'); bull.getRepeatableJobs() .then(jobs => Promise.all(_.map(jobs, (job) => { const [name, cron] = job.key.split(/:{2,}/); return bull.removeRepeatable(name, { cron }); }))) .then(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } })); setInterval(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }), 60000);
Das heißt, die Aufgabe wird zuerst aus der Warteschlange ausgeschlossen und dann erneut festgelegt, und dies alles (leider) durch setInterval (). Ohne einen solchen Life-Hack hätte ich mich wahrscheinlich nicht entschieden, regelmäßige Aufgaben für Bullen zu verwenden.
apapacy@gmail.com
3. Juli 2019