什么是网站推广策略,网站专题页面设计规范,wordpress信息发布付费,专业医疗建站在NameServer与Broker启动之后#xff0c;我们就可以来创建Producer进行生产消息#xff0c;客户端常用的生产类是DefaultMQProducer#xff0c;我们启动消费者其实就是调用该类的start方法。 初始化逻辑
通过构建一起DefaultMQProducer类来实现初始化#xff0c;查看源码…在NameServer与Broker启动之后我们就可以来创建Producer进行生产消息客户端常用的生产类是DefaultMQProducer我们启动消费者其实就是调用该类的start方法。 初始化逻辑
通过构建一起DefaultMQProducer类来实现初始化查看源码我们可以得到DefaultMQProducer类的构造方法存在很多类型但是最终会去执行最后一个构造方法。 public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {// 命名空间this.namespace namespace;// 生产者组this.producerGroup producerGroup;// 创建DefaultMQProducerImpl实例负责发送消息defaultMQProducerImpl new DefaultMQProducerImpl(this, rpcHook);
}该构造函数主要就是用来指定命名空间生产者组以及用来处理消息的发送的DefaultMQProducerImpl类该类里面包含消息处理的所有方法。 public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {this.defaultMQProducer defaultMQProducer;this.rpcHook rpcHook;// 异步发送消息的线程池队列this.asyncSenderThreadPoolQueue new LinkedBlockingQueueRunnable(50000);// 初始化线程this.defaultAsyncSenderExecutor new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),1000 * 60,TimeUnit.MILLISECONDS,this.asyncSenderThreadPoolQueue,new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, AsyncSenderExecutor_ this.threadIndex.incrementAndGet());}});
}主要就是初始化一个异步发送消息的线程池。
启动逻辑
生产者的启动逻辑调用start方法其实就是去调用DefaultMQProducerImpl的start方法。 public void start(final boolean startFactory) throws MQClientException {// 根据serviceState的状态来判断是否重复启动switch (this.serviceState) {case CREATE_JUST:this.serviceState ServiceState.START_FAILED;// 检查配置信息this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 客户端核心管理组件的初始化mQClientFactorythis.mQClientFactory MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 往核心组件中注册一个Producer往hashMap插入数据boolean registerOK mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);// 如果存在就报错if (!registerOK) {this.serviceState ServiceState.CREATE_JUST;throw new MQClientException(The producer group[ this.defaultMQProducer.getProducerGroup() ] has been created before, specify another name please. FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 注册topicPublishInfoTablethis.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 启动客户端通讯实例mQClientFactory.start();}log.info(the producer [{}] start OK. sendMessageWithVIPChannel{}, this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException(The producer service state not OK, maybe started once, this.serviceState FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();RequestFutureHolder.getInstance().startScheduledTask(this);
}首先使用checkConfig去检查配置信息比如生产者组等是否符合规范并通过getOrCreateMQClientInstance方法根据clientId获取mQClientFactory的实例。
将生产者注册到MQClientInstance实例的ProducerTable当中同时注册注册topicPublishInfoTable对象。
启动MQClientInstance实例调用其start方法用于初始化netty服务定时任务拉取消息服务等。
最后主动调用sendHeartbeatToAllBrokerWithLock方法发送心跳信息给所有Broker并且移除超时的request请求执行异常回调。
MQClientInstance的启动
MQClientInstance类封装了RockerMQ底层通讯处理的APIProducer与Consumer类都会使用到这个类它是ProducerConsumer与NameServerBroker打交道的网络通道这个类可以理解成一个工厂是对消费者与生产者以及控制台三者的一个合集内部封装了netty客户端消息的生成消费和负载均衡的实现类等。
MQClientInstance类的初始化
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.clientConfig clientConfig;this.instanceIndex instanceIndex;// 创建netty客户端配置类this.nettyClientConfig new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());// 客户端请求处理器this.clientRemotingProcessor new ClientRemotingProcessor(this);// 创建客户端远程通信API实现类的实例this.mQClientAPIImpl new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);// 更新nameServer的地址if (this.clientConfig.getNamesrvAddr() ! null) {this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());log.info(user specified name server address: {}, this.clientConfig.getNamesrvAddr());}// 客户端IDthis.clientId clientId;// mq的admin控制台操作实现this.mQAdminImpl new MQAdminImpl(this);// push模式下拉取消息服务this.pullMessageService new PullMessageService(this);// 负载均衡服务器this.rebalanceService new RebalanceService(this);this.defaultMQProducer new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);this.defaultMQProducer.resetClientConfig(clientConfig);// 消费者统计管理器this.consumerStatsManager new ConsumerStatsManager(this.scheduledExecutorService);log.info(Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{},this.instanceIndex,this.clientId,this.clientConfig,MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}初始化MQClientInstance类的时候会初始化netty客户端以及各种服务实例等。
MQClientInstance类的启动 在调用MQClientInstance的start方法时会启动很多相关的类比如初始化netty客户端等接下来我们就一起来分析。
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:// 状态的二次校验this.serviceState ServiceState.START_FAILED;// If not specified,looking address from name serverif (null this.clientConfig.getNamesrvAddr()) {// 获取nameServer的地址this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channel// 启动mQClientAPIImpl,建立客户端与服务端的channelthis.mQClientAPIImpl.start();// Start various schedule tasks// 启动定时任务this.startScheduledTask();// Start pull service// 启动拉消息服务// 发送消息this.pullMessageService.start();// Start rebalance service// 负载均衡 rebalanceService为线程类start方法执行run方法// 负载均衡服务消费者与生产者建立关系this.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info(the client factory [{}] start OK, this.clientId);this.serviceState ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException(The Factory object[ this.getClientId() ] has been created before, and failed., null);default:break;}}
}首先会去启动一个netty的客户端在前面分析的NettyRemotingServer类里面有过类似的启动逻辑这里就不再重复感兴趣的客户去看一下。
调用this.startScheduledTask()启动很多的定时任务获取nameServer地址的更新topic路由的清除取消Broker以及发送心跳信息到所有Broker持久化消费者进度广播消息持久化到本地集群消息推送到Broker端尝试调整消费线程池的线程数量(目前还未实现该方法)。
注意该类为生产者和消费者公用的类所以作用于每一个Producer与Consumer。 private void startScheduledTask() {if (null this.clientConfig.getNamesrvAddr()) {// 定时获取nameServer的地址this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error(ScheduledTask fetchNameServerAddr exception, e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}// 更新topic路由this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error(ScheduledTask updateTopicRouteInfoFromNameServer exception, e);}}}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);// 清理无效的Broker以及发送心跳信息给所有的Brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {MQClientInstance.this.cleanOfflineBroker();MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error(ScheduledTask sendHeartbeatToAllBroker exception, e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);// 持久化消费者偏移量即消费进度 广播消息持久化到本地集群消息推送到Broker端this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error(ScheduledTask persistAllConsumerOffset exception, e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// 尝试调整push模式的消费线程池的线程数量没有实现逻辑this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error(ScheduledTask adjustThreadPool exception, e);}}}, 1, 1, TimeUnit.MINUTES);
}更新topic路由信息
每隔30s从nameServer内部拉取并更新topic路由消息。
public void updateTopicRouteInfoFromNameServer() {// 使用set集合就是为了有效的去重SetString topicList new HashSetString();// 从ConsumerTable中获取{IteratorEntryString, MQConsumerInner it this.consumerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQConsumerInner entry it.next();MQConsumerInner impl entry.getValue();if (impl ! null) {SetSubscriptionData subList impl.subscriptions();if (subList ! null) {for (SubscriptionData subData : subList) {topicList.add(subData.getTopic());}}}}}// 从producerTable中获取{IteratorEntryString, MQProducerInner it this.producerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQProducerInner entry it.next();MQProducerInner impl entry.getValue();if (impl ! null) {SetString lst impl.getPublishTopicList();topicList.addAll(lst);}}}// 比较更新本地的路由信息for (String topic : topicList) {/*** 从nameSerer拉取到topic路由信息之后调用topicRouteDataIsChange方法与本地* 的旧topic路由信息比较看是否更改比较的数据包括topic的队列信息queueDatas* topic的broker信息brokerDatas顺序topic配置orderTopicConf* 消费过滤信息filterServerTable。* 当判断需要更新的时候会更新本地的topic缓存包括* 1. 更新brokerName到brokerAddr的地址的映射关系即brokerAddrTable* 2. 更新生产者producerTable集合更新MQProducerInner的topicPublishInfoTable属性* 3. 更新消费者的consumerTable集合更新MQConsumerInner的rebalanceImpl.topicSubscribeInfoTable属性* 4. 更新topicRouteTable集合更新本地topic路由信息。*/this.updateTopicRouteInfoFromNameServer(topic);}
}主要逻辑就是从consumerTable以及producerTable中获取配置的所有topic集合包括consumer订阅的topic集合以及Producer中topicPublishTable集合中的数据。
从nameSerer拉取到topic路由信息之后调用topicRouteDataIsChange方法与本地的旧topic路由信息比较看是否更改比较的数据包括topic的队列信息queueDatastopic的broker信息brokerDatas顺序topic配置orderTopicConf消费过滤信息filterServerTable。
清理无效的Broker
调用cleanOfflineBroker()方法区清除下线的Broker。
private void cleanOfflineBroker() {try {// 加锁if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))try {ConcurrentHashMapString, HashMapLong, String updatedTable new ConcurrentHashMapString, HashMapLong, String(this.brokerAddrTable.size(), 1);IteratorEntryString, HashMapLong, String itBrokerTable this.brokerAddrTable.entrySet().iterator();// 遍历brokerAddrTablewhile (itBrokerTable.hasNext()) {EntryString, HashMapLong, String entry itBrokerTable.next();String brokerName entry.getKey();HashMapLong, String oneTable entry.getValue();HashMapLong, String cloneAddrTable new HashMapLong, String(oneTable.size(), 1);cloneAddrTable.putAll(oneTable);IteratorEntryLong, String it cloneAddrTable.entrySet().iterator();while (it.hasNext()) {EntryLong, String ee it.next();String addr ee.getValue();// 判断broker地址是否存在于topicRouteTable的任意一个topic的路由信息中// 如果不存在则直接移除该broker地址if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {it.remove();log.info(the broker addr[{} {}] is offline, remove it, brokerName, addr);}}// 集合为空就移除if (cloneAddrTable.isEmpty()) {itBrokerTable.remove();log.info(the broker[{}] names host is offline, remove it, brokerName);} else {// 否则更新剩下的updatedTable.put(brokerName, cloneAddrTable);}}// 不为空直接更新if (!updatedTable.isEmpty()) {this.brokerAddrTable.putAll(updatedTable);}} finally {this.lockNamesrv.unlock();}} catch (InterruptedException e) {log.warn(cleanOfflineBroker Exception, e);}
}该方法间隔30s就会被执行遍历并更新brokerAddrTable集合其主要的步骤就是获取每一个人对象地址首先去本地topicRouteTable是否存在存在即保留不存在就表名Broker以下线需要被清除如果brokerAddrTable中的value集合也是空则会直接删除键值对。
发送心跳信息给Broker
该方法每隔30s向所有的Broker发送心跳包的定时任务方法客户的consumer和producer都是通过该定时任务发送心跳数据包的。在其他地方也会主动调用一次该方法例如DefaultMQProducerImpl、DefaultMQPushConsumerImpl等类的start方法的结尾都会主动调用一次该方法。
public void sendHeartbeatToAllBrokerWithLock() {// 加锁if (this.lockHeartbeat.tryLock()) {try {// 发送心跳包给所有brokerthis.sendHeartbeatToAllBroker();// 上传过滤类到Broker对应的所有Filtersrvpush模式消费使用this.uploadFilterClassSource();} catch (final Exception e) {log.error(sendHeartbeatToAllBroker exception, e);} finally {this.lockHeartbeat.unlock();}} else {log.warn(lock heartBeat, but failed. [{}], this.clientId);}
}首先会通过prepareHeartbeatData方法准备心跳包的数据进行发送然后会遍历brokerAddrTable的broker地址进行循环发送心跳包。 注意如果单单启动了生产者这时候我们要去判断是否为Master节点由于生产者只能与Master发送消息数据而当我们启动消费者此时我们就需要向所有的broker发送心跳包。
持久化消费偏移量
Override
public void persistConsumerOffset() {try {this.makeSureStateOK();SetMessageQueue mqs new HashSetMessageQueue();SetMessageQueue allocateMq this.rebalanceImpl.getProcessQueueTable().keySet();mqs.addAll(allocateMq);// 持久化的方法this.offsetStore.persistAll(mqs);} catch (Exception e) {log.error(group: this.defaultMQPushConsumer.getConsumerGroup() persistConsumerOffset exception, e);}
}调用persistAllConsumerOffset()进行持久化而持久化的功能时消费端所有的。通过调用DefaultMQPushConsumer的persistAll()方法如果你是广播就会执行LocalFileOffsetStore的对应方法如果你是集群就会执行RemoteBrokerOffsetStore的对应方法。
发送消息的逻辑在Producer的消息发送逻辑里面下一章分析。
负载均衡也是相对于消费端消费端在处理消息的时候使用的消费模式后期去消费端里面详解。
Override
public void run() {log.info(this.getServiceName() service started);while (!this.isStopped()) {this.waitForRunning(waitInterval);// 执行该方法进行负载均衡this.mqClientFactory.doRebalance();}log.info(this.getServiceName() service end);
}