
Hallo allerseits! Die Popularität des Internethandels wächst stetig, ebenso wie der Anteil der Informatisierung aller Arten von Aktivitäten im Zusammenhang mit dem Handel. Gleichzeitig wächst die Komplexität der Informationsverarbeitung. Jede Bestellung eines Kunden eines Online-Shops generiert eine große Anzahl von Integrationen mit verschiedenen Diensten. Solche Dienstleistungen können Zahlungsabwicklungs-, Liefer-, Buchhaltungs- und Loyalitätsdienste umfassen. Jede Bestellung muss bezahlt, aufgezeichnet, zusammengestellt und geliefert werden und auch für weitere Analysen zur Verfügung stehen. Dies und damit keine einfache Situation wird durch die Tatsache erschwert, dass ein Benutzer eines Online-Shops bei der Bestellung nicht lange und schmerzhaft auf etwas warten möchte. Die Antwort des Online-Shops sollte schnell sein, da jede Millisekunde Verzögerung die Wahrscheinlichkeit erhöht, einen Kunden zu verlieren und anschließend zu profitieren. In diesem Artikel möchte ich über den RabbitMQ-Nachrichtenbroker und dessen Verwendung zur Organisation der Auftragsabwicklung mithilfe von Node.js und TypeScript sprechen. Willkommen bei Katze.
Die notwendige Theorie
Ich denke, viele haben von RabbitMQ gehört, weil die erste Open-Source-Version dieses Nachrichtenbrokers, die auf dem AMQP-Protokoll basiert, bereits 2007 veröffentlicht wurde. Ein Nachrichtenbroker wird benötigt, um verschiedene Komponenten des Systems zu einem Ganzen zu verbinden, da Klebstoff benötigt wird, um eine kaputte Vase wiederzubeleben. Mit dem Nachrichtenbroker können Sie die asynchrone Verarbeitung von im System empfangenen Ereignissen implementieren. Es ist eine solche asynchrone Auftragsabwicklung, die der Online-Shop benötigt. Aber zuerst müssen Sie die grundlegenden Komponenten von RabbitMQ verstehen. Dieser Broker besteht aus drei Hauptkomponenten, mit denen wir den Verarbeitungsprozess erstellen:
- Nachricht Dies ist die kleinste Informationseinheit innerhalb des Nachrichtenbrokers und unseres Verarbeitungsdienstes, die verarbeitet werden kann. RabbitMQ selbst speichert Nachrichten in binärer Form, aber für unser System und für den Artikel ist dies nicht wichtig. Wir werden Nachrichten in Form von JSON empfangen und verarbeiten. Erwähnenswert ist auch, dass Nachrichten in RabbitMQ Header haben. Sie ähneln den Headern von http-Anforderungen. Dies ist ein assoziatives Array, in das Sie die erforderlichen Informationen schreiben können.
- Nachrichtenwarteschlange . Dies ist die Warteschlange, in der RabbitMQ Nachrichten speichert. Eine Nachrichtenwarteschlange kann von einem oder mehreren Verbrauchern abonniert werden. Jede Nachricht in der Kaninchenwarteschlange wird mithilfe des Round-Robin-Algorithmus an die Verbraucher verteilt.
- Austausch Dies ist, wie der Name schon sagt, ein Austauschpunkt. An diesem Punkt können Warteschlangen oder andere Austauscher angebracht werden. Ein Austauschpunkt speichert keine Nachrichten. Seine Hauptfunktion besteht darin, Nachrichten an eine oder mehrere Warteschlangen oder an dieselben Austauschpunkte weiterzuleiten. Jede Warteschlange oder jeder Austauscher ist an einen Routing-Schlüssel gebunden. In RabbitMQ gibt es verschiedene Arten von Austauschern, die sich darauf auswirken, wie genau der Austausch die darin empfangene Nachricht weiterleitet.
Um zu beschreiben, wie verschiedene Arten von Austauschern funktionieren, muss man verstehen, was Routing-Schlüssel sind. Der Routing-Schlüssel befindet sich sowohl in der Bindung der Warteschlange an den Austauscher als auch in der Nachricht selbst. Der Routing-Schlüssel ist nur eine Zeichenfolge, die in Blöcke unterteilt ist. Jeder Block ist durch einen Punkt getrennt. Zum Beispiel "notify.sendEmail.sendSms". Gleichzeitig können Muster für den Nachrichtenweiterleitungsschlüssel mit den Sonderzeichen # und * festgelegt werden. * - sagt, dass nach einem Punkt jeder Block gehen kann, aber nach # eine beliebige Anzahl von Blöcken gehen kann. Zum Beispiel "notify.sendSms. *" Oder "notify. #". Jetzt können Sie zu den Arten von Austauschpunkten übergehen.
Es gibt vier Arten von Austauschern:
- Fanout Die Routing-Logik dieses Austauschs ist einfach: Sie leitet eine eingehende Nachricht an alle damit verbundenen Warteschlangen oder Austauscher um.

- Direkt Dieser Austausch leitet die Nachricht um, abhängig davon, ob der Routing-Schlüssel der Nachricht mit dem Routing-Schlüssel der Bindung übereinstimmt.

- Thema Exchange dieses Typs sowie Direct leiten die Nachricht abhängig vom Routing-Schlüssel weiter. Ein Muster kann jedoch als Routing-Schlüssel fungieren.

- Überschriften . Dieser Austausch verwendet im Gegensatz zu den anderen Nachrichtenkopfzeilen für das Routing. Gleichzeitig werden Warteschlangen an den Austauscher auch über ein assoziatives Array gebunden. Die Logik, mit der der Austauscher Nachrichten weiterleitet, kann mithilfe des speziellen Schlüssels „x-match“ geändert werden, der im assoziativen Bindungsarray festgelegt ist. Der Schlüssel kann auf zwei oder alle Werte eingestellt werden. Wenn der Wert alle ist, müssen die Nachrichtenkopfzeilen vollständig mit dem assoziativen Bindungsarray übereinstimmen. Wenn der Wert einer ist, muss der Wert mit mindestens einem Schlüssel übereinstimmen.

Dies sind die Kernkomponenten von RabbitMQ. Weitere Informationen zu diesen Komponenten finden Sie in der AMQP-Protokollspezifikation . Als Nächstes entwerfen und implementieren wir ein Auftragsabwicklungssystem am Beispiel von TypeScript und verstehen gleichzeitig die Einstellungen der einzelnen Komponenten.
Design
Um das Beispiel zu vereinfachen, gehen wir davon aus, dass wir für die erfolgreiche Bearbeitung einer Online-Bestellung über folgende Funktionen verfügen müssen:
- Eingehende Bestellungen speichern
- Senden Sie dem Kunden eine SMS mit der Bestellnummer sowie dem Status der Bestellung
- Senden Sie eine Nachricht an den Kurierdienst über eine neue Bestellung aus unserem Online-Shop, wenn der Kunde diese Versandmethode gewählt hat
Die Implementierung dieser Funktionalität reicht jedoch nicht aus, da unser Online-Shop plant, die Funktionalität zu erweitern und seinen Kunden in Zukunft mehr verschiedene Möglichkeiten zu bieten (und dies geschieht immer). Benachrichtigen Sie beispielsweise den Kunden per E-Mail oder wählen Sie verschiedene Versandmethoden für die Bestellung aus. Daraus folgt, dass wir das System so gestalten müssen, dass das Hinzufügen von Funktionen einfach war.
Es ist auch erwähnenswert, dass ich die Vorlage für zurückgestellte Nachrichten verwenden werde, damit es möglich ist, die Logik mehrmals zu wiederholen, wenn der externe Dienst nicht verfügbar ist. Über diese Vorlage können Sie hier lesen .
Um das Endziel klarer darzustellen, werde ich ein Diagramm zeichnen.

Lassen Sie uns sehen, wie der Auftragsabwicklungsprozess in diesem Diagramm funktioniert. Das Schema ist in Blöcke und verschiedene Farben unterteilt. Weiße Blöcke kennzeichnen externe Dienste, die wir nicht berücksichtigen werden. Graue Blöcke zeigen RabbitMQ-Elemente an. Warteschlangen und Austauscher. Die grüne Farbe spiegelt die Blöcke der Geschäftslogik wider, die implementiert werden müssen. Außerdem ist jeder Block, der sich auf unsere Logik bezieht, nummeriert. Die Zahlen geben den Prozess und den Teilprozess in der Reihenfolge an.
Zunächst wird die HTTP-API-Nachricht in unseren Dienst aufgenommen. Danach müssen wir der Bestellung eine Nummer zuweisen, die Bestellung in der Datenbank mit dem Status „Neu“ speichern und eine Antwort über die erfolgreiche Erstellung der Bestellung mit ihrer Nummer zurücksenden. Der Kunde, der eine Nachricht über die erfolgreiche Erstellung des Auftrags erhalten hat, geht seinem eigenen Geschäft nach. Durch das Senden einer positiven Antwort senden wir das Auftragsobjekt an die Nachbearbeitungsbörse, von der es in den Mitarbeiter der Routing-Schlüsselformation fällt. Dieser Mitarbeiter, der das Bestellobjekt aus der Warteschlange erhalten hat (unabhängig davon, ob die Bestellung eine E-Mail oder ein Kundentelefon enthält, welche Versandmethode ausgewählt wurde), muss den Bestellungsrouting-Schlüssel bilden. Nachdem der Mitarbeiter einen Routing-Schlüssel gebildet hat, sendet er eine Nachricht zurück an die Nachbearbeitungsbörse. Jetzt hat sich der Routing-Schlüssel der Bestellung geändert und der Austauscher kann ihn bereits auf der gewünschten Route senden. Abhängig vom Schlüssel kann die Bestellung an den Austausch gesendet werden, der für Benachrichtigungen, Austauschintegrationen oder beides gleichzeitig verantwortlich ist. Und weiter die gleiche Logik in einer Warteschlange und Arbeiter.
SMS-Mitarbeiter und Zustelldienste versuchen mehrmals, die Nachricht zu verarbeiten. Die Anzahl solcher Versuche kann in einer Umgebungsvariablen übergeben werden. Sie sollten die Nachricht jedoch nicht endlos verarbeiten, da der Fehler möglicherweise in der Nachricht selbst oder in der Logik des Arbeiters liegt. Daher wird die Nachricht nach Überschreiten der Anzahl der zulässigen Versuche aus den Warteschlangen gelöscht und an den Fehlerspeicher gesendet, von dem aus sie erneut an die gewünschte Verarbeitungsebene zurückgesendet werden kann.
Implementierung
Um die Implementierung zu überprüfen, benötigen Sie Kaninchen selbst. Ich empfehle, zu diesem Zweck Docker und ein offizielles Broker-Image zu verwenden. Installieren Sie den Container und führen Sie ihn mit dem folgenden Befehl aus.
docker run -d --name rabbit -p 5672:5672 -e rabbitmq:3.7.15-management-alpine
Dies ist ein Image mit einer Webschnittstelle, die an Port 15672 zum bequemen Debuggen verfügbar ist.
Wir werden unsere Pläne mit TypeScript und der amqplib- Bibliothek (RabbitMQ-Client-Implementierung für Node.js) implementieren. Daher müssen Sie zunächst mehrere Schnittstellen beschreiben. Wir beschreiben die Schnittstellen der Bestellung und die Nachrichten, die wir an Kaninchen senden.
Nun müssen wir die Konfigurationsschnittstelle von Warteschlangen und Austauschern beschreiben, auf deren Grundlage wir die Verarbeitungsstruktur in Kaninchen aufbauen werden.
import { Types, ExchangeTypes } from '../constants'; import { Options } from 'amqplib';
Nachdem wir die Hauptkomponenten des Systems beschrieben haben, beschreiben wir die Konfiguration, die mit dem Objekt im Diagramm gezeichnet wurde.
Warteschlangen
export default [
Bei der Beschreibung von Warteschlangen werden die folgenden Optionen für die Warteschlange verwendet.
- langlebig . Standardmäßig werden alle Warteschlangennachrichten im Speicher gespeichert. Daher werden beim Neustart des Brokers Nachrichten ausgeblendet. Um dies zu vermeiden, können Sie diese Option verwenden. Mit dieser Einstellung spült Rabbit Nachrichten auf die Festplatte. Aber es gibt eine Einschränkung. Damit Nachrichten nach dem Neustart des Brokers gespeichert werden, reicht diese Einstellung nicht aus. Nachrichten müssen mit der Option persistent an die Warteschlange gesendet werden.
- messageTtl . Die Lebensdauer der Nachricht. In Millisekunden angegeben
- deadLetterExchange . Der Name des Austauschers, an den die Nachricht nach Ablauf der Warteschlange gesendet wird
- deadLetterRoutingKey . RoutingKey, mit dem die Nachricht von der vorherigen Option an den Austauscher gesendet wird
Börsen
import { ExchangeTypes } from '../constants'; export default [ { name: 'postprocessing', type: ExchangeTypes.TOPIC, }, { name: 'notify', type: ExchangeTypes.TOPIC, }, { name: 'integrates', type: ExchangeTypes.TOPIC, }, ];
Bindungen
import { Types } from '../constants'; export default [ { type: Types.EXCHANGE, destination: 'notify', source: 'postprocessing', routingKey: '#.notify.#', }, { type: Types.EXCHANGE, destination: 'integrates', source: 'postprocessing', routingKey: '#.integrates.#', }, { type: Types.QUEUE, destination: 'generateRoutingKey', source: 'postprocessing', routingKey: 'generateRoutingKey', }, { type: Types.QUEUE, destination: 'sendSms', source: 'notify', routingKey: '#.sendSms.#', }, { type: Types.QUEUE, destination: 'delivery', source: 'integrates', routingKey: '#.delivery.#', }, { type: Types.QUEUE, destination: 'sendSmsHold', source: 'notify', routingKey: 'sendSmsHold', }, { type: Types.QUEUE, destination: 'deliveryHold', source: 'integrates', routingKey: 'deliveryHold', }, ];
Vollständige Konfiguration
import { PipelineConfig } from '../interfaces'; import exchanges from './exchanges'; import queues from './queues'; import bindings from './bindigs'; export const pipelineConfig: PipelineConfig = { exchanges, queues, bindings, };
Schreiben Sie eine Klasse, um eine Verbindung zum Kaninchen herzustellen.
import { connect, Connection, Channel } from 'amqplib'; export class RabbitConnect { private _uri: string; private _connection: Connection; private _chanel: Channel; constructor() {
Schreiben wir die Pipeline-Klasse, die beim Start die gesamte erforderliche Infrastruktur in Rabbit gemäß der zuvor beschriebenen Konfiguration erstellt.
import { RabbitConnect } from './RabbitConnect'; import { PipelineConfig } from './interfaces'; import { Types } from './constants'; export class Pipeline extends RabbitConnect { private _pipeline: PipelineConfig; constructor(pipelineConfig: PipelineConfig) { super(); this._pipeline = pipelineConfig; } public async create() { try { await this.connect();
Jetzt schreiben wir eine abstrakte Worker-Klasse mit einer gemeinsamen Funktionalität für alle Worker, von denen geerbt werden kann.
import { RabbitConnect } from './RabbitConnect'; import { Message, Order, FailOrder } from './interfaces'; import { ConsumeMessage } from 'amqplib'; export interface WorkerParams { maxRetry?: number;
Standardmäßig erfordert Kaninchen die Bestätigung einer erfolgreichen Nachrichtenverarbeitung durch den Arbeiter. Hierzu verfügt der Verbindungskanal über eine Bestätigungsmethode. Wenn der Arbeiter die Nachricht nicht verarbeiten konnte, gibt es eine Nack-Methode, die Rabbit anweist, die Nachricht an einen anderen Arbeiter zu senden.
Jetzt können wir einige einfache Arbeiter aus dem Diagramm schreiben.
Arbeiter, der einen Routing-Schlüssel generiert.
import { Worker } from '../Worker'; import { isOrderWithPhone, isOrderWithDeliveryAddress, Order, Message, } from '../interfaces'; import { Keys } from '../constants'; export class GenerateRoutingKey extends Worker<Order> { constructor() { super({ active: 'generateRoutingKey', exchange: 'postprocessing', }); } protected async handler(order: Order) { try { const routingKey: string[] = []; if (isOrderWithPhone(order)) { routingKey.push(Keys.SEND_SMS); } if (isOrderWithDeliveryAddress(order)) { routingKey.push(Keys.SEND_TO_DELIVERY); } const message: Message<Order> = { retry: 0, errors: [], order, }; await this.chanel.publish( this.exchange, routingKey.join('.'), Buffer.from(JSON.stringify(message)), ); await this.ack(); } catch (error) { console.error(error); await this.sendToErrorStorage(error); } } }
Arbeiter senden SMS.
import { Worker } from '../Worker'; import { OrderWithPhone } from '../interfaces'; export class SendSms extends Worker<OrderWithPhone> { constructor() { super({ active: 'sendSms', exchange: 'notify', holdKey: 'sendSmsHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithPhone) { try { console.log(' sms : ', message.phone); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } }
Arbeitnehmerintegration mit dem Lieferservice.
import { Worker } from '../Worker'; import { OrderWithDeliveryAddress } from '../interfaces'; export class Delivery extends Worker<OrderWithDeliveryAddress> { constructor() { super({ active: 'delivery', exchange: 'interates', holdKey: 'deliveryHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithDeliveryAddress) { try { console.log(' : ', message.deliveryAddress); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } }
Der Einstiegspunkt in die Anwendung.
import { Pipeline } from './Pipeline'; import { pipelineConfig } from './pipeline'; import { GenerateRoutingKey } from './workers/GenerateRoutingKey'; import { SendSms } from './workers/SendSms'; import { Delivery } from './workers/Delivery'; (async () => { try { const pipeline = new Pipeline(pipelineConfig); const generateRoutingKey = new GenerateRoutingKey(); const sendSms = new SendSms(); const delivery = new Delivery(); await pipeline.create(); await Promise.all([generateRoutingKey.subscribe(), sendSms.subscribe(), delivery.subscribe()]); } catch (error) { console.error(error); process.exit(1); } })();
Ich werde kein Beispiel für eine Codeklasse zum Schreiben einer Bestellung in die Datenbank und zum Generieren einer Internet-Bestellnummer geben. Dies würde den Rahmen dieses Artikels sprengen. Um den Code zu überprüfen, können Sie die Kaninchen-Weboberfläche verwenden, indem Sie die Bestellung json an den Austausch des Herstellers senden.
Fazit
Ein solches Konstruktionsschema zur Bearbeitung einer Online-Bestellung erleichtert die Skalierung des Systems. Es wird für uns nicht schwierig sein, diesem Schema mehrere Warteschlangen und Worker hinzuzufügen, um die erforderlichen Funktionen hinzuzufügen. Sie können beispielsweise das Senden von Benachrichtigungen per E-Mail oder das Senden einer Bestellung für die Buchhaltung in 1C hinzufügen. Die konvertierte Schaltung sieht folgendermaßen aus:

Ich hoffe dir hat der Artikel gefallen. Ich freue mich über Kommentare und Kritik. Alle eingereichten Codes finden Sie auf github