
Kebetulan bahwa dalam proses bekerja di MegaFon, seseorang harus menghadapi tugas yang sama ketika bekerja dengan RabbitMQ. Pertanyaan yang wajar muncul: "Bagaimana menyederhanakan dan mengotomatiskan pelaksanaan tugas-tugas seperti itu?"
Solusi pertama yang muncul dalam pikiran adalah dengan menggunakan antarmuka HTTP, dan, tentu saja, di luar kotak, RabbitMQ memiliki antarmuka web yang baik dan HTTP API. Namun demikian, penggunaan HTTP API tidak selalu nyaman, dan kadang-kadang bahkan tidak mungkin (katakanlah Anda tidak memiliki hak akses yang cukup, tetapi saya benar-benar ingin menerbitkan pesan) pada saat-saat seperti itu, menjadi perlu untuk bekerja menggunakan protokol AMQP
Tidak menemukan solusi siap pakai yang cocok untuk saya di ruang terbuka jaringan, diputuskan untuk menulis aplikasi kecil untuk bekerja dengan RabbitMQ menggunakan protokol AMQP dengan kemampuan untuk mentransfer parameter startup melalui baris perintah dan menyediakan set fitur minimum yang diperlukan, yaitu:
- Posting
- Mengoreksi Pesan
- Membuat dan mengedit elemen rute dasar
Python dipilih sebagai alat paling sederhana (dan menurut saya indah) untuk mengimplementasikan tugas semacam itu. (Orang dapat berdebat di sini, tetapi apa yang akan berubah?)
Terjemahan panduan resmi ( satu , dua ) oleh RabbitMQ disajikan di hub, namun, kadang-kadang contoh sederhana dari praktik berguna. Dalam artikel tersebut, saya akan mencoba mengilustrasikan masalah utama yang muncul ketika bekerja dengan kelinci menggunakan saluran AMQP dari Python menggunakan contoh aplikasi kecil. Aplikasi itu sendiri tersedia di GitHub .
Secara singkat tentang protokol AMQP dan broker pesan RabbitMQ
AMQP adalah salah satu protokol perpesanan yang paling umum di antara komponen-komponen sistem terdistribusi. Fitur pembeda utama dari protokol ini adalah konsep membangun rute pesan, yang mengandung dua elemen struktural utama: antrian dan titik pertukaran . Antrian mengakumulasikan pesan sampai diterima. Titik pertukaran adalah distributor pesan yang mengarahkan mereka ke antrian yang diinginkan atau ke titik pertukaran lain. Aturan distribusi (binding) , dimana titik pertukaran menentukan tempat untuk mengarahkan pesan, didasarkan pada memeriksa kunci perutean pesan untuk kepatuhan dengan mask yang ditentukan. Anda dapat membaca lebih lanjut tentang cara kerja AMQP di sini .
RabbitMQ adalah aplikasi open source yang sepenuhnya mendukung AMQP dan menawarkan sejumlah fitur tambahan. Untuk bekerja dengan RabbitMQ, sejumlah besar perpustakaan telah ditulis dalam berbagai bahasa pemrograman, termasuk Python.
Implementasi python
Anda selalu dapat melempar beberapa skrip untuk penggunaan pribadi dan tidak tahu masalahnya. Ketika harus menyebar mereka di antara rekan kerja, semuanya menjadi lebih rumit. Semua orang perlu menunjukkan dan memberi tahu bagaimana dan apa yang harus diluncurkan, apa dan di mana harus berubah, di mana untuk mendapatkan versi terbaru, dan apa yang telah berubah di dalamnya ... Secara tidak sadar Anda sampai pada kesimpulan bahwa lebih mudah untuk membuat antarmuka yang sederhana sekali, sehingga Anda tidak membuang waktu di masa depan. Untuk kemudahan penggunaan, diputuskan untuk membagi aplikasi menjadi 4 modul:
- Modul yang bertanggung jawab untuk posting
- Modul yang bertanggung jawab untuk mengurangi pesan dari antrian
- Modul yang dirancang untuk membuat perubahan pada konfigurasi broker RabbitMQ
- Modul yang berisi parameter dan metode yang umum untuk modul sebelumnya
Pendekatan ini menyederhanakan set parameter startup. Kami memilih modul yang diperlukan, memilih salah satu mode operasinya dan melewati parameter yang diperlukan (untuk informasi lebih lanjut tentang mode operasi dan parameter dalam bantuan –bantu).
Karena struktur "kelinci" di MegaFon terdiri dari sejumlah besar node, untuk kenyamanan penggunaan, data untuk menghubungkan ke node ditransfer ke modul dengan parameter umum dan metode rmq_common_tools.py
Untuk bekerja di AMQP dengan Python, kita akan menggunakan pustaka Pika .
import pika
Menggunakan perpustakaan ini, bekerja dengan RabbitMQ akan terdiri dari tiga tahap utama:
- Buat koneksi
- Melakukan Operasi yang Diperlukan
- Tutup koneksi
Tahap pertama dan terakhir adalah sama untuk semua modul dan diimplementasikan di rmq_common_tools.py
Untuk membuat koneksi:
rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel()
Perpustakaan Pika memungkinkan Anda untuk menggunakan berbagai opsi desain untuk menghubungkan ke RabbitMQ. Dalam hal ini, opsi paling mudah adalah dengan melewatkan parameter dalam bentuk string URL dalam format berikut:
'amqp://rabbit_user:rabbit_password@host:port/vhost'
Untuk menutup koneksi:
rmq_connection.close()
Posting
Menerbitkan pesan mungkin adalah yang termudah, tetapi pada saat yang sama operasi yang paling populer saat bekerja dengan kelinci.
Alat penerbitan pos dikompilasi di rmq_publish.py
Untuk mengirim pesan, gunakan metode ini
rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text)
dimana:
exchange - nama dari titik pertukaran dimana pesan akan dipublikasikan
routing_key - kunci perutean dengan mana pesan akan dipublikasikan
isi - isi pesan
rmq_publish.py mendukung dua mode input pesan untuk penerbitan:
- Pesan dimasukkan sebagai parameter melalui baris perintah (from_console)
- Pesan dibaca dari file (from_file)
Mode kedua, menurut saya, lebih nyaman ketika bekerja dengan pesan besar atau array pesan. Yang pertama, pada gilirannya, memungkinkan Anda untuk mengirim pesan tanpa file tambahan, yang nyaman saat mengintegrasikan modul ke dalam skenario lain.
Menerima Pesan
Masalah menerima pesan tidak lagi sepele seperti penerbitan. Ketika berbicara tentang membaca pesan, Anda perlu memahami:
- Setelah mengkonfirmasi penerimaan pesan, pesan akan dihapus dari antrian. Jadi, membaca pesan dari baris "pertempuran", kami "memilih" mereka dari konsumen utama. Jika kita tidak ingin kehilangan aliran pesan, tetapi hanya ingin memahami pesan apa yang bergerak di "kelinci", maka opsi yang paling logis adalah membuat antrian "logging" yang terpisah, atau seperti juga disebut, "trap queue".
- Membaca pesan, sebagai suatu peraturan, memerlukan pemrosesan atau analisis lebih lanjut, yang berarti mereka perlu disimpan di suatu tempat jika pemrosesan waktu nyata tidak mungkin atau tidak diperlukan.
Pembaca pesan diimplementasikan dalam file rmq_consume.py
Dua mode operasi disediakan:
- Baca pesan dari antrian yang ada
- Membuat antrian waktu dan rute untuk membaca pesan dari antrian ini
Pertanyaan membuat antrian dan rute akan dipertimbangkan di bawah ini.
Proofreading langsung diimplementasikan sebagai berikut:
channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc())
dimana
on_message - prosedur penanganan pesan
params.queue - nama antrian dari mana pengurangan akan dilakukan
Penangan pesan harus melakukan beberapa operasi pada pesan yang sudah dibaca dan mengkonfirmasi (atau tidak mengkonfirmasi, jika perlu) pengiriman pesan.
def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log(' .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag)
dimana
all_cnt - penghitung global
lim - jumlah pesan yang akan dibaca
Dalam implementasi handler seperti itu, sejumlah pesan dikurangkan dan informasi tentang perkembangan pengurangan tersebut dikeluarkan ke konsol jika perekaman terjadi dalam file.
Dimungkinkan juga untuk menulis pesan yang sudah dibaca ke dalam basis data. Dalam implementasi saat ini, peluang seperti itu tidak disajikan, tetapi tidak sulit untuk ditambahkan.
Rekam dalam DBKami akan mempertimbangkan contoh penulisan pesan ke database untuk database Oracle dan pustaka cx_oracle .
Hubungkan ke database
ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor()
Di add handler on_message
global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1
dimana
cnt adalah penghitung lain
commit_int - jumlah penyisipan ke dalam basis data, setelah itu perlu untuk melakukan "komit". Kehadiran parameter seperti itu disebabkan oleh keinginan untuk mengurangi beban pada basis data. Namun, menginstalnya tidak terlalu besar, karena jika terjadi kegagalan, ada kemungkinan kehilangan pesan yang dibaca setelah komit berhasil terakhir.
Dan, seperti yang diharapkan, pada akhir pekerjaan kami membuat komitmen akhir dan menutup koneksi
ora_cursor.execute('commit') connection_ora.close()
Sesuatu seperti ini sedang membaca pesan. Jika Anda menghapus batasan pada jumlah pesan yang dibaca, Anda dapat membuat proses latar belakang untuk membaca pesan yang terus-menerus dari "kelinci".
Konfigurasi
Terlepas dari kenyataan bahwa protokol AMQP terutama ditujukan untuk menerbitkan dan membaca pesan, protokol ini juga memungkinkan Anda untuk melakukan manipulasi sederhana dengan konfigurasi rute (kami tidak berbicara tentang mengonfigurasi koneksi jaringan dan pengaturan RabbitMQ lainnya sebagai aplikasi).
Operasi konfigurasi utama adalah:
- Membuat antrian atau titik pertukaran
- Membuat Aturan Penerusan (mengikat)
- Menghapus antrian atau titik pertukaran
- Menghapus Aturan Penerusan (mengikat)
- Kliring antrian
Karena untuk masing-masing dari mereka ada prosedur yang sudah jadi di perpustakaan pika, untuk kemudahan peluncuran, mereka hanya dikompilasi dalam file rmq_setup.py . Selanjutnya, kami membuat daftar prosedur dari pika library dengan beberapa komentar tentang parameter.
Membuat antrian
rmq_channel.queue_declare(queue=params.queue, durable = params.durable)
semuanya sederhana di sini
antrian - nama antrian yang akan dibuat
tahan lama - parameter logis, nilai True akan berarti bahwa ketika kelinci reboot, antrian akan terus ada. Jika Salah, antrian akan dihapus setelah reboot. Opsi kedua biasanya digunakan untuk antrian sementara yang dijamin tidak diperlukan di masa depan.
Membuat titik pertukaran (exchange)
rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable)
di sini muncul parameter baru exchange_type - jenis titik pertukaran. Tentang jenis titik pertukaran apa yang dibaca di sini .
exchange - nama dari titik pertukaran yang dibuat
Menghapus antrian atau titik pertukaran
rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch)
Membuat Aturan Penerusan (mengikat)
rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
exchange - nama titik pertukaran dari mana transfer akan dilakukan
queue - nama antrian yang akan diteruskan
routing_key - mask dari kunci routing, yang akan digunakan untuk penerusan.
Entri berikut ini valid:
- rk.my_key. * - dalam topeng ini, tanda bintang berarti set karakter yang tidak kosong. Dengan kata lain, topeng seperti itu akan melewatkan kunci apa pun dari jenis rk.my_key. + sesuatu yang lain, tetapi tidak akan melewatkan kunci rk.my_key
- rk.my_key. # - topeng ini akan melewati segalanya seperti + kunci rk.my_key sebelumnya
Menghapus Aturan Penerusan (mengikat)
rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
semuanya mirip dengan membuat aturan penerusan.
Kliring antrian
rmq_channel.queue_purge(queue=params.queue)
queue - nama antrian yang akan dihapus
Tentang menggunakan antarmuka baris perintah di aplikasi PythonOpsi startup membuat hidup jauh lebih mudah. Agar tidak mengedit kode sebelum setiap peluncuran, adalah logis untuk menyediakan mekanisme untuk melewatkan parameter saat startup. Perpustakaan argparse dipilih untuk tujuan ini. Saya tidak akan merinci seluk-beluk penggunaannya, ada cukup panduan tentang hal ini ( satu , dua , tiga ). Saya hanya mencatat bahwa alat ini sangat membantu saya menyederhanakan proses menggunakan aplikasi (jika Anda bisa menyebutnya begitu). Bahkan setelah melemparkan urutan perintah sederhana dan membungkusnya dalam antarmuka yang sama, Anda bisa mendapatkan alat yang lengkap dan mudah digunakan.
Aplikasi dalam kehidupan sehari-hari. Apa yang paling berguna.
Nah, sekarang sedikit kesan tentang penggunaan AMQP dalam kehidupan sehari-hari.
Fitur yang paling banyak diminta adalah publikasi pesan. Hak akses pengguna tertentu tidak selalu memungkinkan penggunaan antarmuka web, meskipun terkadang hanya perlu untuk menguji layanan tertentu. Di sini AMQP dan otorisasi atas nama layanan menggunakan saluran ini untuk bantuan.
Yang paling populer kedua adalah kemampuan membaca pesan dari antrian waktu. Fitur ini berguna untuk mengonfigurasi rute baru dan aliran pesan, serta mencegah kecelakaan.
Kemungkinan lain juga ditemukan aplikasi dalam berbagai tugas.