Logische Replikation von PostgreSQL nach Erlang

Ein ziemlich typisches Schema bei der Entwicklung des Systems, bei dem die Hauptverarbeitungslogik in der Anwendung (in unserem Fall Erlang) und die Daten für diese Anwendung (Einstellungen, Benutzerprofile usw.) in der Datenbank (PostgreSQL) konzentriert sind. Die Erlang-Anwendung speichert Einstellungen in der ETS zwischen, um die Verarbeitung zu beschleunigen und die Datenbankbelastung zu verringern, indem permanente Anforderungen abgelehnt werden. Gleichzeitig erfolgt die Änderung dieser Daten über einen separaten (ggf. externen) Dienst.


In solchen Situationen besteht die Herausforderung darin, die zwischengespeicherten Daten auf dem neuesten Stand zu halten. Es gibt verschiedene Ansätze, um dieses Problem zu lösen. Eine davon ist die logische PostgreSQL-Replikation. Darüber und wird weiter unten diskutiert.


Logisches Replikationsprotokoll streamen


Die logische Replikation verwendet das PostgreSQL-Streaming-Replikationsprotokoll, um Datenänderungen in PostgreSQL-Tabellen zu empfangen, indem WAL-Protokolle gelesen, die benötigten Tabellen gefiltert und diese Änderungen an den Abonnenten gesendet werden. Dieser Mechanismus ähnelt dem für die physische Replikation zum Erstellen einer Standby-Datenbank verwendeten.


Die logische Replikation bietet die folgenden Vorteile:


  • Änderungen unverzüglich in Echtzeit empfangen;
  • Filtern von Änderungen nach Tabellen und Operationen (INSERT / DELETE / UPDATE);
  • Vollständigkeit und Integrität der vom Abonnenten empfangenen Daten. Der Abonnent erhält die Änderungen in der Reihenfolge, in der sie in der Datenbank vorgenommen wurden.
  • Kein Datenverlust bei vorübergehender Unterbrechung des Teilnehmers. PostgreSQL merkt sich, wo die Replikation gestoppt wurde.

Datenbankvorbereitung


Um mit der logischen Replikation arbeiten zu können, benötigen Sie ein Plugin, das WAL-Einträge vom Server in einem bequemeren Format decodiert.
Vor PostgreSQL 10 können Sie das Plugin / extension pglogical_output verwenden .
Beginnend mit dem PostgreSQL 10- Plugin .
Dieser Artikel behandelt das Plugin pgoutput.


Auf der PostgreSQL-Seite müssen Sie die folgenden Schritte ausführen:


  • Legen Sie die Parameter für die Unterstützung der logischen Replikation in fest
    postgresql.conf


    wal_level = 'logical' max_replication_slots = 5 max_wal_senders = 5 

  • Erstellen Sie eine Rolle, die für die Replikation verwendet werden soll. Die Rolle muss das SUPERUSER REPLICATION oder SUPERUSER .


     CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test'; 

  • Ermöglichen Sie den Zugriff für diese Rolle in pg_hba.conf mit database = replication


     host replication epgl_test 127.0.0.1/32 trust 

  • Erstellen Sie eine Publikation . Beim Erstellen einer Publikation geben wir die Tabellen an, die wir in der Erlang-Anwendung erhalten möchten


     CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; ALTER PUBLICATION epgl_test ADD TABLE public.test_table2; --       


Erlang Teil


Vor nicht allzu langer Zeit wurde die Unterstützung für das Streaming-Replikationsprotokoll zur beliebten Erlang-Bibliothek für die Arbeit mit PostgreSQL EPGSQL hinzugefügt . Basierend auf dieser Bibliothek erstellen wir die Logik für den Empfang von Änderungen in Erlang.
Da das Format der Daten direkt in der XlogData Nachricht des Protokolls davon abhängt, welches Plug-In für den Replikationssteckplatz verwendet wird, dekodiert die EPGSQL Bibliothek die Daten nicht, sondern ruft die Callback-Methode auf oder sendet die Nachricht asynchron an den Prozess.


DB-Verbindung


Es muss eine spezielle Replikationsverbindung zur Datenbank hergestellt werden, dazu muss das replication .
Innerhalb einer Replikationsdatenbankverbindung können nur Replikationsbefehle ausgeführt werden (z. B. DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT).
Sie können keine reguläre Anforderung über diese Verbindung ausführen.


Erstellen Sie einen Replikationssteckplatz


Der Replikations-Slot wird verwendet, um die aktuelle Position des übertragenen WAL-Protokolls zu verfolgen.
Beim Erstellen eines Replikationssteckplatzes wird ein Plug-In zum Dekodieren angegeben.


PostgreSQL 10 bietet die Möglichkeit, temporäre Replikations-Slots zu erstellen, die automatisch gelöscht werden, wenn die Replikationsverbindung geschlossen wird.


Wenn die Anwendung bei jedem Start den Anfangszustand der Tabellen liest, empfehle ich die Verwendung von temporären Replikationssteckplätzen. In diesem Fall müssen Sie sich nicht um das Löschen der erstellten Replikationssteckplätze (DROP_REPLICATION_SLOT) kümmern. Das Entfernen alter / nicht verwendeter Replikationssteckplätze ist äußerst wichtig, da PostgreSQL WAL-Protokolle erst löscht, wenn die Abonnenten aller Replikationssteckplätze die Änderung erhalten. Wenn noch ein inaktiver Replikationsbereich vorhanden ist, sammeln sich WAL-Protokolle an und früher oder später läuft das Dateisystem über.


Den Anfangszustand von Tabellen abrufen


Beim Erstellen eines Replikationssteckplatzes (siehe vorherigen Schritt) wird automatisch ein Snapshot erstellt, der den Status der Datenbank zum Zeitpunkt der Erstellung des Steckplatzes anzeigt. Dieser Snapshot kann zum Laden des Anfangszustands von Tabellen verwendet werden, der sich zu Beginn der Replikation befand.


Snapshot ist nur verfügbar, bis die Replikationsverbindung geschlossen wird, in der der Befehl CREATE_REPLICATION_SLOT ausgeführt wurde.


Um die anfänglichen Daten zu laden, muss eine neue reguläre / Nicht-Replikationsverbindung zur Datenbank erstellt werden, da SELECT nicht für die Replikationsverbindung ausgeführt werden kann. Setzen Sie in diesem Zusammenhang den Snapshot SET TRANSACTION SNAPSHOT SnapshotName und extrahieren Sie die notwendigen Daten.


Starten Sie die Replikation


Wir starten die Replikation für den erstellten Replikations-Slot. Beim Start der Replikation übergeben wir zusätzliche Parameter für das Plugin, für pgoutput ist dies der Name der erstellten Publikation.


Alle Schritte zusammen


 start_replication() -> %%    {ok, ReplConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}, {replication, "database"}]), %%    {ok, _, [{_, _, SnapshotName}|_]} = epgsql:squery(ReplConn, "CREATE_REPLICATION_SLOT epgl_repl_slot TEMPORARY LOGICAL pgoutput"). %%     {ok, NormalConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]), {ok, _, _} = epgsql:squery(NormalConn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"), {ok, _, _} = epgsql:squery(NormalConn, ["SET TRANSACTION SNAPSHOT '", SnapshotName, "'"]), %% select/load data epgsql:equery(NormalConn,... epgsql:close(NormalConn), %%   ReplSlot = "epgl_repl_slot", Callback = ?MODULE, CbInitState = #{}, WALPosition = "0/0", PluginOpts = "proto_version '1', publication_names '\"epgl_test\"'", ok = epgsql:start_replication(ReplConn, ReplSlot, Callback, CbInitState, WALPosition, PluginOpts). handle_x_log_data(StartLSN, EndLSN, Data, CbState) -> io:format("~p~n", [{StartLSN, EndLSN, Data}]), {ok, EndLSN, EndLSN, CbState}. 

Es gibt zwei Möglichkeiten, mit der EPGSQL Bibliothek zu interagieren:


  • Synchron. Der Name des Moduls wird als Callback übergeben. Die Bibliothek für die empfangenen Daten ruft die CallbackModule:handle_x_log_data Funktion auf CallbackModule:handle_x_log_data . Die Funktion sollte LastFlushedLSN, LastAppliedLSN, zurückgeben, das in der PostgreSQL-Antwort gesendet wird, um die aktuelle Position des Replikationsslots zu verfolgen. In unseren Projekten verwenden wir nur diese Option;


  • Asynchron. Der Rückruf ist die PID des Prozesses, der Nachrichten der Form {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}} . Nach der Verarbeitung sollte der Prozess die verarbeitete LSN über einen epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN) Aufruf epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN) ;



Anstelle einer Schlussfolgerung


Um den beschriebenen Ansatz zu verwenden, ist es außerdem erforderlich, das Dekodieren von Nachrichten aus dem Replikationssteckplatz- Plug-In-Format in Strukturen zu implementieren, die Erlang vertrauter sind. Oder verwenden Sie die Bibliothek mit GitHub , das die Dekodierung für zwei Plug-Ins implementiert und die Ausführung von Replikationsbefehlen vereinfacht.

Source: https://habr.com/ru/post/de482398/


All Articles