门户网站建设模板下载,南昌seo方案,域名有了怎么做网站,dede替换网站模板文章目录streamStream基本概念消息id消息内容增删查改消息生产添加消息 xadd查看消息长度 xlen限制stream最大长度1.xadd 中添加**maxlen**:2.xtrim查询消息 xrange正向排序#xff1a;消费id从小到大排反向查询#xff1a;消费id从大到小排删除消息消息消费独立消费 xread消…
文章目录streamStream基本概念消息id消息内容增删查改消息生产添加消息 xadd查看消息长度 xlen限制stream最大长度1.xadd 中添加**maxlen**:2.xtrim查询消息 xrange正向排序消费id从小到大排反向查询消费id从大到小排删除消息消息消费独立消费 xread消费组stream中出现很多特殊Ids解释创建消费组消息消费查看stream信息场景问题stream
Stream基本概念
Redis 5.0 被作者 Antirez 突然发布出来增加了很多新的特色功能其最大的新特性就是多出了一个数据结构 Stream它是一个新的强大的支持多播的可持久化消息队列作者坦言 Redis Stream 极大地借鉴了 Kafka 的设计。
Redis Stream 的结构如图 它有一个消息链表将所有加入的消息都串起来每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的Redis 重启后内容还在。
每个 Stream 都有唯一的名称它就是 Redis 的 Key在首次使用 xadd 执行追加消息时自动创建。
每个 Stream 都可以挂多个消费组Consumer Group每个消费组会有个游标 last_delivered_id 在 Stream 数组之上往前移动表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称消费组不会自动创建它需要单独的指令 xgroup create 进行创建需要指定从 Stream 的某个消息 ID 开始消费这个 ID 用来初始化 last_delivered_id 变量。
每个消费组的状态都是独立的相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。
同一个消费组可以挂接多个消费者Consumer这些消费者之间是竞争关系任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。
消费者内部会有一个状态变量 pending_ids它记录了当前已经被客户端读取但是还没有 ack 的消息。如果客户端没有 ack这个变量里面的消息 ID 就会越来越多一旦某个消息被 ack它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL也就是 Pending Entries List这是一个核心的数据结构它用来确保客户端至少消费了消息一次而不会在网络传输的中途丢失了而没被处理。
消息id
消息 ID 的形式是 TimestampInMillis-sequence例如 1527846880572-5它表示当前的消息再毫秒时间戳 1527846880572 时产生并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成也可以由客户端自己指定但是形式必须是 “整数-整数”而且后面加入的消息的 ID 必须要大于前面的消息 ID。
消息内容
消息内容就是键值对形如 hash 结构的键值对这没什么特别之处。
增删查改
增删改查指令说明如下
1xadd向 Stream 追加消息。
2xdel向 Stream 中删除消息这里的删除仅仅是设置标志位不影响消息总长度。
3xrange获取 Stream 中的消息列表会自动过滤已经删除的消息。
4xlen获取 Stream 消息长度。
5del删除整个 Stream 消息列表的所有消息。
消息生产
添加消息 xadd
语法xadd stream_name Id field value(field value) 记住stream中存储的消息必须是kv类型不允许是单个String如下操作都是键值对存储的 使用
xadd news_live * 2020-08-21 10:20 国航CA1704航班遇颠簸下降千米业内人士解释正常操作 xadd news_live * 2020-08-21 04:12 5G成电信业务增长“动力担当” *:表示由redis自己生成keykey是由当前时间戳(ms)-0格式
查看消息长度 xlen
语法*xlen stream_name * 使用xlen news_live 添加成功后会显示字符串1597979205554-0改字符串表示时间戳(毫秒)计数有点类似于雪花算法
限制stream最大长度
1.xadd 中添加maxlen:
最多存储消息数依据FIFO原则自动删除超过最长长度的消息
语法xadd stream_name maxlen n id field value(field,value) 记住每次生成消息都需要带这个参数如果maxlen不带等于没有限制支持动态改变maxlen 使用 图片最多存储3条每次添加xlen一直是3条
疑问 1.如果我不知道消息具体的大小我又如何利用maxlen达到自动删除
解决在MAXLEN选项个实际技术之间的~参数意味着:我并不真的需要这恰好1000个项目,它可以是1000或1010或1030只需确保至少保存1000个项目。使用此参数仅在我们可以删除整个节点时执行修剪。这使它更有效率通常是你想要的。
2.每次创建消息的时候都需要带上比较麻烦有没有什么更好的办法
maxlen当然有了xtrim
2.xtrim
XTRIM命令它执行与上面的MAXLEN选项非常相似的操作但是此命令不需要添加任何内容可以以独立方式对任何Stream运行。 XTRIM mystream MAXLEN 10 XTRIM mystream MAXLEN ~ 10
查询消息 xrange
查询是生产者查询自己生产的消息和消费者的消费不是一回事
正向排序消费id从小到大排
1.查询所有消息xrange stream_name - 使用xrange news_live - 2.指定起始id查询xrange news_live 1597980701728-0 查询的消息id 起始id 3.指定最大id查询xrange news_live 1597980701728-0 查询的消息id 结束id
反向查询消费id从大到小排
1.查询所有消息srevrange stream_name - 2.指定起始id查询 xrevrange news 2 消息id 2 倒排 3.指定最大id查询 xrevrange news 2 - 消息id 2 倒排
练习命令
删除消息
语法xdel stream_name id
消息消费
独立消费 xread
类似于List生产者往list中写数据消费者从list中读数据只能有一个消费者
1.头部读取 0-0 语法xread count n stream stream_name 0-0
记住0-0表示从头开启读取数据这里数据消费记录不会保存每次都是从头开始如果接着消费必须自己制定其实id
制定起始id读取xread count n stream stream_name id
2.尾部读取最新消息 $
从尾部读取最新的一条消息
语法 1.xread count n streams stream_name $ 此时默认不返回任何消息 用途
2.xread block time count n streams stream_name $ time为ms如果time0表示一直阻塞 切记客户端如果想要使用 xread 进行顺序消费那么一定要记住当前消费到了那里也就是返回的消息 ID。下次继续调用 xread 时将上次返回的最后一个消息 ID 作为参数传递过去就可以继续消费后续的消息。
block 0 表示永远阻塞直到消息到来block 1000 表示阻塞 1s如果 1s 内没有任何消息到来就返回 nil。
消费组 stream中出现很多特殊Ids解释
“-”xrange中使用等同于 小于某个id作用 标识最小id“”xrange中使用等同于 大于某个id作用 标识最大id“$”在给定Stream中已经包含的最大ID在xread、xcreategroup中标识消费着只能消费最新消息“”在消费者组的上下文中使用在xread、xreadgroup总标识消费未消费过的消息“*”只能与XADD命令一起使用,意味新创建消息自动由redis生成ID。“0-0”表示id从0开始读取xcreategroup标识消费组从id为0开始读取“0”表示id从0开始读取xreadgroup标识消费组消费所有stream中的数据
创建消费组
语法 xgroup create stream_name group_name last_delivered_id
解释last_delivered_id为0-0 表示从头开始消费last_delivered_id为 $ 表示从尾部开始消费只接收新消息当前stream消息全部忽略
使用xgroup create news goup1 0-0 注意xread、xreadgroup中都可以加上 block指令标识阻塞等待直到接受到新的消息或者等待超时
消息消费
语法xreadgroup group group_name consumer_name count n streams news ids 注意ids不理解可以看上面的stream中出现很多特殊Ids解释没有加count标识消费所有的消息
解释 1.xreadgroup group group_name consumer_name count n streams stream_name 消费组开始消费未消费的数据
2.xreadgroup group group_name consumer_name count n streams stream_name 0 表示id从0开始消费意味着消息stream中保留的所有消息包括已经消费过的
使用 1.xreadgroup group goup1 consumer1 streams news 2.xreadgroup count 0 group gb c1 streams news 0使用注意
xgroup create news gb 0-0
xreadgroup count 0 group gb c1 streams news 0
xreadgroup count 0 group gb c1 streams news
xreadgroup count 0 group gb c1 streams news 0第一次xreadgroup 0无法查询到数据知道 消费完以后xreadgroup 0才能获取到数据说明该指令获取消费组中消费者为c1已经消息过的数据如果把消费者c1改成其他值返回无数据。
使用注意 1.xcreategroup指定ids如果为$如果xreadgroup ids利用、0未消费数据、无法读取之前历史只能读取最新数据 2.xcreategroup指定ids为0-0则xreadgroup ids利用、0等都可使用
查看stream信息
1.xinfo stream stream_name
2.xinfo groups stream_name 场景问题
1.Stream消息太多时怎么办
2.ack作用如果忘记ack会怎样
3.PEL 如何避免消息丢失?
引用博客
原理篇挑战 KafkaRedis5.0 重量级特性 Stream 尝鲜
使用手册篇不是特别全的手册
翻译官方文档篇翻译的比较硬核不过也不错
命令行redis5.0的Stream实现消息队列springbootjedis简单例子
pending、消息转移、死信队列
结合应用场景联系篇直播场景下使用