
Antrian pesan digunakan untuk melakukan: operasi yang tertunda, interaksi antara layanan, “pemrosesan batch”, dll. Ada solusi khusus untuk mengatur antrian tersebut, seperti: RabbitMQ, ActiveMQ, ZeroMQ, dll., Tetapi sering terjadi bahwa mereka tidak sangat membutuhkan, dan pemasangan dan dukungannya akan menyebabkan lebih banyak rasa sakit dan penderitaan daripada kebaikan. Misalkan Anda memiliki layanan saat mendaftar di mana email dikirim kepada pengguna untuk konfirmasi, dan jika Anda menggunakan Postgres, maka Anda beruntung - di Postgres, hampir di luar kotak, ada ekstensi PgQ yang akan melakukan semua pekerjaan kotor untuk Anda.
Pada artikel ini saya akan berbicara tentang antrian pesan (tugas) di PostgreSQL menggunakan ekstensi PgQ. Artikel ini akan berguna jika Anda belum pernah menggunakan PgQ atau menggunakan antrian yang ditulis sendiri di atas Postgres.
Mengapa Anda membutuhkan PgQ sama sekali, jika Anda bisa membuat tablet dan menulis tugas di sana? Tampaknya mungkin, tetapi Anda harus memperhitungkan akses paralel ke tugas, kemungkinan kesalahan (apa yang akan terjadi jika proses pemrosesan tugas jatuh?), Serta kinerja (PgQ sangat cepat, dan solusi yang ditulis sendiri biasanya tidak, terutama jika transaksi dilakukan dalam database tidak menutup selama seluruh pelaksanaan tugas), tetapi alasan paling penting mengapa menurut pendapat saya perlu menggunakan PgQ adalah bahwa PgQ sudah ditulis dan bekerja, dan solusi yang ditulis sendiri masih perlu ditulis (UPD: tentang mengapa tidak layak menggunakan antrian yang ditulis sendiri) , Anda dapat membaca, misalnya, di
sini ).
(UPD: karena PgQ berjalan di atas Postgres, semua kesenangan transaksi juga dapat digunakan di dalamnya)
Tapi PgQ memiliki satu minus besar - kurangnya dokumentasi, saya mencoba untuk mengkompensasi kekurangan ini dengan artikel ini.
Perangkat
PgQ terdiri dari bagian-bagian (setidaknya 2): 1 - ekstensi pgq untuk postgres, 2 - daemon pgqd (tentang menginstalnya sedikit kemudian).
Semua interaksi dengan antrian dilakukan menggunakan fungsi di dalam Postgres.
Misalnya, untuk membuat antrian, Anda harus menjalankan
select * from pgq.create_queue({ } text);
Setelah antrian dibuat, Anda dapat menambahkan pesan ke dalamnya.
select * from pgq.insert_event({ } text, { } text, { } text);
Sekarang kita perlu belajar bagaimana menerima pesan yang direkam. Untuk ini, ada entitas seperti "konsumen" (saya akan menulis konsumen), yang menerima bukan pesan (peristiwa), tetapi "batch". Sebuah bach adalah sekelompok pesan berurutan, baches dibuat menggunakan pgqd. Secara berkala (parameter “ticker_ Period” dalam file konfigurasi) pgqd mengambil semua pesan yang terakumulasi dan menulis ke bach baru.
Penting jika pgqd tidak berfungsi, maka bach baru tidak dibuat, yang berarti bahwa konsumen tidak memiliki apa pun untuk dibaca, juga jika pgqd tidak berfungsi untuk waktu yang lama dan kemudian dihidupkan, itu akan membuat satu bach besar dari pesan yang terakumulasi selama waktu ini, oleh karena itu pgqd tidak boleh matikan saja.
Pendaftaran konsumen (
Penting! Seorang konsumen akan menerima acara yang direkam hanya
setelah pendaftarannya, jadi Anda harus terlebih dahulu membuat konsumen, dan baru kemudian menulis acara):
select * from pgq.register_consumer({ } text, { } text);
(mirip dengan pgq.unregister_consumer)Setiap konsumen akan menerima secara
mutlak semua peristiwa yang terjadi setelah pembuatannya (bahkan diproses oleh konsumen lain), yang berarti bahwa kemungkinan besar Anda hanya membutuhkan satu konsumen untuk satu antrian. Selanjutnya saya akan memberi tahu Anda cara membagi beban pada beberapa server.
Untuk mendapatkan bach, pertama-tama Anda harus mengetahui ID-nya:
select * from pgq.next_batch({ } text, { } text);
Fungsi ini dapat mengembalikan NULL jika konsumen telah memproses semua bach. Dalam hal ini, Anda hanya perlu menunggu sampai pgqd membuat bach baru.
Dalam hal ini, sementara bach tidak diproses, fungsi ini akan mengembalikan nilai yang sama.
Anda bisa mendapatkan semua acara dalam batch menggunakan:
select * from pgq.get_batch_events({id } bigint);
(Bach mungkin kosong.)
Jika terjadi kesalahan saat memproses salah satunya, Anda dapat mencoba memproses acara ini nanti:
select * from pgq.event_retry({id } bigint, {id } bigint, { } integer);
Untuk menginformasikan tentang akhir dari bach dan mendapatkan kesempatan untuk memulai yang baru, digunakan
select * from pgq.finish_batch({id } bigint);
Tentu saja, ini tidak semua fungsi dalam ekstensi, saya sarankan membaca
pgq.imtqy.com/extension/pgq/files/external-sql.html dan
github.com/pgq/pgq/tree/master/functions (setiap file berisi definisi dan deskripsi fungsi yang sesuai).
Muat pembagian
Untuk menangani acara secara bersamaan oleh beberapa penangan, ada ekstensi pgq_coop, yang berfungsi seperti pgq dan menambahkan entitas baru yang disebut "sub konsumen", yang akan menerima semua peristiwa dari saat pendaftaran konsumen induk, tentu saja, kecuali untuk yang sudah diproses.
select * from pgq_coop.register_subconsumer({ } text, { } text, { } text);
select * from pgq_coop.next_batch({ } text, { } text, { } text);
select * from pgq_coop.next_batch({ } text, { } text, { } text, { , } interval);
select * from pgq_coop.finish_batch({id } bigint);
Baca tentang semua fitur di
sini .
Instalasi
Ekstensi pgq dan daemon pgqd termasuk dalam repositori PGDG dan diinstal dengan sangat sederhana di sebagian besar distribusi, misalnya, di Debian:
sudo apt install postgresql-XX-pgq3 pgqd
(XX adalah nomor versi).
pgqd adalah program kecil yang dapat ditemukan tentang penggunaan
pgqd --help
, jangan lupa untuk menambahkannya ke autorun (
sudo systemctl enable pgqd.service
, dan konfigurasi defaultnya adalah
/etc/pgqd.ini
).
Untuk mulai menggunakan PgQ, dalam database Anda hanya perlu menghubungkan ekstensi:
create extension if not exists pgq;
Dengan pgq_coop, semuanya sedikit lebih rumit, tidak ada dalam repositori, tetapi tidak sulit untuk mengkompilasinya dari sumber (contoh untuk Debian):
sudo apt install postgresql-server-dev-XX git clone https://github.com/pgq/pgq-coop.git cd pgq-coop sudo make install
dan hubungkan ekstensi menggunakan
create extension if not exists pgq_coop;
Tautan yang bermanfaat
Dokumentasi pgqFungsi pgqFungsi pgq_coopKode sumber pgqdakun github dengan semua proyek terkaitPostingan wiki