我们使用RabbitMQ和TypeScript处理来自在线商店的订单


大家好! 互联网商务的普及与与贸易有关的所有活动的信息化所占的份额都在不断增长。 与此同时,信息处理的复杂性也在增长。 在线商店的客户做出的每笔订单都会生成大量与各种服务的集成。 此类服务可能包括付款处理,交付,会计和会员服务。 每个订单必须付款,记录,组装和交付,还可以进行进一步分析。 由于不是在线商店的用户在下订单时就不想长时间痛苦地等待某事,因此这种情况(而不是简单的情况)变得复杂。 在线商店的响应应该很快,因为每毫秒的延迟都会增加失去客户并随后获利的机会。 在本文中,我想讨论RabbitMQ消息代理,以及如何使用它来使用Node.js和TypeScript组织订单处理。 欢迎来到猫。


必要的理论


我想许多人听说过RabbitMQ,因为基于AMQP协议的此消息代理的第一个开源版本已于2007年发布。 需要消息代理将系统的不同组件连接为一个整体,因为需要使用胶水来恢复破损的花瓶。 使用消息代理,可以实现对系统中接收到的事件的异步处理。 在线商店需要的只是异步订单处理。 但是首先,您需要了解RabbitMQ的基本组件。 该代理具有三个主要组件,我们将使用它们来构建处理过程:


  • 留言内容 这是消息代理和我们的处理服务中可以处理的最小信息单元。 RabbitMQ本身以二进制形式存储消息,但是对于我们的系统和本文而言,这并不重要。 我们将以JSON形式接收和处理消息。 还值得一提的是RabbitMQ中的消息具有标头。 它们类似于http请求的标头。 这是一个关联数组,您可以在其中编写必要的信息。
  • 消息队列 。 这是RabbitMQ存储消息的队列。 消息队列可以由一个或多个使用者预订。 使用循环算法将兔子队列中的每个消息分发给消费者。
  • 交换 顾名思义,这是一个交换点。 队列或其他交换器可以附加到这一点。 交换点不存储消息;它的主要功能是将消息路由到一个或几个队列或相同的交换点。 每个队列或交换器都由路由键绑定。 RabbitMQ中有几种不同类型的交换器,它们会影响交换如何正确路由接收到的消息的方式。

为了描述不同类型的交换器是如何工作的,有必要了解什么是路由键。 路由密钥既在队列到交换器的绑定中,又在消息本身中。 路由密钥只是一个字符串,分为多个块。 每个块由一个点分隔。 例如,“ notify.sendEmail.sendSms”。 同时,可以使用特殊字符#和*为消息路由键设置模式。 *-表示在一个点之后任何一个程序段都可以执行,但是在#之后可以执行任意数量的程序段。 例如,“ notify.sendSms。*”或“ notify。#”。 现在,您可以继续进行交换点的类型。


交换器有四种类型:


  • 扇出 这种交换的路由逻辑很简单;它将进入的消息重定向到与之相连的所有队列或交换器。


  • 直达 此交换根据消息的路由密钥是否与绑定的路由密钥匹配来重定向消息。


  • 话题 这种类型的交换以及直接路由都取决于路由密钥。 但是模式可以充当路由键。


  • 标头 。 与其他交换不同,此交换使用消息头进行路由。 同时,交换器的队列也使用关联数组进行绑定。 可以使用特殊的“ x-match”键更改交换器路由消息的逻辑,该键在关联绑定数组中设置。 键可以设置为全部或任意两个值。 如果值为all,则消息头必须与匹配绑定数组完全匹配;如果值为any,则该值必须至少与一个键匹配。


这些是RabbitMQ的核心组件。 您可以在AMQP协议规范中阅读有关这些组件的更多信息。 接下来,我们将以TypeScript为例,设计并实现一个订单处理系统,同时了解每个组件的设置。


设计方案


为了简化示例,我们假设要成功处理在线订单,我们必须具有以下功能:


  • 保存收到的订单
  • 通过订单号以及订单状态向客户发送短信
  • 如果客户选择了这种送货方式,请向快递送货服务发送有关我们网上商店的新订单的消息

但是,仅仅实现该功能还不够,因为我们的在线商店计划扩展该功能,并在将来为客户提供更多不同的机会(而且这种情况经常发生)。 例如,通过电子邮件通知客户或为订单提供几种交付方式的选择。 因此,我们需要以一种简单的方式来添加功能来设计系统。


还值得一提的是,我将使用模板处理延迟的消息,以便在外部服务不可用时能够多次重复逻辑。 您可以在此处阅读有关此模板的信息。


为了更清楚地代表最终目标,我将绘制一个图表。



让我们依次查看订单处理流程在此图中的工作方式。 该方案分为块和不同的颜色。 白框表示我们不会考虑的外部服务。 灰色块表示RabbitMQ元素。 队列和交换器。 绿色反映了需要实现的业务逻辑块。 同样,与我们的逻辑相关的每个块都被编号。 数字按顺序指示流程和子流程。


首先,HTTP API消息进入我们的服务。 之后,我们必须为订单分配一个编号,将订单状态保存为数据库中的“新”,并发送有关成功创建订单的响应及其编号。 客户已经收到有关成功创建订单的消息,然后继续自己的生意。 通过发送肯定响应,我们将订单对象发送到后处理交换,交换对象从该对象落入路由键形成的工作程序中。 该工作人员从队列中接收到订单对象后,必须以此为基础(无论订单中是否有电子邮件或客户的电话,以及选择了哪种发送方式),都必须构成订单路由密钥。 形成路由键后,工作人员将消息发送回后处理交换,但是现在订单的路由键已更改,交换器可以在所需的路由上发送它。 根据密钥,可以将订单发送到交易所,后者负责通知,交易所集成或同时执行这两者。 并进一步按照相同的逻辑排队和工作。


SMS发送工作者和传递服务将尝试多次处理该消息。 可以在环境变量中传递此类尝试的次数。 但是您不应该无休止地处理消息,因为错误可能在于消息本身或工作人员的逻辑。 因此,在超过允许的尝试次数之后,该消息将从队列中删除,并发送到错误存储,从中可以将其重新发送回所需的处理级别。


实作


为了验证实现,您需要Rabbit本身。 我建议为此使用docker和官方代理映像。 使用以下命令安装并运行容器。


docker run -d --name rabbit -p 5672:5672 -e rabbitmq:3.7.15-management-alpine 

这是在端口15672上具有Web界面的图像,用于方便调试。


我们将使用TypeScript和amqplib库(Node.js的RabbitMQ客户端实现)实施我们的计划,因此一开始您需要描述几个接口。 我们描述了订单的界面以及将发送给Rabbit的消息。


 //    export interface Product { id: string; name: string; price: number; } //    export interface Order { clientName: string; city: string; email?: string; phone?: string; products: Product[]; totalSum: number; deliveryAddress?: string; } //         export interface OrderWithPhone extends Order { phone: string; } //        export interface OrderWithDeliveryAddress extends Order { deliveryAddress: string; } // Types Guard'        export const isOrderWithPhone = (order: Order): order is OrderWithPhone => Boolean(order.phone); export const isOrderWithDeliveryAddress = (order: Order): order is OrderWithDeliveryAddress => Boolean(order.deliveryAddress); //    . export interface Message<O extends Order> { errors: string[]; retry: number; order: O; //         export interface FailOrder extends Message<Order> { exchange: string; routingKey: string; } 

现在我们需要描述队列和交换器的配置接口,在此基础上我们将构建Rabbit的处理结构。


 import { Types, ExchangeTypes } from '../constants'; import { Options } from 'amqplib'; //   RabbitMQ       export enum Types { QUEUE = 'queue', EXCHANGE = 'exchange', } //      export enum ExchangeTypes { TOPIC = 'topic', } //    export interface Queue { name: string; options: Options.AssertQueue; } //    export interface Exchange { name: string; type: ExchangeTypes; } //    export interface Binding { type: Types; destination: string; source: string; routingKey: string; } //   RabbitMQ export interface PipelineConfig { queues: Queue[]; exchanges: Exchange[]; bindings: Binding[]; } 

描述了系统的主要组件之后,我们描述了使用对象在图上绘制的配置。


Queue列


 export default [ //        routingKey { name: 'generateRoutingKey', options: { durable: true, }, }, //   sms { name: 'sendSms', options: { durable: true, }, }, //      { name: 'delivery', options: { durable: true, }, }, //         sms { name: 'sendSmsHold', options: { durable: true, deadLetterExchange: 'notify', deadLetterRoutingKey: 'sendSms', messageTtl: 60000, }, }, //            { name: 'deliveryHold', options: { durable: true, deadLetterExchange: 'integrates', deadLetterRoutingKey: 'delivery', messageTtl: 60000, }, }, ]; 

描述队列时,以下选项用于队列。


  • 耐用的 。 默认情况下,所有队列消息都存储在内存中。 因此,当代理重新启动时,消息将消失。 为避免这种情况,可以使用此选项。 使用此设置,rabbit会将消息刷新到磁盘。 但是有一个警告。 为了在代理重新启动后保存消息,此设置还不够;必须使用持久性选项将消息发送到队列。
  • messageTtl 。 消息生存期。 以毫秒为单位
  • deadLetterExchange 。 过期时将从队列中发送消息的交换器的名称
  • deadLetterRoutingKey 。 从上一个选项将消息发送到交换器的RoutingKey

交流交流


 import { ExchangeTypes } from '../constants'; export default [ { name: 'postprocessing', type: ExchangeTypes.TOPIC, }, { name: 'notify', type: ExchangeTypes.TOPIC, }, { name: 'integrates', type: ExchangeTypes.TOPIC, }, ]; 

绑定


 import { Types } from '../constants'; export default [ { type: Types.EXCHANGE, destination: 'notify', source: 'postprocessing', routingKey: '#.notify.#', }, { type: Types.EXCHANGE, destination: 'integrates', source: 'postprocessing', routingKey: '#.integrates.#', }, { type: Types.QUEUE, destination: 'generateRoutingKey', source: 'postprocessing', routingKey: 'generateRoutingKey', }, { type: Types.QUEUE, destination: 'sendSms', source: 'notify', routingKey: '#.sendSms.#', }, { type: Types.QUEUE, destination: 'delivery', source: 'integrates', routingKey: '#.delivery.#', }, { type: Types.QUEUE, destination: 'sendSmsHold', source: 'notify', routingKey: 'sendSmsHold', }, { type: Types.QUEUE, destination: 'deliveryHold', source: 'integrates', routingKey: 'deliveryHold', }, ]; 

完整配置


 import { PipelineConfig } from '../interfaces'; import exchanges from './exchanges'; import queues from './queues'; import bindings from './bindigs'; export const pipelineConfig: PipelineConfig = { exchanges, queues, bindings, }; 

要连接到Rabbit,请编写一个类。


 import { connect, Connection, Channel } from 'amqplib'; export class RabbitConnect { private _uri: string; private _connection: Connection; private _chanel: Channel; constructor() { //    rabbit     this._uri = process.env.RABBIT_URI || 'amqp://localhost'; } protected async connect() { this._connection = await connect(this._uri); this._chanel = await this._connection.createChannel(); } protected async disconnect() { await this._chanel.close(); return this._connection.close(); } protected get chanel() { return this._chanel; } } 

让我们编写Pipeline类,该类在启动时将根据前面所述的配置在Rabbit中创建所有必要的基础结构。


 import { RabbitConnect } from './RabbitConnect'; import { PipelineConfig } from './interfaces'; import { Types } from './constants'; export class Pipeline extends RabbitConnect { private _pipeline: PipelineConfig; constructor(pipelineConfig: PipelineConfig) { super(); this._pipeline = pipelineConfig; } public async create() { try { await this.connect(); //   const createQueues = this._pipeline.queues.map(queue => this.chanel.assertQueue(queue.name, queue.options), ) as PromiseLike<any>[]; //   const createExchanges = this._pipeline.exchanges.map(exchange => this.chanel.assertExchange(exchange.name, exchange.type), ) as PromiseLike<any>[]; await Promise.all([...createQueues, ...createExchanges]); //       const createBindings = this._pipeline.bindings.map(binding => { if (binding.type === Types.QUEUE) { return this.chanel.bindQueue(binding.destination, binding.source, binding.routingKey); } return this.chanel.bindExchange(binding.destination, binding.source, binding.routingKey); }); await Promise.all(createBindings); return this.disconnect(); } catch (error) { console.error(error); throw new Error(error); } } } 

现在,我们将编写一个抽象的工作程序类,该类为所有可以从其继承的工作程序提供通用功能。


 import { RabbitConnect } from './RabbitConnect'; import { Message, Order, FailOrder } from './interfaces'; import { ConsumeMessage } from 'amqplib'; export interface WorkerParams { maxRetry?: number; //     active: string; //    exchange: string; //       holdKey?: string; //      } export abstract class Worker<M extends Order> extends RabbitConnect { private _maxRetry: number; private _active: string; private _holdKey: string | undefined; protected exchange: string; private _currentMessage: Message<M>; private _currentConsumeMessage: ConsumeMessage; constructor({ active, holdKey, exchange, maxRetry }: WorkerParams) { super(); this._maxRetry = maxRetry || 0; this._active = active; this._holdKey = holdKey; this.exchange = exchange; } public async subscribe() { await this.connect(); this.chanel.consume(this._active, this.checkMessage.bind(this)); } //          private async checkMessage(message: ConsumeMessage) { this._currentConsumeMessage = message; const orderMessage: Message<M> = JSON.parse(message.content.toString()); if (orderMessage.retry >= this._maxRetry) { await this.sendToErrorStorage('  '); } this._currentMessage = orderMessage; //           await this.handler(orderMessage.order || orderMessage); } //       protected async sendToErrorStorage(error: string) { const message: FailOrder = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, exchange: this.exchange, routingKey: this._active }; console.log('   ', message); this.ack(); } //       protected async hold(error: string) { if (!this._holdKey) { return; } const orderMessage = { order: this._currentMessage.order, errors: [...this._currentMessage.errors, error], retry: this._currentMessage.retry + 1, }; const orderData = Buffer.from(JSON.stringify(orderMessage)); return this.chanel.publish(this.exchange, this._holdKey, orderData); } //      protected async ack() { return this.chanel.ack(this._currentConsumeMessage); } protected abstract handler(message: M): void; } 

默认情况下,rabbit要求工作人员确认消息处理成功。 为此,连接通道具有ack方法。 如果工作人员无法处理该消息,则有一个nack方法告诉兔子将消息发送给另一个工作人员。


现在,我们可以从图中编写一些简单的工作程序。


生成路由密钥的工作人员。


 import { Worker } from '../Worker'; import { isOrderWithPhone, isOrderWithDeliveryAddress, Order, Message, } from '../interfaces'; import { Keys } from '../constants'; export class GenerateRoutingKey extends Worker<Order> { constructor() { super({ active: 'generateRoutingKey', exchange: 'postprocessing', }); } protected async handler(order: Order) { try { const routingKey: string[] = []; if (isOrderWithPhone(order)) { routingKey.push(Keys.SEND_SMS); } if (isOrderWithDeliveryAddress(order)) { routingKey.push(Keys.SEND_TO_DELIVERY); } const message: Message<Order> = { retry: 0, errors: [], order, }; await this.chanel.publish( this.exchange, routingKey.join('.'), Buffer.from(JSON.stringify(message)), ); await this.ack(); } catch (error) { console.error(error); await this.sendToErrorStorage(error); } } } 

工人发送短信。


 import { Worker } from '../Worker'; import { OrderWithPhone } from '../interfaces'; export class SendSms extends Worker<OrderWithPhone> { constructor() { super({ active: 'sendSms', exchange: 'notify', holdKey: 'sendSmsHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithPhone) { try { console.log(' sms  : ', message.phone); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } } 

工人与送货服务的整合。


 import { Worker } from '../Worker'; import { OrderWithDeliveryAddress } from '../interfaces'; export class Delivery extends Worker<OrderWithDeliveryAddress> { constructor() { super({ active: 'delivery', exchange: 'interates', holdKey: 'deliveryHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithDeliveryAddress) { try { console.log('      : ', message.deliveryAddress); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } } 

应用程序的入口点。


 import { Pipeline } from './Pipeline'; import { pipelineConfig } from './pipeline'; import { GenerateRoutingKey } from './workers/GenerateRoutingKey'; import { SendSms } from './workers/SendSms'; import { Delivery } from './workers/Delivery'; (async () => { try { const pipeline = new Pipeline(pipelineConfig); const generateRoutingKey = new GenerateRoutingKey(); const sendSms = new SendSms(); const delivery = new Delivery(); await pipeline.create(); await Promise.all([generateRoutingKey.subscribe(), sendSms.subscribe(), delivery.subscribe()]); } catch (error) { console.error(error); process.exit(1); } })(); 

我不会提供用于将订单写入数据库并生成Internet订单号的代码类示例。 这超出了本文的范围。 要检查代码,可以通过将订单json发送到交换器posrprocessing来使用Rabbit Web界面。


结论


用于处理在线订单的这种构造方案使得扩展系统变得容易。 对于我们来说,为该方案添加几个队列和工作器以添加必要的功能并不困难。 例如,您可以添加通过电子邮件发送发送通知或发送1C中的记帐订单。 转换后的电路如下所示:



希望您喜欢这篇文章。 我将对任何评论和批评感到高兴。 所有提交的代码都可以在github找到

Source: https://habr.com/ru/post/zh-CN469991/


All Articles