Processamos pedidos da loja on-line usando RabbitMQ e TypeScript


Olá pessoal! A popularidade do comércio na Internet está em constante crescimento, assim como a parcela de informatização de todos os tipos de atividades relacionadas ao comércio. Junto com isso, a complexidade do processamento de informações está aumentando. Cada pedido feito por um cliente de uma loja online gera um grande número de integrações com vários serviços. Esses serviços podem incluir serviços de processamento de pagamento, entrega, contabilidade e fidelidade. Cada pedido deve ser pago, registrado, montado e entregue, além de estar disponível para análise posterior. Isso, e não uma situação simples, é complicado pelo fato de um usuário de uma loja on-line não querer esperar muito e dolorosamente por algo ao fazer um pedido. A resposta da loja online deve ser rápida, pois cada milissegundo de atraso aumenta a chance de perda de um cliente e, posteriormente, o lucro. Neste artigo, quero falar sobre o broker de mensagens RabbitMQ e como usá-lo para organizar o processamento de pedidos usando o Node.js e o TypeScript. Bem-vindo ao gato.


A teoria necessária


Acho que muitos já ouviram falar do RabbitMQ, porque a primeira versão de código aberto desse intermediário de mensagens, com base no protocolo AMQP, já foi lançada em 2007. É necessário um intermediário de mensagens para conectar diferentes componentes do sistema em um único todo, pois é necessária cola para ressuscitar um vaso quebrado. Usando o intermediário de mensagens, é possível implementar o processamento assíncrono dos eventos recebidos no sistema. É apenas um processamento de pedidos assíncrono que a loja online precisa. Mas primeiro você precisa entender os componentes básicos do RabbitMQ. Esse intermediário possui três componentes principais com os quais iremos construir o processo de processamento:


  • Mensagem Essa é a menor unidade de informações no intermediário de mensagens e em nosso serviço de processamento que pode ser processado. O RabbitMQ armazena mensagens em formato binário, mas para o nosso sistema e para o artigo isso não é importante. Receberemos e processaremos mensagens na forma de JSON. Também vale a pena mencionar que as mensagens no RabbitMQ têm cabeçalhos. Eles são semelhantes aos cabeçalhos das solicitações http. Essa é uma matriz associativa na qual você pode escrever as informações necessárias.
  • Fila de mensagens . Essa é a fila na qual o RabbitMQ armazena mensagens. Uma fila de mensagens pode ser assinada por um ou mais consumidores. Cada mensagem na fila de coelhos é distribuída aos consumidores usando o algoritmo round-robin.
  • Troca Isso, como o nome indica, é um ponto de troca. Filas ou outros trocadores podem ser anexados a este ponto. Um ponto de troca não armazena mensagens; sua principal função é rotear mensagens para uma ou várias filas ou para os mesmos pontos de troca. Cada fila ou trocador é vinculado por uma chave de roteamento. Existem vários tipos diferentes de trocadores no RabbitMQ que afetam como exatamente a troca encaminhará a mensagem recebida nele.

Para descrever como os diferentes tipos de trocadores funcionam, é necessário entender o que são as chaves de roteamento. A chave de roteamento está na ligação da fila ao trocador e na própria mensagem. A chave de roteamento é apenas uma string dividida em blocos. Cada bloco é separado por um ponto. Por exemplo, "notify.sendEmail.sendSms". Ao mesmo tempo, é possível definir padrões para a chave de roteamento de mensagens usando caracteres especiais # e *. * - diz que após um ponto qualquer bloco pode ir, mas depois de # qualquer número de blocos pode ir. Por exemplo, "notify.sendSms. *" Ou "notify. #". Agora você pode passar para os tipos de pontos de troca.


Existem quatro tipos de trocadores:


  • Fanout A lógica de roteamento dessa troca é simples, ela redireciona a mensagem recebida para todas as filas ou trocadores anexados a ela.


  • Direto Essa troca redireciona a mensagem, dependendo se a chave de roteamento da mensagem corresponde à chave de roteamento da ligação.


  • Tópico A troca desse tipo, bem como o Direct, direciona a mensagem, dependendo da chave de roteamento. Mas um padrão pode atuar como uma chave de roteamento.


  • Cabeçalhos . Essa troca, diferente das outras, usa cabeçalhos de mensagens para roteamento. Ao mesmo tempo, as filas para o trocador também são vinculadas usando uma matriz associativa. A lógica pela qual o trocador encaminhará as mensagens pode ser alterada usando a tecla "x-match" especial, definida na matriz de ligação associativa. A chave pode ser configurada para dois valores, todos ou qualquer um. Se o valor for todo, os cabeçalhos da mensagem deverão corresponder completamente à matriz de ligação associativa; se o valor for algum, o valor deverá corresponder a pelo menos uma chave.


Estes são os principais componentes do RabbitMQ. Você pode ler mais sobre esses componentes na especificação do protocolo AMQP . Em seguida, projetaremos e implementaremos um sistema de processamento de pedidos usando o TypeScript como exemplo, entendendo simultaneamente as configurações de cada componente.


Desenho


Para simplificar o exemplo, assumimos que, para o processamento bem-sucedido de um pedido on-line, precisamos ter a seguinte funcionalidade:


  • Salvar pedidos recebidos
  • Envie SMS para o cliente com o número do pedido e o status do pedido
  • Envie uma mensagem ao serviço de entrega de correio sobre um novo pedido da nossa loja online, se o cliente tiver escolhido esse método de entrega

Mas não basta implementar essa funcionalidade, porque nossa loja online planeja expandir a funcionalidade e oferecer mais oportunidades diferentes aos seus clientes no futuro (e isso sempre acontece). Por exemplo, notifique o cliente por email ou forneça uma escolha de vários métodos de entrega para o pedido. Daqui resulta que precisamos projetar o sistema de forma que a adição de funcionalidades seja simples.


Também vale mencionar que usarei o modelo para mensagens adiadas, para que seja possível, se o serviço externo não estiver disponível, repetir a lógica várias vezes. Você pode ler sobre esse modelo aqui.


Para representar mais claramente o objetivo final, vou desenhar um diagrama.



Vamos ver em ordem como o processo de processamento de pedidos funciona neste diagrama. O esquema é dividido em blocos e cores diferentes. Blocos brancos indicam serviços externos que não consideraremos. Blocos cinza indicam elementos RabbitMQ. Filas e trocadores. A cor verde reflete os blocos da lógica de negócios que precisam ser implementados. Além disso, cada bloco relacionado à nossa lógica é numerado. Os números indicam o processo e o subprocesso em ordem.


Primeiro, a mensagem da API HTTP entra em nosso serviço. Depois disso, devemos atribuir um número ao pedido, salvar o pedido no banco de dados com o status “novo” e enviar uma resposta sobre a criação bem-sucedida do pedido, com seu número, de volta. O cliente, após receber uma mensagem sobre a criação bem-sucedida do pedido, cuida de seus próprios negócios. Ao enviar uma resposta positiva, enviamos o objeto de pedido para a central de pós-processamento, a partir da qual ele cai no trabalhador da formação da chave de roteamento. Esse trabalhador, tendo recebido o objeto de pedido da fila, com base nele (se há um e-mail ou telefone de um cliente no pedido, qual método de entrega foi escolhido) deve formar a chave de roteamento de pedidos. Após formar uma chave de roteamento, o trabalhador envia uma mensagem de volta à central de pós-processamento, mas agora a chave de roteamento do pedido foi alterada e o trocador já pode enviá-lo na rota desejada. Dependendo da chave, o pedido pode ser enviado para troca, responsável por notificações, integrações de troca ou ambas ao mesmo tempo. E ainda mais na mesma lógica em uma fila e trabalhadores.


Trabalhadores que enviam SMS e serviços de entrega tentarão processar a mensagem várias vezes. O número de tais tentativas pode ser passado em uma variável de ambiente. Mas você não deve processar a mensagem infinitamente, porque o erro pode estar na própria mensagem ou na lógica do trabalhador. Portanto, após exceder o número de tentativas permitidas, a mensagem será excluída das filas e enviada ao armazenamento de erros, da qual poderá ser reenviada para o nível de processamento desejado.


Implementação


Para verificar a implementação, você precisará do próprio coelho. Eu recomendo usar o docker e uma imagem oficial do broker para esse fim. Instale e execute o contêiner com o seguinte comando.


docker run -d --name rabbit -p 5672:5672 -e rabbitmq:3.7.15-management-alpine 

Esta é uma imagem com uma interface da Web disponível na porta 15672 para depuração conveniente.


Implementaremos nossos planos com o TypeScript e a biblioteca amqplib (implementação do cliente RabbitMQ para Node.js.), portanto, para começar, é necessário descrever várias interfaces. Descrevemos as interfaces da ordem e as mensagens que enviaremos ao coelho.


 //    export interface Product { id: string; name: string; price: number; } //    export interface Order { clientName: string; city: string; email?: string; phone?: string; products: Product[]; totalSum: number; deliveryAddress?: string; } //         export interface OrderWithPhone extends Order { phone: string; } //        export interface OrderWithDeliveryAddress extends Order { deliveryAddress: string; } // Types Guard'        export const isOrderWithPhone = (order: Order): order is OrderWithPhone => Boolean(order.phone); export const isOrderWithDeliveryAddress = (order: Order): order is OrderWithDeliveryAddress => Boolean(order.deliveryAddress); //    . export interface Message<O extends Order> { errors: string[]; retry: number; order: O; //         export interface FailOrder extends Message<Order> { exchange: string; routingKey: string; } 

Agora precisamos descrever a interface de configuração de filas e trocadores, com base na qual iremos construir a estrutura de processamento em coelho.


 import { Types, ExchangeTypes } from '../constants'; import { Options } from 'amqplib'; //   RabbitMQ       export enum Types { QUEUE = 'queue', EXCHANGE = 'exchange', } //      export enum ExchangeTypes { TOPIC = 'topic', } //    export interface Queue { name: string; options: Options.AssertQueue; } //    export interface Exchange { name: string; type: ExchangeTypes; } //    export interface Binding { type: Types; destination: string; source: string; routingKey: string; } //   RabbitMQ export interface PipelineConfig { queues: Queue[]; exchanges: Exchange[]; bindings: Binding[]; } 

Tendo descrito os principais componentes do sistema, descrevemos a configuração que foi desenhada no diagrama usando o objeto.


Filas


 export default [ //        routingKey { name: 'generateRoutingKey', options: { durable: true, }, }, //   sms { name: 'sendSms', options: { durable: true, }, }, //      { name: 'delivery', options: { durable: true, }, }, //         sms { name: 'sendSmsHold', options: { durable: true, deadLetterExchange: 'notify', deadLetterRoutingKey: 'sendSms', messageTtl: 60000, }, }, //            { name: 'deliveryHold', options: { durable: true, deadLetterExchange: 'integrates', deadLetterRoutingKey: 'delivery', messageTtl: 60000, }, }, ]; 

Ao descrever filas, as seguintes opções são usadas para a fila.


  • durável . Por padrão, todas as mensagens da fila são armazenadas na memória. Portanto, quando o broker reiniciar, as mensagens desaparecerão. Para evitar isso, você pode usar esta opção. Com essa configuração, o coelho envia mensagens para o disco. Mas há uma ressalva. Para que as mensagens sejam salvas após a reinicialização do broker, essa configuração não é suficiente; as mensagens devem ser enviadas para a fila com a opção persistente.
  • messageTtl . A vida útil da mensagem. Dado em milissegundos
  • deadLetterExchange . O nome do trocador para o qual a mensagem será enviada da fila quando expirar
  • deadLetterRoutingKey . RoutingKey com a qual a mensagem será enviada ao trocador a partir da opção anterior

Trocas


 import { ExchangeTypes } from '../constants'; export default [ { name: 'postprocessing', type: ExchangeTypes.TOPIC, }, { name: 'notify', type: ExchangeTypes.TOPIC, }, { name: 'integrates', type: ExchangeTypes.TOPIC, }, ]; 

Ligações


 import { Types } from '../constants'; export default [ { type: Types.EXCHANGE, destination: 'notify', source: 'postprocessing', routingKey: '#.notify.#', }, { type: Types.EXCHANGE, destination: 'integrates', source: 'postprocessing', routingKey: '#.integrates.#', }, { type: Types.QUEUE, destination: 'generateRoutingKey', source: 'postprocessing', routingKey: 'generateRoutingKey', }, { type: Types.QUEUE, destination: 'sendSms', source: 'notify', routingKey: '#.sendSms.#', }, { type: Types.QUEUE, destination: 'delivery', source: 'integrates', routingKey: '#.delivery.#', }, { type: Types.QUEUE, destination: 'sendSmsHold', source: 'notify', routingKey: 'sendSmsHold', }, { type: Types.QUEUE, destination: 'deliveryHold', source: 'integrates', routingKey: 'deliveryHold', }, ]; 

Configuração completa


 import { PipelineConfig } from '../interfaces'; import exchanges from './exchanges'; import queues from './queues'; import bindings from './bindigs'; export const pipelineConfig: PipelineConfig = { exchanges, queues, bindings, }; 

Para se conectar ao coelho, escreva uma classe.


 import { connect, Connection, Channel } from 'amqplib'; export class RabbitConnect { private _uri: string; private _connection: Connection; private _chanel: Channel; constructor() { //    rabbit     this._uri = process.env.RABBIT_URI || 'amqp://localhost'; } protected async connect() { this._connection = await connect(this._uri); this._chanel = await this._connection.createChannel(); } protected async disconnect() { await this._chanel.close(); return this._connection.close(); } protected get chanel() { return this._chanel; } } 

Vamos escrever a classe Pipeline, que na inicialização criará toda a infraestrutura necessária no rabbit, de acordo com a configuração descrita anteriormente.


 import { RabbitConnect } from './RabbitConnect'; import { PipelineConfig } from './interfaces'; import { Types } from './constants'; export class Pipeline extends RabbitConnect { private _pipeline: PipelineConfig; constructor(pipelineConfig: PipelineConfig) { super(); this._pipeline = pipelineConfig; } public async create() { try { await this.connect(); //   const createQueues = this._pipeline.queues.map(queue => this.chanel.assertQueue(queue.name, queue.options), ) as PromiseLike<any>[]; //   const createExchanges = this._pipeline.exchanges.map(exchange => this.chanel.assertExchange(exchange.name, exchange.type), ) as PromiseLike<any>[]; await Promise.all([...createQueues, ...createExchanges]); //       const createBindings = this._pipeline.bindings.map(binding => { if (binding.type === Types.QUEUE) { return this.chanel.bindQueue(binding.destination, binding.source, binding.routingKey); } return this.chanel.bindExchange(binding.destination, binding.source, binding.routingKey); }); await Promise.all(createBindings); return this.disconnect(); } catch (error) { console.error(error); throw new Error(error); } } } 

Agora, escreveremos uma classe abstrata de trabalhadores com uma funcionalidade comum para todos os trabalhadores dos quais será possível herdar.


 import { RabbitConnect } from './RabbitConnect'; import { Message, Order, FailOrder } from './interfaces'; import { ConsumeMessage } from 'amqplib'; export interface WorkerParams { maxRetry?: number; //     active: string; //    exchange: string; //       holdKey?: string; //      } export abstract class Worker<M extends Order> extends RabbitConnect { private _maxRetry: number; private _active: string; private _holdKey: string | undefined; protected exchange: string; private _currentMessage: Message<M>; private _currentConsumeMessage: ConsumeMessage; constructor({ active, holdKey, exchange, maxRetry }: WorkerParams) { super(); this._maxRetry = maxRetry || 0; this._active = active; this._holdKey = holdKey; this.exchange = exchange; } public async subscribe() { await this.connect(); this.chanel.consume(this._active, this.checkMessage.bind(this)); } //          private async checkMessage(message: ConsumeMessage) { this._currentConsumeMessage = message; const orderMessage: Message<M> = JSON.parse(message.content.toString()); if (orderMessage.retry >= this._maxRetry) { await this.sendToErrorStorage('  '); } this._currentMessage = orderMessage; //           await this.handler(orderMessage.order || orderMessage); } //       protected async sendToErrorStorage(error: string) { const message: FailOrder = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, exchange: this.exchange, routingKey: this._active }; console.log('   ', message); this.ack(); } //       protected async hold(error: string) { if (!this._holdKey) { return; } const orderMessage = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, }; const orderData = Buffer.from(JSON.stringify(orderMessage)); return this.chanel.publish(this.exchange, this._holdKey, orderData); } //      protected async ack() { return this.chanel.ack(this._currentConsumeMessage); } protected abstract handler(message: M): void; } 

Por padrão, o coelho exige a confirmação de um processamento de mensagens bem-sucedido do trabalhador. Para isso, o canal de conexão possui um método ack. Se o trabalhador não conseguiu processar a mensagem, existe um método nack que diz ao coelho para enviar a mensagem para outro trabalhador.


Agora podemos escrever alguns trabalhadores simples a partir do diagrama.


Trabalhador que gera uma chave de roteamento.


 import { Worker } from '../Worker'; import { isOrderWithPhone, isOrderWithDeliveryAddress, Order, Message, } from '../interfaces'; import { Keys } from '../constants'; export class GenerateRoutingKey extends Worker<Order> { constructor() { super({ active: 'generateRoutingKey', exchange: 'postprocessing', }); } protected async handler(order: Order) { try { const routingKey: string[] = []; if (isOrderWithPhone(order)) { routingKey.push(Keys.SEND_SMS); } if (isOrderWithDeliveryAddress(order)) { routingKey.push(Keys.SEND_TO_DELIVERY); } const message: Message<Order> = { retry: 0, errors: [], order, }; await this.chanel.publish( this.exchange, routingKey.join('.'), Buffer.from(JSON.stringify(message)), ); await this.ack(); } catch (error) { console.error(error); await this.sendToErrorStorage(error); } } } 

Trabalhadores enviando sms.


 import { Worker } from '../Worker'; import { OrderWithPhone } from '../interfaces'; export class SendSms extends Worker<OrderWithPhone> { constructor() { super({ active: 'sendSms', exchange: 'notify', holdKey: 'sendSmsHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithPhone) { try { console.log(' sms  : ', message.phone); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } } 

Integração do trabalhador com o serviço de entrega.


 import { Worker } from '../Worker'; import { OrderWithDeliveryAddress } from '../interfaces'; export class Delivery extends Worker<OrderWithDeliveryAddress> { constructor() { super({ active: 'delivery', exchange: 'interates', holdKey: 'deliveryHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithDeliveryAddress) { try { console.log('      : ', message.deliveryAddress); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } } 

O ponto de entrada para o aplicativo.


 import { Pipeline } from './Pipeline'; import { pipelineConfig } from './pipeline'; import { GenerateRoutingKey } from './workers/GenerateRoutingKey'; import { SendSms } from './workers/SendSms'; import { Delivery } from './workers/Delivery'; (async () => { try { const pipeline = new Pipeline(pipelineConfig); const generateRoutingKey = new GenerateRoutingKey(); const sendSms = new SendSms(); const delivery = new Delivery(); await pipeline.create(); await Promise.all([generateRoutingKey.subscribe(), sendSms.subscribe(), delivery.subscribe()]); } catch (error) { console.error(error); process.exit(1); } })(); 

Não darei um exemplo de classe de código para escrever um pedido no banco de dados e gerar um número de pedido na Internet. Isso está além do escopo deste artigo. Para verificar o código, você pode usar a interface da web do coelho enviando o pedido json ao pós-processamento do trocador.


Conclusão


Esse esquema de construção para processar um pedido on-line facilita o dimensionamento do sistema. Não será difícil adicionar várias filas e trabalhadores a esse esquema para adicionar a funcionalidade necessária. Por exemplo, você pode adicionar o envio de notificações por email ou um pedido de contabilidade em 1C. O circuito convertido terá a seguinte aparência:



Espero que tenham gostado do artigo. Ficarei feliz em quaisquer comentários e críticas. Todo o código enviado pode ser encontrado no github

Source: https://habr.com/ru/post/pt469991/


All Articles