Replicação lógica do PostgreSQL para Erlang

Um esquema bastante típico no desenvolvimento do sistema, quando a lógica de processamento principal está concentrada na aplicação (no nosso caso, Erlang) e os dados dessa aplicação (configurações, perfis de usuário etc.) no banco de dados (PostgreSQL). O aplicativo Erlang armazena em cache as configurações no ETS para acelerar o processamento e reduzir a carga no banco de dados, rejeitando solicitações persistentes. Ao mesmo tempo, a alteração desses dados ocorre por meio de um serviço separado (possivelmente externo).


Em tais situações, surge o desafio de manter os dados armazenados em cache atualizados. Existem diferentes abordagens para resolver esse problema. Um deles é a replicação lógica do PostgreSQL. Sobre isso e será discutido abaixo.


Protocolo de replicação lógica de fluxo


A replicação lógica usa o protocolo de replicação de streaming do PostgreSQL para receber alterações de dados nas tabelas do PostgreSQL, lendo os logs do WAL, filtrando as tabelas necessárias e enviando essas alterações ao assinante. Esse mecanismo é semelhante ao usado para replicação física para criar um banco de dados em espera.


A replicação lógica fornece os seguintes benefícios:


  • receber alterações sem demora em tempo real;
  • filtrar alterações por tabelas e operações (INSERT / DELETE / UPDATE);
  • integridade e integridade dos dados recebidos pelo assinante. O assinante recebe as alterações na mesma ordem em que ocorreram no banco de dados;
  • sem perda de dados em caso de parada temporária do assinante. O PostgreSQL lembra onde a replicação parou;

Preparação de banco de dados


Para trabalhar com replicação lógica, você precisa de um plug-in que decodifique os registros WAL do servidor em um formato mais conveniente.
Antes do PostgreSQL 10, você pode usar o plugin / extension pglogical_output .
Começando com o plug-in PostgreSQL 10 pgoutput .
Este artigo abordará o plugin pgoutput.


No lado do PostgreSQL, você deve concluir as seguintes etapas:


  • Defina parâmetros para suportar replicação lógica em
    postgresql.conf


    wal_level = 'logical' max_replication_slots = 5 max_wal_senders = 5 

  • Crie uma função a ser usada para replicação. A função deve ter o SUPERUSER REPLICATION ou SUPERUSER .


     CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test'; 

  • Permitir acesso para esta função no pg_hba.conf com database = replication


     host replication epgl_test 127.0.0.1/32 trust 

  • Crie uma publicação . Ao criar uma publicação, indicamos as tabelas que planejamos receber no aplicativo Erlang


     CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; ALTER PUBLICATION epgl_test ADD TABLE public.test_table2; --       


Parte Erlang


Há pouco tempo , o suporte ao protocolo de replicação de streaming foi adicionado à popular biblioteca Erlang para trabalhar com o PostgreSQL EPGSQL . Com base nesta biblioteca, construiremos a lógica para receber alterações em Erlang.
Como o formato dos dados diretamente na mensagem XlogData do protocolo depende de qual plug-in é usado para o slot de replicação, a biblioteca EPGSQL não decodifica os dados, mas chama o método de retorno de chamada ou envia a mensagem ao processo de forma assíncrona.


Conexão de banco de dados


Uma conexão de replicação especial com o banco de dados deve ser criada, para isso você precisa passar o sinalizador de replication .
Em uma conexão com o banco de dados de replicação, apenas os comandos de replicação podem ser executados (por exemplo, DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT).
Você não pode executar uma solicitação regular através desta conexão.


Crie um slot de replicação


O slot de replicação é usado para rastrear a posição atual do log WAL transferido.
Ao criar um slot de replicação, um plug-in para decodificação é especificado.


O PostgreSQL 10 apresenta a capacidade de criar slots de replicação temporários que são excluídos automaticamente quando a conexão de replicação é fechada.


Se o aplicativo ler o estado inicial das tabelas toda vez que for iniciado, recomendamos o uso de slots de replicação temporários. Nesse caso, você não precisará se preocupar em excluir os slots de replicação criados (DROP_REPLICATION_SLOT). A remoção de slots de replicação antigos / não utilizados é extremamente importante porque o PostgreSQL não exclui os logs do WAL até que os assinantes de todos os slots de replicação recebam a alteração. Se ainda houver um slot de replicação inativo, os logs do WAL começarão a se acumular e, mais cedo ou mais tarde, o sistema de arquivos será excedido.


Obtendo o estado inicial das tabelas


Ao criar um slot de replicação (consulte a etapa anterior), é criado automaticamente um instantâneo que mostra o estado do banco de dados no momento em que o slot foi criado. Esse instantâneo pode ser usado para carregar o estado inicial das tabelas, que estava no início da replicação.


A CREATE_REPLICATION_SLOT instantânea só está disponível até que a conexão de replicação na qual o comando CREATE_REPLICATION_SLOT foi executado seja fechada.


Para carregar os dados iniciais, uma nova conexão regular / sem replicação com o banco de dados deve ser criada, pois SELECT não pode ser executado na conexão de replicação. Nesta conexão, defina o instantâneo SET TRANSACTION SNAPSHOT SnapshotName e extraia os dados necessários.


Iniciar replicação


Iniciamos a replicação para o slot de replicação criado. Ao iniciar a replicação, passamos parâmetros adicionais para o plug-in, para pgoutput esse é o nome da publicação criada.


Todos os passos juntos


 start_replication() -> %%    {ok, ReplConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}, {replication, "database"}]), %%    {ok, _, [{_, _, SnapshotName}|_]} = epgsql:squery(ReplConn, "CREATE_REPLICATION_SLOT epgl_repl_slot TEMPORARY LOGICAL pgoutput"). %%     {ok, NormalConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]), {ok, _, _} = epgsql:squery(NormalConn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"), {ok, _, _} = epgsql:squery(NormalConn, ["SET TRANSACTION SNAPSHOT '", SnapshotName, "'"]), %% select/load data epgsql:equery(NormalConn,... epgsql:close(NormalConn), %%   ReplSlot = "epgl_repl_slot", Callback = ?MODULE, CbInitState = #{}, WALPosition = "0/0", PluginOpts = "proto_version '1', publication_names '\"epgl_test\"'", ok = epgsql:start_replication(ReplConn, ReplSlot, Callback, CbInitState, WALPosition, PluginOpts). handle_x_log_data(StartLSN, EndLSN, Data, CbState) -> io:format("~p~n", [{StartLSN, EndLSN, Data}]), {ok, EndLSN, EndLSN, CbState}. 

Existem duas opções para interagir com a biblioteca EPGSQL :


  • Síncrona. O nome do módulo é passado como um retorno de chamada. A biblioteca para os dados recebidos chamará a função CallbackModule:handle_x_log_data . A função deve retornar LastFlushedLSN, LastAppliedLSN, que é enviado na resposta do PostgreSQL para rastrear a posição atual do slot de replicação. Nos nossos projetos, usamos apenas essa opção;


  • Assíncrono. O retorno de chamada é o pid do processo que receberá mensagens no formato {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}} . Após o processamento, o processo deve relatar o LSN processado por meio de uma chamada epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN) ;



Em vez de uma conclusão


Além disso, para usar a abordagem descrita, é necessário implementar a decodificação de mensagens do formato de plug-in do slot de replicação em estruturas mais familiares para Erlang. Ou use a biblioteca com o GitHub , que implementa a decodificação para dois plug-ins e simplifica a execução de comandos de replicação.

Source: https://habr.com/ru/post/pt482398/


All Articles