Stream 是 Redis 5.0 引入的一种新数据类型,它以更抽象的方式模拟了日志数据结构,但日志的本质仍然存在:就像一个只读追加模式打开的文件,Redis Streams 主要是一个只读追加的数据结构。至少从概念上讲,因为 Redis Streams 是一种内存中的抽象数据类型,它们实现了更强大的操作,以克服日志文件本身的限制。
尽管 Redis Streams 的数据结构本身相对简单,使其成为 Redis 最复杂的数据类型之一的原因在于它实现了额外的、非强制性的功能:一系列阻塞操作,允许消费者等待生产者添加到 Stream 中的新数据,此外还有称为消费者组的概念。
消费者组最初是由流行的消息系统 Kafka(TM)引入的。Redis 以完全不同的方式重新实现了类似的想法,但目标是一样的:允许一组客户端合作消费同一消息流的不同部分。
为了理解 Redis 流是什么以及如何使用它们,我们将忽略所有高级功能,而是专注于数据结构本身,以及用于操作和访问它的命令。这基本上是与列表、集合、有序集合等其他大多数 Redis 数据类型共通的部分。然而,请注意,列表还具有一个可选的更复杂的阻塞 API,由 BLPOP 等命令导出。因此,从这个角度来看,流与列表并没有太大不同,只是额外的 API 更复杂且更强大。
由于流是一种只追加的数据结构,基本的写入命令 XADD 会将新的条目追加到指定的流中。流条目不仅仅是一个字符串,而是由一个或多个字段-值对组成。这样,流中的每个条目都是结构化的,就像以 CSV 格式写入的只追加文件,其中每行包含多个分隔的字段。
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
上述对 XADD 命令的调用将在键mystream的流中添加一个条目sensor-id: 1234, temperature: 19.8,使用一个自动生成的条目 ID,该 ID 是命令返回的,具体为1518951480106-0。它将第一个参数设置为键名mystream,第二个参数是用于标识流中每个条目的条目 ID。
然而,在这种情况下,我们传递了*,因为我们希望服务器为我们生成一个新的 ID。每个新 ID 都会单调递增,也就是说,每次添加的新条目将具有比所有过去条目更高的 ID。服务器自动生成 ID 几乎总是你想要的,明确指定 ID 的情况非常罕见。我们稍后会更多地讨论这一点。流条目具有 ID 的事实与日志文件相似,在日志文件中,可以使用行号或文件内的字节偏移来标识特定条目。回到我们的 XADD 示例,在键名和 ID 之后,接下来的参数是组成我们流条目的字段-值对。
可以仅使用 XLEN 命令获取流中的条目数量:
> XLEN mystream
(integer) 1
XADD 命令返回的条目 ID 由两部分组成,用于唯一标识给定流中的每个条目
<millisecondsTime>-<sequenceNumber>
时间毫秒部分实际上是生成流 ID 的本地 Redis 节点的本地时间。然而,如果当前毫秒时间比前一个条目时间小,那么前一个条目时间将被使用,因此即使时钟倒退,ID 的单调递增属性仍然成立。序列号用于在同一毫秒创建的条目。由于序列号是 64 位宽的,实际上在同一毫秒内生成的条目数量没有限制。
这样的 ID 格式乍一看可能显得有些奇怪,温柔的读者可能会好奇为什么时间会包含在 ID 中。原因是 Redis 流支持基于 ID 的范围查询。因为 ID 与条目生成的时间相关,这使得几乎免费地查询时间范围成为可能。我们将在覆盖 XRANGE 命令时看到这一点。
如果用户出于某些原因需要增量 ID,这些 ID 与时间无关,而是与另一个外部系统 ID 相关,就像之前已经观察到的那样,XADD 命令可以接受一个显式的 ID,而不是使用*通配符 ID 来触发自动生成,如下例所示:
> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2
请注意,在这种情况下,最小 ID 为 0-1,并且命令不会接受等于或小于之前 ID 的 ID:
> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
现在我们终于可以通过 XADD 向流中追加条目了。然而,虽然向流中追加数据是显而易见的,但如何查询流以提取数据却不是那么显而易见。如果我们继续使用日志文件的类比,一种显而易见的方法是模仿我们通常使用 Unix 命令tail -f的做法,即开始监听以获取追加到流中的新消息。
请注意,与 Redis 的阻塞列表操作不同,在 BLPOP 这种弹出风格的操作中,给定的元素只会到达一个阻塞的客户端,而在流中,我们希望多个消费者能够看到追加到流中的新消息,就像多个tail -f进程能够看到添加到日志中的内容一样。使用传统的术语,我们希望流能够将消息广播给多个客户端。
然而,这只是可能的访问模式之一。我们也可以以完全不同的方式看待一个流:不将其视为消息系统,而是视为时间序列存储。在这种情况下,也许将新消息追加进来也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标逐个检查历史记录。这肯定也是一种有用的访问模式。
最后,如果我们从消费者的视角来看一个流,我们可能还想以另一种方式访问这个流,即将其视为可以被多个消费者分割的消息流,这些消费者处理这些消息,这样不同的消费者组只能看到单个流中到达的一小部分消息。这样,可以在不同的消费者之间扩展消息处理,而无需单个消费者处理所有消息:每个消费者只需处理不同的消息。这基本上就是 Kafka(TM)通过消费者组实现的方式。通过消费者组读取消息是另一种从 Redis 流中读取的有趣模式。
Redis 流支持上述描述的三种查询模式,通过不同的命令实现。接下来的部分将展示所有这些模式,从最简单且更直接使用的范围查询开始。
要按范围查询流,我们只需要指定两个 ID:start 和 end。返回的范围将包括具有 start 或 end 作为 ID 的元素,因此范围是包含的。两个特殊的 ID-和+分别表示可能的最小 ID 和最大 ID。
> XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
每个返回的条目是一个包含两个项目的数组:ID 和字段值对列表。我们已经说过,条目的 ID 与时间有关,因为 ID 中-字符左侧的部分是创建流条目时本地节点的毫秒级 Unix 时间戳(然而请注意,流是通过完全指定的 XADD 命令进行复制的,因此副本将具有与主节点相同的 ID)。这意味着我可以使用 XRANGE 查询时间范围。
然而,为了做到这一点,我可能希望省略 ID 的序列部分:如果省略,范围的起始部分将假设为 0,而结束部分将假设为可用的最大序列号。这样,使用仅仅两个毫秒级的 Unix 时间戳进行查询,我们就可以获取该时间范围内的所有条目,以包括的方式。
例如,如果我想查询一个两毫秒的时期,可以使用:
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
在这个范围内我只有一个条目,但在实际数据集中,我可以查询小时范围,或者在仅仅两毫秒内就有许多条目,返回的结果可能非常大。因此,XRANGE 支持在末尾添加一个可选的 COUNT 选项。通过指定一个 count,我可以只获取前 N 个条目。
如果我想获取更多,可以使用返回的最后一个 ID,将序列部分加一,然后再次查询。让我们在以下示例中看看这个。我们开始使用 XADD 添加 10 个条目(我将不展示这部分,假设流mystream已经填充了 10 个条目)。为了开始我的迭代,每次命令获取 2 个条目,我从整个范围开始,但 count 设置为 2。
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
为了继续迭代下一个两个项目,我必须选择上次返回的最后一个 ID,即1519073279157-0,并在 ID 的序列号部分加 1。请注意,序列号是 64 位的,因此无需检查溢出。在这种情况下,结果 ID1519073279157-1现在可以作为下一个 XRANGE 调用的新起始参数使用:
> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
2) 1) "foo"
2) "value_3"
2) 1) 1519073281432-0
2) 1) "foo"
2) "value_4"
并且依次类推。由于 XRANGE 的复杂度为 O(log(N))以查找,然后 O(M)以返回 M 个元素,当元素数量较少时,该命令具有对数时间复杂度,这意味着迭代的每一步都很快速。因此,XRANGE 也是实际上的流迭代器,不需要使用 XSCAN 命令。
XREVRANGE 命令与 XRANGE 类似,但返回的元素顺序相反,因此 XREVRANGE 的实际用途之一是检查 Stream 中的最后一个元素:
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
2) 1) "foo"
2) "value_10"
“请注意,
XREVRANGE命令的start和stop参数顺序是相反的。
当不想通过范围访问流中的项时,通常我们希望订阅新项到达流的通知。这个概念可能与 Redis 的 Pub/Sub 类似,你订阅一个频道,或者与 Redis 的阻塞列表类似,你等待一个键获取新元素来获取,但处理流的方式存在根本差异:
用于监听新消息到达流中的命令称为 XREAD。它比 XRANGE 稍微复杂一些,所以我们先展示简单的形式,之后再提供整个命令的布局。
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
上述是 XREAD 的非阻塞形式。需要注意的是,COUNT 选项并不是必需的,实际上,该命令唯一的必需选项是 STREAMS 选项,它指定了一组键以及每个流由调用的消费者已经看到的最大 ID,因此该命令只会将客户端提供给 ID 大于我们指定的那些消息。
在上述命令中,我们写了STREAMS mystream 0,所以我们想要获取 Streammystream中所有 ID 大于0-0的消息。如上面的示例所示,该命令会返回键名,因为实际上可以用这个命令读取多个键对应的不同 Stream。例如,我可以写:STREAMS mystream otherstream 0 0。请注意,在 STREAMS 选项之后,我们需要提供键名,之后再提供 ID。因此,STREAMS 选项必须始终是最后一个。
除了 XREAD 可以同时访问多个流,以及我们可以指定最后一个我们拥有的 ID 以仅获取更新的消息外,以这种简单形式该命令与 XRANGE 并没有做太多不同的事情。然而,有趣的部分在于,通过指定 BLOCK 参数,我们可以轻松地将 XREAD 转换为阻塞命令:
> XREAD BLOCK 0 STREAMS mystream $
“请注意,在上述示例中,除了移除 COUNT 之外,我还指定了新的 BLOCK 选项,并设置了超时时间为 0 毫秒(这意味着永不超时)。此外,我传递了一个特殊的 ID
$而不是普通的流 IDmystream。这个特殊的 ID 表示 XREAD 应该使用流mystream中已存储的最大 ID 作为最后一个 ID,这样我们只会收到新消息,从我们开始监听的时间开始。这在某种程度上类似于 Unix 命令tail -f。
注意,当使用 BLOCK 选项时,我们不需要使用特殊的 ID。我们可以使用任何有效的 ID。如果命令能够立即满足我们的请求而不需要阻塞,它将这样做,否则它将阻塞。通常,如果我们想从新条目开始消费流,我们将从 ID开始,然后在之后使用上次接收的消息的 ID 来进行下一次调用,依此类推。
XREAD 的阻塞形式也可以监听多个 Streams,只需指定多个键名即可。如果请求可以同步处理,因为至少有一个 Stream 中的元素大于我们指定的相应 ID,那么它将返回结果。否则,该命令将阻塞,并返回第一个有新数据的 Stream 的项目(根据指定的 ID)。
与阻塞列表操作类似,阻塞流读取从等待数据的客户端角度来看是公平的,因为语义是 FIFO 风格。当有新项可用时,最先阻塞等待给定流的客户端将最先被解除阻塞。
XREAD 没有其他选项,只有 COUNT 和 BLOCK,因此它是一个非常基础的命令,主要用于将消费者附加到一个或多个流中。然而,使用消费者组 API 可以提供更多强大的消费流的功能,不过通过消费者组进行读取是通过一个不同的命令 XREADGROUP 实现的,这部分内容将在本指南的下一节中介绍。
当任务是让不同的客户端消费同一个流时,XREAD 已经提供了一种将消息分发给 N 个客户端的方法,甚至还可以使用副本以提供更多的读取可扩展性。但在某些问题中,我们想要做的不仅仅是将同一个消息流提供给许多客户端,而是将同一个流中的不同子集消息提供给许多客户端。这种情况下一个明显的应用场景是处理速度较慢的消息:能够有 N 个不同的工人接收流的不同部分,允许我们通过将不同的消息路由到准备处理更多工作的工人来扩展消息处理能力。
在实际操作中,如果我们假设存在三个消费者 C1、C2、C3,以及一个包含消息 1、2、3、4、5、6、7 的流,那么我们希望的消息服务流程如下图所示:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
为了获得这种效果,Redis 使用了一个称为消费者组的概念。需要注意的是,从实现角度来看,Redis 消费者组与 Kafka(TM)消费者组没有任何关系,但它们在实现的概念上是相似的,因此我决定不改变术语,与最初推广这一概念的软件产品保持一致。
消费者组就像一个伪消费者,从流中获取数据,并实际上服务于多个消费者,提供一定的保证:
在某种意义上,消费者组可以被想象为关于一个流的一些状态:
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+
从这个角度来看,消费者组的功能非常简单易懂:它只是为消费者提供它们待处理的消息历史,并且当消费者请求新消息时,只会提供消息 ID 大于last_delivered_id的消息。同时,如果将消费者组视为 Redis 流的辅助数据结构,很明显一个流可以有多个消费者组,每个消费者组有不同的消费者。实际上,同一个流甚至可以通过 XREAD 直接读取,而不同的消费者组则通过 XREADGROUP 进行读取。
现在让我们放大来看一下基本的消费者组命令,如下所示:
XGROUP 用于创建、销毁和管理消费者组。XREADGROUP 用于通过消费者组从流中读取数据。XACK 是允许消费者标记待处理消息已正确处理的命令。假设我已经有一个名为mystream的 stream 类型键存在,为了创建一个消费者组,我只需要执行以下操作:
> XGROUP CREATE mystream mygroup $
OK
如上命令创建消费者组时,我们需要指定一个 ID,例如示例中的。这是必要的,因为消费者组除了其他状态外,还需要知道在第一个消费者连接时要提供哪个消息,也就是说,当组刚刚创建时,当前最后的消息 ID 是什么?如果我们像示例中那样指定,那么只有从现在起新到达的消息才会被组中的消费者消费。如果我们指定0,那么消费者组将从历史记录中消费所有消息。当然,你可以指定任何其他有效的 ID。你知道的是,消费者组将从你指定的 ID 之后的消息开始提供。因为意味着流中的当前最大 ID,指定的效果将是只消费新消息。
XGROUP CREATE也可以在不存在时自动创建流,使用可选的MKSTREAM子命令作为最后一个参数:
> XGROUP CREATE newstream mygroup $ MKSTREAM
OK
现在我们已经创建了消费者组,可以通过消费者组立即尝试读取消息,使用 XREADGROUP 命令。我们将从名为 Alice 和 Bob 的消费者读取消息,以查看系统将如何向 Alice 和 Bob 返回不同的消息。
XREADGROUP 与 XREAD 非常相似,提供了相同的 BLOCK 选项,否则它是一个同步命令。然而,必须始终指定一个强制选项,即 GROUP,并包含两个参数:消费者组的名称和尝试读取的消费者名称。COUNT 选项也受到支持,并且与 XREAD 中的一致。
在从流中读取之前,让我们往里面放入一些消息:
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0
“注意:这里“message”是字段名称,“fruit”是关联值,记住流项是小字典。
是时候使用消费者组来尝试读取一些内容了:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
XREADGROUP 回复与 XREAD 回复类似。然而,请注意上面提供的GROUP <group-name> <consumer-name>,它表示我想使用消费者组mygroup从流中读取,并且我是消费者Alice。每次消费者使用消费者组执行操作时,都必须唯一地指定该组中标识此消费者的名称。
在上面的命令行中还有一个非常重要的细节,在强制性的 STREAMS 选项之后,用于键mystream的 ID 是特殊的 ID>。这个特殊的 ID 仅在消费者组的上下文中有效,并且它的含义是:迄今为止从未被其他消费者接收的消息。
这几乎总是你想要的,然而你也可以指定一个实际的 ID,例如0或其他任何有效的 ID。在这种情况下,我们请求 XREADGROUP 只提供我们已挂起的消息的历史记录,因此在这种情况下,我们永远不会看到组中的新消息。所以基本上,XREADGROUP 会根据我们指定的 ID 表现出以下行为:
>,则命令将返回至今为止从未被其他消费者接收的新消息,并且作为副作用,将更新消费者组的最后 ID。我们可以立即通过指定 ID 为 0 来测试这种行为,而不使用 COUNT 选项:我们只会看到唯一的待处理消息,即关于苹果的那个消息:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
然而,如果我们确认消息已处理,它将不再包含在待处理消息历史中,因此系统将不再报告任何内容:
> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set)
如果你还不了解 XACK 的工作原理,没关系,这个概念就是已处理的消息将不再包含在我们可以访问的历史记录中。 现在轮到 Bob 读了:
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
2) 1) 1526569506935-0
2) 1) "message"
2) "strawberry"
Bob 请求最多两条消息,并且通过相同的组mygroup进行阅读。因此发生的情况是 Redis 只报告新消息。如你所见,“apple”消息没有被送达,因为它已经送达给了 Alice,所以 Bob 获取到了“orange”和“strawberry”,依次类推。
这样,Alice、Bob 以及其他组内的任何其他消费者都能够从同一个流中读取不同的消息,读取他们尚未处理的消息历史,或者标记消息为已处理。这允许创建不同的拓扑结构和语义来消费流中的消息。
需要注意一些事项:
XREADGROUP 是一个写命令,因为即使它从流中读取数据,消费者组也会作为读取的副作用被修改,因此它只能在主实例上调用。以下是一个使用消费者组编写的 Ruby 语言消费者实现示例。这段 Ruby 代码被编写得非常清晰,几乎任何其他语言的有经验的程序员都能读懂,即使他们不懂 Ruby:
require 'redis'
if ARGV.length == 0
puts "Please specify a consumer name"
exit 1
end
ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new
def process_message(id,msg)
puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end
$lastid = '0-0'
puts "Consumer #{ConsumerName} starting..."
check_backlog = true
whiletrue
# Pick the ID based on the iteration: the first time we want to
# read our pending messages, incase we crashed and are recovering.
# Once we consumed our history, we can start getting new messages.
if check_backlog
myid = $lastid
else
myid = '>'
end
items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)
if items == nil
puts "Timeout!"
next
end
# If we receive an empty reply, it means we were consuming our history
# and that the history is now empty. Let's start to consume new messages.
check_backlog = false if items[0][1].length == 0
items[0][1].each{|i|
id,fields = i
# Process the message
process_message(id,fields)
# Acknowledge the message as processed
r.xack(:my_stream_key,GroupName,id)
$lastid = id
}
end
如你所见,这里的思路是开始消费历史记录,也就是我们待处理的消息列表。这很有用,因为消费者可能之前已经崩溃了,所以在重启时,我们希望重新读取之前交付但未确认的消息。这样我们就可以多次处理一条消息或仅处理一次(至少在消费者故障的情况下是这样,但 Redis 的持久化和复制也有其限制,详见关于此主题的特定部分)。
一旦历史记录被消费完毕,我们得到一个空的消息列表,就可以切换使用>特殊 ID 来消费新消息了。
上述示例允许我们编写参与同一消费者组的消费者,每个消费者处理一部分消息,并在失败后通过重新读取仅交付给它们的待处理消息来恢复。然而,在现实世界中,消费者可能会永久失败且无法恢复。那么,因任何原因停止后从未恢复的消费者的待处理消息会发生什么?
Redis 消费者组提供了一个功能,用于声明给定消费者待处理的消息,从而使这些消息的所有权发生转移,并重新分配给不同的消费者。该功能非常明确:消费者需要检查待处理消息列表,并使用特殊命令声明特定的消息,否则服务器将永远将这些待处理消息分配给旧的消费者。这样一来,不同的应用程序可以选择是否使用此功能以及如何使用它。
这个过程的第一步只是一个提供消费者组中待处理条目可观察性的命令,这个命令叫做 XPENDING。这是一个只读命令,调用它是安全的,不会改变任何消息的所有权。最简单的形式下,这个命令只需要两个参数,即流的名字和消费者组的名字。
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
2) "2"
当以这种方式调用时,该命令仅输出消费者组中待处理的消息总数(在这种情况下为 2 条消息)、待处理消息中的最低和最高消息 ID,以及最后列出的消费者及其待处理的消息数。因为爱丽丝仅使用 XACK 确认了唯一一条消息,所以我们只有鲍勃,他有两条待处理的消息。
我们可以通过向 XPENDING 命令提供更多参数来获取更多详细信息,因为该命令的完整签名如下:
XPENDING <key> <groupname> [<start-id> <end-id> <count> [<consumer-name>]]
通过提供起始和结束 ID(可以是 XRANGE 中的-和+)以及一个计数来控制命令返回的信息量,我们可以更多地了解待处理的消息。可选的最终参数是消费者名称,如果我们要限制输出仅限于某个特定消费者待处理的消息,则会使用该参数,但在接下来的示例中我们将不使用此功能。
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
2) 1) 1526569506935-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
现在我们有了每条消息的详细信息:消息 ID、消费者名称、空闲时间(以毫秒为单位),即从上次消息被某个消费者接收以来过去了多少毫秒,最后是消息被接收的次数。我们有两条来自 Bob 的消息,它们的空闲时间约为 20 小时,共 74170458 毫秒。
请注意,我们完全可以使用 XRANGE 来检查第一条消息的内容。
> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
我们在 arguments 中只需要重复使用同一个 ID 两次。现在我们有了些想法,Alice 可能会决定,在 Bob 20 小时不处理消息后,他很可能无法及时恢复,是时候去声明这些消息并在 Bob 的位置上继续处理了。为此,我们使用 XCLAIM 命令。
这个命令在完整形式下非常复杂且有很多选项,因为它用于复制消费者组的变化,但我们只需要通常使用的参数。在这种情况下,我们只需这样调用它:
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
基本上我们说,对于这个特定的键和组,我希望指定的消息 ID 将变更所有权,并被分配给指定的消费者名称<consumer>。我们还提供了一个最小空闲时间,因此只有在提到的消息的空闲时间大于指定的空闲时间时,该操作才会生效。这很有用,因为也许有两个客户端同时尝试获取同一条消息:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
然而,声明一条消息会作为副作用重置其空闲时间,并增加其交付次数计数器,因此第二个客户端将无法声明该消息。这样可以避免简单的重复处理消息(尽管在一般情况下你无法获得恰好一次处理)。 这是命令执行的结果:
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
消息已被艾丽丝成功领取,她现在可以处理该消息并确认接收,即使原始消费者尚未恢复,也可以继续进行。
从上面的例子可以看出,在成功抢占某个消息之后,XCLAIM 命令还会返回该消息。但这并不是强制性的。可以使用 JUSTID 选项仅返回成功抢占的消息的 ID。这在你希望减少客户端和服务器之间的带宽使用量,同时降低命令的性能,并且你不关心该消息(因为你的消费者是通过定期重新扫描待处理消息的历史来实现的)时非常有用。
声明也可以通过一个单独的过程来实现:一个仅检查待处理消息列表,并将空闲消息分配给看起来活跃的消费者的进程。活跃的消费者可以通过 Redis 流的可观测性功能之一来获取。这将在下一节中讨论。
您在 XPENDING 输出中观察到的计数器是每条消息的交付次数。这种计数器通过两种方式增加:当通过 XCLAIM 成功认领一条消息时,或者使用 XREADGROUP 调用以访问待处理消息的历史记录时。
当出现故障时,消息被重复投递是正常的,最终它们通常会被处理。但是有时在处理某个特定消息时会遇到问题,因为该消息被损坏或以某种方式触发了处理代码中的 bug。在这种情况下,消费者会不断无法处理这个特定的消息。由于我们有投递尝试次数的计数器,可以使用该计数器来检测那些由于某些原因完全无法处理的消息。一旦投递计数器达到你选择的某个较大数值,将这些消息放入另一个流并通知系统管理员可能是更明智的做法。这基本上是 Redis 流实现死信队列概念的方式。
缺乏可观测性的消息系统非常难以使用。不知道谁在消费消息、哪些消息在等待、给定流中的活跃消费者组是什么,这一切都变得不透明。因此,Redis 流和消费者组有不同的方式来观察正在发生的事情。我们已经介绍了 XPENDING,它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和交付次数。
然而,我们可能还想做更多,XINFO 命令是一个可观测性接口,可以通过子命令来获取关于流或消费者组的信息。
此命令使用子命令来显示关于流及其消费者组的不同信息。例如,XINFO STREAM 报告关于流本身的信息。
> XINFO STREAM mystream
1) length
2) (integer) 13
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1526569495631-0
2) 1) "message"
2) "apple"
11) last-entry
12) 1) 1526569544280-0
2) 1) "message"
2) "banana"
输出显示了流的内部编码信息,并且还显示了流中的第一条和最后一条消息。还可以获取与此流值关联的消费者组数量。我们可以进一步查询更多关于消费者组的信息。
> XINFO GROUPS mystream
1) 1) name
2) "mygroup"
3) consumers
4) (integer) 2
5) pending
6) (integer) 2
7) last-delivered-id
8) "1588152489012-0"
2) 1) name
2) "some-other-group"
3) consumers
4) (integer) 1
5) pending
6) (integer) 0
7) last-delivered-id
8) "1588152498034-0"
如您在本输出和上一个输出中所见,XINFO 命令输出了一系列键值对。由于它是一个可观测性命令,这使得人类用户可以立即理解报告了哪些信息,并允许命令在未来通过添加更多字段来报告更多信息而不破坏与旧客户端的兼容性。而像 XPENDING 这样的必须更高效使用带宽的命令,则只是报告信息而不包含字段名称。
上述示例中使用 GROUPS 子命令的输出应该清晰地显示字段名称。我们可以通过检查此类组中注册的消费者来更详细地查看特定消费者组的状态。
> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 1
5) idle
6) (integer) 9104628
2) 1) name
2) "Bob"
3) pending
4) (integer) 1
5) idle
6) (integer) 83841983
如果您忘记了命令的语法,可以直接询问该命令本身获取帮助:
> XINFO HELP
1) XINFO <subcommand> arg arg ... arg. Subcommands are:
2) CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.
3) GROUPS <key> -- Show the stream consumer groups.
4) STREAM <key> -- Show information about the stream.
5) HELP -- Print this help.
Redis 流中的消费者组在某些方面可能类似于 Kafka(商标)基于分区的消费者组,但是请注意,Redis 流实际上是非常不同的。分区仅仅是逻辑上的,消息只是被放入一个单一的 Redis 键中,因此不同客户端的处理方式取决于哪个客户端准备好处理新消息,而不是客户端从哪个分区读取。例如,如果某个消费者 C3 在某个点永久失败,Redis 将继续为 C1 和 C2 服务所有新到达的消息,就好像现在只有两个逻辑分区一样。
同样地,如果某个消费者处理消息的速度远快于其他消费者,那么在相同的时间单位内,这个消费者将收到更多消息。这是因为 Redis 会显式地跟踪所有未确认的消息,并记住每个消费者收到了哪些消息以及第一个未交付的消息的 ID。
然而,这也意味着在 Redis 中,如果你想将同一个流中的消息拆分到多个 Redis 实例中,你必须使用多个键,并且需要使用某种分片系统,如 Redis Cluster 或其他特定应用的分片系统。单个 Redis 流不会自动拆分到多个实例中。
我们可以这么说:从架构上讲,以下内容是正确的:
所以基本上来说,Kafka 的分区更类似于使用 N 个不同的 Redis 键。而 Redis 消费者组则是一个服务器端的消息负载均衡系统,将给定流中的消息分发到 N 个不同的消费者。
许多应用程序并不希望永远将数据收集到流中。有时,希望流中最多包含一定数量的项目;其他时候,一旦达到某个大小,将数据从 Redis 移动到一个不是内存中且速度较慢但适合存储几十年历史的存储中会很有用。Redis 流对此有一些支持。其中一个就是 XADD 命令的 MAXLEN 选项。这个选项非常简单易用:
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
使用 MAXLEN 时,当达到指定长度时,旧条目会自动被移除,从而使流保持恒定大小。目前尚无选项可以让流仅保留不超过给定时间的项,因为这样的命令为了保持一致性,可能会在移除条目时潜在地阻塞很长时间。例如,假设发生了插入高峰,然后是一段长时间的暂停,之后又进行了插入,所有这些操作都具有相同的最大时间。流会阻塞以移除在暂停期间变得过旧的数据。
因此,用户需要进行一些规划并了解所需的最大流长度。此外,虽然流的长度与使用的内存成正比,但按时间进行修剪则更难控制和预测:这取决于插入率,这是一个随时间经常变化的变量(当它不变化时,仅按大小进行修剪就很简单)。
然而,使用 MAXLEN 截断可能会很昂贵:流是以宏节点的形式表示在.radix 树中,以非常节省内存。修改包含几十个元素的单个宏节点并不是最优的。因此,可以使用该命令的以下特殊形式:
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
~选项在 MAXLEN 选项和实际计数之间意味着,我并不真的需要这个正好是 1000 项。可以是 1000 或 1010 或 1030,只要至少保存 1000 项即可。有了这个选项,只有当我们能够移除一个完整的节点时才会进行修剪。这使得效率大大提高,通常这也是你想要的。
还有 XTRIM 命令,它执行的功能与上面的 MAXLEN 选项非常相似,唯一的区别是它可以独立运行:
> XTRIM mystream MAXLEN 10
或者,对于 XADD 选项:
> XTRIM mystream MAXLEN ~ 10
然而,XTRIM 设计为接受不同的修剪策略,即使目前只有 MAXLEN 实现。
由于 XTRIM 是一个显式命令,用户需要了解不同修剪策略的潜在缺点。因此,以后实现按时间修剪是有可能的。
XTRIM 可能稍后会学习另一种有用的淘汰策略,即根据 ID 范围进行淘汰,以便在需要将数据从 Redis 移动到其他存储系统时,可以更容易地使用 XRANGE 和 XTRIM。
您可能已经注意到,在 Redis API 中有几种特殊的 ID。这里简单回顾一下,以便将来更好地理解它们。
前两个特殊 ID 是-和+,并在 XRANGE 命令的范围查询中使用。这两个 ID 分别表示最小的 ID(基本上是0-1)和最大的 ID(基本上是18446744073709551615-18446744073709551615)。正如你所见,使用-和+来代替这些数字会更整洁。
然后有一些 API,我们想要获取流中最大 ID 的项的 ID。这就是的含义。例如,如果我只想使用 XREADGROUP 获取新的条目,我可以使用这样的 ID 来表示我已经有了所有现有的条目,但没有将来的新的条目。同样地,在创建或设置消费者组的 ID 时,我可以将最后一个已交付的项设置为,以便仅将新的条目交付给使用该组的消费者。
如你所见,并不等于+,它们是两回事。+是每个流中可能的最大 ID,而则是特定流中给定条目中的最大 ID。此外,API 通常只会理解+或
另一个特殊的 ID 是>,它仅与消费者组相关,并且仅在使用 XREADGROUP 命令时才有特殊含义。这种特殊的 ID 表示我们只想获取迄今为止其他消费者从未接收过的条目。换句话说,>ID 是消费者组中最后接收的 ID。
最后,特殊的 ID * 只能与 XADD 命令一起使用,表示为我们自动选择一个新条目的 ID。
所以我们有 -、+、$、> 和 *,每个都有不同的含义,并且大多数情况下可以在不同的上下文中使用。
流,和其他任何 Redis 数据结构一样,会异步复制到从节点,并持久化到 AOF 和 RDB 文件中。不过可能不太明显的是,消费者组的完整状态也会被传播到 AOF、RDB 和从节点,因此如果消息在主节点上处于等待状态,从节点也会有相同的信息。同样,在重启后,AOF 会恢复消费者组的状态。
然而请注意,Redis 流和消费者组是使用 Redis 的默认复制进行持久化和复制的,因此:
所以在使用 Redis 流和消费者组设计应用程序时,请确保理解应用程序在故障情况下的语义属性,并相应地进行配置,评估其是否适用于您的使用场景。
Streams 还有一个特殊命令,可以通过 ID 从中间删除流中的项。对于只读追加的数据结构来说,这可能看起来像一个奇怪的功能,但实际上对于涉及隐私法规等应用来说非常有用。
该命令称为 XDEL,只需要提供流的名称和要删除的 ID:
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
2) 1) "value"
2) "3"
然而,在当前的实现中,内存只有在宏节点完全为空时才会真正回收,因此你不应该滥用这个功能。
流与其他 Redis 数据结构的一个区别在于,当其他数据结构不再有元素时,作为调用移除元素命令的副作用,该键本身将会被移除。例如,当 ZREM 命令移除有序集合中的最后一个元素时,有序集合将被完全移除。而流则允许在使用 MAXLEN 选项且计数为零(XADD 和 XTRIM 命令)或调用了 XDEL 的情况下保持为空。
这种不对称性存在是因为 Streams 可能关联有消费者组,我们不希望因为 Stream 中没有项了就丢失消费者组定义的状态。目前即使没有关联的消费者组,Stream 也不会被删除,但这在未来可能会发生变化。
非阻塞流命令,如 XRANGE、XREAD 或 XREADGROUP(不带 BLOCK 选项)会像其他任何 Redis 命令一样同步处理,因此讨论这些命令的延迟是没有意义的:更有趣的是检查 Redis 文档中这些命令的时间复杂度。可以说,流命令在提取范围时至少与有序集合命令一样快,而 XADD 非常快,在使用管道时,它可以在普通机器上每秒轻松插入五十万到一百万项。
然而,如果我们要理解消息在消费者组中的阻塞消费者处理延迟的时间,延迟成为一个有趣的参数。从消息通过 XADD 生成的那一刻起,到消费者通过 XREADGROUP 返回获取到消息的那一刻。
在提供测试结果之前,了解 Redis 是如何路由流消息的(更一般地说,是如何管理任何阻塞操作等待数据的)是非常有趣的。
signalKeyAsReady()函数。该函数会将该键放入一个需要处理的键列表中,因为这些键可能有新的数据供被阻塞的消费者使用。请注意,这些已准备好处理的键会在稍后处理,因此在同一事件循环周期内,该键可能会收到其他写操作。如你所见,基本上,在返回事件循环之前,调用 XADD 的客户端以及被阻塞以消费消息的客户端,其回复都会在输出缓冲区中,因此调用 XADD 的客户端会同时收到 Redis 的回复,消费者也会收到新的消息。
这种模型是推送基于的,因为通过调用 XADD 直接将数据添加到消费者缓冲区中,所以延迟通常比较可预测。
为了检查这些延迟特性,我们使用多个 Ruby 程序实例进行测试,这些程序实例向 Streams 中推送消息,并在消息中包含计算机的毫秒时间作为额外字段。另一些 Ruby 程序从消费者组读取消息并处理它们。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟。
这些程序并未进行优化,并在运行 Redis 的小型双核实例上执行,以尝试提供在非最优条件下可以预期的延迟数据。消息以每秒 10,000 条的速度生成,十个同时运行的消费者从同一个 Redis 流和消费者组中消费并确认消息。
获得的结果:
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
所以 99.9% 的请求延迟 <= 2 毫秒,而剩下的异常值也仍然非常接近平均值。向流中添加几百万条未确认的消息并不会改变基准测试的核心,大多数查询仍然以非常短的延迟处理。
几点说明:
COUNT参数被设置为 10000。这会增加很多延迟,但这是为了允许处理速度较慢的消费者能够跟上消息流。因此,你可以预期实际世界的延迟要小得多。