Jika layanan microser Anda
sudah menggunakan database PostgreSQL umum untuk menyimpan data, atau jika beberapa contoh layanan yang sama menggunakannya di server yang berbeda, Anda bisa
mendapatkan pesan " PubSub
" di antara mereka yang relatif murah tanpa integrasi ke dalam arsitektur Redis, cluster RabbitMQ, atau menanamkan ke dalam kode aplikasi
sistem MQ lain .
Untuk melakukan ini, kami
tidak akan menulis pesan ke tabel database , karena ini menyebabkan terlalu banyak biaya overhead, pertama
menulis yang ditransmisikan , dan kemudian juga
membersihkan dari yang sudah dibaca .
Kami akan mengirim dan menerima data menggunakan mekanisme
NOTIFY /
LISTEN , dan kami akan mengumpulkan implementasi model untuk Node.js.

Tetapi dengan cara ini ada garu yang harus dielakkan dengan hati-hati.
Fitur Protokol
Dengarkan
LISTEN
Aplikasi yang menggunakan perpustakaan libpq mengeksekusi perintah LISTEN sebagai perintah SQL biasa, dan kemudian secara berkala harus memanggil fungsi PQnotifies untuk memeriksa notifikasi baru.
Jika Anda tidak menulis pustaka untuk bekerja dengan PG, tetapi sudah aplikasi tertentu, dalam kebanyakan kasus, Anda tidak akan memiliki akses ke panggilan ke fungsi ini.
Tetapi jika pustaka tersebut telah ditulis untuk Anda sesuai dengan rekomendasi untuk memproses
permintaan dan
pemberitahuan asinkron , Anda akan secara otomatis menerima pesan dalam kode aplikasi. Jika tidak, Anda cukup
menjalankan SELECT 1
secara berkala pada koneksi, maka pemberitahuan akan muncul bersama dengan hasil permintaan:
Dalam rilis libpq yang sangat lama, hanya ada satu cara untuk memastikan penerimaan pesan tepat waktu dari perintah NOTIFY - untuk terus-menerus mengirim perintah, bahkan yang kosong, dan kemudian memeriksa PQnotify setelah setiap panggilan PQexec. Meskipun metode ini masih berfungsi, itu dianggap usang karena penggunaan prosesor yang tidak efisien.
Dalam hal, misalnya,
psql, tampilannya seperti ini:
_tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991.
Jika untuk tugas yang diterapkan, kami dapat menyetujui penundaan maksimum dalam mengirimkan pesan dalam waktu 1 detik, dengan interval seperti itu, kami melaksanakan permintaan tersebut. Pada saat yang sama, metode ini membantu untuk
memantau "liveness" dari koneksi , memastikan bahwa tidak ada yang secara tidak sengaja
pg_terminate_backend
dari sisi server melalui
pg_terminate_backend
, atau tidak ada "crash" tiba-tiba dari PG tanpa pemberitahuan kepada klien.
PEMBERITAHUAN
NOTIFY [ , ]
Perintah NOTIFY mengirimkan acara pemberitahuan bersama dengan baris "pesan" tambahan untuk semua aplikasi klien yang sebelumnya telah mengeksekusi saluran dengan nama saluran yang ditentukan dalam database LISTEN saat ini.
...
Baris "pesan" yang akan dikirim bersamaan dengan pemberitahuan ... harus berupa konstanta teks sederhana . Dalam konfigurasi standar, panjangnya harus kurang dari 8000 byte .
Yaitu, jika "pesan" kami tiba-tiba berisi sesuatu yang sangat berbeda dari ASCII, maka kami
harus menyaringnya , dan jika itu melebihi ukuran 8000 byte (bukan karakter!), Lalu
potong menjadi blok dan tempelkan . Pada saat yang sama, kita harus menghemat bandwidth saluran dan sumber daya server untuk memproses transfer blok seperti itu - yaitu, tambahkan sedikit layanan "mengikat" ke konten yang berguna mungkin, tetapi juga tidak "mencekik" aplikasi klien, memaksanya untuk berkemas dengan
gzip -9
.
Dari keuntungan tambahan mekanisme, seseorang juga dapat mencatat pengikatan pada "sumber" pesan ...
... pekerjaan tambahan dapat dihindari dengan memeriksa apakah PID dari proses pensinyalan (ditunjukkan dalam data acara) cocok dengan PID sesi itu sendiri (Anda dapat menemukannya dengan menghubungi libpq). Jika mereka cocok, maka sesi menerima pemberitahuan tentang tindakannya sendiri, sehingga dapat diabaikan.
... dan pesanan pengiriman terjamin:
Selain memfilter pemberitahuan duplikat berikutnya, NOTIFY memastikan bahwa pemberitahuan dari satu transaksi selalu tiba dalam urutan yang sama dengan saat mereka dikirim. Juga dijamin bahwa pesan dari transaksi yang berbeda tiba sesuai urutan transaksi ini dilakukan .
Kami tidak akan secara khusus menggabungkan apa pun, sehingga setiap permintaan kami hanya akan sesuai dengan transaksi terpisah.
Tetapi ingat bahwa jika ada juga aktivitas aplikasi pada koneksi yang digunakan untuk pertukaran, PEMBERITAHUAN kami mungkin tidak berada dalam transaksi atas kehendak bebas kami sendiri, sehingga
efek samping dapat terjadi :
Transaksi memiliki dampak signifikan pada PEMBERITAHUAN. Pertama, jika NOTIFY dijalankan di dalam suatu transaksi, pemberitahuan dikirimkan kepada penerima setelah transaksi dilakukan, dan hanya dalam kasus ini. Ini masuk akal, karena dalam hal transaksi sedang terputus, tindakan semua perintah di dalamnya dibatalkan, termasuk PEMBERITAHUAN .
Oleh karena itu, lebih baik menggunakan koneksi di mana jelas tidak ada transaksi atau permintaan panjang.
AccessExclusiveLock pada objek 0 dari kelas 1262 dari database 0
Jika tiba-tiba PEMBERITAHUAN Anda mulai tumpul dan mencatat ekspektasi kunci seperti itu, maka Anda masih "tumbuh dari celana pendek", dan inilah saatnya untuk memikirkan MQ "dewasa".
Bagaimanapun, antrian notifikasi, meskipun cukup besar (8GB pada build standar), masih terbatas. Menurut
jawaban Tom Lane :
Kunci ini ditahan saat memasukkan pesan notifikasi transaksi, setelah itu transaksi melakukan dan melepaskan kunci.
Artinya, tidak ada terlalu banyak opsi untuk dikerjakan:
- kirim tetapi lebih jarang
Yaitu, untuk menggabungkan indikator yang dikirim, jika ini adalah beberapa penghitung, dalam interval yang lebih lama. - kirim lebih sedikit
Misalnya, untuk menghapus "default" dari sudut pandang nilai-nilai kunci aplikasi dari JSON yang ditransmisikan. - hanya mengirim sinyal , tidak ada konten sama sekali
Sebagai pilihan - untuk memulai beberapa saluran, nama masing-masing sudah akan memiliki beberapa pengertian yang berlaku. - masih melakukan pengiriman dari database
Mengirim pesan yang rumit
Pengodean tubuh
Dalam kasus umum, kami mungkin ingin mengirimkan tidak hanya karakter yang diizinkan dalam pesan, tetapi juga huruf Rusia, dan "sembarang binari" - oleh karena itu, akan lebih mudah untuk menggunakan
konversi ke representasi hex untuk membentuk string yang ditransmisikan. Dan ya, metode ini bekerja dengan sangat baik:
NOTIFY test, E'\x20\x21'
Asynchronous notification "test" with payload " !" received from server process with PID 63991.
Tetapi mari kita kembali ke dokumentasi:
Anda harus memastikan bahwa urutan byte yang Anda buat dengan cara ini, terutama dalam notasi oktal dan heksadesimal, membentuk karakter yang valid dalam penyandian server . Ketika server beroperasi dengan pengkodean UTF-8, alih-alih rekaman byte seperti itu, gunakan urutan khusus Unicode atau sintaks Unicode alternatif yang dijelaskan dalam Bagian 4.1.2.3. (Jika tidak, Anda harus menyandikan karakter UTF-8 secara manual dan menulisnya dengan byte, yang sangat merepotkan.)
Oleh karena itu, bahkan dengan simbol umum tanda kutip dari win1251, kami menerima roti dalam kesedihan:
NOTIFY test, E'\x98'
Karena kami tidak ingin "
menyandikan karakter UTF-8 secara manual dan menulisnya dengan byte ", kami akan langsung setuju untuk mengirim isi pesan yang
dikemas dalam base64 jika ada karakter di luar rentang
\x20-\x7E
atau, jika perlu, segmentasi. Di satu sisi, metode pengemasan seperti itu tidak meningkatkan redundansi terlalu banyak (koefisien 4: 3), di sisi lain, metode ini diterapkan pada tingkat pustaka sistem dalam bahasa apa pun dan akan memberikan beban tambahan minimal.
Tetapi bahkan jika kita tidak memiliki karakter "aneh", dan pesannya cocok dalam satu segmen, masih ada satu fitur -
lolos dari tanda kutip :
Untuk memasukkan tanda kutip dalam satu baris, tulis dua tanda kutip di sebelahnya , misalnya: 'Joan of Arc'. Perhatikan bahwa ini tidak sama dengan kutipan ganda (").
Identifikasi Segmen
Tugas selanjutnya adalah "memotong" pesan dengan benar ke dalam
blok -
blok yang diizinkan untuk pengiriman
7999 byte , jika ukurannya tiba-tiba melebihi nilai ini. Dan agar penerima dapat mengumpulkannya tanpa melanggar urutan atau jatuh ke dalam rantai segmen "alien". Untuk ini, masing-masing dari mereka perlu diidentifikasi.
Sebenarnya, kita sudah tahu dua "koordinat" - ini adalah
PID dari proses pengiriman dan
nama saluran yang datang di setiap notifikasi. Dan urutan kedatangan segmen dijamin oleh protokol itu sendiri.
Tetangga MenulisKami tidak akan mempertimbangkan kasus ketika beberapa penulis pada saluran yang sama aktif pada saat yang sama dengan menghubungkan ke database (yaitu, jelas dalam proses aplikasi yang sama). Secara teknis, ini dapat didukung dengan melewati pengenal tambahan di header segmen - tetapi lebih baik untuk "berbagi" objek PubSub tunggal di dalam aplikasi Anda.
Batas Kontainer
Untuk merakit sebuah wadah integral dari beberapa segmen, kita perlu mengetahui saat penyelesaiannya. Ada dua cara khas untuk ini:
- transfer ukuran target (dalam byte atau segmen) di yang pertama
- transmisi tanda [bukan] segmen terakhir di masing-masing
Karena kami menulis PubSub setelah semua, sebagian besar pesan kami akan pendek dan tidak menguntungkan untuk memesan banyak byte untuk transfer ukuran. Oleh karena itu, kami akan menggunakan metode kedua, setelah mencadangkan
karakter pertama dari data segmen sebagai bendera kelanjutan / ujung wadah.
Transfer Objek
Untuk mengirimkan string teks biasa dan objek JSON sebagai "pesan", kami menambahkan
satu lagi tanda simbol untuk transformasi terbalik di sisi penerima.
Karena kami memutuskan untuk menyandikan "non-format" di base64, untuk bendera kami dapat mengambil karakter yang diizinkan yang tidak ada dalam set ini.
Total, kami mendapat opsi berikut untuk segmen yang ditransmisikan:
-- "" !simple string -- " " @{"a":1} -- base64 #<segment> -- base64 $<segment>
Seperti yang Anda lihat, itu cukup untuk menganalisis ketika menerima segmen hanya karakter pertama untuk memahami apa yang perlu dilakukan dengan itu lebih lanjut.
Menulis Implementasi PubSub
Aplikasi kita akan berada di Node.js, jadi kita akan menggunakan
modul node-postgres untuk bekerja dengan PostgreSQL.
Kami menulis bingkai awalUntuk memulai, mari buat PubSub sebagai pewaris
EventEmitter , untuk dapat menghasilkan acara bagi mereka yang berlangganan saluran tertentu:
const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) {
Kami bekerja dengan saluranKarena LISTEN / UNLISTEN tidak bersumpah dengan cara apa pun ketika berlangganan kembali ke saluran atau berhenti berlangganan dari apa yang tidak kami langgani, kami tidak akan mempersulit apa pun.
Mengirim dan menerima pesan const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD;
Beberapa tes const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, ' '); psB.publish(chA, {a : 1}); psA.publish(chB, ' 100 '.repeat(100)); });
Semuanya sangat sederhana, sehingga Anda dapat dengan mudah menerapkannya pada PL lain yang digunakan dalam proyek Anda, dengan mengambil contoh sebagai dasar untuk bekerja dengan pemberitahuan asinkron: