PubSub几乎是免费的:PostgreSQL中的NOTIFY功能

如果您的微服务已经使用通用的PostgreSQL数据库存储数据,或者同一服务的多个实例在不同的服务器上使用它,则可以在它们之间相对便宜地获得“ PubSub ”消息 ,而无需集成到Redis架构,RabbitMQ集群或嵌入代码中另一个MQ系统的应用。

为此,我们不会将 消息 写入 数据库表 ,因为这会导致过多的开销,首先要写入传输的内容 ,然后再清除已读取的内容

我们将使用NOTIFY / LISTEN机制发送和接收数据,并收集Node.js的模型实现。



但是以这种方式,必须仔细规避耙子。

协议功能


听着


LISTEN  
使用libpq库的应用程序将LISTEN命令作为常规SQL命令执行,然后必须定期调用PQnotify函数以检查是否有新通知。
如果您不是在编写用于PG的库,而是在编写特定的应用程序,则在大多数情况下,您将无权访问此函数。

但是,如果已经按照处理异步请求通知的建议为您编写了这样的库,那么您将自动在应用程序代码中收到一条消息。 如果没有,则可以简单在连接上定期执行SELECT 1 ,然后将在查询结果中附带一条通知:
在非常老的libpq版本中,只有一种方法可以确保及时接收来自NOTIFY命令的消息-不断发送命令,甚至是空命令,然后在每次PQexec调用之后检查PQnotification。 尽管此方法仍然有效,但由于处理器使用效率低而被认为已过时。
psql而言,它看起来像这样:

 _tmp=# LISTEN test; LISTEN _tmp=# SELECT 1; ?column? ---------- 1 (1 row) Asynchronous notification "test" with payload "abc123" received from server process with PID 63991. 

如果对于已应用的任务,我们可以同意在1秒钟内以这样的间隔在1秒内传递消息,则需要最大的延迟,然后执行请求。 同时,此方法有助于监视连接的“活动性” ,确保没有人通过pg_terminate_backend从服务器端意外断开连接 ,或者确保在没有任何通知客户端的情况下PG不会突然“崩溃”。

通知


 NOTIFY  [ ,  ] 

NOTIFY命令将通知事件以及附加的“消息”行发送到以前在当前LISTEN数据库中执行了具有指定通道名称的通道的所有客户端应用程序。
...
随通知一起发送的“消息”行必须是一个简单的文本常量 。 在标准配置中,其长度应小于8000个字节
也就是说,如果我们的“消息”突然包含与ASCII完全不同的内容,那么我们将不得不对其进行筛选 ,如果它超过了8000字节(不是字符!)的大小, 则将其切成块然后粘贴 。 同时,我们应该节省通道带宽和服务器资源,以处理此类块的传输-也就是说,将尽可能少的服务“绑定”添加到有用的内容中,但不要“扼杀”客户端应用程序,从而迫使其打包gzip -9

在该机制的其他优点中,还可以注意到绑定到消息的“源” ...
...通过检查信令过程PID (在事件数据中指示)是否与会话自己的PID相匹配(可以通过与libpq联系找到它)来避免额外的工作。 如果它们匹配,则会话将收到有关其自身操作的通知,因此可以忽略它。
...并保证交货单:
除了过滤重复通知的后续实例外,NOTIFY还确保来自单个事务的通知始终以发送时的顺序到达。 还可以确保来自不同事务的消息以提交这些事务的顺序到达
我们不会具体组合任何东西,因此我们的每个请求都只会对应一个单独的事务。

但是请记住,如果用于交换的连接上也有应用程序活动,则我们的NOTIFY可能不在我们自由意志的交易之内,因此可能会产生副作用
交易对NOTIFY有重大影响。 首先,如果在事务内部执行了NOTIFY,则仅在这种情况下,在事务提交后才将通知传递给收件人。 这是合理的,因为在事务中断的情况下,将取消其中的所有命令(包括NOTIFY)的操作
因此,最好在显然没有事务或长查询的地方使用连接。

数据库0类别1262的对象0上的AccessExclusiveLock


如果突然您的NOTIFY通知开始变钝并记录了这种锁定的期望,那么您仍然“从短裤中长出来”,现在该考虑“成人” MQ了。

毕竟,通知队列虽然很大(标准版本为8GB),但仍然是有限的。 根据汤姆·莱恩的回答
在插入事务的通知消息时,将保持此锁,此后,事务将提交并释放该锁。
也就是说,没有太多选择可以解决:

  • 发送但不经常
    也就是说,如果是一些计数器,则在更长的时间间隔内汇总发送的指标。
  • 少发送
    例如,从传输的JSON的应用程序键值的角度删除“默认”。
  • 仅发送信号 ,完全不发送内容
    作为一种选择-启动多个通道,每个通道的名称本身已经具有某种应用意义。
  • 仍然从数据库中发货

发送复杂的消息


身体编码


在一般情况下,我们可能希望不仅在消息中传输允许的字符,还传输俄语字母和“任何二进制文件”-因此,使用转换为十六进制表示形式来形成传输的字符串将很方便。 是的,此方法效果很好:

 NOTIFY test, E'\x20\x21' 

 Asynchronous notification "test" with payload " !" received from server process with PID 63991. 

但是,让我们再次转到文档:
您必须确保以这种方式创建的字节序列(尤其是八进制和十六进制表示法)形成有效的服务器编码字符 。 当服务器使用UTF-8编码而不是字节记录时,请使用Unicode特殊序列或第4.1.2.3节中介绍的替代Unicode语法。 (否则,您将必须手动编码UTF-8字符并按字节写入它们,这非常不方便。)
因此,即使使用了win1251引号中的普通符号,我们也会悲痛欲绝:

 NOTIFY test, E'\x98' -- ERROR: invalid byte sequence for encoding "UTF8": 0x98 

由于我们不想“ 手动编码UTF-8字符并按字节写入 ”,因此,如果有任何字符不在\x20-\x7E范围内,或者如有必要,则我们将立即同意发送打包在base64中的消息正文。 一方面,这种打包方法不会增加过多的冗余(4:3系数),另一方面,它是在系统库级别以任何语言实现的,并且将提供最小的额外负载。

但是,即使我们没有“奇怪的”字符,并且消息只适合一个段,也仍然存在一项功能- 转义撇号
要在一行中包含撇号,请在其旁边两个撇号 ,例如:'Joan of Arc'。 请注意,这与双引号(“)不同。

细分识别


下一个任务是,如果消息的大小突然超过此值,则将消息正确“剪切”为允许传输7999字节的块 。 这样,接收者就可以在不破坏订单或不属于“外来”细分链的情况下收集它。 为此,每个人都需要以某种方式进行标识。

实际上,我们已经知道两个“坐标”-这是发送过程PID和每个通知中包含的通道名称 。 协议本身保证了分段到达的顺序。

写作邻居
连接到数据库的同一时间(也就是说,显然在同一应用程序进程内) 同时多个处于同一通道的编写器处于活动状态时,我们将不考虑这种情况。 从技术上讲,这可以通过在段头中传递一个附加标识符来支持-但最好在应用程序内“共享”单个PubSub对象。

货柜限额


要从多个部分组装一个完整的容器,我们需要知道完成的时间。 有两种典型的方法:

  • 在第一个目标大小中传输目标大小(以字节或段为单位)
  • 在每个属性中传递[non] last属性

由于我们毕竟是写PubSub的,所以我们的大多数消息都是简短的,并且保留大量字节用于大小传输是无利可图的。 因此,我们将使用第二种方法, 将段数据第一个字符保留为容器的继续/结束标志。

对象转移


为了同时将纯文本字符串和JSON对象作为“消息”进行传输,我们在接收方增加了一个符号符号用于逆向转换。

由于我们决定在base64中编码“非格式”,因此对于标志,我们可以采用任何不在此集合中的允许字符。

总计,我们为传输的段提供了以下选项:

 -- ""   !simple string -- "  "  @{"a":1} --    base64 #<segment> --    base64 $<segment> 

如您所见,仅在接收到第一个字符时就进行分析就足够了,以便进一步了解需要做什么。

编写PubSub实现


我们的应用程序将位于Node.js上,因此我们将使用node-postgres模块与PostgreSQL一起使用。

我们写起始帧
首先,让我们创建PubSub作为EventEmitter的继承人,以便能够为订阅特定频道的用户生成事件:

 const util = require('util'); const EventEmitter = require('events').EventEmitter; const PubSub = function(connection, interval, skipSelf) { //     this.connection = connection; //        this.connection.on('notification', p._onmessage.bind(this)); //         this.skipSelf = skipSelf; //  "" setInterval(() => { this.connection.query('SELECT 1'); }, interval); //     ""  this.slices = {}; }; util.inherits(PubSub, EventEmitter); const p = PubSub.prototype; 

我们与渠道合作
由于重新订阅频道或取消订阅我们未订阅的内容时,LISTEN / UNLISTEN不会发誓,因此我们不会使任何事情复杂化。

 //     - "",      //     -        const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`; p.subscribe = function(channel) { this.connection.query(`LISTEN ${quot(channel)}`); return this; }; p.unsubscribe = function(channel) { this.connection.query(`UNLISTEN ${quot(channel)}`); return this; }; 

发送和接收消息
 const PAYLOAD_LIMIT = 8000 - 1; const PAYLOAD_FL_STR = '!'; const PAYLOAD_FL_OBJ = '@'; const PAYLOAD_FL_SEQ = '#'; const PAYLOAD_FL_FIN = '$'; const PAYLOAD_SZ_HEAD = 1; const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD; //  ""  const reASCII = /^[\x20-\x7E]*$/; //  p.publish = function(channel, payload) { let query = `NOTIFY ${quot(channel)}`; if (payload !== null && payload !== undefined) { //    -    let str = typeof payload == 'string' ? PAYLOAD_FL_STR + payload : PAYLOAD_FL_OBJ + JSON.stringify(payload); if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) { //   base64- const b64 = Buffer.from(str).toString('base64'); for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) { let fin = pos + PAYLOAD_SZ_DATA; let seg = fin >= len ? PAYLOAD_FL_FIN + b64.slice(pos) : PAYLOAD_FL_SEQ + b64.slice(pos, fin); this.connection.query(`${query}, '${seg}'`); } } else { //        ? //     str = str.replace(/'/g, "''"); this.connection.query(`${query}, '${str}'`); } } else { //       this.connection.query(query); } return this; }; //    p._onmessage = function(msg) { const {processId, channel, payload} = msg; //  "" if (processId == this.connection.processID && this.skipSelf) { return; } // ""  const id = `${processId}:${channel}`; let rv; //   let fl = payload.charAt(0); if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) { // base64 const str = payload.slice(PAYLOAD_SZ_HEAD); const slices = this.slices; let b64; if (fl == PAYLOAD_FL_FIN) { //   if (slices[id]) { slices[id].push(str); b64 = slices[id].join(''); delete slices[id]; } else { b64 = str; } } else { //     if (slices[id]) { slices[id].push(str); } else { slices[id] = [str]; } } if (b64) { rv = Buffer.from(b64, 'base64').toString(); fl = rv.charAt(0); } } else { //  / rv = payload; } if (rv !== undefined) { //   '' let res = { processId , channel }; if (rv) { //       let data = rv.slice(1); res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data; } this.emit(channel, res); } }; 

一些测试
 const pg = require('pg'); const pgsql = new pg.Client({ host : 'example-db' , port : 5432 , user : 'postgres' , password : 'postgres' , database : '_tmp' }); pgsql.connect(err => { let psA = new PubSub(pgsql, 1000); let psB = new PubSub(pgsql, 1000); let chA = 'channel:A'; let chB = 'channel:B'; psA.subscribe(chA); psB.subscribe(chB); psA.on(chA, (msg) => { console.log('A:rcv', msg); }); psB.on(chB, (msg) => { console.log('B:rcv', msg); }); psB.publish(chA); psB.publish(chA, 'simple string'); psB.publish(chA, '  '); psB.publish(chA, {a : 1}); psA.publish(chB, '   100  '.repeat(100)); }); 

一切都很简单,因此您可以轻松地在项目中使用的任何其他PL上实现它,以使用异步通知的基础为例:

Source: https://habr.com/ru/post/zh-CN484978/


All Articles