如果您的微服务
已经使用通用的PostgreSQL数据库存储数据,或者同一服务的多个实例在不同的服务器上使用它,则可以在它们之间相对便宜地
获得“ PubSub
”消息 ,而无需集成到Redis架构,RabbitMQ集群或嵌入代码中
另一个MQ系统的应用。
为此,我们
不会将 消息 写入 数据库表 ,因为这会导致过多的开销,首先要
写入传输的内容 ,然后再
清除已读取的内容 。
我们将使用
NOTIFY /
LISTEN机制发送和接收数据,并收集Node.js的模型实现。

但是以这种方式,必须仔细规避耙子。
协议功能
听着
LISTEN
使用libpq库的应用程序将LISTEN命令作为常规SQL命令执行,然后必须定期调用PQnotify函数以检查是否有新通知。
如果您不是在编写用于PG的库,而是在编写特定的应用程序,则在大多数情况下,您将无权访问此函数。
但是,如果已经按照处理
异步请求和
通知的建议为您编写了这样的库,那么您将自动在应用程序代码中收到一条消息。 如果没有,则可以简单
地在连接上
定期执行SELECT 1
,然后将在查询结果中附带一条通知:
在非常老的libpq版本中,只有一种方法可以确保及时接收来自NOTIFY命令的消息-不断发送命令,甚至是空命令,然后在每次PQexec调用之后检查PQnotification。 尽管此方法仍然有效,但由于处理器使用效率低而被认为已过时。
就
psql而言,它看起来像这样:
_tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991.
如果对于已应用的任务,我们可以同意在1秒钟内以这样的间隔在1秒内传递消息,则需要最大的延迟,然后执行请求。 同时,此方法有助于
监视连接的“活动性” ,确保没有人通过
pg_terminate_backend
从服务器端意外断开
连接 ,或者确保在没有任何通知客户端的情况下PG不会突然“崩溃”。
通知
NOTIFY [ , ]
NOTIFY命令将通知事件以及附加的“消息”行发送到以前在当前LISTEN数据库中执行了具有指定通道名称的通道的所有客户端应用程序。
...
随通知一起发送的“消息”行必须是一个简单的文本常量 。 在标准配置中,其长度应小于8000个字节 。
也就是说,如果我们的“消息”突然包含与ASCII完全不同的内容,那么我们
将不得不对其进行筛选 ,如果它超过了8000字节(不是字符!)的大小,
则将其切成块然后粘贴 。 同时,我们应该节省通道带宽和服务器资源,以处理此类块的传输-也就是说,将尽可能少的服务“绑定”添加到有用的内容中,但不要“扼杀”客户端应用程序,从而迫使其打包
gzip -9
。
在该机制的其他优点中,还可以注意到绑定到消息的“源” ...
...通过检查信令过程的PID (在事件数据中指示)是否与会话自己的PID相匹配(可以通过与libpq联系找到它)来避免额外的工作。 如果它们匹配,则会话将收到有关其自身操作的通知,因此可以忽略它。
...并保证交货单:
除了过滤重复通知的后续实例外,NOTIFY还确保来自单个事务的通知始终以发送时的顺序到达。 还可以确保来自不同事务的消息以提交这些事务的顺序到达 。
我们不会具体组合任何东西,因此我们的每个请求都只会对应一个单独的事务。
但是请记住,如果用于交换的连接上也有应用程序活动,则我们的NOTIFY可能不在我们自由意志的交易之内,因此
可能会产生副作用 :
交易对NOTIFY有重大影响。 首先,如果在事务内部执行了NOTIFY,则仅在这种情况下,在事务提交后才将通知传递给收件人。 这是合理的,因为在事务中断的情况下,将取消其中的所有命令(包括NOTIFY)的操作 。
因此,最好在显然没有事务或长查询的地方使用连接。
数据库0类别1262的对象0上的AccessExclusiveLock
如果突然您的NOTIFY通知开始变钝并记录了这种锁定的期望,那么您仍然“从短裤中长出来”,现在该考虑“成人” MQ了。
毕竟,通知队列虽然很大(标准版本为8GB),但仍然是有限的。 根据
汤姆·莱恩的回答 :
在插入事务的通知消息时,将保持此锁,此后,事务将提交并释放该锁。
也就是说,没有太多选择可以解决:
- 发送但不经常
也就是说,如果是一些计数器,则在更长的时间间隔内汇总发送的指标。 - 少发送
例如,从传输的JSON的应用程序键值的角度删除“默认”。 - 仅发送信号 ,完全不发送内容
作为一种选择-启动多个通道,每个通道的名称本身已经具有某种应用意义。 - 仍然从数据库中发货
发送复杂的消息
身体编码
在一般情况下,我们可能希望不仅在消息中传输允许的字符,还传输俄语字母和“任何二进制文件”-因此,使用
转换为十六进制表示形式来形成传输的字符串将很方便。 是的,此方法效果很好:
NOTIFY test, E'\x20\x21'
Asynchronous notification "test" with payload " !" received from server process with PID 63991.
但是,让我们再次转到文档:
您必须确保以这种方式创建的字节序列(尤其是八进制和十六进制表示法)形成有效的服务器编码字符 。 当服务器使用UTF-8编码而不是字节记录时,请使用Unicode特殊序列或第4.1.2.3节中介绍的替代Unicode语法。 (否则,您将必须手动编码UTF-8字符并按字节写入它们,这非常不方便。)
因此,即使使用了win1251引号中的普通符号,我们也会悲痛欲绝:
NOTIFY test, E'\x98'
由于我们不想“
手动编码UTF-8字符并按字节写入 ”,因此,如果有任何字符不在
\x20-\x7E
范围内,或者如有必要,则我们将立即同意发送
打包在base64中的消息正文。 一方面,这种打包方法不会增加过多的冗余(4:3系数),另一方面,它是在系统库级别以任何语言实现的,并且将提供最小的额外负载。
但是,即使我们没有“奇怪的”字符,并且消息只适合一个段,也仍然存在一项功能-
转义撇号 :
要在一行中包含撇号,请在其旁边写两个撇号 ,例如:'Joan of Arc'。 请注意,这与双引号(“)不同。
细分识别
下一个任务是,如果消息的大小突然超过此值,则将消息正确“剪切”为允许传输
7999字节的块 。 这样,接收者就可以在不破坏订单或不属于“外来”细分链的情况下收集它。 为此,每个人都需要以某种方式进行标识。
实际上,我们已经知道两个“坐标”-这是
发送过程的
PID和每个通知中
包含的通道的
名称 。 协议本身保证了分段到达的顺序。
写作邻居当连接到数据库的同一时间(也就是说,显然在同一应用程序进程内) 同时有多个处于同一通道的编写器处于活动状态时,我们将不考虑这种情况。 从技术上讲,这可以通过在段头中传递一个附加标识符来支持-但最好在应用程序内“共享”单个PubSub对象。
货柜限额
要从多个部分组装一个完整的容器,我们需要知道完成的时间。 有两种典型的方法:
- 在第一个目标大小中传输目标大小(以字节或段为单位)
- 在每个属性中传递[non] last属性
由于我们毕竟是写PubSub的,所以我们的大多数消息都是简短的,并且保留大量字节用于大小传输是无利可图的。 因此,我们将使用第二种方法,
将段数据的
第一个字符保留为容器的继续/结束标志。
对象转移
为了同时将纯文本字符串和JSON对象作为“消息”进行传输,我们在接收方增加了
一个符号符号用于逆向转换。
由于我们决定在base64中编码“非格式”,因此对于标志,我们可以采用任何不在此集合中的允许字符。
总计,我们为传输的段提供了以下选项:
-- "" !simple string -- " " @{"a":1} -- base64 #<segment> -- base64 $<segment>
如您所见,仅在接收到第一个字符时就进行分析就足够了,以便进一步了解需要做什么。
编写PubSub实现
我们的应用程序将位于Node.js上,因此我们将使用
node-postgres模块与PostgreSQL一起使用。
我们写起始帧首先,让我们创建PubSub作为EventEmitter的继承人,以便能够为订阅特定频道的用户生成事件:
const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) {
我们与渠道合作由于重新订阅频道或取消订阅我们未订阅的内容时,LISTEN / UNLISTEN不会发誓,因此我们不会使任何事情复杂化。
发送和接收消息 const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD;
一些测试 const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, ' '); psB.publish(chA, {a : 1}); psA.publish(chB, ' 100 '.repeat(100)); });
一切都很简单,因此您可以轻松地在项目中使用的任何其他PL上实现它,以使用异步通知的基础为例: