PubSub hampir gratis: PEMBERITAHUAN fitur di PostgreSQL

Jika layanan microser Anda sudah menggunakan database PostgreSQL umum untuk menyimpan data, atau jika beberapa contoh layanan yang sama menggunakannya di server yang berbeda, Anda bisa mendapatkan pesan " PubSub " di antara mereka yang relatif murah tanpa integrasi ke dalam arsitektur Redis, cluster RabbitMQ, atau menanamkan ke dalam kode aplikasi sistem MQ lain .

Untuk melakukan ini, kami tidak akan menulis pesan ke tabel database , karena ini menyebabkan terlalu banyak biaya overhead, pertama menulis yang ditransmisikan , dan kemudian juga membersihkan dari yang sudah dibaca .

Kami akan mengirim dan menerima data menggunakan mekanisme NOTIFY / LISTEN , dan kami akan mengumpulkan implementasi model untuk Node.js.



Tetapi dengan cara ini ada garu yang harus dielakkan dengan hati-hati.

Fitur Protokol


Dengarkan


LISTEN  
Aplikasi yang menggunakan perpustakaan libpq mengeksekusi perintah LISTEN sebagai perintah SQL biasa, dan kemudian secara berkala harus memanggil fungsi PQnotifies untuk memeriksa notifikasi baru.
Jika Anda tidak menulis pustaka untuk bekerja dengan PG, tetapi sudah aplikasi tertentu, dalam kebanyakan kasus, Anda tidak akan memiliki akses ke panggilan ke fungsi ini.

Tetapi jika pustaka tersebut telah ditulis untuk Anda sesuai dengan rekomendasi untuk memproses permintaan dan pemberitahuan asinkron , Anda akan secara otomatis menerima pesan dalam kode aplikasi. Jika tidak, Anda cukup menjalankan SELECT 1 secara berkala pada koneksi, maka pemberitahuan akan muncul bersama dengan hasil permintaan:
Dalam rilis libpq yang sangat lama, hanya ada satu cara untuk memastikan penerimaan pesan tepat waktu dari perintah NOTIFY - untuk terus-menerus mengirim perintah, bahkan yang kosong, dan kemudian memeriksa PQnotify setelah setiap panggilan PQexec. Meskipun metode ini masih berfungsi, itu dianggap usang karena penggunaan prosesor yang tidak efisien.
Dalam hal, misalnya, psql, tampilannya seperti ini:

 _tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991. 

Jika untuk tugas yang diterapkan, kami dapat menyetujui penundaan maksimum dalam mengirimkan pesan dalam waktu 1 detik, dengan interval seperti itu, kami melaksanakan permintaan tersebut. Pada saat yang sama, metode ini membantu untuk memantau "liveness" dari koneksi , memastikan bahwa tidak ada yang secara tidak sengaja pg_terminate_backend dari sisi server melalui pg_terminate_backend , atau tidak ada "crash" tiba-tiba dari PG tanpa pemberitahuan kepada klien.

PEMBERITAHUAN


 NOTIFY  [ ,  ] 

Perintah NOTIFY mengirimkan acara pemberitahuan bersama dengan baris "pesan" tambahan untuk semua aplikasi klien yang sebelumnya telah mengeksekusi saluran dengan nama saluran yang ditentukan dalam database LISTEN saat ini.
...
Baris "pesan" yang akan dikirim bersamaan dengan pemberitahuan ... harus berupa konstanta teks sederhana . Dalam konfigurasi standar, panjangnya harus kurang dari 8000 byte .
Yaitu, jika "pesan" kami tiba-tiba berisi sesuatu yang sangat berbeda dari ASCII, maka kami harus menyaringnya , dan jika itu melebihi ukuran 8000 byte (bukan karakter!), Lalu potong menjadi blok dan tempelkan . Pada saat yang sama, kita harus menghemat bandwidth saluran dan sumber daya server untuk memproses transfer blok seperti itu - yaitu, tambahkan sedikit layanan "mengikat" ke konten yang berguna mungkin, tetapi juga tidak "mencekik" aplikasi klien, memaksanya untuk berkemas dengan gzip -9 .

Dari keuntungan tambahan mekanisme, seseorang juga dapat mencatat pengikatan pada "sumber" pesan ...
... pekerjaan tambahan dapat dihindari dengan memeriksa apakah PID dari proses pensinyalan (ditunjukkan dalam data acara) cocok dengan PID sesi itu sendiri (Anda dapat menemukannya dengan menghubungi libpq). Jika mereka cocok, maka sesi menerima pemberitahuan tentang tindakannya sendiri, sehingga dapat diabaikan.
... dan pesanan pengiriman terjamin:
Selain memfilter pemberitahuan duplikat berikutnya, NOTIFY memastikan bahwa pemberitahuan dari satu transaksi selalu tiba dalam urutan yang sama dengan saat mereka dikirim. Juga dijamin bahwa pesan dari transaksi yang berbeda tiba sesuai urutan transaksi ini dilakukan .
Kami tidak akan secara khusus menggabungkan apa pun, sehingga setiap permintaan kami hanya akan sesuai dengan transaksi terpisah.

Tetapi ingat bahwa jika ada juga aktivitas aplikasi pada koneksi yang digunakan untuk pertukaran, PEMBERITAHUAN kami mungkin tidak berada dalam transaksi atas kehendak bebas kami sendiri, sehingga efek samping dapat terjadi :
Transaksi memiliki dampak signifikan pada PEMBERITAHUAN. Pertama, jika NOTIFY dijalankan di dalam suatu transaksi, pemberitahuan dikirimkan kepada penerima setelah transaksi dilakukan, dan hanya dalam kasus ini. Ini masuk akal, karena dalam hal transaksi sedang terputus, tindakan semua perintah di dalamnya dibatalkan, termasuk PEMBERITAHUAN .
Oleh karena itu, lebih baik menggunakan koneksi di mana jelas tidak ada transaksi atau permintaan panjang.

AccessExclusiveLock pada objek 0 dari kelas 1262 dari database 0


Jika tiba-tiba PEMBERITAHUAN Anda mulai tumpul dan mencatat ekspektasi kunci seperti itu, maka Anda masih "tumbuh dari celana pendek", dan inilah saatnya untuk memikirkan MQ "dewasa".

Bagaimanapun, antrian notifikasi, meskipun cukup besar (8GB pada build standar), masih terbatas. Menurut jawaban Tom Lane :
Kunci ini ditahan saat memasukkan pesan notifikasi transaksi, setelah itu transaksi melakukan dan melepaskan kunci.
Artinya, tidak ada terlalu banyak opsi untuk dikerjakan:

  • kirim tetapi lebih jarang
    Yaitu, untuk menggabungkan indikator yang dikirim, jika ini adalah beberapa penghitung, dalam interval yang lebih lama.
  • kirim lebih sedikit
    Misalnya, untuk menghapus "default" dari sudut pandang nilai-nilai kunci aplikasi dari JSON yang ditransmisikan.
  • hanya mengirim sinyal , tidak ada konten sama sekali
    Sebagai pilihan - untuk memulai beberapa saluran, nama masing-masing sudah akan memiliki beberapa pengertian yang berlaku.
  • masih melakukan pengiriman dari database

Mengirim pesan yang rumit


Pengodean tubuh


Dalam kasus umum, kami mungkin ingin mengirimkan tidak hanya karakter yang diizinkan dalam pesan, tetapi juga huruf Rusia, dan "sembarang binari" - oleh karena itu, akan lebih mudah untuk menggunakan konversi ke representasi hex untuk membentuk string yang ditransmisikan. Dan ya, metode ini bekerja dengan sangat baik:

 NOTIFY test, E'\x20\x21' 

 Asynchronous notification "test" with payload " !" received from server process with PID 63991. 

Tetapi mari kita kembali ke dokumentasi:
Anda harus memastikan bahwa urutan byte yang Anda buat dengan cara ini, terutama dalam notasi oktal dan heksadesimal, membentuk karakter yang valid dalam penyandian server . Ketika server beroperasi dengan pengkodean UTF-8, alih-alih rekaman byte seperti itu, gunakan urutan khusus Unicode atau sintaks Unicode alternatif yang dijelaskan dalam Bagian 4.1.2.3. (Jika tidak, Anda harus menyandikan karakter UTF-8 secara manual dan menulisnya dengan byte, yang sangat merepotkan.)
Oleh karena itu, bahkan dengan simbol umum tanda kutip dari win1251, kami menerima roti dalam kesedihan:

 NOTIFY test, E'\x98' -- ERROR: invalid byte sequence for encoding "UTF8": 0x98 

Karena kami tidak ingin " menyandikan karakter UTF-8 secara manual dan menulisnya dengan byte ", kami akan langsung setuju untuk mengirim isi pesan yang dikemas dalam base64 jika ada karakter di luar rentang \x20-\x7E atau, jika perlu, segmentasi. Di satu sisi, metode pengemasan seperti itu tidak meningkatkan redundansi terlalu banyak (koefisien 4: 3), di sisi lain, metode ini diterapkan pada tingkat pustaka sistem dalam bahasa apa pun dan akan memberikan beban tambahan minimal.

Tetapi bahkan jika kita tidak memiliki karakter "aneh", dan pesannya cocok dalam satu segmen, masih ada satu fitur - lolos dari tanda kutip :
Untuk memasukkan tanda kutip dalam satu baris, tulis dua tanda kutip di sebelahnya , misalnya: 'Joan of Arc'. Perhatikan bahwa ini tidak sama dengan kutipan ganda (").

Identifikasi Segmen


Tugas selanjutnya adalah "memotong" pesan dengan benar ke dalam blok - blok yang diizinkan untuk pengiriman 7999 byte , jika ukurannya tiba-tiba melebihi nilai ini. Dan agar penerima dapat mengumpulkannya tanpa melanggar urutan atau jatuh ke dalam rantai segmen "alien". Untuk ini, masing-masing dari mereka perlu diidentifikasi.

Sebenarnya, kita sudah tahu dua "koordinat" - ini adalah PID dari proses pengiriman dan nama saluran yang datang di setiap notifikasi. Dan urutan kedatangan segmen dijamin oleh protokol itu sendiri.

Tetangga Menulis
Kami tidak akan mempertimbangkan kasus ketika beberapa penulis pada saluran yang sama aktif pada saat yang sama dengan menghubungkan ke database (yaitu, jelas dalam proses aplikasi yang sama). Secara teknis, ini dapat didukung dengan melewati pengenal tambahan di header segmen - tetapi lebih baik untuk "berbagi" objek PubSub tunggal di dalam aplikasi Anda.

Batas Kontainer


Untuk merakit sebuah wadah integral dari beberapa segmen, kita perlu mengetahui saat penyelesaiannya. Ada dua cara khas untuk ini:

  • transfer ukuran target (dalam byte atau segmen) di yang pertama
  • transmisi tanda [bukan] segmen terakhir di masing-masing

Karena kami menulis PubSub setelah semua, sebagian besar pesan kami akan pendek dan tidak menguntungkan untuk memesan banyak byte untuk transfer ukuran. Oleh karena itu, kami akan menggunakan metode kedua, setelah mencadangkan karakter pertama dari data segmen sebagai bendera kelanjutan / ujung wadah.

Transfer Objek


Untuk mengirimkan string teks biasa dan objek JSON sebagai "pesan", kami menambahkan satu lagi tanda simbol untuk transformasi terbalik di sisi penerima.

Karena kami memutuskan untuk menyandikan "non-format" di base64, untuk bendera kami dapat mengambil karakter yang diizinkan yang tidak ada dalam set ini.

Total, kami mendapat opsi berikut untuk segmen yang ditransmisikan:

 -- ""   !simple string -- "  "  @{"a":1} --    base64 #<segment> --    base64 $<segment> 

Seperti yang Anda lihat, itu cukup untuk menganalisis ketika menerima segmen hanya karakter pertama untuk memahami apa yang perlu dilakukan dengan itu lebih lanjut.

Menulis Implementasi PubSub


Aplikasi kita akan berada di Node.js, jadi kita akan menggunakan modul node-postgres untuk bekerja dengan PostgreSQL.

Kami menulis bingkai awal
Untuk memulai, mari buat PubSub sebagai pewaris EventEmitter , untuk dapat menghasilkan acara bagi mereka yang berlangganan saluran tertentu:

 const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) { //     this.connection = connection; //        this.connection.on('notification', p._onmessage.bind(this)); //         this.skipSelf = skipSelf; //  "" setInterval(() => { this.connection.query('SELECT 1'); }, interval); //     ""  this.slices = {}; }; util.inherits(PubSub, EventEmitter); const p = PubSub.prototype; 

Kami bekerja dengan saluran
Karena LISTEN / UNLISTEN tidak bersumpah dengan cara apa pun ketika berlangganan kembali ke saluran atau berhenti berlangganan dari apa yang tidak kami langgani, kami tidak akan mempersulit apa pun.

 //     - "",      //     -        const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`; p.subscribe = function(channel) { this.connection.query(`LISTEN ${quot(channel)}`); return this; }; p.unsubscribe = function(channel) { this.connection.query(`UNLISTEN ${quot(channel)}`); return this; }; 

Mengirim dan menerima pesan
 const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD; //  ""  const reASCII = /^[\x20-\x7E]*$/; //  p.publish = function(channel, payload) { let query = `NOTIFY ${quot(channel)}`; if (payload !== null && payload !== undefined) { //    -    let str = typeof payload == 'string' ? PAYLOAD_FL_STR + payload : PAYLOAD_FL_OBJ + JSON.stringify(payload); if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) { //   base64- const b64 = Buffer.from(str).toString('base64'); for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) { let fin = pos + PAYLOAD_SZ_DATA; let seg = fin >= len ? PAYLOAD_FL_FIN + b64.slice(pos) : PAYLOAD_FL_SEQ + b64.slice(pos, fin); this.connection.query(`${query}, '${seg}'`); } } else { //        ? //     str = str.replace(/'/g, "''"); this.connection.query(`${query}, '${str}'`); } } else { //       this.connection.query(query); } return this; }; //    p._onmessage = function(msg) { const {processId, channel, payload} = msg; //  "" if (processId == this.connection.processID && this.skipSelf) { return; } // ""  const id = `${processId}:${channel}`; let rv; //   let fl = payload.charAt(0); if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) { // base64 const str = payload.slice(PAYLOAD_SZ_HEAD); const slices = this.slices; let b64; if (fl == PAYLOAD_FL_FIN) { //   if (slices[id]) { slices[id].push(str); b64 = slices[id].join(''); delete slices[id]; } else { b64 = str; } } else { //     if (slices[id]) { slices[id].push(str); } else { slices[id] = [str]; } } if (b64) { rv = Buffer.from(b64, 'base64').toString(); fl = rv.charAt(0); } } else { //  / rv = payload; } if (rv !== undefined) { //   '' let res = { processId , channel }; if (rv) { //       let data = rv.slice(1); res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data; } this.emit(channel, res); } }; 

Beberapa tes
 const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, '  '); psB.publish(chA, {a : 1}); psA.publish(chB, '   100  '.repeat(100)); }); 

Semuanya sangat sederhana, sehingga Anda dapat dengan mudah menerapkannya pada PL lain yang digunakan dalam proyek Anda, dengan mengambil contoh sebagai dasar untuk bekerja dengan pemberitahuan asinkron:

Source: https://habr.com/ru/post/id484978/


All Articles