公司电子商务网站建设策划书,淮安做网站 卓越凯欣,seo搜索引擎优化主要做什么,网站建设会犯法吗RocketMQ学习笔记
消息中间件应用场景
应用解耦削峰填谷数据分发
常见的消息中间件
ActiveMQ#xff1a;Apache出品#xff0c;比较老的一个开源的消息中间件#xff0c;以前在中小企业应用广泛Kafka#xff1a;Apache软件基金会开发的一个开源流处理平台#xff0c;由…RocketMQ学习笔记
消息中间件应用场景
应用解耦削峰填谷数据分发
常见的消息中间件
ActiveMQApache出品比较老的一个开源的消息中间件以前在中小企业应用广泛KafkaApache软件基金会开发的一个开源流处理平台由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统。RabbitMQ基于erlang语言开发的消息中间件RabbitMQ最初起源于金融系统用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。适用于对数据的一致性、稳定性和可靠性要求比较高的场景RocketMQ高性能、低延时和高可靠性等特性
消息中间件对比
KafkaRocketMQRabbitMQ定位日志消息、监控数据非日志的可靠消息传输非日志的可靠消息传输可用性非常高、分布式、主从非常高、分布式、主从高、主从、采用镜像模式实现数据量大时可能有性能问题消息可靠性异步刷盘容易丢数据同步刷盘、异步刷盘同步刷盘单机吞吐量百万级十万级万级堆积能力非常好非常好一般顺序消费支持一台broker宕机后消息会乱序支持顺序消费场景下消费失败时消费队列将会暂停支持如果一个消费失败此消息的顺序会被打乱定时消息不支持支持支持事务消息不支持支持不支持消息重试不支持支持支持死信队列不支持支持支持访问权限无无类似数据库配置用户名和密码
核心概念
消息生产者Producer往RocketMQ发送消息的应用程序消息消费者Consumer从RocketMQ拉取消息并根据消息执行业务的应用程序代理服务器Broker实际和消费生产者、消息消费者进行交互的程序主要进行消息的存储、消息的推送一般从性能考虑会对消息进行集群命名服务NameServer代理服务器Broker在启动的时候注册信息到NameServer中消息生产者和消息消费者启动的时候从NameServer拉取Broker的IP和端口消息生产者应用程序和消息消费者的应用程序会和Broker建立长链接基于Netty发送消息和消费消息都是基于长链接的通道主题Topic划分不同类型的消息比如订单消息发送到OrderTopic会员消息发送到MemberTopic消息队列MessageQueue在Topic内部专门进行消息的存储的地方最底层存储的数据结构默认一个Topic中有4个MessageQueue消息Message在消息生产者给RocketMQ发送消息的时候需要将传递的参数封装到Message对象中通过网络传输到RocketMQ消息会存储在MessageQueue中标签Tag可以在发送的时候给消息添加标签Tag消费者可以通过标签Tag进行过滤
简单使用
引入jar包
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.9.5/version
/dependency生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class SimpleProducer {public static void main(String[] args) throws Exception{//定义消费的生产者对象DefaultMQProducer producer new DefaultMQProducer(helloProducerGroup);//定义nameServer地址producer.setNamesrvAddr(127.0.0.1:9876);//自动连接从nameServer拉取broker地址并且建立连接producer.start();//定义消息发送的目的地TopicString topic helloTopic;for(int i0;i10;i){//定义消息Message message new Message(topic,(helloTopic的消息i).getBytes());//发送消息SendResult result producer.send(message);//输出消息储存的结果System.out.println(消息存储的状态:result.getSendStatus());System.out.println(消息存储的消息ID:result.getMsgId());}//关闭连接producer.shutdown();}
}消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class SimpleConsumer {public static void main(String[] args) throws Exception {//定义消息的消费者DefaultMQPushConsumer consumer new DefaultMQPushConsumer(helloConsumerGroup);//定义nameServer地址consumer.setNamesrvAddr(127.0.0.1:9876);//定义消费的主题String topic helloTopic;//监听该主题消息consumer.subscribe(topic,*);//设置消息监听器服务器把消息推送给我们消费消息consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) - {for(MessageExt messageExt:list){System.out.println(处理的线程Thread.currentThread()消息内容new String(messageExt.getBody()));}//告诉消息中间件消息处理的情况return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}