王晴儿网站建设方案,wordpress iis 中文,开源门户网站源码,现代建设中国公司网站【README】
本文演示了当有新消费者加入组后#xff0c;其他消费者接收分区情况#xff1b;本文还模拟了 broker 宕机的情况#xff1b;本文使用的是最新的 kafka3.0.0 #xff1b;本文测试案例#xff0c;来源于 消费者接收分区的5种模型#xff0c;建议先看模型#…【README】
本文演示了当有新消费者加入组后其他消费者接收分区情况本文还模拟了 broker 宕机的情况本文使用的是最新的 kafka3.0.0 本文测试案例来源于 消费者接收分区的5种模型建议先看模型refer2 https://blog.csdn.net/PacosonSWJTU/article/details/121853461https://blog.csdn.net/PacosonSWJTU/article/details/121853461【1】kafka测试环境准备
1kafka集群
3个broker分别为 centos201, centos202, centos203 id分别为 12,3 topic 3个分区2个副本2生产者代码
public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);/*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, all);/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 0);/*5.批次大小一次发送多少数据当数据大于16k生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);/*6.等待时间 等待时间超过1毫秒即便数据没有大于16k 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 超时时间props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);/*7. RecordAccumulator 缓冲区大小*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);/*8. key, value 的序列化类 */props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/** 设置压缩算法 */props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, snappy);/** 设置拦截器 */
// props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));/** 设置阻塞超时时间 */props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);/* 9.创建生产者对象 */KafkaProducerString, String producer new KafkaProducer(props);/* 10.发送数据 */int order 1;for (int i 0; i 100000; i) {for (int j 0; j 3; j) {FutureRecordMetadata future producer.send(new ProducerRecordString, String(hello10,j, , String.format([%s] , order) DataFactory.INSTANCE.genChar(5)));try {System.out.println([生产者] 分区【 future.get().partition() 】-offset【 future.get().offset() 】);} catch (Exception e) {}}try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}/* 11.关闭资源 */producer.close();System.out.println(kafka生产者写入数据完成);}
}
生产者会向每个分区发送1条消息发送完成后睡眠500ms 共计循环 10w次 共计5w秒计划耗时 10小时这里其他同学可以自行设置为其他值
34个消费者编号为1,2,3,4
public class MyConsumer1 {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092,centos202:9092,centos203:9092);/*2.2开启自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自动提交的间隔时间*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, hello10G1); // group.id/*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest); // 默认值是 lastest/*2.7 关闭自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(hello10));/* 指定消费者的每个分区从偏移量1开始读取下面的poll方法就会从位置1开始消费消息 */
// for (TopicPartition partition : consumer.assignment()) {
// consumer.seek(partition, 1);
// }// 消费消息try {// 死循环while(!Thread.interrupted()) {try {System.out.println(DateUtils.getNowTimestamp() 消费者1-等待消费消息);TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 消费消息-获取数据ConsumerRecordsString, String consumerRds consumer.poll(100);// 遍历 ConsumerRecordsfor(ConsumerRecordString, String rd : consumerRds) {System.out.println(消费者1-分区【 rd.partition() 】offset【 rd.offset() 】 - DateUtils.getNowTimestamp() rd.key() -- rd.value());}consumer.commitSync(); // 同步提交}} finally {// 记得关闭消费者consumer.close();}}
}这样的消费者有4个分别编号为 消费者 12,3,4 我的意思是4个不同的消费者类以便打印日志标识
我的消费者消费的是 lastest 最新产生的消费这里可以自行设置为其他值如 earlies
4添加日志配置不打印 debug日志因为kafka消费者debug日志很多
新建 logback.xml 设置仅打印info以上级别日志
?xml version1.0 encodingUTF-8?
configurationlogger nameorg.apache.kafka.clients levelinfo /
/configuration
5为了直观展示消费详情我会用命令行启动4个不同消费者而用idea启动生产者但编译都通过maven 【2】kafka测试
【2.1】测试1当有新消费者加入后整个消费者组成员接收分区情况
写在前面 文末会po出命令行启动消费者的命令及参数
消费者接收分区消息模型参见
step0启动 生产者发送消息到kafka
step2命令行启动消费者1消息消费日志如下 消费者1接收了3个分区消息 step2命令行启动消费者2群组消费日志如下 消费者1接收了个分区2消息 消费者2接收了分区0和分区2的消息 step3命令行继续启动消费者3群组消费日志如下 消费者1接收了个分区2消息 消费者2接收了分区0的消息 消费者3接收了分区1的消息 step4命令行继续启动消费者4 日志如下 消费者1接收了个分区2消息 消费者2接收了分区0的消息 消费者3接收了分区1的消息 消费者4空闲 【2】 模拟kafka broker 宕机
写在前面模拟宕机前先查看 topic 详情 图1
step1 停止掉 201 broker的服务
情况1topic的分区没有受影响但leader 副本选举为3比较本图和图1看差别 情况2所有消费者全部阻塞直到超时全部抛出异常 等待 kafka集群的控制器首领副本选择完成后又可以接收消费者请求
补充1 这里有一小段时间延时即当有broker宕机后需要重新选举控制器首领副本等而且会发生分区再均衡step2重启 201消费日志如下 消费者1接收了个分区1消息 消费者2接收了分区2的消息消费者3空闲 消费者4接收了分区0的消息 之所以 消费者3空闲消费者4忙碌是因为 broker 动态上下线导致了分区再均衡使得分区所有权从消费者A转到消费者B201宕机前是消费者3忙碌消费者4空闲
【小结】
1要保证kafka消息可靠性需要 生产者broker消费者3方的全力配合
2本文这里仅记录了一部分 kafka集群异常的情况 【附录】
命令行启动消费者命令及参数仅供参考因为路径肯定不一样
其实这是拷贝idea的执行日志里的命令如下 java -classpath D:\Java\jdk1.8.0_172\jre\lib\charsets.jar;D:\Java\jdk1.8.0_172\jre\lib\deploy.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_172\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_172\jre\lib\javaws.jar;D:\Java\jdk1.8.0_172\jre\lib\jce.jar;D:\Java\jdk1.8.0_172\jre\lib\jfr.jar;D:\Java\jdk1.8.0_172\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_172\jre\lib\jsse.jar;D:\Java\jdk1.8.0_172\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_172\jre\lib\plugin.jar;D:\Java\jdk1.8.0_172\jre\lib\resources.jar;D:\Java\jdk1.8.0_172\jre\lib\rt.jar;D:\workbench_idea\study4vw\vwstudy22\target\classes;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.5.4\spring-boot-starter-web-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter\2.5.4\spring-boot-starter-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot\2.5.4\spring-boot-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.5.4\spring-boot-autoconfigure-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.5.4\spring-boot-starter-logging-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\ch\qos\logback\logback-classic\1.2.5\logback-classic-1.2.5.jar;D:\software_cluster\mvn_repo\.m2\repository\ch\qos\logback\logback-core\1.2.5\logback-core-1.2.5.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.14.1\log4j-to-slf4j-2.14.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\logging\log4j\log4j-api\2.14.1\log4j-api-2.14.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\jul-to-slf4j\1.7.32\jul-to-slf4j-1.7.32.jar;D:\software_cluster\mvn_repo\.m2\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-core\5.3.9\spring-core-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-jcl\5.3.9\spring-jcl-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\yaml\snakeyaml\1.28\snakeyaml-1.28.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.5.4\spring-boot-starter-json-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.12.4\jackson-databind-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.12.4\jackson-annotations-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.12.4\jackson-core-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.12.4\jackson-datatype-jdk8-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.12.4\jackson-datatype-jsr310-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.12.4\jackson-module-parameter-names-2.12.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.5.4\spring-boot-starter-tomcat-2.5.4.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.52\tomcat-embed-core-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.52\tomcat-embed-el-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.52\tomcat-embed-websocket-9.0.52.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-web\5.3.9\spring-web-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-beans\5.3.9\spring-beans-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-webmvc\5.3.9\spring-webmvc-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-aop\5.3.9\spring-aop-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-context\5.3.9\spring-context-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\springframework\spring-expression\5.3.9\spring-expression-5.3.9.jar;D:\software_cluster\mvn_repo\.m2\repository\org\apache\kafka\kafka-clients\3.0.0\kafka-clients-3.0.0.jar;D:\software_cluster\mvn_repo\.m2\repository\com\github\luben\zstd-jni\1.5.0-2\zstd-jni-1.5.0-2.jar;D:\software_cluster\mvn_repo\.m2\repository\org\lz4\lz4-java\1.7.1\lz4-java-1.7.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.1\snappy-java-1.1.8.1.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;D:\software_cluster\mvn_repo\.m2\repository\org\slf4j\slf4j-simple\1.7.25\slf4j-simple-1.7.25.jar kafka.consumer.MyConsumer2