Replicación lógica de PostgreSQL a Erlang

Un esquema bastante típico en el desarrollo del sistema, cuando la lógica de procesamiento principal se concentra en la aplicación (en nuestro caso, Erlang), y los datos para esta aplicación (configuraciones, perfiles de usuario, etc.) en la base de datos (PostgreSQL). La aplicación Erlang almacena en caché la configuración en ETS para acelerar el procesamiento y reducir la carga en la base de datos al rechazar las solicitudes persistentes. Al mismo tiempo, el cambio de estos datos ocurre a través de un servicio separado (posiblemente externo).


En tales situaciones, surge el desafío de mantener actualizados los datos almacenados en caché. Existen diferentes enfoques para resolver este problema. Una de ellas es la replicación lógica de PostgreSQL. Sobre esto y se discutirá a continuación.


Protocolo de replicación lógica de flujo


La replicación lógica utiliza el protocolo de replicación de transmisión de PostgreSQL para recuperar los cambios de datos en las tablas de PostgreSQL leyendo los registros de WAL, filtrando las tablas deseadas y enviando estos cambios al suscriptor. Este mecanismo es similar al utilizado para la replicación física para crear una base de datos en espera.


La replicación lógica proporciona los siguientes beneficios:


  • recibir cambios sin demora en tiempo real;
  • filtrado de cambios por tablas y operaciones (INSERT / DELETE / UPDATE);
  • integridad e integridad de los datos recibidos por el suscriptor. El suscriptor recibe los cambios en el mismo orden en que ocurrieron en la base de datos;
  • sin pérdida de datos en caso de una parada temporal del suscriptor. PostgreSQL recuerda dónde se detuvo la replicación;

Preparación de bases de datos


Para trabajar con la replicación lógica, necesita un complemento que decodifique los registros WAL del servidor en un formato más conveniente.
Antes de PostgreSQL 10, puede usar el complemento / extension pglogical_output .
Comenzando con el complemento de pgoutput PostgreSQL 10.
Este artículo cubrirá el complemento pgoutput.


En el lado de PostgreSQL, debe completar los siguientes pasos:


  • Establecer parámetros para admitir la replicación lógica en
    postgresql.conf


    wal_level = 'logical' max_replication_slots = 5 max_wal_senders = 5 

  • Cree un rol para ser utilizado para la replicación. El rol debe tener el SUPERUSER o SUPERUSER .


     CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test'; 

  • Permitir acceso para este rol en pg_hba.conf con database = replication


     host replication epgl_test 127.0.0.1/32 trust 

  • Crea una publicación . Al crear una publicación, indicamos las tablas que planeamos recibir en la aplicación Erlang


     CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; ALTER PUBLICATION epgl_test ADD TABLE public.test_table2; --       


Parte de Erlang


No hace mucho tiempo, se agregó soporte para el protocolo de replicación de transmisión a la popular biblioteca Erlang para trabajar con PostgreSQL EPGSQL . En base a esta biblioteca, construiremos la lógica para recibir cambios en Erlang.
Dado que el formato de los datos directamente en el mensaje XlogData del protocolo depende de qué complemento se utiliza para la ranura de replicación, la biblioteca EPGSQL no decodifica los datos, sino que llama al método de devolución de llamada o envía el mensaje al proceso de forma asincrónica.


Conexión DB


Se debe crear una conexión de replicación especial a la base de datos, para esto debe pasar el indicador de replication .
Dentro de una conexión de base de datos de replicación, solo se pueden ejecutar comandos de replicación (por ejemplo, DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT).
No puede ejecutar una solicitud regular a través de esta conexión.


Crear un espacio de replicación


La ranura de replicación se utiliza para rastrear la posición actual del registro WAL transferido.
Al crear una ranura de replicación, se especifica un complemento para la decodificación.


PostgreSQL 10 introduce la capacidad de crear ranuras de replicación temporales que se eliminan automáticamente cuando se cierra la conexión de replicación.


Si la aplicación lee el estado inicial de las tablas cada vez que se inicia, le recomiendo usar ranuras de replicación temporales, en cuyo caso no tendrá que preocuparse por eliminar las ranuras de replicación creadas (DROP_REPLICATION_SLOT). La eliminación de ranuras de replicación antiguas / no utilizadas es extremadamente importante porque PostgreSQL no elimina los registros de WAL hasta que los suscriptores de todas las ranuras de replicación reciban el cambio. Si queda una ranura de replicación inactiva, los registros de WAL comenzarán a acumularse y, tarde o temprano, el sistema de archivos se desbordará.


Obtener el estado inicial de las tablas


Al crear una ranura de replicación (consulte el paso anterior), se crea automáticamente una instantánea que muestra el estado de la base de datos en el momento en que se creó la ranura. Esta instantánea se puede usar para cargar el estado inicial de las tablas, que estaba al comienzo de la replicación.


La instantánea solo está disponible hasta que se CREATE_REPLICATION_SLOT la conexión de replicación en la que se ejecutó el comando CREATE_REPLICATION_SLOT .


Para cargar los datos iniciales, se debe crear una nueva conexión regular / sin replicación a la base de datos, ya que SELECT no se puede realizar en la conexión de replicación. En esta conexión, configure la instantánea SET TRANSACTION SNAPSHOT SnapshotName y extraiga los datos necesarios.


Iniciar replicación


Comenzamos la replicación para la ranura de replicación creada. Al iniciar la replicación, pasamos parámetros adicionales para el complemento, para pgoutput este es el nombre de la publicación creada.


Todos los pasos juntos


 start_replication() -> %%    {ok, ReplConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}, {replication, "database"}]), %%    {ok, _, [{_, _, SnapshotName}|_]} = epgsql:squery(ReplConn, "CREATE_REPLICATION_SLOT epgl_repl_slot TEMPORARY LOGICAL pgoutput"). %%     {ok, NormalConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]), {ok, _, _} = epgsql:squery(NormalConn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"), {ok, _, _} = epgsql:squery(NormalConn, ["SET TRANSACTION SNAPSHOT '", SnapshotName, "'"]), %% select/load data epgsql:equery(NormalConn,... epgsql:close(NormalConn), %%   ReplSlot = "epgl_repl_slot", Callback = ?MODULE, CbInitState = #{}, WALPosition = "0/0", PluginOpts = "proto_version '1', publication_names '\"epgl_test\"'", ok = epgsql:start_replication(ReplConn, ReplSlot, Callback, CbInitState, WALPosition, PluginOpts). handle_x_log_data(StartLSN, EndLSN, Data, CbState) -> io:format("~p~n", [{StartLSN, EndLSN, Data}]), {ok, EndLSN, EndLSN, CbState}. 

Hay dos opciones para interactuar con la biblioteca EPGSQL :


  • Sincrónico El nombre del módulo se pasa como una devolución de llamada. La biblioteca para los datos recibidos llamará a la función CallbackModule:handle_x_log_data . La función debe devolver LastFlushedLSN, LastAppliedLSN, que se envía en la respuesta PostgreSQL para rastrear la posición actual de la ranura de replicación. En nuestros proyectos usamos solo esta opción;


  • Asincrónico La devolución de llamada es el pid del proceso que recibirá mensajes de la forma {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}} . Después del procesamiento, el proceso debe informar el LSN procesado a través de una llamada epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN) ;



En lugar de una conclusión


Además, para utilizar el enfoque descrito, es necesario implementar la decodificación de mensajes del formato del complemento de la ranura de replicación en estructuras más familiares para Erlang. O utilice la biblioteca con GitHub , que implementa la decodificación para dos complementos y simplifica la ejecución de comandos de replicación.

Source: https://habr.com/ru/post/482398/


All Articles