1. Redis数据类型-Stream

消息队列 :是指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递,生产者 产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由服务端给其推送消息。Redis 也支持消息队列功能,在 5.0 版本之前,基于以下两种方式实现:

  • Pub/Sub
  • List

Pub/Sub 发布订阅模式,消息的发送者不会将消息直接发送给特定的接收者,而是通过消息通道广播出去,让订阅该消息主题的订阅者消费到:

image-20240914105933677

Pub/Sub 中的消息无法持久化,如果出现网络断开、宕机等,消息就会被丢弃。而且也没有 Ack机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

Redis List 也可以实现消息队列,按照插入顺序排序,可以添加一个元素到列表的头部(左边)或者尾部(右边)。将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理:

Redis Stream5.0版本中引入的一种新的数据结构,用于实现简单但功能强大的消息传递模式。以时间序列的方式存储消息,每个消息都有一个唯一的 ID,并且可以被多个消费者订阅和消费。是 Redis 实现消息队列的另外一种模式,支持消息的持久化、支持自动生成全局唯一 1D、支持 Ack确认消息模式、支持消费组模式等,旨在让消息队列更加的稳定和可靠。

其结构图如下:

image-20240914110146830

各部分解释:

  • Message Content:消息内容
  • Consumer group:消费组,通过 XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
  • Last_delivered_id:游标,每个消费组会有个游标 Last_delivered_id,任意一个消费者读取了消息都会使游标往前移动。
  • Consumer:消费者,消费组中的消费者
  • Pending_ ids:消费者会有一个状态变量,用于记录被当前消费已读取但未 ack 的消息 Id ,如果客户端没有 ack ,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack 它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

2. 常用命令

Stream 相关所有命令:

命名 描述
XACK 确认消费者已经成功处理从 Stream中获取的消息
XADD 添加消息到队列末尾
XAUTOCLAIM 转移符合指定条件的待处理流条目的所有权
XCLAIM 改变待处理消息的所有权
XDEL 删除消息
XGROUP CREATE 为存储在key 的流创建一个新的消费者组
XGROUP CREATECONSUMER 要在存储在key的流的消费者组中创建一个消费者
XGROUP DELCONSUMER 消费者组中删除一个消费者
XGROUP DESTROY 删除一个已存在的消费者组
XGROUP SETID 为消费者组设置最后传递的ID
XINFO CONSUMERS 返回消费者组中的消费者列表
XINFO GROUPS 返回消费者组列表
XINFO STREAM 存储在的key流的相关信息
XLEN 获取 Stream 中的消息长度
XPENDING 通过消费者组从流中获取数据但不确认这些数据,会产生待处理条目
XRANGE 获取消息列表(可以指定范围)
XREAD 获取消息(阻塞/非阻塞),返回大于指定ID 的消息
XREADGROUP XREAD命令的一个特殊版本,支持消费者组
XREVRANGE XRANGE相比区别在于反向获取,ID从大到小
XSETID 内部命令。它用于主节点来复制流的最后传递的ID
XTRIM 限制 Stream的长度,如果已经超长会进行截取

2.1 XADD

XADD 命令用于向 Stream(流)数据结构末尾添加消息。

语法格式:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

参数说明:

  • key:指定要添加消息的 Stream 的名称。
  • [NOMKSTREAM]:可选参数,用于指定当流不存在时是否报错。默认情况下,如果指定的流不存在,XADD命令会创建。如果使用NOMKSTREAM选项,则流不存在时命令会失败。
  • [MAXLEN|MINID [=|~] threshold [LIMIT count]]:这组选项用于控制流的最大长度或最小消息 ID
    • MAXLEN maxlen:限制 Stream 的最大长度。当长度达到maxlen时,旧的消息会被自动删除。
    • MINID minid:指定最旧的消息ID。当插入新消息时,如果已经存在比minid更旧的消息,则会将这些消息删除。
    • [=|~]:操作符,=表示精确匹配,~表示小于等于(对于MINID而言)或大于等于(对于MAXLEN而言)。
    • [LIMIT count]:当使用MAXLEN~时,指定需要保留的消息数量的最小值。
  • *|ID:消息的ID。使用*表示自动生成一个唯一的ID。如果不使用*,则需要提供一个有效的消息ID,它必须大于流中所有已存在的消息的ID
  • field value [field value ...]:消息的字段和值。可以指定一个或多个字段及其对应的值。

示例,插入消息:

localhost:0>XADD mystream * msg_1 100 msg_2 38
"1719279960591-0"

示例, 插入消息,并限制长度不超过 1000 条:

localhost:0>XADD mystream MAXLEN 1000 * msg_3 100 msg_4 38
"1719279971749-0"

2.2 XRANGE

XRANGE 命令用于获取指定范围内的消息。

命令格式:

XRANGE key start end [COUNT count]

参数说明:

  • key:指定 Streamkey
  • start:指定要检索的消息范围的起始 ID 。可以使用特殊值-来表示最小值。
  • end:指定要检索的消息范围的结束 ID 。可以使用特殊值+来表示最大值。
  • [COUNT count]:可选参数,用于限制返回的消息数量。

注意事项:

  • Stream 的消息 ID 由两部分组成:一个时间戳和一个序列号。时间戳表示消息被添加到 Stream 的时间,而序列号则用于在同一时间戳内区分不同的消息。
  • XRANGE 命令返回的消息是按照它们在 Stream 中的顺序排列的,即按照消息 ID 的顺序。
  • 如果在检索消息时使用了 COUNT 参数,但指定的范围内的消息数量少于 COUNT 指定的数量,那么只会返回范围内的所有消息。

示例,检索所有消息:

localhost:0>XRANGE mystream - +
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"
 2)    1)   "1719279971749-0"
  2)      1)    "msg_3"
   2)    "100"
   3)    "msg_4"
   4)    "38"

示例,检索特定范围内的消息:

localhost:0>XRANGE mystream  1719279960591-0 1719279960600-0
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"

示例,限制返回的消息数量:

localhost:0>XRANGE mystream - + COUNT 1
 1)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4) 

2.3 XREVRANGE

XREVRANGE 命令与 XRANGE 命令类似,但它是按照消息 ID 的递减顺序(用于反向)获取指定范围内的消息。

命令格式:

XREVRANGE key end start [COUNT count]

示例,检索最后两个时间序列的消息:

localhost:0>XREVRANGE mystream + - COUNT 2
 1)    1)   "1719279971749-0"
  2)      1)    "msg_3"
   2)    "100"
   3)    "msg_4"
   4)    "38"

 2)    1)   "1719279960591-0"
  2)      1)    "msg_1"
   2)    "100"
   3)    "msg_2"
   4)    "38"

2.4 XDEL

XDEL 命令用于从 Stream 中删除指定的消息。返回一个整数,表示被成功删除的消息数量。

命令格式:

XDEL key ID [ID ...]

参数说明:

  • key:指定 Streamkey
  • ID:一个或多个要删除的消息的 ID

注意事项:

  • 在使用 XDEL 命令时,你需要确保提供的消息 ID 是存在的,否则命令将不会删除任何消息,并返回0。
  • 可以通过一次 XDEL 命令删除多个消息,只需在命令中提供多个消息 ID 即可。
  • XDEL 命令不会改变 Stream 的其他消息的顺序或 ID

示例,删除消息:

localhost:0>XDEL mystream 1719280747405-0
"1"

2.5 XLEN

XLEN 命令用于获取指定 Stream 中包含的消息数量,即流的长度。如果 Stream 不存在或为空,则返回 0

命令格式:

XLEN key

示例:

localhost:0>XLEN mystream
"1"

2.6 XREAD

XREAD 命令是用于从 Stream 独立消费消息,支持阻塞等待新消息的到来。返回一个数组,其中每个元素都是一个包含 Stream key和消息列表的数组。消息列表是一个包含消息 ID 和消息数据的数组。

命令格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数说明:

  • COUNT count:指定一次读取的最大消息数量。如果未指定,则默认为 1
  • BLOCK milliseconds:用于指定阻塞的时间(以毫秒为单位)。如果指定了此参数,并且 Stream 中没有可消费的消息,客户端将在指定的时间内阻塞等待。如果设置为 0 ,则表示非阻塞模式,即如果没有消息可消费,则立即返回。
  • STREAMS key [key ...]:指定要从中读取消息的 Streamkey 。可以指定一个或多个。
  • ID [ID ...]:对于每个指定的 key ,可以提供一个或多个消息 ID 。这些 ID 用于指定从哪个位置开始读取消息。如果某个 key 后面没有指定 ID ,则默认为从该 Stream 的最新消息开始读取。

示例,非阻塞模式读取最新消息:

XREAD COUNT 1 STREAMS mystream $

示例,阻塞模式,读取最新消息并等待新消息:

XREAD COUNT 1 BLOCK 10000 STREAMS mystream $

2.7 XGROUP CREATE

XGROUP CREATE 命令用于在已存在的流(stream)上创建一个新的消费者组(consumer group)。消费者组允许多个消费者(consumer)协作消费同一个流中的数据,并且每个消费者都可以从自己的位置开始读取流。

命令格式:

XGROUP CREATE <key> <groupname> <id> [MKSTREAM] [MKTABLE] [CREATECONSUMER <consumername>]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <id>:消费者组初始的最后一个条目 ID ,即消费者组开始读取的起始点。可以使用$表示流的最新条目,或者使用0表示流的起始点,或者使用任何其他有效的 ID
  • [MKSTREAM]:可选参数,如果流不存在,则创建它。
  • [MKTABLE]:在 Redis 6.2 及更高版本中引入的可选参数,用于创建与流关联的二级索引表(secondary index table)。这通常用于支持基于特定字段的查询。
  • [CREATECONSUMER <consumername>]:在 Redis 6.2 及更高版本中引入的可选参数,用于在创建消费者组时同时创建一个消费者。

示例,创建一个新的消费者组,从流的最新条目开始读取:

localhost:0>XGROUP CREATE mystream mygroup $ MKSTREAM
"OK"

2.8 XACK

XACK命令用于确消费者已经成功处理了一个或多个消息。这些消息通常是从流(Stream)中读取的,并存储在消费者组的待处理条目列表(Pending Entry ListPEL)中。通过发送 XACK 命令,消费者通知 Redis 服务器它已经完成了一个或多个消息的处理,从而将这些消息从 PEL中移除。

命令格式:

XACK <key> <groupname> <consumername> <ID> [<ID> ...]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • <consumername>:消费者的名称。
  • <ID>:要确认的消息的ID,可以指定一个或多个。

示例,假设消费者已经读取了一些消息,并决定它们已经被成功处理。现在,消费者想要确认这些消息:

XACK mystream mygroup myconsumer 1526569900000-0 1526569900002-0

在这个例子中,消费者确认了两个消息,它们的 ID 分别是 1526569900000-01526569900002-0。一旦消息被确认,它们将从该消费者组的 PEL 中移除,表示这些消息已经被成功处理。注意,即使消息被确认并从 PEL中移除,它们仍然保留在流中,并且可以被其他消费者组或消费者读取。

如果消费者在处理消息时失败,或者需要稍后重试,它可以选择不发送 XACK 命令,这样消息将保持在 PEL中,直到消费者准备好确认它们或它们因超时而被自动从 PEL 中移除(取决于消费者组的配置)。

2.9 XPENDING

XPENDING 命令用于查询消费者组中未确认消息的详细信息。允许你查看哪些消息正在等待被处理,以及哪些消费者拥有这些消息。

命令格式:

XPENDING <key> <groupname> [start end count] [consumername]

参数说明:

  • <key>:流的名称。
  • <groupname>:消费者组的名称。
  • [start end count]:这三个参数是可选的,用于限制查询结果的范围。
  • start:查询的开始消息ID
  • end:查询的结束消息ID
  • count:要返回的消息数量。
  • [consumername]:可选参数,指定要查询的消费者的名称。如果不提供此参数,将返回消费者组中的所有未确认消息。

XPENDING 命令返回一个数组,其中包含以下信息:

  • 总未确认消息数:整数,表示在指定范围内未确认的消息总数。
  • 最小消息ID:字符串,表示在指定范围内未确认消息的最小ID
  • 最大消息ID:字符串,表示在指定范围内未确认消息的最大ID
  • 每个消费者的未确认消息:一个数组,其中每个元素都是一个包含消费者名称和该消费者拥有的未确认消息数的数组。

注意事项:

  • XPENDING 是一个只读命令,它不会修改任何数据。
  • 如果提供了 consumername 参数,则只返回该消费者的未确认消息信息。
  • 如果提供了 [start end count] 参数,则只返回指定范围内的未确认消息信息。
  • 通过 XPENDING 命令,你可以轻松地监控消费者组中的未确认消息,从而确保消息得到及时处理,并在必要时进行故障排除。

示例:

XPENDING mystream mygroup
2) "1526569900000-0"  # 最小消息ID  
3) "1526569900002-0"  # 最大消息ID  
4) 1) 1) "myconsumer" # 消费者名称  
     2) (integer) 2   # 该消费者拥有的未确认消息数

3. 应用场景

Redis Streams 是 Redis 5.0 版本引入的一种新的数据结构,主要用于消息队列,所以可以用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。但是更推荐使用专业的消息队列,比如RabbitMQRocketMQ等,对于简单的应用场景,如果能满足需求,也可以使用Redis Stream

3.1 消息队列

场景:实现一个简单的生产者-消费者队列,用于处理异步任务。

命令XADD, XREAD, XREADGROUP

# 生产者添加消息
XADD mystream * "field1" "value1" "field2" "value2"

# 消费者读取消息
XREAD STREAMS mystream 0

# 消费者组读取消息
XREADGROUP GROUP mygroup CONSUMER myconsumer BLOCK 0 STREAMS mystream 0

3.2 事件日志

场景:记录用户操作事件,如点击、购买等。

命令XADD, XREAD

# 添加事件
XADD userevents * "event" "click" "user_id" "123" "timestamp" "2024-09-14T10:00:00Z"

# 读取事件
XREAD STREAMS userevents 0

3.3 实时分析

场景:实时处理和分析流数据,如监控指标或传感器数据。

命令XADD, XREAD, XGROUP, XREADGROUP

# 添加监控数据
XADD metrics * "metric" "cpu_usage" "value" "75" "timestamp" "2024-09-14T10:00:00Z"

# 创建消费者组
XGROUP CREATE mystream mygroup 0

# 消费者组读取数据
XREADGROUP GROUP mygroup CONSUMER myconsumer BLOCK 0 STREAMS mystream 0

3.4 聊天应用

场景:实现一个聊天应用的消息系统。

命令XADD, XREAD, XREADGROUP

# 用户发送消息
XADD chatroom * "user" "alice" "message" "Hello, Bob!" "timestamp" "2024-09-14T10:00:00Z"

# 用户读取消息
XREAD STREAMS chatroom 0

# 用户组读取消息
XREADGROUP GROUP chatgroup CONSUMER chatuser BLOCK 0 STREAMS chatroom 0

3.5 交易处理

场景:处理金融交易,确保交易顺序和一致性。

命令XADD, XREAD, XGROUP, XREADGROUP

# 添加交易
XADD transactions * "transaction_id" "tx123" "amount" "100" "currency" "USD" "timestamp" "2024-09-14T10:00:00Z"

# 创建消费者组
XGROUP CREATE transactions mygroup 0

# 消费者组处理交易
XREADGROUP GROUP mygroup CONSUMER myconsumer BLOCK 0 STREAMS transactions 0

以上这些用例展示了 Redis Streams 在不同场景下的应用,通过这些命令可以有效地处理消息队列、事件日志、实时分析、聊天应用和交易处理等数据流。