模版网站可以做seo吗,十大网站建设公司排名,龙岗区是深圳最差的区,微信招聘网站建设消息整体处理过程#xff0c;这里我们将消息的整体处理阶段分为3个阶段进行分析#xff1a;1、Producer发送消息阶段。
2、Broker处理消息阶段。
3、Consumer消费消息阶段。一、Producer发送消息阶段
1、安全机制保障1#xff0c;发送方式。
1、同步发送
2、异步发送
3、O…消息整体处理过程这里我们将消息的整体处理阶段分为3个阶段进行分析1、Producer发送消息阶段。
2、Broker处理消息阶段。
3、Consumer消费消息阶段。一、Producer发送消息阶段
1、安全机制保障1发送方式。
1、同步发送
2、异步发送
3、Oneway发送Oneway 方式只负责发送请求不等待应答2、安全机制保障2
如果发送消息失败或者超时则重新发送。
发送重试源码如下本质其实就是一个for循环当发送消息发生异常的时候重新循环发送。默认重试3次重试次数修改。3、安全机制保障3
broker提供多master模式即使某台broker宕机了保证消息可以投递到另外一台正常的broker上二、Broker处理消息阶段
1、安全机制1同步/异步 【刷盘】的策略
当消息投递到broker之后会先存到page cache然后根据broker设置的刷盘策略是否立即刷盘
也就是如果刷盘策略为异步broker并不会等待消息落盘成功就会返回【producer成功】还只是保存到了page cache
也就是说当broker所在的服务器突然宕机则会丢失部分页的消息。【这就是异步刷盘带来了的问题】安全机制2提供主从模式同时主从支持同步双写
即使broker设置了【同步刷盘】如果主broker磁盘损坏也是会导致消息丢失。
因此可以给broker指定slave然后将slave设置为同步刷盘策略。此模式下producer每发送一条消息都会等消息投递到【master】和【slave】都落盘成功了
broker才会当作消息投递成功保证休息不丢失。缺点比较慢而且如果单边失败引发其他问题。三、Consumer消费消息阶段
consumer默认提供的是【At least Once】机制
何为【At least Once】就是Consumer先pull消息到本地消费完成后才向服务器返回ack。通常消费消息的ack机制一般分为两种思路
1先提交后消费可以解决重复消费的问题但是会丢失消息
2先消费消费成功后再提交因此Rocketmq默认实现的是思路二由各自consumer业务方保证幂等来解决重复消费问题。消费消息重试机制RocketMQ本身提供了重新消费消息的能力。但是会有重复消费的问题。
重复消费的问题出现原因
RocketMQ是以【consumer groupqueue】来确认消息消费进度通过【gruopoffset】来标记【queue】消费进度
消费成功之后都会返回一个ack消息告之broker更新offset但是RocketMQ并不是按一条一条消息来做ack
而是根据一次拉取批量来做消息ack如一次从broker拉去10条消息就按照10条消息整体做offset为方便理解下面先按照10条来分析
如上一次的offset为101本次拉取了10调消息偏移量从101-110
每一条消息消费成功会按照当前消息最小的offset来更新本地的消费进度怎么理解这句话例如103消息先消费完成但是101还没有消费完成消费失败也算作消费完成这时候更新还是按照101的偏移量来更新本地偏移量直到所有的消息都消费完成110这条消息消费完成的时候才会把偏移量更新为110再通过定时任务将本地偏移量更新到broker假设恰好更新偏移量等定时任务触发。RocketMQ按批次更新进度好处是不需要每一条消息都需要做ack操作提升了效率但是随之产生了2个问题
1、某一条失败导致整体失败然后又重行全量消费一次。
2、但是实际是失败的消息如果处理。问题1:
如果这一批消息中的101消息由于一些原因一直没有消费完成即使其它的9条消息都消费完成了
broker的消费进度依然偏移到101如果此时该consumer宕机或者实例被kill该queue通过负载均衡策略会重新被分配给
其它的consumer这个时候从broker拉去的偏移量为101开始消费但是实际102-109这9条消息已经消费完成
造成102-109这9条消息重复消费解决方案
3.6版本之前RocketMQ没有给出解决方案官方强调业务方【需要自己实现消息幂等】逻辑但是为了避免大量的出现消息重
复消费的问题RocketMQ也做了一些限制如果本地的消息量达到2000之后不会在拉取新的消息也就是即使出现上面的
极端情况也只会造成最多1999条消息重复消费。3.6之后的版本RocketMQ给出了一个解决方案治标不治本在消费端设置了一个消费超时时间
【consumeTimeout 15min】 原理是RocketMQ启动了一个定时任务来检查所有的消息的消费情况在消费开始的时
候会记录消息【消费开始时间】每隔consumeTimeout时间去检查所有消息是不是消费完成了如果还没有消费完
成并且时间超过了consumeTimeout配置的时间就当作【消费成功但是处理失败】也算作消费完成既然消费完成了
自然会把本地消费进度更新到上例中的110再通过定时同步机制将本地进度同步到broker达成本地和broker端一致的效果consumeTimeout支持业务自己配置为什么说治标不治本因为始终还是出现2*consumeTimeout时间比如第一次任务在12点0分101消息从12点1分开始消费到12点30分才会发现超时如果这个时候宕机的消息会出现无法完成确认造成消息重复消费。问题2
既然是按批量来更新消费进度但是那些虽然消费完成但是实际是【处理失败】的消息主动返回【RECONSUME_LATER】和
【抛出异常】的的消息是如何处理的rocketmq在消息消费失败的消息会单独把该消息的msgid、偏移量等信息通过rpc调用通知给broker那broker会把该消息做重新的投递从而做到了消息的重置机制消息的重试后面在分析安全性保障跳转
重复消费跳转