建德网站建设德品牌网,seo查询网站,网站怎么做支付宝支付接口,在线设计签名免费网站前言
本文的主角是Redis Stream#xff0c;它是Redis5.0版本新增加的数据结构#xff0c;主要用于消息队列#xff0c;提供了消息的持久化和主备复制功能#xff0c;可以让任何客户端访问任何时刻的数据#xff0c;并且能记住每一个客户端的访问位置#xff0c;还能保证…前言
本文的主角是Redis Stream它是Redis5.0版本新增加的数据结构主要用于消息队列提供了消息的持久化和主备复制功能可以让任何客户端访问任何时刻的数据并且能记住每一个客户端的访问位置还能保证消息不丢失功能颇为强大。其实Redis本身是有一个Redis发布订阅来实现消息队列的功能但它有个缺点就是消息无法持久化如果出现网络断开、Redis宕机等消息就会被丢弃。所以Redis的发布订阅像是Vue2的Bus或Vue3的Mitt属于后端版的事件总线。此外Redis本身的List和Sorted Set也可以实现但是也有各自的缺点如List没有消息多播功能没有ACK机制无法重复消费等Sorted Set不支持阻塞式获取消息、不允许重复消费、不支持分组。相比之下Redis Stream明显胜出。
一、消息队列相关命令
1.XADD - 添加消息到末尾
1语法格式
XADD key ID field value2参数
key队列名称如果不存在就创建ID消息ID我们使用*表示由redis生成可以自定义但是要自己保证递增性。field value记录。
3示例
127.0.0.1:6379[15] XADD MQ * name Vegeta sex male age 18
1703235642574-0
127.0.0.1:6379[15] XADD MQ * name Bulma sex female age 18
1703235648454-0
127.0.0.1:6379[15]2.XLEN - 获取流包含的元素数量即消息长度
1语法格式
XLEN key2参数
key队列名称。
3示例
127.0.0.1:6379[15] XLEN MQ
(integer) 23.XRANGE - 获取消息列表会自动过滤已经删除的消息
1语法格式
XRANGE key start end [COUNT count]2参数
key队列名start开始值- 表示最小值end结束值 表示最大值count数量
3示例
127.0.0.1:6379[15] XRANGE MQ -
1) 1) 1703235642574-02) 1) name2) Vegeta3) sex4) male5) age6) 18
2) 1) 1703235648454-02) 1) name2) Bulma3) sex4) female5) age6) 184.XREVRANGE - 反向获取消息列表ID从大到小
1语法格式
XREVRANGE key end start [COUNT count]2参数
key队列名end结束值 表示最大值start开始值- 表示最小值count数量
3示例
127.0.0.1:6379[15] XREVRANGE MQ - COUNT 2
1) 1) 1703235648454-02) 1) name2) Bulma3) sex4) female5) age6) 18
2) 1) 1703235642574-02) 1) name2) Vegeta3) sex4) male5) age6) 185.XDEL - 删除消息
1语法格式
XDEL key ID [ID ...]2参数
key队列名称ID消息 ID
3示例
127.0.0.1:6379[15] XADD MQ * name Kakarotto sex male age 18
1703238230846-0
127.0.0.1:6379[15] XADD MQ * name Android18 sex female age 18
1703238306386-0
127.0.0.1:6379[15] XDEL MQ 1703238230846-0
(integer) 1
127.0.0.1:6379[15] XRANGE MQ -
1) 1) 1703235642574-02) 1) name2) Vegeta3) sex4) male5) age6) 18
2) 1) 1703235648454-02) 1) name2) Bulma3) sex4) female5) age6) 18
3) 1) 1703238306386-02) 1) name2) Android183) sex4) female5) age6) 186.XTRIM - 对流进行修剪限制长度
1语法格式
XTRIM key MAXLEN [~] count2参数
key队列名称MAXLENStream中最大消息数量即保留的消息数量[~]若使用了 ~ 符号则表示限制的是消息的大小而非数量。count需要删除的消息数量
3示例
# 限制MQ最多1条消息其余删除
127.0.0.1:6379[15] XTRIM MQ MAXLEN 1
(integer) 2
127.0.0.1:6379[15] XRANGE MQ -
1) 1) 1703238306386-02) 1) name2) Android183) sex4) female5) age6) 187.XREAD - 以阻塞或非阻塞方式读取一个或多个队列的消息
1语法格式
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]2参数
count数量默认值为10milliseconds可选阻塞毫秒数没有设置就是非阻塞模式key队列名id指定读取的起始位置可以是特定的消息ID也可以是$表示最新的消息或者是0表示从最早的消息开始读取。
3示例
127.0.0.1:6379[15] XADD MQ * name Vegeta sex male age 18
1703297838206-0
127.0.0.1:6379[15] XADD MQ * name Bulma sex female age 18
1703297844215-0
# 读取MQ最早的默认条消息
127.0.0.1:6379[15] XREAD STREAMS MQ 0
1) 1) MQ2) 1) 1) 1703238306386-02) 1) name2) Android183) sex4) female5) age6) 182) 1) 1703297838206-02) 1) name2) Vegeta3) sex4) male5) age6) 183) 1) 1703297844215-02) 1) name2) Bulma3) sex4) female5) age6) 18# 读取MQ第二条消息需指定第二条消息的ID
127.0.0.1:6379[15] XREAD STREAMS MQ 1703297838206-0
1) 1) MQ2) 1) 1) 1703297844215-02) 1) name2) Bulma3) sex4) female5) age6) 18# 读取MQ最新的一条消息需开启阻塞阻塞时长为10s。如果10s内未读取到消息则退出阻塞。
【客户端A】127.0.0.1:6379[15] XREAD BLOCK 100000 STREAMS MQ $
1) 1) MQ2) 1) 1) 1703300894359-02) 1) name2) Ranchi3) sex4) male5) age6) 18
(2.04s)
【客户端A】127.0.0.1:6379[15]# 另开一个终端向MQ队列中写入一条消息阻塞读的终端就能接收到消息。
root帅龍之龍:~# redis-cli -h 127.0.0.1 -p 6379 -a 123456
Warning: Using a password with -a or -u option on the command line interface may not be safe.
【客户端B】127.0.0.1:6379 select 15
OK
【客户端B】127.0.0.1:6379[15] XADD MQ * name Ranchi sex male age 18
1703300894359-0
【客户端B】127.0.0.1:6379[15]4注意XREAD存在消息漏读的风险当正在处理一条消息时又有多条消息到达此时读取的是最新那条
二、消费者组相关命令
消费者组将多个消费者划分到一个组中监听同一个队列具有消息分流、消息标示、消息确认的特点。 ·消息分流分流给组内的不同消费者不会重复消费反而加快消费 ·消息标示消费者组会记录最后一个被处理的消息确保每一个消息都会被消费 ·消息确认消费者获取消息后消息处于pending状态然后将其存入pending-list列表当处理完成后通过XACK确认消息将消息标记为已处理然后从pending-list被移除
1.XGROUP CREATE - 创建消费者组
1语法格式
XGROUP CREATE key group id|$2参数
key队列名称如果不存在就创建起始IDgroup消费者组名id起始ID$代表队列中最后一条消息0代表队列中第一条消息
3示例
# 创建一个从队列第一条消息开始消费的消费者组
127.0.0.1:6379[15] XGROUP CREATE MQ mqGroupA 0
OK
# 创建一个从队列最后一条消息开始消费的消费者组
127.0.0.1:6379[15] XGROUP CREATE MQ mqGroupB $
OK2.XGROUP CREATECONSUMER - 在指定的消费者组中添加消费者
1语法格式
XGROUP CREATECONSUMER key group consumer2参数
key队列名称如果不存在就创建group消费者组名consumer消费者名
3示例
127.0.0.1:6379[15] XGROUP CREATECONSUMER MQ mqGroupA consumer1
(integer) 1
127.0.0.1:6379[15] XGROUP CREATECONSUMER MQ mqGroupA consumer2
(integer) 1
127.0.0.1:6379[15] XGROUP CREATECONSUMER MQ mqGroupA consumer3
(integer) 13.XINFO STREAM - 打印流信息
1语法格式
XINFO STREAM key2参数
key队列名称
3示例
127.0.0.1:6379[15] XINFO STREAM MQ1) length2) (integer) 53) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) last-generated-id8) 1703300894359-09) max-deleted-entry-id
10) 1703238230846-0
11) entries-added
12) (integer) 8
13) recorded-first-entry-id
14) 1703238306386-0
15) groups
16) (integer) 2
17) first-entry
18) 1) 1703238306386-02) 1) name2) Android183) sex4) female5) age6) 18
19) last-entry
20) 1) 1703300894359-02) 1) name2) Ranchi13) sex4) male5) age6) 184.XINFO GROUPS - 打印消费者组的信息
1语法格式
XINFO GROUPS key2参数
key队列名称
3示例
127.0.0.1:6379[15] XINFO GROUPS MQ
1) 1) name2) mqGroupA3) consumers4) (integer) 35) pending6) (integer) 07) last-delivered-id8) 0-09) entries-read10) (nil)11) lag12) (integer) 5
2) 1) name2) mqGroupB3) consumers4) (integer) 05) pending6) (integer) 07) last-delivered-id8) 1703300894359-09) entries-read10) (nil)11) lag12) (integer) 05.XGROUP DELCONSUMER - 在指定的消费者组中删除消费者
1语法格式
XGROUP DELCONSUMER key group consumer2参数
key队列名称如果不存在就创建group消费者组名consumer消费者名
3示例
127.0.0.1:6379[15] XGROUP DELCONSUMER MQ mqGroupA consumer3
(integer) 06.XGROUP DESTROY - 删除指定的消费者组
1语法格式
XGROUP DESTROY key group2参数
key队列名称如果不存在就创建group消费者组名
3示例
127.0.0.1:6379[15] XGROUP DESTROY MQ mqGroupB
(integer) 17.XGROUP SETID - 为消费者组设置新的最后递送消息ID
1语法格式
XGROUP SETID key group id|$2参数
key队列名称如果不存在就创建group消费者组名id起始ID$代表队列中最后一条消息0代表队列中第一条消息
3示例
127.0.0.1:6379[15] XGROUP CREATE MQ mqGroupB $
OK
127.0.0.1:6379[15] XGROUP SETID MQ mqGroupB $
OK8.XREADGROUP GROUP - 读取消费者组中的消息
1语法格式
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]2参数
group消费者组名consumer消费者名count读取数量milliseconds阻塞毫秒数key队列名id起始ID代表从下一条未消费的消息开始0代表从pending-list中第一条消息开始其它根据指定id从pending-list中获取已消费但未确认的消息开始
3示例
# 指定消费者组的消费者去读取下一条未消费的消息
127.0.0.1:6379[15] XREADGROUP GROUP mqGroupA consumer1 COUNT 1 STREAMS MQ
1) 1) MQ2) 1) 1) 1703238306386-02) 1) name2) Android183) sex4) female5) age6) 18
127.0.0.1:6379[15]
127.0.0.1:6379[15] XREADGROUP GROUP mqGroupA consumer2 COUNT 1 STREAMS MQ
1) 1) MQ2) 1) 1) 1703297838206-02) 1) name2) Vegeta3) sex4) male5) age6) 18
127.0.0.1:6379[15]4注意若某个消费者消费了某条消息但是并没有处理成功时如消费者进程宕机这条消息可能会丢失因为组内其他消费者不能再次消费到该消息了
9.XPENDING - 显示待处理消息的相关信息
1语法格式
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]2参数
key队列名group消费者组名start开始值-表示最小值end结束值表示最大值count数量
3示例
127.0.0.1:6379[15] XPENDING MQ mqGroupA
1) (integer) 2 # 已读取但未处理的消息数
2) 1703238306386-0 # 起始消息ID
3) 1703297838206-0 # 结束消息ID
4) 1) 1) consumer12) 12) 1) consumer22) 1
127.0.0.1:6379[15] XPENDING MQ mqGroupB
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)10.XACK - 将消息标记为已处理
1语法格式
XACK key group id [id ...]2参数
key队列名group消费者组名id消息ID
3示例
127.0.0.1:6379[15] XACK MQ mqGroupA 1703238306386-0
(integer) 1
127.0.0.1:6379[15] XPENDING MQ mqGroupA
1) (integer) 1
2) 1703297838206-0
3) 1703297838206-0
4) 1) 1) consumer22) 1
127.0.0.1:6379[15]11.XCLAIM - 转移消息的归属权
1语法格式
XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]2参数
key队列名group消费者组名consumer消费者名min-idle-time从被读取到未处理的时间id消息ID
3示例
# 在指定的消费者组中将cosumer2已读取5分钟(300秒300000毫秒)但未处理的1703297838206-0消息转移给consumer1
127.0.0.1:6379[15] XPENDING MQ mqGroupA
1) (integer) 1
2) 1703297838206-0
3) 1703297838206-0
4) 1) 1) consumer22) 1
127.0.0.1:6379[15] XCLAIM MQ mqGroupA consumer1 3600000 1703297838206-0
(empty array) # 转移不成功
127.0.0.1:6379[15] XCLAIM MQ mqGroupA consumer1 300000 1703297838206-0
1) 1) 1703297838206-02) 1) name2) Vegeta3) sex4) male5) age6) 18
127.0.0.1:6379[15] XPENDING MQ mqGroupA
1) (integer) 1
2) 1703297838206-0
3) 1703297838206-0
4) 1) 1) consumer12) 1
127.0.0.1:6379[15] XPENDING MQ mqGroupA - 10
1) 1) 1703297838206-02) consumer13) (integer) 1171095 # IDLE被重置了4) (integer) 2 # 读取次数被14说明某个消费者读取了消息但没有处理这时消费者宕机或重启等就会导致该消息失踪。那么就需要该消息转移给其他的消费者处理就是消息转移。转移除了要指定ID外还需要指定min-idle-time最小空闲时间该值要小于消息从被读取到未处理的时间。
三、消息队列的帮助命令
1语法格式
HELP XXX2参数
XXX命令关键字
3示例
127.0.0.1:6379[15] help XADDXADD key [NOMKSTREAM] [MAXLEN|MINID [|~] threshold [LIMIT count]] *|id field value [field value ...]summary: Appends a new message to a stream. Creates the key if it doesnt exist.since: 5.0.0group: stream127.0.0.1:6379[15] HELP XGROUPXGROUP (null)summary: A container for consumer groups commands.since: 5.0.0group: streamXGROUP CREATE key group id|$ [MKSTREAM] [ENTRIESREAD entries-read]summary: Creates a consumer group.since: 5.0.0group: streamXGROUP CREATECONSUMER key group consumersummary: Creates a consumer in a consumer group.since: 6.2.0group: streamXGROUP DELCONSUMER key group consumersummary: Deletes a consumer from a consumer group.since: 5.0.0group: streamXGROUP DESTROY key groupsummary: Destroys a consumer group.since: 5.0.0group: streamXGROUP HELP (null)summary: Returns helpful text about the different subcommands.since: 5.0.0group: streamXGROUP SETID key group id|$ [ENTRIESREAD entries-read]summary: Sets the last-delivered ID of a consumer group.since: 5.0.0group: stream127.0.0.1:6379[15]