北京网站设计技术乐云seo,山西专业网站建设价目,wordpress首页链接新标签打开,学做网站论坛 可以吗【README】
本文主要对 java客户端作为kafka 消费者进行测试#xff0c; 生产者由 kafka客户端扮演#xff1b; 【1】普通消费者
设置消费者组#xff1b;
重置消费者的offset#xff0c; 即每次都从最头开始消费#xff08;默认仅保持7天内数据#xff09; #xf…【README】
本文主要对 java客户端作为kafka 消费者进行测试 生产者由 kafka客户端扮演 【1】普通消费者
设置消费者组
重置消费者的offset 即每次都从最头开始消费默认仅保持7天内数据
类似于 命令行 --from-beginning
kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning
小结从头开始消费必须满足2个条件
条件1 必须重新换组 如本文中的消费者组 从 sichuan 更新为 sichuan1
条件2 需要设置offset 修改为 earliest 默认值是 lastest /*** 普通消费者*/
public class MyConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);/*2.2开启自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, sichuan1); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest /* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(first, second));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecordsString, String consumerRds consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍历 ConsumerRecords*/for(ConsumerRecordString, String rd : consumerRds) {System.out.println([消费者] rd.key() -- rd.value()); }} /* 关闭消费者 */
// consumer.close(); }
} 从官网可以找到以上配置值 https://kafka.apache.org/0110/documentation.html#configuration 【2】kafka消费者-手动提交offset
手动提交offset有3种方式
方式1同步手动提交方式2异步手动提交 方式3自定义手动提交策略
0为啥需要手动提交
kafka自动提交是在kafka拉取到数据之后就直接提交这样很容易丢失数据尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 所以这时我们就需要进行手动提交kafka的offset下标。这里顺便说下offset具体是什么。
offset指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标每次消费数据的时候如果提交offset那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据我消费了50条并且提交了那么此时的kafka服务端记录提交的offset就是49(offset从0开始)那么下次消费的时候offset就从50开始消费。
1关闭自动提交(默认为true) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
第一次启动 consumer 从 90 开始消费 第2次启动相同 consumer 还是从90开始消费
2 如何使用手动提交
kafka提供了手动提交offset的api
方法1commitSync 同步提交
方法2commitAsync 异步提交
两者相同点都会将本次 poll 的一批数据最高的偏移量提交
不同点是 commitSync 阻塞当前线程一直到提交成功 并且会自动失败重试
而 commitAsync 没有失败重试机制 可能提交失败 3同步手动提交offset
/*** 手动同步提交offset */
public class ManSyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, sichuan1); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest /* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(first, second));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecordsString, String consumerRds consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍历 ConsumerRecords*/for(ConsumerRecordString, String rd : consumerRds) {System.out.println([消费者] [partition] rd.partition() [offset] rd.offset() rd.key() -- rd.value()); }/* 【同步提交】当前线程会阻塞直到 offset提交成功 */ consumer.commitSync();} /* 关闭消费者 */
// consumer.close(); }
}
4异步手动提交offset
/*** 异步手动提交offset */
public class ManASyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, sichuan1); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest /* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(first, second));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecordsString, String consumerRds consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍历 ConsumerRecords*/for(ConsumerRecordString, String rd : consumerRds) {System.out.println([消费者] [partition] rd.partition() [offset] rd.offset() rd.key() -- rd.value()); }/* 【异步提交】 当前线程会阻塞直到 offset提交成功 */ consumer.commitAsync(new OffsetCommitCallback() {Override public void onComplete(MapTopicPartition, OffsetAndMetadata offsets,Exception exception) {if (exception !null) {System.out.println(异步提交失败);} else {System.out.println(异步提交成功); }}}); } /* 关闭消费者 */
// consumer.close(); }
}
5自定义手动提交offset策略
5.0为啥需要自定义
因为异步提交有一些问题如下
先消费数据后提交offset 可能导致数据重复消费
先提交offset 后走业务逻辑可能会丢数据
5.1应用场景
把 offset 存储到本地库 和 消息消费逻辑 在同一个数据库事务里面 5.2如何实现需要实现 ConsumerRebalanceListener 来实现。
/*** 自定义手动提交offset策略 */
public class DiyCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, sichuan1); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest /* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(first, second), new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(CollectionTopicPartition partitions) { // 在 rebalance方法【前】调用}Overridepublic void onPartitionsAssigned(CollectionTopicPartition partitions) { // 在 rebalance方法【后】调用 /* 分区分配方法 */for (TopicPartition partition : partitions) { /*定位到某个 offset*/consumer.seek(partition, 1); // TODO: 这里需要输入1 }} });/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecordsString, String consumerRds consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍历 ConsumerRecords*/for(ConsumerRecordString, String rd : consumerRds) {System.out.println([消费者] [partition] rd.partition() [offset] rd.offset() rd.key() -- rd.value()); }/* 【同步提交】当前线程会阻塞直到 offset提交成功 */ consumer.commitSync();} /* 关闭消费者 */
// consumer.close(); }
} 补充 消费者rebalance 是什么
消费者 rebalance 什么时候触发 rebalance 如 同一个消费者组下的 某个消费者机器宕机或新增一个消费者机器都会触发 rebalance即重新分配 kafka分区数据与 消费者的对应关系