网站页面建设,莱阳市规划建设局网站,汽车营销活动策划方案,网站发外链的好处位移提交#xff1a;
Kafka的每条消息都有唯一的 offset#xff0c; 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。
消费者每次在 poll() 拉取消息#xff0c;它要返回的是还没有消费过的消息集#xff0c;
因此#xff0c;需要记录上一次消费时的消费位…位移提交
Kafka的每条消息都有唯一的 offset 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。
消费者每次在 poll() 拉取消息它要返回的是还没有消费过的消息集
因此需要记录上一次消费时的消费位移并且持久化。
消费者在消费完消息之后需要执行消费位移的提交。
自动位移提交
Kafka默认的消费位移的提交方式是 自动提交。
自动提交由消费者客户端参数 enable.auto.commit 配置默认值是 true。
默认的自动提交是定期提交提交的周期由 auto.commit.interval.ms 配置默认是 5s。
自动位移提交有可能会重复消费和消息丢失。
假设刚刚提交完一次消费位移然后拉取一批消息进行消费在下一次自动提交消费位移之前消费者崩溃了那又得从上一次位移提交的地方重新开始消费这样就会重复消费。
手动位移提交:
手动位移提交由消费者客户端参数 enable.auto.commit 配置 设置为 false 就是手动位移提交。
手动位移提交可以分为 同步提交、异步提交。
commitSync() 同步提交
同步提交会阻塞消费者线程直到位移提交完成。
示例代码
public class OffsetCommitSync {public static final String BROKER_LIST localhost:9092;public static final String TOPIC myTopic1;public static final String GROUP_ID group.demo;public static void main(String[] args) {Properties props initConfig();KafkaConsumerString, String consumer new KafkaConsumer(props);//消费者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecordsString, String records consumer.poll(1000);for (ConsumerRecordString, String record : records) {//do something}//手动提交位移consumer.commitSync();System.out.println(手动提交位移成功.);}}public static Properties initConfig() {Properties props new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);//不自动提交采用手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}
commitAsync() 异步提交
异步提交在执行的时候消费者线程不会被阻塞可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。
异步提交将 consumer.commitSync(); 换成 commitAsync。
如果还需要回调就用 OffsetCommitCallback对象作为参数。
示例如下
public class OffsetCommitAsyncCallback {public static final String BROKER_LIST localhost:9092;public static final String TOPIC myTopic1;public static final String GROUP_ID group.demo;public static void main(String[] args) {Properties props initConfig();KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecordsString, String records consumer.poll(1000);for (ConsumerRecordString, String record : records) {//do something}//异步回调如果不需要回调就采用无参的方法consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata offsets,Exception exception) {if (exception null) {System.out.println(offsets);} else {log.error(fail to commit offsets {}, offsets, exception);}}});}}public static Properties initConfig() {Properties props new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}
参考资料
《深入理解kafka核心设计与实践原理》