
消息队列用于执行:未决操作,服务之间的交互,“批处理”等。 有专门的组织此类队列的解决方案,例如:RabbitMQ,ActiveMQ,ZeroMQ等,但是经常会遇到它们的需求不大,而且它们的安装和支持所带来的痛苦和痛苦要比带来的收益多。 假设您在注册时有一项服务可以向用户发送电子邮件进行确认,并且如果您使用Postgres,那么您很幸运-在Postgres中,几乎是开箱即用的,有一个PgQ扩展名将为您完成所有肮脏的工作。
在本文中,我将讨论使用PgQ扩展的PostgreSQL中的消息队列(任务)。 如果您尚未使用PgQ或在Postgres上使用自写队列,那么本文将非常有用。
如果您仅能创建平板电脑并在其中编写任务,为什么根本需要PgQ? 这似乎是可能的,但是您必须考虑对任务的并行访问,可能的错误(如果处理任务的流程失败,会发生什么?)以及性能(PgQ非常快,而自写解决方案通常不是,尤其是在事务进行中)数据库不会在任务的整个执行过程中关闭),但是在我看来,必须使用PgQ的最重要原因是PgQ已经编写并且可以工作,并且仍然需要编写自写解决方案(UPD:关于为什么不值得使用自写队列的信息) ,例如,您可以
在此处阅读)。
(UPD:由于PgQ在Postgres之上运行,因此所有交易的乐趣都可以在其中使用)
但是PgQ有一个很大的缺点-缺少文档,我试图通过本文弥补这一不足。
装置
PgQ包含部分(至少2个部分):1-用于postgres的pgq扩展,2-pgqd守护程序(稍后再安装)。
与队列的所有交互都是使用Postgres中的函数进行的。
例如,要创建队列,您必须运行
select * from pgq.create_queue({ } text);
创建队列后,可以向其中添加消息。
select * from pgq.insert_event({ } text, { } text, { } text);
现在我们需要学习如何接收记录的消息。 为此,存在一个诸如“消费者”(我将写一个消费者)之类的实体,它不接收消息(事件),而是接收“批处理”(批处理)。 bach是一组连续的消息,使用pgqd创建了baches。 pgqd定期(配置文件中的“ ticker_period”参数)获取所有累积的消息并写入新的bach。
重要的是,如果pgqd不起作用,那么就不会创建新的bach,这意味着消费者没有什么可阅读的。如果pgqd长时间不起作用然后再打开,它将根据这段时间内积累的消息创建一个大bach,因此pgqd不应只是把它关掉。
消费者的注册(
重要!消费者只有
在注册
之后才会收到记录的事件,因此您应该首先创建消费者,然后才写事件):
select * from pgq.register_consumer({ } text, { } text);
(类似于pgq.unregister_consumer)每个使用者都将获得创建后
绝对发生的
所有事件 (甚至由另一个使用者处理),这意味着您很可能只需要一个使用者即可处理一个队列。 接下来,我将告诉您如何在多台服务器上分配负载。
要获得单身汉,您首先需要找出其ID:
select * from pgq.next_batch({ } text, { } text);
如果使用者已经处理了所有bach,则该函数可以返回NULL。 在这种情况下,您只需要等到pgqd创建一个新的bach。
在这种情况下,当不处理bach时,此函数将返回相同的值。
您可以使用以下命令获取批处理中的所有事件:
select * from pgq.get_batch_events({id } bigint);
(巴赫可能是空的。)
如果在处理其中之一时发生错误,则可以稍后尝试处理此事件:
select * from pgq.event_retry({id } bigint, {id } bigint, { } integer);
为了通知bach结束并有机会启动一个新的bach
select * from pgq.finish_batch({id } bigint);
当然,这些不是扩展中的全部功能,我建议阅读
pgq.imtqy.com/extension/pgq/files/external-sql.html和
github.com/pgq/pgq/tree/master/functions (每个文件均包含定义和说明)相应的功能)。
负载分担
为了由多个处理程序同时处理事件,有一个pgq_coop扩展名,它与pgq一样工作,并添加了一个称为“子消费者”的新实体,该实体将自父消费者注册后立即接收所有事件,除了已处理的事件。
select * from pgq_coop.register_subconsumer({ } text, { } text, { } text);
select * from pgq_coop.next_batch({ } text, { } text, { } text);
select * from pgq_coop.next_batch({ } text, { } text, { } text, { , } interval);
select * from pgq_coop.finish_batch({id } bigint);
在
这里阅读所有功能。
安装方式
pgq扩展名和pgqd守护程序包含在PGDG存储库中,并且在大多数发行版中都非常简单地安装,例如在Debian中:
sudo apt install postgresql-XX-pgq3 pgqd
(XX是版本号)。
pgqd是一个小型程序,可以使用
pgqd --help
,不要忘记将其添加到自动运行中(
sudo systemctl enable pgqd.service
,默认配置为
/etc/pgqd.ini
)。
要开始使用PgQ,只需在数据库中连接扩展名:
create extension if not exists pgq;
使用pgq_coop,一切都有些复杂,它不在存储库中,但是从源代码编译它并不困难(例如Debian):
sudo apt install postgresql-server-dev-XX git clone https://github.com/pgq/pgq-coop.git cd pgq-coop sudo make install
并使用
create extension if not exists pgq_coop;
有用的链接
PGQ文档PGQ功能pgq_coop函数pgqd源代码github帐户以及所有相关项目Wiki Postgres