当前位置: 首页 > news >正文

txt电子书下载网站推荐wordpress添加自定义模板

txt电子书下载网站推荐,wordpress添加自定义模板,龙岗营销型网站建设,自己做视频会员网站【README】 本文使用的kafka是最新的 kafka3.0.0#xff1b;本文kafka集群有3个节点分别是 centos201, centos202, centos203 #xff1b; brokerid 分别为 1,2#xff0c;3#xff1b;本文主要用于测试 再均衡监听器#xff1b;当有新消费者加入时#xff0c;会发生分区…【README】 本文使用的kafka是最新的 kafka3.0.0本文kafka集群有3个节点分别是 centos201, centos202, centos203 brokerid 分别为 1,23本文主要用于测试 再均衡监听器当有新消费者加入时会发生分区再均衡再均衡前就会调用再均衡监听器的 onPartitionsRevoked()方法本文的测试主题 hello12有3个分区每个分区2个副本【1】再均衡监听器 1应用场景  在消费者退出或进行分区再均衡前会做一些清理工作如提交偏移量或关闭数据库连接这些工作可以通过监听器来实现 2再均衡监听器 ConsumerRebalanceListener 实现 该监听器即可有3个方法 onPartitionsRevoked在分区均衡开始【前】和消费者停止读取消息【后】被调用onPartitionsAssigned分区再均衡【后】和消费者开始读取消息【前】被调用 onPartitionsLost分区宕机时调用本文不涉及 /*** Description 消费者分区再均衡监听器实现类* author xiao tang* version 1.0.0* createTime 2021年12月11日*/ public class ConsumerBalanceListenerImpl implements ConsumerRebalanceListener {/** 消费者 */private Consumer consumer;/** 主题分区偏移量数组 */private MyTopicPartitionOffset[] topicPartitionOffsetArr;/*** description 构造器* param consumer 消费者* param curOffsets 当前偏移量* author xiao tang* date 2021/12/11*/public ConsumerBalanceListenerImpl(Consumer consumer, MyTopicPartitionOffset[] topicPartitionOffsetArr) {this.consumer consumer;this.topicPartitionOffsetArr topicPartitionOffsetArr;}/** * description 在分区均衡开始【前】和消费者停止读取消息【后】被调用* param partitions 分区列表分区号从0开始计数* author xiao tang* date 2021/12/12 */Overridepublic void onPartitionsRevoked(CollectionTopicPartition partitions) {System.out.println( 分区再均衡触发onPartitionsRevoked()方法);// 提交偏移量回调或记录错误日志OffsetCommitCallback offsetCommitCallback new OffsetCommitCallbackImpl();// 打印日志Arrays.stream(topicPartitionOffsetArr).filter(x-x.partition()-1).forEach(x-System.out.printf(提交偏移量信息partition【%d】offset【%s】\n, x.partition(), x.offset()));// 把数组转为主题分区与偏移量映射并提交最后一次处理的偏移量 可以异步也可以同步// 同步提交一直重试或报超时异常// 异步提交传入提交回调失败自行处理consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), offsetCommitCallback);// 停止程序的原因在于做实验下次从本次提交的偏移量开始消费throw new RuntimeException(发生分区再均衡程序结束);}/*** description 分区再均衡【后】和消费者开始读取消息【前】被调用* param partitions 主题分区列表* author xiao tang* date 2021/12/12*/Overridepublic void onPartitionsAssigned(CollectionTopicPartition partitions) {// do sth}Overridepublic void onPartitionsLost(CollectionTopicPartition partitions) {ConsumerRebalanceListener.super.onPartitionsLost(partitions);} }为了测试分区再均衡监听器中onPartitionsRevoked() 方法提交最后已消费消息的偏移量后就抛出运行时异常结束运行让其他消费者消费以便查看监听器是否成功提交偏移量 3消费者工具   /*** Description 消费者工具* author xiao tang* version 1.0.0* createTime 2021年12月12日*/ public enum MyConsumerUtils {/** 单例 */INSTANCE;/*** description 获取主题分区与偏移量映射* param topicPartitionOffsetArr 主题分区与偏移量数组* return 主题分区与偏移量映射* author xiao tang* date 2021/12/12*/public static MapTopicPartition, OffsetAndMetadata getTopicPartitionOffsetMetadataMap(MyTopicPartitionOffset[] topicPartitionOffsetArr) {// 主题分区与偏移量映射MapTopicPartition, OffsetAndMetadata topicPartitionOffsetMetadataMap new HashMap(topicPartitionOffsetArr.length);// 分区号大于-1才是消费者接收的分区Arrays.stream(topicPartitionOffsetArr).filter(x-x.partition()-1).forEach(x - {topicPartitionOffsetMetadataMap.put(new TopicPartition(x.topic(), x.partition()), new OffsetAndMetadata(x.offset(), no metadata));});return topicPartitionOffsetMetadataMap;} } 【2】生产者  /*** Description 生产者* author xiao tang* version 1.0.0* createTime 2021年12月03日*/ public class MyProducer {/** 主题 */public final static String TOPIC_NAME hello12;public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);/*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, all);/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小一次发送多少数据当数据大于16k生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待时间 等待时间超过1毫秒即便数据没有大于16k 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超时时间props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 缓冲区大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化类 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 设置压缩算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, snappy);/** 设置拦截器 */ // props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));/** 设置阻塞超时时间 */props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);/* 9.创建生产者对象 */KafkaProducerString, String producer new KafkaProducer(props);/* 10.发送数据 */int order 1;for (int i 0; i 10000; i) {for (int j 0; j 3; j) {FutureRecordMetadata future producer.send(new ProducerRecordString, String(TOPIC_NAME, j, , String.format([%s] , order) DataFactory.INSTANCE.genChar(5)));try {System.out.println([生产者] 分区【 future.get().partition() 】-offset【 future.get().offset() 】);} catch (Exception e) {}}}/* 11.关闭资源 */producer.close();System.out.println(kafka生产者写入数据完成);} } 【3】消费者 【3.1】带有均衡监听器的消费者1 /*** Description 带有均衡监听器的消费者* author xiao tang* version 1.0.0* createTime 2021年12月11日*/ public class MyConsumerWithRebalanceListener {public static void main(String[] args) {// 创建消费者配置信息Properties props new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME G1);// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumerString, String consumer new KafkaConsumer(props);int partitionSize consumer.partitionsFor(MyProducer.TOPIC_NAME).size();// 创建分区偏移量数组并初始化 仅考虑一个topic的情况MyTopicPartitionOffset[] topicPartitionOffsetArr new MyTopicPartitionOffset[partitionSize];IntStream.range(0, partitionSize).forEach(x - topicPartitionOffsetArr[x] new MyTopicPartitionOffset());// 订阅主题 【传入分区再均衡监听器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME), new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr));// 循环拉取try {while (!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() 带均衡监听器消费者等待消费消息);TimeUtils.sleep(1000);// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);System.out.println(poll begin {);for (ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者-WithRebalanceListener-分区【 rd.partition() 】offset【 rd.offset() 】 值 rd.value());// 提交的偏移量是 当前消息偏移量加1topicPartitionOffsetArr[rd.partition()].setAll(rd.topic(), rd.partition(), rd.offset() 1);}System.out.println(poll end } );// 【异步提交每个分区的偏移量】consumer.commitAsync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr), new OffsetCommitCallbackImpl());}} finally {try {// 【同步提交】 因为错误时同步提交会一直重试直到提交成功或发生无法恢复的错误consumer.commitSync(MyConsumerUtils.getTopicPartitionOffsetMetadataMap(topicPartitionOffsetArr));} finally {// 记得关闭消费者consumer.close();}}} } 【3.2】 不带均衡监听器的消费者2 测试用 即一个普通消费者 /*** Description 不带均衡监听器的消费者* author xiao tang* version 1.0.0* createTime 2021年12月11日*/ public class MyConsumerNoRebalanceListener {public static void main(String[] args) {// 创建消费者配置信息Properties props new Properties();// 属性配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME G1);// 关闭自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 设置消费消息的位置消费最新消息props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// 设置分区策略 (默认值-RangeAssignor)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());// 创建消费者KafkaConsumerString, String consumer new KafkaConsumer(props);// 订阅主题 【没有分区再均衡监听器】consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));// 循环拉取try {while(!Thread.interrupted()) {System.out.println(DateUtils.getNowTimestamp() 没有均衡监听器的消费者等待消费消息);TimeUtils.sleep(1000);// 消费消息ConsumerRecordsString, String consumerRds consumer.poll(100);for(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者-NoRebalanceListener-分区【 rd.partition() 】offset【 rd.offset() 】 值 rd.value());}// 【异步提交】consumer.commitAsync(new OffsetCommitCallbackImpl());if (!consumerRds.isEmpty()) break;}} finally {try {// 【同步提交】 因为错误时同步提交会一直重试直到提交成功或发生无法恢复的错误consumer.commitSync();} finally {// 记得关闭消费者consumer.close();System.out.println(消费者关闭);}}} } 我们可以发现一旦消费者2消费了消息消息不为空就停止消费 以便我们查看消费者2接收消息的偏移量是不是 消费者1的监听器在发生分区再均衡前提交的偏移量1 【4】测试 【4.1】测试步骤 step1 运行生产者发送消息到kafka step2 运行 带有均衡监听器的消费者1 MyConsumerWithRebalanceListener 消费消息 在消费者订阅主题时传入再均衡监听器 // 订阅主题 【传入分区再均衡监听器】 consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME) , new ConsumerBalanceListenerImpl(consumer, topicPartitionOffsetArr)); step3运行 不带均衡监听器的消费者2 MyConsumerNoRebalanceListener消费消息 一旦消费者2加入消费者组就会发生分区再均衡消费者1的某些分区所有权会转给消费者2触发消费者1 的 监听器 ConsumerBalanceListenerImpl 的 onPartitionsRevoked() 方法 然后 onPartitionsRevoked()方法提交 消费者1处理的消息的偏移量后就原地抛出异常停止运行 【4.2】测试结果分析 1消费者1带分区再均衡监听器的监听器最后提交的偏移量日志如下 2021-12-12 10:23:30 带均衡监听器消费者等待消费消息 分区再均衡触发onPartitionsRevoked()方法 提交偏移量信息partition【0】offset【1296】 提交偏移量信息partition【1】offset【1269】 提交偏移量信息partition【2】offset【1269】 2消费者2接收到的起始消息的偏移量日志如下全部 2021-12-12 10:23:27 没有均衡监听器的消费者等待消费消息 2021-12-12 10:23:32 没有均衡监听器的消费者等待消费消息消费者-NoRebalanceListener-分区【0】offset【1296】值[589]  ABCDE 消费者-NoRebalanceListener-分区【0】offset【1297】值[592]  ABCDE 消费者-NoRebalanceListener-分区【0】offset【1298】值[595]  ABCDE 消费者-NoRebalanceListener-分区【0】offset【1299】值[598]  ABCDE 消费者-NoRebalanceListener-分区【0】offset【1300】值[601]  ABCDE 消费者-NoRebalanceListener-分区【0】offset【1301】值[604]  ABCDE 消费者-NoRebalanceListener-分区【0】offset【1302】值[607]  ABCDE 消费者-NoRebalanceListener-分区【0】offset【1303】值[610]  ABCDE 消费者-NoRebalanceListener-分区【0】offset【1304】值[613]  ABCDE消费者-NoRebalanceListener-分区【2】offset【1269】值[510]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1270】值[513]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1271】值[516]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1272】值[519]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1273】值[522]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1274】值[525]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1275】值[528]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1276】值[531]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1277】值[534]  ABCDE消费者-NoRebalanceListener-分区【1】offset【1269】值[509]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1270】值[512]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1271】值[515]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1272】值[518]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1273】值[521]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1274】值[524]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1275】值[527]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1276】值[530]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1277】值[533]  ABCDE 消费者关闭 即 监听器提交的偏移量为 partition【0】offset【1296】 partition【1】offset【1269】 partition【2】offset【1269】 而普通消费者接收消息的起始偏移量为 消费者-NoRebalanceListener-分区【0】offset【1296】值[589]  ABCDE 消费者-NoRebalanceListener-分区【2】offset【1269】值[510]  ABCDE 消费者-NoRebalanceListener-分区【1】offset【1269】值[509]  ABCDE 所以偏移量是可以对上的即再均衡监听器在发生分区再均衡前提交的消息偏移量后 其他消费者可以接收该偏移量指定的消息 所以本次再均衡监听器测试是成功的 【注意】 注意1监听器提交的偏移量是接收消息的当前偏移量1注意要加1非常重要即加1后的偏移量作为其他消费者轮序消费的起始位置 代码偏移量1参见  MyConsumerWithRebalanceListener 的 接收消息的循环中的代码如下// 提交的偏移量是 当前消息偏移量加1 topicPartitionOffsetArr[rd.partition()].setAll( rd.topic(), rd.partition(), rd.offset() 1); 注意2本文代码参考了 《kafka权威指南》 page63但书中代码有问题 在每次for循环中创建 TopicPartition对象和 OffsetAndMetadata对象我觉得这是没有必要的因为只有每个分区的最后一次消息的 topicpartitionoffset是有用的但它每次循环都创建了对象而且把 currentOffsets 放在了while循环外面这样肯定会造成 oom本文仅记录了 topicpartitionoffset而没有创建对象这是本文的优化点当然了原作者写的是参考代码可以理解但业务生产代码绝对不能这样写 【References】 kakfa权威指南
http://www.sadfv.cn/news/320862/

相关文章:

  • 洛阳建设信息网站腾讯广告联盟官网
  • 如何制作网站二维码企术建站
  • 个人做的网站不能做淘客宝格丽官网
  • 建设工程招标投标网一键优化清理
  • 怎么把网站提交WordPress百度快照
  • 深圳做网站 汉狮网络华为云网站建设怎么设置选择项
  • 韩国购物网站有哪些公司网站备案资料
  • 网站开发需要学shenme网站改版301怎么做
  • seo整站优化多少钱wordpress 推荐 配置
  • 烟台做网站天津网站建设设计开发公司
  • 做平台的网站有哪些如何写营销策划方案
  • 重庆光龙网站建设关键词排名技巧
  • 设计最好的网站怎么开网店详细步骤教程
  • 男女做污污的网站wordpress升级需要ftp
  • 台州网站建设技术支持网站登录注册怎么做
  • 图片网站该如何做seo优化事件营销的案例
  • 广州网站建设公司哪家比较好网页设计培训教程
  • 用dw制作网站模板下载地址html5网页代码大全
  • 北京网站建设 网络安全有教做鱼骨图的网站吗
  • 做软件开发视频网站医院网站建设好处
  • 知名网站开发哪家好网站项目需求分析
  • 网站开发与建设会计分录湖北网站科技建设
  • 镇江网站建设设计品牌推广的具体方法
  • 公司网站怎么选销售新网站推广策略
  • 网站开发实训基本要求医院网站的建设
  • 金融网站建设案例药品网站如何建设
  • 用python怎么做网站大连搜狗
  • 央企网站群建设营业推广的目标通常是
  • 外贸网站建设的重要性好的高端网站
  • 松江建设管理中心网站电子商务网站建设评估工具