当主要处理逻辑集中在应用程序(在我们的示例中为Erlang)中,而该应用程序的数据(设置,用户配置文件等)集中在数据库(PostgreSQL)中时,这是系统开发中的一个相当典型的方案。 Erlang应用程序在ETS中缓存设置,以通过拒绝持久性请求来加快处理速度并减少数据库的负载。 同时,通过单独的(可能是外部)服务来更改此数据。
在这种情况下,要保持高速缓存的数据是最新的挑战。 解决此问题有不同的方法。 其中之一是PostgreSQL逻辑复制。 关于它,将在下面讨论。
流逻辑复制协议
逻辑复制通过读取WAL日志,过滤所需的表并将这些更改发送给订户,从而使用PostgreSQL流复制协议来接收PostgreSQL表中的数据更改。 此机制类似于用于物理复制以创建备用数据库的机制。
逻辑复制具有以下优点:
- 实时接收更改而没有延迟;
- 通过表和操作过滤更改(INSERT / DELETE / UPDATE);
- 订户接收到的数据的完整性和完整性。 订户以与数据库中发生的更改相同的顺序接收更改;
- 在订户暂时停止的情况下不会丢失数据。 PostgreSQL记住复制停止的地方。
数据库准备
要使用逻辑复制,您需要一个插件,以更方便的格式解码来自服务器的WAL记录。
在PostgreSQL 10之前,您可以使用/扩展名pglogical_output插件 。
从PostgreSQL 10 pgoutput插件开始 。
本文将介绍pgoutput插件。
在PostgreSQL方面,您必须完成以下步骤:
设置参数以支持逻辑复制
postgresql.conf
wal_level = 'logical' max_replication_slots = 5 max_wal_senders = 5
创建一个用于复制的角色。 该角色必须具有REPLICATION
或SUPERUSER
。
CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test';
允许通过database = replication
访问pg_hba.conf中的该角色
host replication epgl_test 127.0.0.1/32 trust
创建出版物 。 创建发布时,我们指出计划在Erlang应用程序中接收的表
CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; ALTER PUBLICATION epgl_test ADD TABLE public.test_table2;
二郎部分
不久前,流行的Erlang库中添加了对流复制协议的支持,以用于PostgreSQL EPGSQL 。 基于此库,我们将构建用于接收Erlang中的更改的逻辑。
由于协议的XlogData
消息中直接包含的数据格式取决于复制插槽所使用的插件,因此EPGSQL
库EPGSQL
解码数据,而是会调用Callback方法或将消息异步发送给进程。
数据库连接
必须创建到数据库的特殊复制连接,为此,您需要传递replication
标志。
在复制数据库连接内,只能执行复制命令(例如DROP_REPLICATION_SLOT,CREATE_REPLICATION_SLOT)。
您不能通过此连接运行常规请求。
创建复制插槽
复制插槽用于跟踪传输的WAL日志的当前位置。
创建复制插槽时,将指定用于解码的插件。
PostgreSQL 10引入了创建临时复制插槽的功能,该复制插槽在复制连接关闭时会自动删除。
如果应用程序每次启动时都读取表的初始状态,则建议使用临时复制插槽,在这种情况下,您不必担心删除创建的复制插槽(DROP_REPLICATION_SLOT)。 删除旧的/未使用的复制插槽非常重要,因为在所有复制插槽的订户收到更改之前,PostgreSQL不会删除WAL日志。 如果仍然有一个非活动的复制插槽,则WAL日志将开始积累,并且文件系统迟早会溢出。
获取表的初始状态
创建复制插槽时(请参阅上一步),将自动创建快照,该快照显示创建插槽时数据库的状态。 此快照可用于加载表的初始状态,该状态是在复制开始时进行的。
只有在执行CREATE_REPLICATION_SLOT
命令的复制连接关闭之前,快照才可用。
要加载初始数据,必须创建与数据库的新常规/非复制连接,因为无法对复制连接执行SELECT。 在此连接中,设置快照SET TRANSACTION SNAPSHOT SnapshotName
并提取必要的数据。
开始复制
我们开始为创建的复制插槽进行复制。 开始复制时,我们为插件传递其他参数,对于pgoutput,这是创建的发布的名称。
所有步骤在一起
start_replication() ->
与EPGSQL
库进行交互有两种选择:
同步的。 模块的名称作为回调传递。 接收到的数据的库将调用CallbackModule:handle_x_log_data
函数CallbackModule:handle_x_log_data
。 该函数应返回LastFlushedLSN,LastAppliedLSN,该值在PostgreSQL响应中发送,以跟踪复制插槽的当前位置。 在我们的项目中,我们仅使用此选项;
异步的。 回调是进程的pid,它将接收以下格式的消息: {epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}}
。 处理后,该进程应通过epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN)
调用报告已处理的LSN epgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN)
;
而不是结论
另外,为了使用所描述的方法,有必要实现将消息从复制插槽插件格式解码为Erlang更熟悉的结构。 或将库与GitHub一起使用,该库可实现两个插件的解码并简化复制命令的执行。