
Redis Stream-Redis在5.0版中引入的一种新的抽象数据类型
从概念上讲,Redis Stream是一个列表,您可以在其中添加条目。 每个条目都有一个唯一的标识符。 默认情况下,标识符是自动生成的,并且包含时间戳。 因此,由于Unix tail -f命令读取日志文件并冻结对新数据的预期,因此您可以按时间请求记录范围或在流中接收新数据。 请注意,多个客户端可以同时收听流,因为许多“ tail -f”进程可以同时读取文件,而不会彼此冲突。
为了了解新数据类型的所有优点,让我们简要回顾一下早已存在的Redis结构,该结构部分重复了Redis Stream的功能。
历史游览
Redis pub / sub
Redis Pub / Sub是已经内置在键值存储中的简单消息系统。 但是,为简单起见,您必须支付:
- 如果发布者由于任何原因失败,那么他将失去所有订阅者
- 发布者需要知道所有订阅者的确切地址。
- 如果数据发布的速度快于发布者的发布速度,则发布者可能会使其订阅者超负荷
- 该消息在发布后立即从发布者的缓冲区中删除,无论它传递了多少订阅者以及他们如何快速处理此消息。
- 所有订户将同时收到消息。 订户本身必须以某种方式就如何处理同一消息达成共识。
- 没有内置机制来确认订户对消息的成功处理。 如果订阅者收到消息并在处理过程中掉线,则发布者将不知道该消息。
Redis列表
Redis列表是一种支持锁定读取命令的数据结构。 您可以从列表的开头或结尾添加和阅读消息。 基于此结构,您可以为分布式系统创建一个良好的堆栈或队列,并且在大多数情况下这已足够。 与Redis Pub / Sub的主要区别:
- 邮件传递给一个客户端。 被读取阻止的第一个客户端将首先接收数据。
- Clint必须为每个消息启动读取操作。 列表对客户一无所知。
- 消息将被存储,直到有人计数或明确删除它们为止。 如果将Redis服务器设置为将数据刷新到磁盘,则系统的可靠性将大大提高。
流介绍
将记录添加到流
XADD命令将新记录添加到流中。 一条记录不仅仅是一个字符串,它还包含一个或多个键值对。 因此,每个记录已经结构化,并且类似于CSV文件的结构。
> XADD mystream * sensor-id 1234 temperature 19.8 1518951480106-0
在上面的示例中,我们向流中添加了两个字段,其名称(键)为“ mystream”:“ sensor-id”和“ temperature”,其值分别为“ 1234”和“ 19.8”。 作为第二个参数,该命令接受将分配给记录的标识符-该标识符唯一地标识流中的每个记录。 但是,在这种情况下,我们传递了*,因为我们希望Redis为我们生成一个新的标识符。 每个新的标识符将增加。 因此,每个新记录将具有相对于先前记录更大的标识符。
ID格式
XADD命令返回的记录标识符由两部分组成:
{millisecondsTime}-{sequenceNumber}
millisecondsTime -Unix时间(以毫秒为单位)(Redis服务器时间)。 但是,如果当前时间等于或小于前一个记录的时间,则使用前一个记录的时间戳。 因此,如果服务器时间返回过去,则新标识符仍将保留增加属性。
sequenceNumber用于在同一毫秒内创建的记录。
sequenceNumber将相对于前一个记录增加1。 由于
sequenceNumber的大小为64位,因此实际上,不应限制在一毫秒内可以生成的记录数。
乍一看,此类标识符的格式可能看起来很奇怪。 一个难以置信的读者可能想知道为什么时间是标识符的一部分。 原因是Redis流通过标识符支持范围请求。 由于标识符与创建记录的时间相关联,因此可以请求时间范围。 当我们继续研究
XRANGE命令时,我们将看一个具体的例子。
如果用户出于某种原因需要指定自己的标识符,例如与某个外部系统相关联的标识符,那么我们可以将其传递给
XADD命令而不是*符号,如下所示:
> XADD somestream 0-1 field value 0-1 > XADD somestream 0-2 foo bar 0-2
请注意,在这种情况下,您必须自己监视标识符的增加。 在我们的示例中,最小标识符为“ 0-1”,因此团队将不接受其他等于或小于“ 0-1”的标识符。
> XADD somestream 0-1 foo bar (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
流中的记录数
您只需使用
XLEN命令就可以获取流中的记录数。 对于我们的示例,此命令将返回以下值:
> XLEN somestream (integer) 2
范围请求-XRANGE和XREVRANGE
要请求某个范围的数据,我们需要指定两个标识符-范围的开始和结束。 返回的范围将包括所有元素,包括边界。 还有两个特殊标识符“-”和“ +”,分别表示流中的最小(第一条记录)和最大(最后一条记录)标识符。 下面的示例将显示所有流条目。
> XRANGE mystream - + 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 2) 1) 1518951482479-0 2) 1) "sensor-id" 2) "9999" 3) "temperature" 4) "18.2"
每个返回的记录都是由两个元素组成的数组:标识符和键值对列表。 我们已经说过,记录标识符与时间有关。 因此,我们可以请求特定时间段的范围。 但是,我们可以在请求中指定完整的标识符,而不是完整的标识符,而只指定Unix时间,省略与
sequenceNumber相关的部分。 标识符的省略部分在范围的开头自动等于零,在范围的结尾自动等于最大值。 以下是如何请求两个毫秒范围的示例。
> XRANGE mystream 1518951480106 1518951480107 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8"
在此范围内,我们只有一条记录,但是在实际数据集中,返回的结果可能非常庞大。 因此,
XRANGE支持COUNT选项。 通过指定数量,我们可以简单地获取前N条记录。 如果需要获取下N个条目(分页),则可以使用接收到的最后一个标识符,将其
sequenceNumber增加一个,然后再次请求。 让我们在以下示例中对此进行研究。 我们开始使用
XADD添加10个元素(假设mystream流已经填充了10个元素)。 为了开始迭代,每个命令获取2个元素,我们从整个范围开始,但COUNT等于2。
> XRANGE mystream - + COUNT 2 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2"
要继续使用以下两个元素进行迭代,我们需要选择接收到的最后一个标识符1519073279157-0,并向
sequenceNumber添加1。
所得的标识符(在这种情况下为1519073279157-1)现在可以用作下一个
XRANGE调用范围开头的新参数:
> XRANGE mystream 1519073279157-1 + COUNT 2 1) 1) 1519073280281-0 2) 1) "foo" 2) "value_3" 2) 1) 1519073281432-0 2) 1) "foo" 2) "value_4"
依此类推。 由于要搜索的
XRANGE的复杂度是O(log(N)),然后是O(M)返回M个元素,因此每个迭代步骤都很快。 因此,使用
XRANGE,可以有效地
迭代流。
XREVRANGE命令与
XRANGE等效,但以相反的顺序返回元素:
> XREVRANGE mystream + - COUNT 1 1) 1) 1519073287312-0 2) 1) "foo" 2) "value_10"
请注意,
XREVRANGE命令以相反的顺序获取开始和停止范围的参数。
使用XREAD读取新记录
通常,有一个任务要订阅流并仅接收新消息。 这个概念可能看起来像是Redis发布/订阅或阻止Redis列表,但是在使用Redis Stream方面存在根本差异:
- 默认情况下,每条新消息都会传递给每个订户。 此行为与阻止Redis列表不同,在Redis列表中,只有一个订阅者才能读取新消息。
- 在Redis Pub / Sub中,所有消息都将被遗忘且从不保存,而在Stream中,所有消息将无限期存储(除非客户端明确要求删除)。
- Redis流允许您区分对一个流中消息的访问。 特定的订户只能看到他的个人消息历史记录。
您可以使用
XREAD命令订阅流并接收新消息。 这比
XRANGE复杂一点,因此我们将首先从更简单的示例开始。
> XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2"
在上面的示例中,
指定了非阻塞
XREAD形式。 请注意,COUNT选项是可选的。 实际上,唯一需要的命令选项是STREAMS选项,它设置流列表以及相应的最大标识符。 我们写了“ STREAMS mystream 0”-我们想获取标识符大于“ 0-0”的mystream流的所有记录。 从示例中可以看到,该命令返回流的名称,因为我们可以同时预订多个流。 我们可以编写例如“ STREAMS mystream otherstream 0 0”。 请注意,在STREAMS选项之后,我们首先需要提供所有必要流的名称,然后才提供标识符列表。
与
XRANGE相比,这种简单形式的命令没有什么特别的
地方 。 但是,有趣的是,通过指定BLOCK参数,我们可以轻松地将
XREAD变成阻塞命令:
> XREAD BLOCK 0 STREAMS mystream $
在上面的示例中,新的BLOCK选项指定了0毫秒的超时(这意味着无尽的等待)。 此外,没有传递mystream流的常规标识符,而是传递了特殊标识符$。 这个特殊的标识符意味着
XREAD应该使用mystream流中的最大标识符作为标识符。 因此,从我们开始收听的那一刻开始,我们将只收到新消息。 在某种程度上,这类似于Unix tail -f命令。
请注意,使用BLOCK选项时,我们不需要使用特殊标识符$。 我们可以使用流中存在的任何标识符。 如果团队可以立即满足我们的要求,而不会阻止,那么它将这样做,否则将被阻止。
阻止
XREAD还可以一次侦听多个流,只需要指定它们的名称即可。 在这种情况下,该命令将返回数据到达的第一个流的记录。 为此流阻塞的第一个订户将首先接收数据。
消费群体
在某些任务中,我们希望区分订户对同一线程内消息的访问。 一个有用的示例是一个消息队列,其中的工作人员将从流中接收不同的消息,从而使您可以扩展消息处理。
如果我们假设我们有三个订户C1,C2,C3和一个包含消息1、2、3、4、5、6、7的流,那么将发生消息服务,如下图所示:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
为了获得这种效果,Redis Stream使用了一个称为消费者组的概念。 此概念类似于从流接收数据的伪订户,但实际上由组中的多个订户提供服务,从而提供了某些保证:
- 每个消息都传递给组内的不同订户。
- 在组中,订户由名称标识,名称是区分大小写的字符串。 如果某个订户暂时退出组,则可以使用自己的唯一名称将其还原到组中。
- 每个消费者组都遵循“第一条未读消息”的概念。 当订户请求新消息时,他只能接收从未传送给组内任何订户的消息。
- 有一个命令可以明确确认订户对消息的成功处理。 在调用此命令之前,请求的消息将保持“待处理”状态。
- 在消费者组内,每个订户可以请求已发送给他但尚未处理的消息的历史记录(处于“待处理”状态)
从某种意义上说,组的状态可以表示为:
+----------------------------------------+ | consumer_group_name: mygroup | consumer_group_stream: somekey | last_delivered_id: 1292309234234-92 | | consumers: | "consumer-1" with pending messages | 1292309234234-4 | 1292309234232-8 | "consumer-42" with pending messages | ... (and so forth) +----------------------------------------+
现在是时候了解消费者小组的主要团队了:
- XGROUP用于创建,销毁和管理组。
- XREADGROUP用于通过组读取流。
- XACK-此命令允许订户将消息标记为已成功处理
成立消费群
假设mystream流已经存在。 然后,组创建命令将如下所示:
> XGROUP CREATE mystream mygroup $
OK
创建群组时,我们必须传递一个标识符,群组将从该标识符开始接收消息。 如果我们只想接收所有新消息,则可以使用特殊标识符$(如上面的示例所示)。 如果您指定0而不是特殊标识符,那么该流中的所有消息都可用于该组。
现在已经创建了组,我们可以立即使用
XREADGROUP命令开始阅读消息。 该命令与
XREAD非常相似,并支持可选的BLOCK选项。 但是,有一个强制性的GROUP选项,必须始终使用两个参数来指定该选项:组名和订户名。 也支持COUNT选项。
在阅读流之前,让我们在其中放置一些消息:
> XADD mystream * message apple 1526569495631-0 > XADD mystream * message orange 1526569498055-0 > XADD mystream * message strawberry 1526569506935-0 > XADD mystream * message apricot 1526569535168-0 > XADD mystream * message banana 1526569544280-0
现在,让我们尝试通过组来阅读此流:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
上面的命令逐字读取如下:
“我,爱丽丝的订户,是mygroup的成员,我想从mystream中读取一条从未传递给任何人的消息。”
订户每次对一个群组执行操作时,都必须指出自己的名字,以便在该群组中唯一地标识自己。 上面的命令中还有一个非常重要的细节-特殊标识符“>”。 这个特殊的标识符过滤消息,仅留下到目前为止尚未传递的消息。
另外,在特殊情况下,您可以指定一个实数标识符,例如0或任何其他有效标识符。 在这种情况下,
XREADGROUP命令将向您返回状态为“待处理”的消息的历史记录,这些消息的历史记录已传递给指定的订户(Alice),但尚未使用
XACK命令进行确认。
我们可以通过立即指定标识符0而不使用
COUNT选项来验证此行为。 我们只看到唯一待处理的消息,即带有苹果的消息:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
但是,如果我们确认该消息已成功处理,它将不再显示:
> XACK mystream mygroup 1526569495631-0 (integer) 1 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) (empty list or set)
现在轮到鲍勃阅读一些东西:
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 2) 1) 1526569506935-0 2) 1) "message" 2) "strawberry"
鲍勃(mygroup的成员)要求的信息不超过两条。 由于特殊标识符“>”,该命令仅报告未送达的邮件。 如您所见,“ apple”消息没有显示,因为它已经传递给了Alice,所以Bob收到了“ orange”和“ strawberry”。
因此,Alice,Bob和任何其他组订户可以从同一流中读取不同的消息。 他们还可以阅读其原始消息历史记录或将消息标记为已处理。
有几件事要牢记:
- 一旦订户认为该消息是XREADGROUP命令,该消息便进入“挂起”状态并分配给该特定订户。 其他群组订户将无法阅读此消息。
- 订阅者会在第一次提及时自动创建,因此无需明确创建。
- 使用XREADGROUP,您可以同时从多个不同的流中读取消息,但是,要使其正常工作,必须首先使用XGROUP为每个流创建具有相同名称的组。
崩溃恢复
订户可以从故障中恢复并重新阅读状态为“待处理”的消息列表。 但是,在现实世界中,订户最终可能会失败。 如果订户失败后仍无法恢复,那么他们的悬空消息会如何处理?
消费者组提供了专门用于此类情况的功能-当您需要更改消息的所有者时。
首先,您需要调用
XPENDING命令,该命令显示该组中所有消息的状态为“待处理”。 以最简单的形式,仅使用两个参数调用命令:流的名称和组的名称:
> XPENDING mystream mygroup 1) (integer) 2 2) 1526569498055-0 3) 1526569506935-0 4) 1) 1) "Bob" 2) "2"
团队打印了整个组和每个订户的未处理消息数。 我们只有Bob带有两条未处理的消息,因为Alice请求的唯一消息是通过
XACK确认的。
我们可以使用更多参数来请求其他信息:
XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id}-标识符范围(可以使用“-”和“ +”)
{count}-投放尝试次数
{consumer-name}-组名
> XPENDING mystream mygroup - + 10 1) 1) 1526569498055-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 2) 1) 1526569506935-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1
现在,我们有每条消息的详细信息:标识符,订户名称,停机时间(以毫秒为单位)以及最后的传递尝试次数。 我们收到了来自Bob的两条消息,它们闲置了74170458毫秒,大约20个小时。
请注意,没有人阻止我们仅使用
XRANGE来检查消息的内容。
> XRANGE mystream 1526569498055-0 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange"
我们只需要在参数中重复两次相同的标识符即可。 现在我们有了一些想法,爱丽丝可以决定鲍勃在闲置20个小时后可能无法恢复,现在是时候请求这些消息并继续处理它们而不是鲍勃了。 为此,我们使用
XCLAIM命令:
XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}
使用此命令,我们可以通过将所有者更改为{consumer}来获取尚未处理的“外来”消息。 但是,我们也可以提供最少的停机时间{min-idle-time}。 这有助于避免两个客户端尝试同时更改同一消息的所有者的情况:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
第一位客户将重置停机时间,并增加交付数量的计数。 因此,第二个客户端将无法请求它。
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange"
该消息已由爱丽丝成功声明,爱丽丝现在可以处理该消息并确认。
从上面的示例中可以明显看出,成功执行请求将返回消息本身的内容。 但是,这不是必需的。 JUSTID选项可用于仅返回消息标识符。 如果您对消息的详细信息不感兴趣并想提高系统性能,这将很有用。
送货柜台
您在
XPENDING输出中观察到的计数器是每封邮件的传递数量。 这样的计数器以两种方式递增:通过
XCLAIM成功请求消息时或使用
XREADGROUP调用时。
某些消息多次发送是正常的。 最主要的是,结果是所有消息都得到处理。 有时,在处理消息时,由于损坏消息本身或处理消息而导致的问题会导致处理程序代码中的错误。
在这种情况下,可能没有人能够处理此消息。由于我们有一个交付尝试计数器,因此我们可以使用此计数器来检测此类情况。因此,一旦传递计数器达到您指定的数量,则将这样的消息放在另一个流中并向系统管理员发送通知可能更合理。线程状态
XINFO命令用于请求有关流及其组的各种信息。例如,命令的基本形式如下: > XINFO STREAM mystream 1) length 2) (integer) 13 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 9) first-entry 10) 1) 1524494395530-0 2) 1) "a" 2) "1" 3) "b" 4) "2" 11) last-entry 12) 1) 1526569544280-0 2) 1) "message" 2) "banana"
上面的命令显示有关指定流的常规信息。现在是一个稍微复杂一点的示例: > XINFO GROUPS mystream 1) 1) name 2) "mygroup" 3) consumers 4) (integer) 2 5) pending 6) (integer) 2 2) 1) name 2) "some-other-group" 3) consumers 4) (integer) 1 5) pending 6) (integer) 0
上面的命令显示指定流的所有组的常规信息 > XINFO CONSUMERS mystream mygroup 1) 1) name 2) "Alice" 3) pending 4) (integer) 1 5) idle 6) (integer) 9104628 2) 1) name 2) "Bob" 3) pending 4) (integer) 1 5) idle 6) (integer) 83841983
上面的命令显示有关指定流和组的所有订户的信息。如果您忘记了命令语法,只需联系命令以获取帮助: > XINFO HELP 1) XINFO {subcommand} arg arg ... arg. Subcommands are: 2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}. 3) GROUPS {key} -- Show the stream consumer groups. 4) STREAM {key} -- Show information about the stream. 5) HELP -- Print this help.
流大小限制
许多应用程序不想永远将数据收集到流中。在流中拥有最大数量的消息通常很有用。在其他情况下,当达到指定的流大小时,将所有消息从流传输到另一个持久性存储很有用。您可以使用XADD命令中的MAXLEN参数来限制流的大小: > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3"
使用MAXLEN时,达到指定的长度时,旧记录将自动删除,因此流的大小是恒定的。但是,在这种情况下,修整不会以最有效的方式在Redis内存中发生。可以如下改善这种情况:上面示例中的参数〜表示我们不需要将流的长度限制为特定值。在我们的示例中,它可以是大于或等于1000的任何数字(例如1000、1010或1030)。我们只是明确表示希望我们的流至少存储1000条记录。这使得在Redis内部使用内存的工作效率更高。还有一个单独的XTRIM命令可以执行相同的操作:XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
> XTRIM mystream MAXLEN 10
> XTRIM mystream MAXLEN ~ 10
永久性存储和复制
Redis Stream异步复制到从节点,并保存到AOF(所有数据的快照)和RDB(所有写操作的日志)之类的文件中。还支持使用者组状态复制。因此,如果消息在主节点上处于“挂起”状态,则在从节点上此消息将具有相同的状态。从流中删除单个项目
要删除消息,有一个特殊的XDEL命令。该命令获取流的名称,后跟需要删除的消息的标识符: > XRANGE mystream - + COUNT 2 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" > XDEL mystream 1526654999635-0 (integer) 1 > XRANGE mystream - + COUNT 2 1) 1) 1526655000369-0 2) 1) "value" 2) "3"
使用此命令时,您需要考虑到实际上不会立即释放内存。零长度流
流与其他Redis数据结构之间的区别在于,当其他数据结构自身中不再包含元素时,副作用是,该数据结构本身将从内存中删除。因此,例如,当ZREM调用删除最后一个项目时,已排序的集合将被完全删除。取而代之的是,允许线程保留在内存中,甚至不包含单个元素。结论
Redis Stream是创建消息代理,消息队列,统一日志和存储历史记录的聊天系统的理想选择。正如Nicklaus Wirth所说,程序是算法加上数据结构,Redis已经为您提供了两者。