Um esquema bastante típico no desenvolvimento do sistema, quando a lógica de processamento principal está concentrada na aplicação (no nosso caso, Erlang) e os dados dessa aplicação (configurações, perfis de usuário etc.) no banco de dados (PostgreSQL). O aplicativo Erlang armazena em cache as configurações no ETS para acelerar o processamento e reduzir a carga no banco de dados, rejeitando solicitações persistentes. Ao mesmo tempo, a alteração desses dados ocorre por meio de um serviço separado (possivelmente externo).
Em tais situações, surge o desafio de manter os dados armazenados em cache atualizados. Existem diferentes abordagens para resolver esse problema. Um deles é a replicação lógica do PostgreSQL. Sobre isso e será discutido abaixo.
Protocolo de replicação lógica de fluxo
A replicação lógica usa o protocolo de replicação de streaming do PostgreSQL para receber alterações de dados nas tabelas do PostgreSQL, lendo os logs do WAL, filtrando as tabelas necessárias e enviando essas alterações ao assinante. Esse mecanismo é semelhante ao usado para replicação física para criar um banco de dados em espera.
A replicação lógica fornece os seguintes benefícios:
- receber alterações sem demora em tempo real;
- filtrar alterações por tabelas e operações (INSERT / DELETE / UPDATE);
- integridade e integridade dos dados recebidos pelo assinante. O assinante recebe as alterações na mesma ordem em que ocorreram no banco de dados;
- sem perda de dados em caso de parada temporária do assinante. O PostgreSQL lembra onde a replicação parou;
Preparação de banco de dados
Para trabalhar com replicação lógica, você precisa de um plug-in que decodifique os registros WAL do servidor em um formato mais conveniente.
Antes do PostgreSQL 10, você pode usar o plugin / extension pglogical_output .
Começando com o plug-in PostgreSQL 10 pgoutput .
Este artigo abordará o plugin pgoutput.
No lado do PostgreSQL, você deve concluir as seguintes etapas:
Defina parâmetros para suportar replicação lógica em
postgresql.conf
wal_level = 'logical' max_replication_slots = 5 max_wal_senders = 5
Crie uma função a ser usada para replicação. A função deve ter o SUPERUSER
REPLICATION
ou SUPERUSER
.
CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test';
Permitir acesso para esta função no pg_hba.conf com database = replication
host replication epgl_test 127.0.0.1/32 trust
Crie uma publicação . Ao criar uma publicação, indicamos as tabelas que planejamos receber no aplicativo Erlang
CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; ALTER PUBLICATION epgl_test ADD TABLE public.test_table2;
Parte Erlang
Há pouco tempo , o suporte ao protocolo de replicação de streaming foi adicionado à popular biblioteca Erlang para trabalhar com o PostgreSQL EPGSQL . Com base nesta biblioteca, construiremos a lógica para receber alterações em Erlang.
Como o formato dos dados diretamente na mensagem XlogData
do protocolo depende de qual plug-in é usado para o slot de replicação, a biblioteca EPGSQL
não decodifica os dados, mas chama o método de retorno de chamada ou envia a mensagem ao processo de forma assíncrona.
Conexão de banco de dados
Uma conexão de replicação especial com o banco de dados deve ser criada, para isso você precisa passar o sinalizador de replication
.
Em uma conexão com o banco de dados de replicação, apenas os comandos de replicação podem ser executados (por exemplo, DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT).
Você não pode executar uma solicitação regular através desta conexão.
Crie um slot de replicação
O slot de replicação é usado para rastrear a posição atual do log WAL transferido.
Ao criar um slot de replicação, um plug-in para decodificação é especificado.
O PostgreSQL 10 apresenta a capacidade de criar slots de replicação temporários que são excluídos automaticamente quando a conexão de replicação é fechada.
Se o aplicativo ler o estado inicial das tabelas toda vez que for iniciado, recomendamos o uso de slots de replicação temporários. Nesse caso, você não precisará se preocupar em excluir os slots de replicação criados (DROP_REPLICATION_SLOT). A remoção de slots de replicação antigos / não utilizados é extremamente importante porque o PostgreSQL não exclui os logs do WAL até que os assinantes de todos os slots de replicação recebam a alteração. Se ainda houver um slot de replicação inativo, os logs do WAL começarão a se acumular e, mais cedo ou mais tarde, o sistema de arquivos será excedido.
Obtendo o estado inicial das tabelas
Ao criar um slot de replicação (consulte a etapa anterior), é criado automaticamente um instantâneo que mostra o estado do banco de dados no momento em que o slot foi criado. Esse instantâneo pode ser usado para carregar o estado inicial das tabelas, que estava no início da replicação.
A CREATE_REPLICATION_SLOT
instantânea só está disponível até que a conexão de replicação na qual o comando CREATE_REPLICATION_SLOT
foi executado seja fechada.
Para carregar os dados iniciais, uma nova conexão regular / sem replicação com o banco de dados deve ser criada, pois SELECT não pode ser executado na conexão de replicação. Nesta conexão, defina o instantâneo SET TRANSACTION SNAPSHOT SnapshotName
e extraia os dados necessários.
Iniciar replicação
Iniciamos a replicação para o slot de replicação criado. Ao iniciar a replicação, passamos parâmetros adicionais para o plug-in, para pgoutput esse é o nome da publicação criada.
Todos os passos juntos
start_replication() ->
Existem duas opções para interagir com a biblioteca EPGSQL
:
Síncrona. O nome do módulo é passado como um retorno de chamada. A biblioteca para os dados recebidos chamará a função CallbackModule:handle_x_log_data
. A função deve retornar LastFlushedLSN, LastAppliedLSN, que é enviado na resposta do PostgreSQL para rastrear a posição atual do slot de replicação. Nos nossos projetos, usamos apenas essa opção;
Assíncrono. O retorno de chamada é o pid do processo que receberá mensagens no formato {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}}
. Após o processamento, o processo deve relatar o LSN processado por meio de uma chamada epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN)
;
Em vez de uma conclusão
Além disso, para usar a abordagem descrita, é necessário implementar a decodificação de mensagens do formato de plug-in do slot de replicação em estruturas mais familiares para Erlang. Ou use a biblioteca com o GitHub , que implementa a decodificação para dois plug-ins e simplifica a execução de comandos de replicação.