
Halo semuanya! Popularitas perdagangan Internet terus meningkat, seperti halnya pembagian informatisasi semua jenis kegiatan yang terkait dengan perdagangan. Seiring dengan ini, kompleksitas pemrosesan informasi semakin meningkat. Setiap pesanan yang dibuat oleh pelanggan toko online menghasilkan sejumlah besar integrasi dengan berbagai layanan. Layanan tersebut dapat mencakup pemrosesan pembayaran, pengiriman, layanan akuntansi dan loyalitas. Setiap pesanan harus dibayar, dicatat, dikumpulkan dan dikirim, dan juga tersedia untuk analisis lebih lanjut. Ini, dan bukan situasi yang sederhana, diperumit oleh fakta bahwa pengguna toko online tidak ingin menunggu lama dan menyakitkan untuk sesuatu ketika melakukan pemesanan. Respons dari toko online harus cepat, karena setiap milidetik keterlambatan meningkatkan peluang kehilangan pelanggan, dan selanjutnya untung. Pada artikel ini saya ingin berbicara tentang broker pesan RabbitMQ dan bagaimana menggunakannya untuk mengatur pemrosesan pesanan menggunakan Node.js dan TypeScript. Selamat datang di kucing.
Teori yang diperlukan
Saya pikir banyak yang telah mendengar tentang RabbitMQ, karena versi open source pertama dari broker pesan ini, berdasarkan protokol AMQP, sudah dirilis pada tahun 2007. Perantara pesan diperlukan untuk menghubungkan komponen-komponen sistem yang berbeda menjadi satu kesatuan, karena lem diperlukan untuk menyadarkan kembali vas yang rusak. Dengan menggunakan pialang pesan, Anda dapat menerapkan pemrosesan peristiwa yang tidak sinkron yang diterima dalam sistem. Hanya pemrosesan pesanan yang tidak sinkron yang dibutuhkan toko online. Tetapi pertama-tama Anda harus memahami komponen dasar RabbitMQ. Pialang ini memiliki tiga komponen utama yang dengannya kami akan membangun proses pemrosesan:
- Pesan Ini adalah unit informasi terkecil di dalam broker pesan dan layanan pemrosesan kami yang dapat diproses. RabbitMQ sendiri menyimpan pesan dalam bentuk biner, tetapi untuk sistem kami dan untuk artikel ini tidak penting. Kami akan menerima dan memproses pesan dalam bentuk JSON. Perlu juga disebutkan bahwa pesan di RabbitMQ memiliki header. Mereka mirip dengan header permintaan http. Ini adalah array asosiatif di mana Anda dapat menulis informasi yang diperlukan.
- Antrian pesan . Ini adalah antrian tempat RabbitMQ menyimpan pesan. Antrian pesan dapat berlangganan oleh satu atau lebih konsumen. Setiap pesan dalam antrian kelinci didistribusikan ke konsumen menggunakan algoritma round-robin.
- Tukar Ini, seperti namanya, adalah titik pertukaran. Antrean atau penukar lainnya dapat dilampirkan pada poin ini. Titik pertukaran tidak menyimpan pesan, fungsi utamanya adalah untuk merutekan pesan ke satu atau beberapa antrian, atau titik pertukaran yang sama. Setiap antrian atau penukar terikat oleh kunci perutean. Ada beberapa jenis penukar di RabbitMQ yang memengaruhi bagaimana tepatnya pertukaran akan merutekan pesan yang diterima di dalamnya.
Untuk menggambarkan bagaimana berbagai jenis penukar bekerja, perlu dipahami apa itu kunci perutean. Kunci perutean ada dalam pengikatan antrian ke penukar, dan dalam pesan itu sendiri. Kunci perutean hanya string yang dibagi menjadi beberapa blok. Setiap blok dipisahkan oleh sebuah titik. Misalnya, โberi tahu.sendEmail.sendSmsโ. Pada saat yang sama, pola dapat diatur untuk tombol perutean pesan menggunakan karakter khusus # dan *. * - mengatakan bahwa setelah titik mana saja satu blok dapat pergi, tetapi setelah # sejumlah blok dapat pergi. Misalnya, "notify.sendSms. *" Atau "notify. #". Sekarang Anda dapat beralih ke jenis titik pertukaran.
Ada empat jenis penukar:
- Tanpa kipas Logika perutean dari pertukaran ini sederhana, ia mengarahkan pesan masuk ke semua antrian atau penukar yang terlampir padanya.

- Langsung Pertukaran ini mengalihkan pesan tergantung pada apakah kunci perutean pesan cocok dengan kunci perutean yang mengikat.

- Topik Pertukaran jenis ini serta Direct mengarahkan pesan tergantung pada tombol routing. Tetapi suatu pola dapat bertindak sebagai kunci perutean.

- Tajuk . Pertukaran ini, tidak seperti yang lain, menggunakan header pesan untuk perutean. Pada saat yang sama, antrian ke penukar juga terikat menggunakan array asosiatif. Logika yang digunakan penukar untuk merutekan pesan dapat diubah menggunakan kunci "x-match" khusus, yang diatur dalam larik pengikatan asosiatif. Kunci dapat diatur ke dua nilai semua atau apa pun. Jika nilainya semua, maka tajuk pesan harus benar-benar cocok dengan larik pengikatan asosiatif, jika nilainya ada, maka nilainya harus cocok dengan setidaknya satu kunci.

Ini adalah komponen inti dari RabbitMQ. Anda dapat membaca lebih lanjut tentang komponen ini dalam spesifikasi protokol AMQP . Selanjutnya, kita akan merancang dan mengimplementasikan sistem pemrosesan pesanan menggunakan TypeScript sebagai contoh, sekaligus memahami pengaturan masing-masing komponen.
Desain
Untuk menyederhanakan contoh, kami mengasumsikan bahwa agar pemrosesan pesanan online berhasil, kami harus memiliki fungsionalitas berikut:
- Simpan pesanan yang masuk
- Kirim SMS ke klien dengan nomor pesanan, serta status pesanan
- Kirim pesan ke layanan pengiriman kurir tentang pesanan baru dari toko online kami, jika klien telah memilih metode pengiriman ini
Tetapi itu tidak cukup untuk mengimplementasikan fungsi ini, karena toko online kami berencana untuk memperluas fungsionalitas dan memberikan lebih banyak peluang berbeda kepada pelanggannya di masa depan (dan ini selalu terjadi). Misalnya, beri tahu pelanggan melalui email atau berikan beberapa metode pengiriman untuk pesanan. Oleh karena itu kita perlu merancang sistem sedemikian rupa sehingga menambahkan fungsionalitas itu sederhana.
Perlu juga disebutkan bahwa saya akan menggunakan templat untuk pesan yang ditangguhkan sehingga dimungkinkan, jika layanan eksternal tidak tersedia, untuk mengulangi logika beberapa kali. Anda dapat membaca tentang templat ini di sini.
Untuk lebih jelas menggambarkan tujuan akhir, saya akan menggambar diagram.

Mari kita lihat bagaimana proses pemrosesan order bekerja pada diagram ini. Skema ini dibagi menjadi beberapa blok dan warna berbeda. Blok putih menunjukkan layanan eksternal yang tidak akan kami pertimbangkan. Blok abu-abu menunjukkan elemen RabbitMQ. Antrian dan penukar. Warna hijau mencerminkan blok logika bisnis yang perlu diimplementasikan. Juga, setiap blok yang terkait dengan logika kita diberi nomor. Angka-angka menunjukkan proses dan subproses secara berurutan.
Pertama-tama, pesan HTTP API masuk ke layanan kami. Setelah itu, kita harus menetapkan nomor ke pesanan, menyimpan pesanan di database dengan status "baru" dan mengirim respons tentang keberhasilan pembuatan pesanan, dengan nomornya, kembali. Klien, setelah menerima pesan tentang keberhasilan pembuatan pesanan, menjalankan bisnisnya sendiri. Dengan mengirimkan respons positif, kami mengirim objek pesanan ke pertukaran pasca-pemrosesan, yang darinya jatuh ke pekerja formasi kunci perutean. Pekerja ini, setelah menerima objek pesanan dari antrian, berdasarkan itu (apakah ada email atau telepon pelanggan dalam urutan, metode pengiriman yang dipilih) harus membentuk kunci routing order. Setelah membentuk kunci perutean, pekerja mengirim pesan kembali ke pertukaran pasca pemrosesan, tetapi sekarang kunci perutean pesanan telah berubah dan penukar dapat mengirimkannya pada rute yang diinginkan. Bergantung pada kuncinya, pesanan dapat dikirim untuk dipertukarkan, yang bertanggung jawab atas pemberitahuan, pertukaran integrasi, atau keduanya sekaligus. Dan selanjutnya pada logika yang sama dalam antrian dan pekerja.
Pekerja pengirim SMS dan layanan pengiriman akan mencoba memproses pesan beberapa kali. Jumlah upaya tersebut dapat dilewatkan dalam variabel lingkungan. Tetapi Anda tidak boleh memproses pesan tanpa henti, karena kesalahan mungkin terletak pada pesan itu sendiri atau logika pekerja. Oleh karena itu, setelah melebihi jumlah upaya yang diizinkan, pesan akan dihapus dari antrian dan dikirim ke toko kesalahan, yang darinya dapat dikirim kembali ke tingkat pemrosesan yang diinginkan.
Implementasi
Untuk memverifikasi implementasinya, Anda perlu kelinci itu sendiri. Saya sarankan menggunakan buruh pelabuhan dan gambar broker resmi untuk tujuan ini. Instal dan jalankan wadah dengan perintah berikut.
docker run -d --name rabbit -p 5672:5672 -e rabbitmq:3.7.15-management-alpine
Ini adalah gambar dengan antarmuka web yang tersedia di port 15672 untuk memudahkan debugging.
Kami akan mengimplementasikan rencana kami dengan TypeScript dan perpustakaan amqplib (implementasi klien RabbitMQ untuk Node.js), jadi untuk permulaan Anda perlu menjelaskan beberapa antarmuka. Kami menggambarkan antarmuka dari pesanan dan pesan yang akan kami kirim ke kelinci.
Sekarang kita perlu menggambarkan antarmuka konfigurasi antrian dan penukar, atas dasar yang mana kita akan membangun struktur pemrosesan pada kelinci.
import { Types, ExchangeTypes } from '../constants'; import { Options } from 'amqplib';
Setelah mendeskripsikan komponen utama sistem, kami menjelaskan konfigurasi yang dibuat pada diagram menggunakan objek.
Antrian
export default [
Saat menjelaskan antrian, opsi berikut digunakan untuk antrian
- tahan lama . Secara default, semua pesan antrian disimpan dalam memori. Karena itu, ketika broker reboot, pesan akan hilang. Untuk menghindari ini, Anda dapat menggunakan opsi ini. Dengan pengaturan ini, kelinci akan menyiram pesan ke disk. Tapi ada satu peringatan. Agar pesan disimpan setelah broker memulai kembali, pengaturan ini tidak cukup, pesan harus dikirim ke antrian dengan opsi persisten.
- messageTtl . Pesan seumur hidup. Diberikan dalam milidetik
- deadLetterExchange . Nama penukar di mana pesan akan dikirim dari antrian saat pesan itu kedaluwarsa
- deadLetterRoutingKey . RoutingKey dengan mana pesan akan dikirim ke exchanger dari opsi sebelumnya
Pertukaran
import { ExchangeTypes } from '../constants'; export default [ { name: 'postprocessing', type: ExchangeTypes.TOPIC, }, { name: 'notify', type: ExchangeTypes.TOPIC, }, { name: 'integrates', type: ExchangeTypes.TOPIC, }, ];
Binding
import { Types } from '../constants'; export default [ { type: Types.EXCHANGE, destination: 'notify', source: 'postprocessing', routingKey: '#.notify.#', }, { type: Types.EXCHANGE, destination: 'integrates', source: 'postprocessing', routingKey: '#.integrates.#', }, { type: Types.QUEUE, destination: 'generateRoutingKey', source: 'postprocessing', routingKey: 'generateRoutingKey', }, { type: Types.QUEUE, destination: 'sendSms', source: 'notify', routingKey: '#.sendSms.#', }, { type: Types.QUEUE, destination: 'delivery', source: 'integrates', routingKey: '#.delivery.#', }, { type: Types.QUEUE, destination: 'sendSmsHold', source: 'notify', routingKey: 'sendSmsHold', }, { type: Types.QUEUE, destination: 'deliveryHold', source: 'integrates', routingKey: 'deliveryHold', }, ];
Konfigurasi penuh
import { PipelineConfig } from '../interfaces'; import exchanges from './exchanges'; import queues from './queues'; import bindings from './bindigs'; export const pipelineConfig: PipelineConfig = { exchanges, queues, bindings, };
Untuk terhubung ke kelinci, tulis kelas.
import { connect, Connection, Channel } from 'amqplib'; export class RabbitConnect { private _uri: string; private _connection: Connection; private _chanel: Channel; constructor() {
Mari kita tulis kelas Pipeline, yang saat startup akan membuat semua infrastruktur yang diperlukan pada kelinci sesuai dengan konfigurasi yang dijelaskan sebelumnya.
import { RabbitConnect } from './RabbitConnect'; import { PipelineConfig } from './interfaces'; import { Types } from './constants'; export class Pipeline extends RabbitConnect { private _pipeline: PipelineConfig; constructor(pipelineConfig: PipelineConfig) { super(); this._pipeline = pipelineConfig; } public async create() { try { await this.connect();
Sekarang kita akan menulis kelas pekerja abstrak dengan fungsi umum untuk semua pekerja yang darinya dimungkinkan untuk diwariskan.
import { RabbitConnect } from './RabbitConnect'; import { Message, Order, FailOrder } from './interfaces'; import { ConsumeMessage } from 'amqplib'; export interface WorkerParams { maxRetry?: number;
Secara default, kelinci membutuhkan konfirmasi untuk pemrosesan pesan yang berhasil dari pekerja. Untuk ini, saluran koneksi memiliki metode ack. Jika pekerja tidak dapat memproses pesan, maka ada metode nack yang memberitahu kelinci untuk mengirim pesan ke pekerja lain.
Sekarang kita dapat menulis beberapa pekerja sederhana dari diagram.
Pekerja menghasilkan kunci perutean.
import { Worker } from '../Worker'; import { isOrderWithPhone, isOrderWithDeliveryAddress, Order, Message, } from '../interfaces'; import { Keys } from '../constants'; export class GenerateRoutingKey extends Worker<Order> { constructor() { super({ active: 'generateRoutingKey', exchange: 'postprocessing', }); } protected async handler(order: Order) { try { const routingKey: string[] = []; if (isOrderWithPhone(order)) { routingKey.push(Keys.SEND_SMS); } if (isOrderWithDeliveryAddress(order)) { routingKey.push(Keys.SEND_TO_DELIVERY); } const message: Message<Order> = { retry: 0, errors: [], order, }; await this.chanel.publish( this.exchange, routingKey.join('.'), Buffer.from(JSON.stringify(message)), ); await this.ack(); } catch (error) { console.error(error); await this.sendToErrorStorage(error); } } }
Pekerja mengirim sms.
import { Worker } from '../Worker'; import { OrderWithPhone } from '../interfaces'; export class SendSms extends Worker<OrderWithPhone> { constructor() { super({ active: 'sendSms', exchange: 'notify', holdKey: 'sendSmsHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithPhone) { try { console.log(' sms : ', message.phone); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } }
Integrasi pekerja dengan layanan pengiriman.
import { Worker } from '../Worker'; import { OrderWithDeliveryAddress } from '../interfaces'; export class Delivery extends Worker<OrderWithDeliveryAddress> { constructor() { super({ active: 'delivery', exchange: 'interates', holdKey: 'deliveryHold', maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5, }); } protected async handler(message: OrderWithDeliveryAddress) { try { console.log(' : ', message.deliveryAddress); this.ack(); } catch (error) { console.error(error); await this.hold(error); } } }
Titik masuk ke aplikasi.
import { Pipeline } from './Pipeline'; import { pipelineConfig } from './pipeline'; import { GenerateRoutingKey } from './workers/GenerateRoutingKey'; import { SendSms } from './workers/SendSms'; import { Delivery } from './workers/Delivery'; (async () => { try { const pipeline = new Pipeline(pipelineConfig); const generateRoutingKey = new GenerateRoutingKey(); const sendSms = new SendSms(); const delivery = new Delivery(); await pipeline.create(); await Promise.all([generateRoutingKey.subscribe(), sendSms.subscribe(), delivery.subscribe()]); } catch (error) { console.error(error); process.exit(1); } })();
Saya tidak akan memberikan contoh kelas kode untuk menulis pesanan ke database dan menghasilkan nomor pesanan Internet. Ini di luar ruang lingkup artikel ini. Untuk memeriksa kode, Anda dapat menggunakan antarmuka web kelinci dengan mengirimkan json pesanan ke posrprocessing penukar.
Kesimpulan
Skema konstruksi seperti itu untuk memproses pesanan online membuatnya mudah untuk mengukur sistem. Tidak akan sulit bagi kami untuk menambahkan beberapa antrian dan pekerja ke skema ini untuk menambahkan fungsionalitas yang diperlukan. Misalnya, Anda dapat menambahkan pengiriman pemberitahuan melalui email atau mengirim pesanan untuk penghitungan dalam 1C. Sirkuit yang dikonversi akan terlihat seperti ini:

Saya harap Anda menikmati artikel ini. Saya akan senang dengan komentar dan kritik. Semua kode yang dikirimkan dapat ditemukan di github