Nous traitons les commandes de la boutique en ligne en utilisant RabbitMQ et TypeScript


Bonjour à tous! La popularité du commerce sur Internet ne cesse de croître, tout comme la part de l'informatisation de tous les types d'activités liées au commerce. Parallèlement à cela, la complexité du traitement de l'information augmente. Chaque commande effectuée par un client d'une boutique en ligne génère un grand nombre d'intégrations avec différents services. Ces services peuvent inclure des services de traitement des paiements, de livraison, de comptabilité et de fidélité. Chaque commande doit être payée, enregistrée, assemblée et livrée, et également disponible pour une analyse plus approfondie. Cela, et donc pas une situation simple, est compliqué par le fait qu'un utilisateur d'une boutique en ligne ne veut pas attendre longtemps et douloureusement quelque chose lors de la commande. La réponse de la boutique en ligne doit être rapide, car chaque milliseconde de retard augmente les chances de perdre un client, et donc de réaliser des bénéfices. Dans cet article, je veux parler du courtier de messages RabbitMQ et comment l'utiliser pour organiser le traitement des commandes à l'aide de Node.js et TypeScript. Bienvenue au chat.


La théorie nécessaire


Je pense que beaucoup ont entendu parler de RabbitMQ, car la première version open source de ce courtier de messages, basée sur le protocole AMQP, a déjà été publiée en 2007. Un courtier de messages est nécessaire pour connecter différents composants du système en un seul ensemble, car de la colle est nécessaire pour ressusciter un vase cassé. À l'aide du courtier de messages, vous pouvez implémenter un traitement asynchrone des événements reçus dans le système. C'est juste un tel traitement de commande asynchrone dont la boutique en ligne a besoin. Mais vous devez d'abord comprendre les composants de base de RabbitMQ. Ce courtier a trois composants principaux avec lesquels nous allons construire le processus de traitement:


  • Message Il s'agit de la plus petite unité d'informations du courtier de messages et de notre service de traitement qui peut être traitée. RabbitMQ lui-même stocke les messages sous forme binaire, mais pour notre système et pour l'article, ce n'est pas important. Nous recevrons et traiterons les messages sous forme de JSON. Il convient également de mentionner que les messages dans RabbitMQ ont des en-têtes. Ils sont similaires aux en-têtes des requêtes http. Il s'agit d'un tableau associatif dans lequel vous pouvez écrire les informations nécessaires.
  • File d'attente de messages . Il s'agit de la file d'attente dans laquelle RabbitMQ stocke les messages. Une file d'attente de messages peut être abonnée par un ou plusieurs consommateurs. Chaque message dans la file d'attente de lapin est distribué aux consommateurs à l'aide de l'algorithme de tourniquet.
  • Échange Ceci, comme son nom l'indique, est un point d'échange. Des files d'attente ou d'autres échangeurs peuvent être attachés à ce point. Un point d'échange ne stocke pas de messages; sa fonction principale est d'acheminer les messages vers une ou plusieurs files d'attente, ou les mêmes points d'échange. Chaque file d'attente ou échangeur est lié par une clé de routage. Il existe plusieurs types d'échangeurs dans RabbitMQ qui affectent la manière exacte dont l'échange acheminera le message reçu.

Afin de décrire le fonctionnement de différents types d'échangeurs, il est nécessaire de comprendre ce que sont les clés de routage. La clé de routage se trouve à la fois dans la liaison de la file d'attente à l'échangeur et dans le message lui-même. La clé de routage est juste une chaîne divisée en blocs. Chaque bloc est séparé par un point. Par exemple, «notify.sendEmail.sendSms». Dans le même temps, des modèles peuvent être définis pour la clé de routage des messages à l'aide des caractères spéciaux # et *. * - dit qu'après un point n'importe quel bloc peut aller, mais après # n'importe quel nombre de blocs peut aller. Par exemple, "notify.sendSms. *" Ou "notify. # #". Vous pouvez maintenant passer aux types de points d'échange.


Il existe quatre types d'échangeurs:


  • Fanout La logique de routage de cet échange est simple; il redirige un message entrant vers toutes les files d'attente ou échangeurs qui lui sont attachés.


  • Direct Cet échange redirige le message selon que la clé de routage du message correspond à la clé de routage de la liaison.


  • Sujet L'échange de ce type ainsi que Direct acheminent le message en fonction de la clé de routage. Mais un modèle peut agir comme une clé de routage.


  • En-têtes . Cet échange, contrairement aux autres, utilise des en-têtes de message pour le routage. Dans le même temps, les files d'attente de l'échangeur sont également liées à l'aide d'un tableau associatif. La logique par laquelle l'échangeur achemine les messages peut être modifiée à l'aide de la clé spéciale «x-match», qui est définie dans le tableau de liaison associative. La clé peut être définie sur deux valeurs, toutes ou toutes. Si la valeur est all, les en-têtes de message doivent correspondre complètement au tableau de liaisons associatives; si la valeur est quelconque, la valeur doit correspondre à au moins une clé.


Ce sont les composants principaux de RabbitMQ. Vous pouvez en savoir plus sur ces composants dans la spécification du protocole AMQP . Ensuite, nous concevrons et mettrons en œuvre un système de traitement des commandes en utilisant TypeScript comme exemple, en comprenant simultanément les paramètres de chaque composant.


La conception


Pour simplifier l'exemple, nous supposons que pour le succès du traitement d'une commande en ligne, nous devons disposer des fonctionnalités suivantes:


  • Enregistrer les commandes entrantes
  • Envoyer un SMS au client avec le numéro de commande, ainsi que l'état de la commande
  • Envoyer un message au service de livraison par messagerie à propos d'une nouvelle commande dans notre boutique en ligne, si le client a choisi ce mode de livraison

Mais il ne suffit pas de mettre en œuvre cette fonctionnalité, car notre boutique en ligne prévoit d'étendre la fonctionnalité et de fournir plus d'opportunités différentes à ses clients à l'avenir (et cela se produit toujours). Par exemple, informez le client par e-mail ou indiquez un choix de plusieurs modes de livraison pour la commande. Il s'ensuit que nous devons concevoir le système de manière à ce que l'ajout de fonctionnalités soit simple.


Il convient également de mentionner que j'utiliserai le modèle pour les messages différés afin qu'il soit possible, si le service externe n'est pas disponible, de répéter la logique plusieurs fois. Vous pouvez lire sur ce modèle ici.


Pour représenter plus clairement le but ultime, je vais dessiner un diagramme.



Voyons dans l'ordre comment fonctionne le processus de traitement des commandes sur ce diagramme. Le schéma est divisé en blocs et en différentes couleurs. Les blocs blancs indiquent des services externes que nous ne considérerons pas. Les blocs gris indiquent les éléments RabbitMQ. Files d'attente et échangeurs. La couleur verte reflète les blocs de logique métier qui doivent être mis en œuvre. De plus, chaque bloc lié à notre logique est numéroté. Les nombres indiquent le processus et le sous-processus dans l'ordre.


Tout d'abord, le message de l'API HTTP entre dans notre service. Après cela, nous devons attribuer un numéro à la commande, enregistrer la commande dans la base de données avec le statut "nouveau" et renvoyer une réponse sur la création réussie de la commande, avec son numéro. Le client, après avoir reçu un message sur la réussite de la création de la commande, va à son compte. En envoyant une réponse positive, nous envoyons l'objet de commande à l'échange de post-traitement, à partir duquel il tombe dans le travailleur de la formation de clé de routage. Ce travailleur, après avoir reçu l'objet de commande de la file d'attente, sur la base de celui-ci (qu'il y ait un e-mail ou le téléphone d'un client dans la commande, quel mode de livraison a été choisi) doit former la clé de routage de la commande. Après avoir formé une clé de routage, le travailleur renvoie un message à l'échange de post-traitement, mais maintenant la clé de routage de la commande a changé et l'échangeur peut l'envoyer déjà sur la route souhaitée. Selon la clé, la commande peut être envoyée à Exchange, qui est responsable des notifications, des intégrations d'échange ou des deux à la fois. Et plus loin sur la même logique dans une file d'attente et des travailleurs.


Les employés qui envoient des SMS et les services de livraison essaieront de traiter le message plusieurs fois. Le nombre de ces tentatives peut être passé dans une variable d'environnement. Mais vous ne devez pas traiter le message sans fin, car l'erreur peut se trouver dans le message lui-même ou dans la logique du travailleur. Par conséquent, après avoir dépassé le nombre de tentatives autorisées, le message sera supprimé des files d'attente et envoyé au magasin d'erreurs, à partir duquel il peut être renvoyé au niveau de traitement souhaité.


Implémentation


Pour vérifier la mise en œuvre, vous aurez besoin de lapin lui-même. Je recommande d'utiliser Docker et une image de courtier officielle à cet effet. Installez et exécutez le conteneur avec la commande suivante.


docker run -d --name rabbit -p 5672:5672 -e rabbitmq:3.7.15-management-alpine 

Il s'agit d'une image avec une interface Web disponible sur le port 15672 pour un débogage pratique.


Nous implémenterons nos plans avec TypeScript et la bibliothèque amqplib (implémentation du client RabbitMQ pour Node.js), donc pour commencer, vous devez décrire plusieurs interfaces. Nous décrivons les interfaces de la commande et les messages que nous enverrons à lapin.


 //    export interface Product { id: string; name: string; price: number; } //    export interface Order { clientName: string; city: string; email?: string; phone?: string; products: Product[]; totalSum: number; deliveryAddress?: string; } //         export interface OrderWithPhone extends Order { phone: string; } //        export interface OrderWithDeliveryAddress extends Order { deliveryAddress: string; } // Types Guard'        export const isOrderWithPhone = (order: Order): order is OrderWithPhone => Boolean(order.phone); export const isOrderWithDeliveryAddress = (order: Order): order is OrderWithDeliveryAddress => Boolean(order.deliveryAddress); //    . export interface Message<O extends Order> { errors: string[]; retry: number; order: O; //         export interface FailOrder extends Message<Order> { exchange: string; routingKey: string; } 

Nous devons maintenant décrire l'interface de configuration des files d'attente et des échangeurs, sur la base de laquelle nous allons construire la structure de traitement en lapin.


 import { Types, ExchangeTypes } from '../constants'; import { Options } from 'amqplib'; //   RabbitMQ       export enum Types { QUEUE = 'queue', EXCHANGE = 'exchange', } //      export enum ExchangeTypes { TOPIC = 'topic', } //    export interface Queue { name: string; options: Options.AssertQueue; } //    export interface Exchange { name: string; type: ExchangeTypes; } //    export interface Binding { type: Types; destination: string; source: string; routingKey: string; } //   RabbitMQ export interface PipelineConfig { queues: Queue[]; exchanges: Exchange[]; bindings: Binding[]; } 

Après avoir décrit les principaux composants du système, nous décrivons la configuration qui a été dessinée sur le diagramme en utilisant l'objet.


Files d'attente


 export default [ //        routingKey { name: 'generateRoutingKey', options: { durable: true, }, }, //   sms { name: 'sendSms', options: { durable: true, }, }, //      { name: 'delivery', options: { durable: true, }, }, //         sms { name: 'sendSmsHold', options: { durable: true, deadLetterExchange: 'notify', deadLetterRoutingKey: 'sendSms', messageTtl: 60000, }, }, //            { name: 'deliveryHold', options: { durable: true, deadLetterExchange: 'integrates', deadLetterRoutingKey: 'delivery', messageTtl: 60000, }, }, ]; 

Lors de la description des files d'attente, les options suivantes sont utilisées pour la file d'attente.


  • durable . Par défaut, tous les messages de file d'attente sont stockés en mémoire. Par conséquent, lorsque le courtier redémarre, les messages disparaissent. Pour éviter cela, vous pouvez utiliser cette option. Avec ce paramètre, rabbit videra les messages sur le disque. Mais il y a une mise en garde. Pour que les messages soient enregistrés après le redémarrage du courtier, ce paramètre n'est pas suffisant; les messages doivent être envoyés à la file d'attente avec l'option persistante.
  • messageTtl . La durée de vie du message. Donné en millisecondes
  • deadLetterExchange . Le nom de l'échangeur où le message sera envoyé de la file d'attente à son expiration
  • deadLetterRoutingKey . RoutingKey avec laquelle le message sera envoyé à l'échangeur à partir de l'option précédente

Échanges


 import { ExchangeTypes } from '../constants'; export default [ { name: 'postprocessing', type: ExchangeTypes.TOPIC, }, { name: 'notify', type: ExchangeTypes.TOPIC, }, { name: 'integrates', type: ExchangeTypes.TOPIC, }, ]; 

Liaisons


 import { Types } from '../constants'; export default [ { type: Types.EXCHANGE, destination: 'notify', source: 'postprocessing', routingKey: '#.notify.#', }, { type: Types.EXCHANGE, destination: 'integrates', source: 'postprocessing', routingKey: '#.integrates.#', }, { type: Types.QUEUE, destination: 'generateRoutingKey', source: 'postprocessing', routingKey: 'generateRoutingKey', }, { type: Types.QUEUE, destination: 'sendSms', source: 'notify', routingKey: '#.sendSms.#', }, { type: Types.QUEUE, destination: 'delivery', source: 'integrates', routingKey: '#.delivery.#', }, { type: Types.QUEUE, destination: 'sendSmsHold', source: 'notify', routingKey: 'sendSmsHold', }, { type: Types.QUEUE, destination: 'deliveryHold', source: 'integrates', routingKey: 'deliveryHold', }, ]; 

Configuration complète


 import { PipelineConfig } from '../interfaces'; import exchanges from './exchanges'; import queues from './queues'; import bindings from './bindigs'; export const pipelineConfig: PipelineConfig = { exchanges, queues, bindings, }; 

Pour vous connecter à Rabbit, écrivez une classe.


 import { connect, Connection, Channel } from 'amqplib'; export class RabbitConnect { private _uri: string; private _connection: Connection; private _chanel: Channel; constructor() { //    rabbit     this._uri = process.env.RABBIT_URI || 'amqp://localhost'; } protected async connect() { this._connection = await connect(this._uri); this._chanel = await this._connection.createChannel(); } protected async disconnect() { await this._chanel.close(); return this._connection.close(); } protected get chanel() { return this._chanel; } } 

Écrivons la classe Pipeline qui, au démarrage, créera toute l'infrastructure nécessaire dans rabbit selon la configuration décrite précédemment.


 import { RabbitConnect } from './RabbitConnect'; import { PipelineConfig } from './interfaces'; import { Types } from './constants'; export class Pipeline extends RabbitConnect { private _pipeline: PipelineConfig; constructor(pipelineConfig: PipelineConfig) { super(); this._pipeline = pipelineConfig; } public async create() { try { await this.connect(); //   const createQueues = this._pipeline.queues.map(queue => this.chanel.assertQueue(queue.name, queue.options), ) as PromiseLike<any>[]; //   const createExchanges = this._pipeline.exchanges.map(exchange => this.chanel.assertExchange(exchange.name, exchange.type), ) as PromiseLike<any>[]; await Promise.all([...createQueues, ...createExchanges]); //       const createBindings = this._pipeline.bindings.map(binding => { if (binding.type === Types.QUEUE) { return this.chanel.bindQueue(binding.destination, binding.source, binding.routingKey); } return this.chanel.bindExchange(binding.destination, binding.source, binding.routingKey); }); await Promise.all(createBindings); return this.disconnect(); } catch (error) { console.error(error); throw new Error(error); } } } 

Nous allons maintenant écrire une classe de travail abstraite avec une fonctionnalité commune à tous les travailleurs dont il sera possible d'hériter.


 import { RabbitConnect } from './RabbitConnect'; import { Message, Order, FailOrder } from './interfaces'; import { ConsumeMessage } from 'amqplib'; export interface WorkerParams { maxRetry?: number; //     active: string; //    exchange: string; //       holdKey?: string; //      } export abstract class Worker<M extends Order> extends RabbitConnect { private _maxRetry: number; private _active: string; private _holdKey: string | undefined; protected exchange: string; private _currentMessage: Message<M>; private _currentConsumeMessage: ConsumeMessage; constructor({ active, holdKey, exchange, maxRetry }: WorkerParams) { super(); this._maxRetry = maxRetry || 0; this._active = active; this._holdKey = holdKey; this.exchange = exchange; } public async subscribe() { await this.connect(); this.chanel.consume(this._active, this.checkMessage.bind(this)); } //          private async checkMessage(message: ConsumeMessage) { this._currentConsumeMessage = message; const orderMessage: Message<M> = JSON.parse(message.content.toString()); if (orderMessage.retry >= this._maxRetry) { await this.sendToErrorStorage('  '); } this._currentMessage = orderMessage; //           await this.handler(orderMessage.order || orderMessage); } //       protected async sendToErrorStorage(error: string) { const message: FailOrder = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, exchange: this.exchange, routingKey: this._active }; console.log('   ', message); this.ack(); } //       protected async hold(error: string) { if (!this._holdKey) { return; } const orderMessage = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, }; const orderData = Buffer.from(JSON.stringify(orderMessage)); return this.chanel.publish(this.exchange, this._holdKey, orderData); } //      protected async ack() { return this.chanel.ack(this._currentConsumeMessage); } protected abstract handler(message: M): void; } 

Par défaut, le lapin requiert la confirmation d'un traitement de message réussi par le travailleur. Pour cela, le canal de connexion a une méthode d'acquittement. Si le travailleur n'a pas pu traiter le message, il existe une méthode nack qui indique à lapin d'envoyer le message à un autre travailleur.


Maintenant, nous pouvons écrire quelques ouvriers simples à partir du diagramme.


Travailleur générant une clé de routage.


 import { Worker } from '../Worker'; import { isOrderWithPhone, isOrderWithDeliveryAddress, Order, Message, } from '../interfaces'; import { Keys } from '../constants'; export class GenerateRoutingKey extends Worker<Order> { constructor() { super({ active: 'generateRoutingKey', exchange: 'postprocessing', }); } protected async handler(order: Order) { try { const routingKey: string[] = []; if (isOrderWithPhone(order)) { routingKey.push(Keys.SEND_SMS); } if (isOrderWithDeliveryAddress(order)) { routingKey.push(Keys.SEND_TO_DELIVERY); } const message: Message<Order> = { retry: 0, errors: [], order, }; await this.chanel.publish( this.exchange, routingKey.join('.'), Buffer.from(JSON.stringify(message)), ); await this.ack(); } catch (error) { console.error(error); await this.sendToErrorStorage(error); } } } 

Les travailleurs envoient des sms.


 import { Worker } from '../Worker'; import { OrderWithPhone } from '../interfaces'; export class SendSms extends Worker<OrderWithPhone> { constructor() { super({ active: 'sendSms', exchange: 'notify', holdKey: 'sendSmsHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithPhone) { try { console.log(' sms  : ', message.phone); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } } 

Intégration des travailleurs avec le service de livraison.


 import { Worker } from '../Worker'; import { OrderWithDeliveryAddress } from '../interfaces'; export class Delivery extends Worker<OrderWithDeliveryAddress> { constructor() { super({ active: 'delivery', exchange: 'interates', holdKey: 'deliveryHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithDeliveryAddress) { try { console.log('      : ', message.deliveryAddress); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } } 

Le point d'entrée de l'application.


 import { Pipeline } from './Pipeline'; import { pipelineConfig } from './pipeline'; import { GenerateRoutingKey } from './workers/GenerateRoutingKey'; import { SendSms } from './workers/SendSms'; import { Delivery } from './workers/Delivery'; (async () => { try { const pipeline = new Pipeline(pipelineConfig); const generateRoutingKey = new GenerateRoutingKey(); const sendSms = new SendSms(); const delivery = new Delivery(); await pipeline.create(); await Promise.all([generateRoutingKey.subscribe(), sendSms.subscribe(), delivery.subscribe()]); } catch (error) { console.error(error); process.exit(1); } })(); 

Je ne donnerai pas d'exemple de classe de code pour écrire une commande dans la base de données et générer un numéro de commande Internet. Cela dépasse le cadre de cet article. Pour vérifier le code, vous pouvez utiliser l'interface web de lapin en envoyant la commande json au posrprocessing de l'échangeur.


Conclusion


Un tel schéma de construction pour le traitement d'une commande en ligne facilite la mise à l'échelle du système. Il ne sera pas difficile pour nous d'ajouter plusieurs files d'attente et travailleurs à ce schéma afin d'ajouter les fonctionnalités nécessaires. Par exemple, vous pouvez ajouter l'envoi de notifications par e-mail ou l'envoi d'une commande de comptabilité dans 1C. Le circuit converti ressemblera à ceci:



J'espère que l'article vous a plu. Je serai heureux de tout commentaire et critique. Tout le code soumis peut être trouvé sur github

Source: https://habr.com/ru/post/fr469991/


All Articles