Procesamos pedidos de la tienda en línea usando RabbitMQ y TypeScript


Hola a todos! La popularidad del comercio por Internet está en constante crecimiento, al igual que la cuota de información de todo tipo de actividades relacionadas con el comercio. Junto con esto, la complejidad del procesamiento de la información está creciendo. Cada pedido realizado por un cliente de una tienda en línea genera una gran cantidad de integraciones con diversos servicios. Dichos servicios pueden incluir servicios de procesamiento de pagos, entrega, contabilidad y lealtad. Cada pedido debe ser pagado, registrado, ensamblado y entregado, y también disponible para su posterior análisis. Esto, y por lo tanto no es una situación simple, se complica por el hecho de que un usuario de una tienda en línea no quiere esperar mucho y dolorosamente por algo al hacer un pedido. La respuesta de la tienda en línea debe ser rápida, porque cada milisegundo de retraso aumenta la posibilidad de perder un cliente y, posteriormente, obtener ganancias. En este artículo quiero hablar sobre el agente de mensajes RabbitMQ y cómo usarlo para organizar el procesamiento de pedidos usando Node.js y TypeScript. Bienvenido a cat.


La teoria necesaria


Creo que muchos han oído hablar de RabbitMQ, porque la primera versión de código abierto de este intermediario de mensajes, basada en el protocolo AMQP, ya se lanzó en 2007. Se necesita un intermediario de mensajes para conectar diferentes componentes del sistema en un solo conjunto, ya que se necesita pegamento para resucitar un florero roto. Con el intermediario de mensajes, puede implementar el procesamiento asincrónico de eventos recibidos en el sistema. Es un procesamiento de pedidos tan asíncrono que la tienda en línea necesita. Pero primero debe comprender los componentes básicos de RabbitMQ. Este corredor tiene tres componentes principales con los que construiremos el proceso de procesamiento:


  • Mensaje Esta es la unidad de información más pequeña dentro del agente de mensajes y nuestro servicio de procesamiento que puede procesarse. RabbitMQ en sí mismo almacena mensajes en forma binaria, pero para nuestro sistema y para el artículo esto no es importante. Recibiremos y procesaremos mensajes en forma de JSON. También vale la pena mencionar que los mensajes en RabbitMQ tienen encabezados. Son similares a los encabezados de las solicitudes http. Esta es una matriz asociativa en la que puede escribir la información necesaria.
  • Cola de mensajes Esta es la cola en la que RabbitMQ almacena mensajes. Uno o más consumidores pueden suscribirse a una cola de mensajes. Cada mensaje en la cola de conejo se distribuye a los consumidores utilizando el algoritmo round-robin.
  • Intercambio Esto, como su nombre lo indica, es un punto de intercambio. Se pueden adjuntar colas u otros intercambiadores a este punto. Un punto de intercambio no almacena mensajes; su función principal es enrutar mensajes a una o varias colas, o los mismos puntos de intercambio. Cada cola o intercambiador está vinculado por una clave de enrutamiento. Hay varios tipos diferentes de intercambiadores en RabbitMQ que afectan cómo exactamente el intercambio enrutará el mensaje recibido en él.

Para describir cómo funcionan los diferentes tipos de intercambiadores, es necesario comprender qué son las claves de enrutamiento. La clave de enrutamiento está tanto en el enlace de la cola al intercambiador como en el mensaje mismo. La clave de enrutamiento es solo una cadena dividida en bloques. Cada bloque está separado por un punto. Por ejemplo, "notify.sendEmail.sendSms". Al mismo tiempo, se pueden establecer patrones para la clave de enrutamiento de mensajes utilizando caracteres especiales # y *. * - dice que después de un punto cualquier bloque puede ir, pero después de # cualquier número de bloques puede ir. Por ejemplo, "notify.sendSms. *" O "notifique. #". Ahora puede pasar a los tipos de puntos de intercambio.


Hay cuatro tipos de intercambiadores:


  • Fanout La lógica de enrutamiento de este intercambio es simple; redirige un mensaje entrante a todas las colas o intercambiadores que están conectados a él.


  • Directo Este intercambio redirige el mensaje dependiendo de si la clave de enrutamiento del mensaje coincide con la clave de enrutamiento del enlace.


  • Tema El intercambio de este tipo, así como el enrutamiento directo del mensaje, depende de la clave de enrutamiento. Pero un patrón puede actuar como una clave de enrutamiento.


  • Encabezados Este intercambio, a diferencia de los otros, utiliza encabezados de mensajes para el enrutamiento. Al mismo tiempo, las colas al intercambiador también se unen mediante una matriz asociativa. La lógica por la cual el intercambiador enrutará los mensajes se puede cambiar utilizando la tecla especial "x-match", que se establece en la matriz de enlace asociativo. La clave se puede establecer en dos valores, todos o cualquiera. Si el valor es todo, los encabezados del mensaje deben coincidir completamente con la matriz de enlace asociativo; si el valor es cualquiera, entonces el valor debe coincidir con al menos una clave.


Estos son los componentes principales de RabbitMQ. Puede leer más sobre estos componentes en la especificación del protocolo AMQP . A continuación, diseñaremos e implementaremos un sistema de procesamiento de pedidos utilizando TypeScript como ejemplo, entendiendo simultáneamente la configuración de cada componente.


Diseño


Para simplificar el ejemplo, suponemos que para el procesamiento exitoso de un pedido en línea, debemos tener la siguiente funcionalidad:


  • Guardar pedidos entrantes
  • Enviar SMS al cliente con el número de pedido, así como el estado del pedido
  • Envíe un mensaje al servicio de entrega de mensajería sobre un nuevo pedido de nuestra tienda en línea, si el cliente ha elegido este método de entrega

Pero no es suficiente implementar esta funcionalidad, porque nuestra tienda en línea planea expandir la funcionalidad y brindar más oportunidades diferentes a sus clientes en el futuro (y esto siempre sucede). Por ejemplo, notifique al cliente por correo electrónico o brinde una selección de varios métodos de entrega para el pedido. Se deduce que necesitamos diseñar el sistema de tal manera que agregar funcionalidad sea simple.


También vale la pena mencionar que usaré la plantilla para mensajes diferidos para que sea posible, si el servicio externo no está disponible, repetir la lógica varias veces. Puedes leer sobre esta plantilla aquí.


Para representar más claramente el objetivo final, dibujaré un diagrama.



Veamos en orden cómo funciona el proceso de procesamiento de pedidos en este diagrama. El esquema se divide en bloques y diferentes colores. Los bloques blancos indican servicios externos que no consideraremos. Los bloques grises indican elementos RabbitMQ. Colas e intercambiadores. El color verde refleja los bloques de lógica empresarial que deben implementarse. Además, cada bloque relacionado con nuestra lógica está numerado. Los números indican el proceso y el subproceso en orden.


En primer lugar, el mensaje API HTTP ingresa a nuestro servicio. Después de eso, debemos asignar un número al pedido, guardar el pedido en la base de datos con el estado "nuevo" y enviar una respuesta sobre la creación exitosa del pedido, con su número, de regreso. El cliente, después de recibir un mensaje sobre la creación exitosa de la orden, se dedica a su propio negocio. Al enviar una respuesta positiva, enviamos el objeto de pedido al intercambio de posprocesamiento, desde el cual recae en el trabajador de la formación de la clave de enrutamiento. Este trabajador, después de haber recibido el objeto de pedido de la cola, sobre la base de este (si hay un correo electrónico o el teléfono de un cliente en el pedido, qué método de entrega se eligió) debe formar la clave de enrutamiento del pedido. Después de haber formado una clave de enrutamiento, el trabajador envía un mensaje de vuelta al intercambio de procesamiento posterior, pero ahora la clave de enrutamiento de la orden ha cambiado y el intercambiador ya puede enviarla en la ruta deseada. Dependiendo de la clave, el pedido se puede enviar al intercambio, que es responsable de las notificaciones, integraciones de intercambio o ambas a la vez. Y más adelante en la misma lógica en una cola y trabajadores.


Los trabajadores de envío de SMS y los servicios de entrega intentarán procesar el mensaje varias veces. El número de tales intentos se puede pasar en una variable de entorno. Pero no debe procesar el mensaje interminablemente, porque el error puede estar en el mensaje mismo o en la lógica del trabajador. Por lo tanto, después de exceder el número de intentos permitidos, el mensaje se eliminará de las colas y se enviará al almacén de errores, desde donde se puede volver a enviar al nivel de procesamiento deseado.


Implementación


Para verificar la implementación, necesitarás el propio conejo. Recomiendo usar Docker y una imagen oficial de corredor para este propósito. Instala y ejecuta el contenedor con el siguiente comando.


docker run -d --name rabbit -p 5672:5672 -e rabbitmq:3.7.15-management-alpine 

Esta es una imagen con una interfaz web disponible en el puerto 15672 para una depuración conveniente.


Implementaremos nuestros planes con TypeScript y la biblioteca amqplib (implementación del cliente RabbitMQ para Node.js), por lo que, para empezar, debe describir varias interfaces. Describimos las interfaces de la orden y los mensajes que enviaremos a rabbit.


 //    export interface Product { id: string; name: string; price: number; } //    export interface Order { clientName: string; city: string; email?: string; phone?: string; products: Product[]; totalSum: number; deliveryAddress?: string; } //         export interface OrderWithPhone extends Order { phone: string; } //        export interface OrderWithDeliveryAddress extends Order { deliveryAddress: string; } // Types Guard'        export const isOrderWithPhone = (order: Order): order is OrderWithPhone => Boolean(order.phone); export const isOrderWithDeliveryAddress = (order: Order): order is OrderWithDeliveryAddress => Boolean(order.deliveryAddress); //    . export interface Message<O extends Order> { errors: string[]; retry: number; order: O; //         export interface FailOrder extends Message<Order> { exchange: string; routingKey: string; } 

Ahora necesitamos describir la interfaz de configuración de colas e intercambiadores, sobre la base de la cual construiremos la estructura de procesamiento en conejo.


 import { Types, ExchangeTypes } from '../constants'; import { Options } from 'amqplib'; //   RabbitMQ       export enum Types { QUEUE = 'queue', EXCHANGE = 'exchange', } //      export enum ExchangeTypes { TOPIC = 'topic', } //    export interface Queue { name: string; options: Options.AssertQueue; } //    export interface Exchange { name: string; type: ExchangeTypes; } //    export interface Binding { type: Types; destination: string; source: string; routingKey: string; } //   RabbitMQ export interface PipelineConfig { queues: Queue[]; exchanges: Exchange[]; bindings: Binding[]; } 

Habiendo descrito los componentes principales del sistema, describimos la configuración que se dibujó en el diagrama usando el objeto.


Colas


 export default [ //        routingKey { name: 'generateRoutingKey', options: { durable: true, }, }, //   sms { name: 'sendSms', options: { durable: true, }, }, //      { name: 'delivery', options: { durable: true, }, }, //         sms { name: 'sendSmsHold', options: { durable: true, deadLetterExchange: 'notify', deadLetterRoutingKey: 'sendSms', messageTtl: 60000, }, }, //            { name: 'deliveryHold', options: { durable: true, deadLetterExchange: 'integrates', deadLetterRoutingKey: 'delivery', messageTtl: 60000, }, }, ]; 

Al describir las colas, se utilizan las siguientes opciones para la cola.


  • duradero Por defecto, todos los mensajes de la cola se almacenan en la memoria. Por lo tanto, cuando el corredor se reinicia, los mensajes desaparecerán. Para evitar esto, puede usar esta opción. Con esta configuración, el conejo vaciará los mensajes al disco. Pero hay una advertencia. Para que los mensajes se guarden después del reinicio del intermediario, esta configuración no es suficiente; los mensajes deben enviarse a la cola con la opción persistente.
  • messageTtl . El mensaje de por vida. Dado en milisegundos
  • deadLetterExchange . El nombre del intercambiador donde se enviará el mensaje desde la cola cuando caduque.
  • deadLetterRoutingKey . RoutingKey con la que se enviará el mensaje al intercambiador desde la opción anterior

Intercambios


 import { ExchangeTypes } from '../constants'; export default [ { name: 'postprocessing', type: ExchangeTypes.TOPIC, }, { name: 'notify', type: ExchangeTypes.TOPIC, }, { name: 'integrates', type: ExchangeTypes.TOPIC, }, ]; 

Fijaciones


 import { Types } from '../constants'; export default [ { type: Types.EXCHANGE, destination: 'notify', source: 'postprocessing', routingKey: '#.notify.#', }, { type: Types.EXCHANGE, destination: 'integrates', source: 'postprocessing', routingKey: '#.integrates.#', }, { type: Types.QUEUE, destination: 'generateRoutingKey', source: 'postprocessing', routingKey: 'generateRoutingKey', }, { type: Types.QUEUE, destination: 'sendSms', source: 'notify', routingKey: '#.sendSms.#', }, { type: Types.QUEUE, destination: 'delivery', source: 'integrates', routingKey: '#.delivery.#', }, { type: Types.QUEUE, destination: 'sendSmsHold', source: 'notify', routingKey: 'sendSmsHold', }, { type: Types.QUEUE, destination: 'deliveryHold', source: 'integrates', routingKey: 'deliveryHold', }, ]; 

Configuración completa


 import { PipelineConfig } from '../interfaces'; import exchanges from './exchanges'; import queues from './queues'; import bindings from './bindigs'; export const pipelineConfig: PipelineConfig = { exchanges, queues, bindings, }; 

Para conectarte al conejo, escribe una clase.


 import { connect, Connection, Channel } from 'amqplib'; export class RabbitConnect { private _uri: string; private _connection: Connection; private _chanel: Channel; constructor() { //    rabbit     this._uri = process.env.RABBIT_URI || 'amqp://localhost'; } protected async connect() { this._connection = await connect(this._uri); this._chanel = await this._connection.createChannel(); } protected async disconnect() { await this._chanel.close(); return this._connection.close(); } protected get chanel() { return this._chanel; } } 

Escribamos la clase Pipeline, que al inicio creará toda la infraestructura necesaria en conejo de acuerdo con la configuración descrita anteriormente.


 import { RabbitConnect } from './RabbitConnect'; import { PipelineConfig } from './interfaces'; import { Types } from './constants'; export class Pipeline extends RabbitConnect { private _pipeline: PipelineConfig; constructor(pipelineConfig: PipelineConfig) { super(); this._pipeline = pipelineConfig; } public async create() { try { await this.connect(); //   const createQueues = this._pipeline.queues.map(queue => this.chanel.assertQueue(queue.name, queue.options), ) as PromiseLike<any>[]; //   const createExchanges = this._pipeline.exchanges.map(exchange => this.chanel.assertExchange(exchange.name, exchange.type), ) as PromiseLike<any>[]; await Promise.all([...createQueues, ...createExchanges]); //       const createBindings = this._pipeline.bindings.map(binding => { if (binding.type === Types.QUEUE) { return this.chanel.bindQueue(binding.destination, binding.source, binding.routingKey); } return this.chanel.bindExchange(binding.destination, binding.source, binding.routingKey); }); await Promise.all(createBindings); return this.disconnect(); } catch (error) { console.error(error); throw new Error(error); } } } 

Ahora escribiremos una clase de trabajador abstracta con una funcionalidad común para todos los trabajadores de la que será posible heredar.


 import { RabbitConnect } from './RabbitConnect'; import { Message, Order, FailOrder } from './interfaces'; import { ConsumeMessage } from 'amqplib'; export interface WorkerParams { maxRetry?: number; //     active: string; //    exchange: string; //       holdKey?: string; //      } export abstract class Worker<M extends Order> extends RabbitConnect { private _maxRetry: number; private _active: string; private _holdKey: string | undefined; protected exchange: string; private _currentMessage: Message<M>; private _currentConsumeMessage: ConsumeMessage; constructor({ active, holdKey, exchange, maxRetry }: WorkerParams) { super(); this._maxRetry = maxRetry || 0; this._active = active; this._holdKey = holdKey; this.exchange = exchange; } public async subscribe() { await this.connect(); this.chanel.consume(this._active, this.checkMessage.bind(this)); } //          private async checkMessage(message: ConsumeMessage) { this._currentConsumeMessage = message; const orderMessage: Message<M> = JSON.parse(message.content.toString()); if (orderMessage.retry >= this._maxRetry) { await this.sendToErrorStorage('  '); } this._currentMessage = orderMessage; //           await this.handler(orderMessage.order || orderMessage); } //       protected async sendToErrorStorage(error: string) { const message: FailOrder = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, exchange: this.exchange, routingKey: this._active }; console.log('   ', message); this.ack(); } //       protected async hold(error: string) { if (!this._holdKey) { return; } const orderMessage = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, }; const orderData = Buffer.from(JSON.stringify(orderMessage)); return this.chanel.publish(this.exchange, this._holdKey, orderData); } //      protected async ack() { return this.chanel.ack(this._currentConsumeMessage); } protected abstract handler(message: M): void; } 

Por defecto, el conejo requiere la confirmación de un procesamiento exitoso del mensaje por parte del trabajador. Para esto, el canal de conexión tiene un método ack. Si el trabajador no pudo procesar el mensaje, entonces hay un método oculto que le dice a conejo que envíe el mensaje a otro trabajador.


Ahora podemos escribir algunos trabajadores simples del diagrama.


Trabajador que genera una clave de enrutamiento.


 import { Worker } from '../Worker'; import { isOrderWithPhone, isOrderWithDeliveryAddress, Order, Message, } from '../interfaces'; import { Keys } from '../constants'; export class GenerateRoutingKey extends Worker<Order> { constructor() { super({ active: 'generateRoutingKey', exchange: 'postprocessing', }); } protected async handler(order: Order) { try { const routingKey: string[] = []; if (isOrderWithPhone(order)) { routingKey.push(Keys.SEND_SMS); } if (isOrderWithDeliveryAddress(order)) { routingKey.push(Keys.SEND_TO_DELIVERY); } const message: Message<Order> = { retry: 0, errors: [], order, }; await this.chanel.publish( this.exchange, routingKey.join('.'), Buffer.from(JSON.stringify(message)), ); await this.ack(); } catch (error) { console.error(error); await this.sendToErrorStorage(error); } } } 

Trabajadores enviando sms.


 import { Worker } from '../Worker'; import { OrderWithPhone } from '../interfaces'; export class SendSms extends Worker<OrderWithPhone> { constructor() { super({ active: 'sendSms', exchange: 'notify', holdKey: 'sendSmsHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithPhone) { try { console.log(' sms  : ', message.phone); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } } 

Integración de trabajadores con servicio a domicilio.


 import { Worker } from '../Worker'; import { OrderWithDeliveryAddress } from '../interfaces'; export class Delivery extends Worker<OrderWithDeliveryAddress> { constructor() { super({ active: 'delivery', exchange: 'interates', holdKey: 'deliveryHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithDeliveryAddress) { try { console.log('      : ', message.deliveryAddress); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } } 

El punto de entrada a la aplicación.


 import { Pipeline } from './Pipeline'; import { pipelineConfig } from './pipeline'; import { GenerateRoutingKey } from './workers/GenerateRoutingKey'; import { SendSms } from './workers/SendSms'; import { Delivery } from './workers/Delivery'; (async () => { try { const pipeline = new Pipeline(pipelineConfig); const generateRoutingKey = new GenerateRoutingKey(); const sendSms = new SendSms(); const delivery = new Delivery(); await pipeline.create(); await Promise.all([generateRoutingKey.subscribe(), sendSms.subscribe(), delivery.subscribe()]); } catch (error) { console.error(error); process.exit(1); } })(); 

No daré un ejemplo de una clase de código para escribir un pedido en la base de datos y generar un número de pedido por Internet. Esto está más allá del alcance de este artículo. Para verificar el código, puede usar la interfaz web de conejo enviando el pedido json al intercambiador después del procesamiento.


Conclusión


Tal esquema de construcción para procesar un pedido en línea facilita la ampliación del sistema. No será difícil para nosotros agregar varias colas y trabajadores a este esquema para agregar la funcionalidad necesaria. Por ejemplo, puede agregar notificaciones de envío por correo electrónico o enviar un pedido de contabilidad en 1C. El circuito convertido se verá así:



Espero que hayas disfrutado el artículo. Estaré encantado de cualquier comentario y crítica. Todo el código enviado se puede encontrar en github

Source: https://habr.com/ru/post/469991/


All Articles