Replikasi logis dari PostgreSQL ke Erlang

Skema yang cukup khas dalam pengembangan sistem, ketika logika pemrosesan utama terkonsentrasi dalam aplikasi (dalam kasus kami, Erlang), dan data untuk aplikasi ini (pengaturan, profil pengguna, dll.) Dalam database (PostgreSQL). Aplikasi Erlang melakukan cache pengaturan di ETS untuk mempercepat pemrosesan dan mengurangi beban pada database dengan menolak permintaan persisten. Pada saat yang sama, perubahan data ini terjadi melalui layanan yang terpisah (mungkin eksternal).


Dalam situasi seperti itu, tantangan muncul untuk menjaga agar data cache tidak diperbarui. Ada berbagai pendekatan untuk menyelesaikan masalah ini. Salah satunya adalah replikasi logis PostgreSQL. Tentang itu dan akan dibahas di bawah.


Protokol Replikasi Logika Stream


Replikasi logis menggunakan protokol replikasi streaming PostgreSQL untuk mengambil perubahan data dalam tabel PostgreSQL dengan membaca log WAL, memfilter tabel yang diinginkan, dan mengirimkan perubahan ini ke pelanggan. Mekanisme ini mirip dengan yang digunakan untuk replikasi fisik untuk membuat database siaga.


Replikasi logis memberikan manfaat berikut:


  • menerima perubahan tanpa penundaan dalam waktu nyata;
  • memfilter perubahan berdasarkan tabel dan operasi (INSERT / DELETE / UPDATE);
  • kelengkapan dan integritas data yang diterima oleh pelanggan. Pelanggan menerima perubahan dalam urutan yang sama seperti yang terjadi dalam database;
  • tidak ada kehilangan data jika terjadi penghentian sementara pelanggan. PostgreSQL mengingat di mana replikasi berhenti;

Persiapan Basis Data


Untuk bekerja dengan replikasi logis, Anda memerlukan plugin yang menerjemahkan kode WAL dari server dalam format yang lebih nyaman.
Sebelum PostgreSQL 10, Anda dapat menggunakan plugin / extension pglogical_output .
Dimulai dengan plugin PostgreSQL 10 pgoutput .
Artikel ini akan membahas plugin pgoutput.


Di sisi PostgreSQL, Anda harus menyelesaikan langkah-langkah berikut:


  • Tetapkan parameter untuk mendukung replikasi logis di
    postgresql.conf


    wal_level = 'logical' max_replication_slots = 5 max_wal_senders = 5 

  • Buat peran yang akan digunakan untuk replikasi. Peran harus memiliki SUPERUSER REPLICATION atau SUPERUSER .


     CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test'; 

  • Izinkan akses untuk peran ini di pg_hba.conf dengan database = replication


     host replication epgl_test 127.0.0.1/32 trust 

  • Buat publikasi . Saat membuat publikasi, kami menunjukkan tabel yang kami rencanakan untuk terima di aplikasi Erlang


     CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; ALTER PUBLICATION epgl_test ADD TABLE public.test_table2; --       


Bagian erlang


Belum lama ini, dukungan untuk protokol replikasi streaming telah ditambahkan ke perpustakaan Erlang yang populer untuk bekerja dengan PostgreSQL EPGSQL . Berdasarkan pustaka ini, kami akan membangun logika untuk menerima perubahan di Erlang.
Karena format data langsung dalam pesan XlogData dari protokol tergantung pada plug-in mana yang digunakan untuk slot replikasi, perpustakaan EPGSQL tidak men-decode data, tetapi memanggil metode Callback atau mengirim pesan ke proses secara tidak sinkron.


Koneksi DB


Koneksi replikasi khusus ke database harus dibuat, untuk ini Anda harus melewati bendera replication .
Dalam koneksi basis data replikasi, hanya perintah replikasi yang dapat dijalankan (misalnya, DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT).
Anda tidak dapat menjalankan permintaan reguler melalui koneksi ini.


Buat slot replikasi


Slot replikasi digunakan untuk melacak posisi saat log WAL yang ditransfer.
Saat membuat slot replikasi, sebuah plug-in untuk decoding ditentukan.


PostgreSQL 10 memperkenalkan kemampuan untuk membuat slot replikasi sementara yang secara otomatis dihapus ketika koneksi replikasi ditutup.


Jika aplikasi membaca keadaan awal tabel setiap kali dimulai, maka saya sarankan menggunakan slot replikasi sementara, dalam hal ini Anda tidak perlu khawatir menghapus slot replikasi yang dibuat (DROP_REPLICATION_SLOT). Menghapus slot replikasi lama / tidak terpakai sangat penting karena PostgreSQL tidak menghapus log WAL sampai pelanggan dari semua slot replikasi menerima perubahan. Jika masih ada slot replikasi tidak aktif, maka log WAL akan mulai menumpuk dan cepat atau lambat sistem file akan meluap.


Mendapatkan status awal tabel


Saat membuat slot replikasi (lihat langkah sebelumnya), sebuah snapshot dibuat secara otomatis yang menunjukkan keadaan database pada saat slot tersebut dibuat. Cuplikan ini dapat digunakan untuk memuat kondisi awal tabel, yang berada di awal replikasi.


Snapshot hanya tersedia sampai koneksi replikasi di mana perintah CREATE_REPLICATION_SLOT dieksekusi ditutup.


Untuk memuat data awal, koneksi reguler / non-replikasi ke database harus dibuat, karena SELECT tidak dapat dilakukan pada koneksi replikasi. Dalam koneksi ini, atur snapshot SET TRANSACTION SNAPSHOT SnapshotName dan ekstrak data yang diperlukan.


Mulai replikasi


Kami memulai replikasi untuk slot replikasi yang dibuat. Saat memulai replikasi, kami memberikan parameter tambahan untuk plugin, untuk pgoutput ini adalah nama publikasi yang dibuat.


Semua langkah bersama


 start_replication() -> %%    {ok, ReplConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}, {replication, "database"}]), %%    {ok, _, [{_, _, SnapshotName}|_]} = epgsql:squery(ReplConn, "CREATE_REPLICATION_SLOT epgl_repl_slot TEMPORARY LOGICAL pgoutput"). %%     {ok, NormalConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]), {ok, _, _} = epgsql:squery(NormalConn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"), {ok, _, _} = epgsql:squery(NormalConn, ["SET TRANSACTION SNAPSHOT '", SnapshotName, "'"]), %% select/load data epgsql:equery(NormalConn,... epgsql:close(NormalConn), %%   ReplSlot = "epgl_repl_slot", Callback = ?MODULE, CbInitState = #{}, WALPosition = "0/0", PluginOpts = "proto_version '1', publication_names '\"epgl_test\"'", ok = epgsql:start_replication(ReplConn, ReplSlot, Callback, CbInitState, WALPosition, PluginOpts). handle_x_log_data(StartLSN, EndLSN, Data, CbState) -> io:format("~p~n", [{StartLSN, EndLSN, Data}]), {ok, EndLSN, EndLSN, CbState}. 

Ada dua opsi untuk berinteraksi dengan perpustakaan EPGSQL :


  • Sinkron. Nama modul dilewatkan sebagai Callback. Pustaka untuk data yang diterima akan memanggil fungsi CallbackModule:handle_x_log_data . Fungsi harus mengembalikan LastFlushedLSN, LastAppliedLSN, yang dikirim dalam respons PostgreSQL untuk melacak posisi saat ini dari slot replikasi. Dalam proyek kami, kami hanya menggunakan opsi ini;


  • Tidak sinkron. Callback adalah pid dari proses, yang akan menerima pesan dalam bentuk {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}} . Setelah diproses, proses tersebut harus melaporkan LSN yang diproses melalui panggilan epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN) ;



Alih-alih sebuah kesimpulan


Selain itu, untuk menggunakan pendekatan yang dijelaskan, perlu untuk mengimplementasikan decoding pesan dari format plug-in slot replikasi ke dalam struktur yang lebih akrab dengan Erlang. Atau gunakan pustaka dengan GitHub , yang mengimplementasikan decoding untuk dua plug-in dan menyederhanakan eksekusi perintah replikasi.

Source: https://habr.com/ru/post/id482398/


All Articles