当前位置: 首页 > news >正文

北京网站设计技术乐云seo山西专业网站建设价目

北京网站设计技术乐云seo,山西专业网站建设价目,wordpress首页链接新标签打开,学做网站论坛 可以吗【README】 本文主要对 java客户端作为kafka 消费者进行测试#xff0c; 生产者由 kafka客户端扮演#xff1b; 【1】普通消费者 设置消费者组#xff1b; 重置消费者的offset#xff0c; 即每次都从最头开始消费#xff08;默认仅保持7天内数据#xff09; #xf…【README】 本文主要对 java客户端作为kafka 消费者进行测试 生产者由 kafka客户端扮演  【1】普通消费者 设置消费者组 重置消费者的offset 即每次都从最头开始消费默认仅保持7天内数据 类似于 命令行 --from-beginning kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning 小结从头开始消费必须满足2个条件 条件1 必须重新换组 如本文中的消费者组 从 sichuan 更新为 sichuan1 条件2 需要设置offset 修改为 earliest 默认值是 lastest /*** 普通消费者*/ public class MyConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);/*2.2开启自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, sichuan1); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest /* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(first, second));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecordsString, String consumerRds consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍历 ConsumerRecords*/for(ConsumerRecordString, String rd : consumerRds) {System.out.println([消费者] rd.key() -- rd.value()); }} /* 关闭消费者 */ // consumer.close(); } } 从官网可以找到以上配置值 https://kafka.apache.org/0110/documentation.html#configuration 【2】kafka消费者-手动提交offset  手动提交offset有3种方式 方式1同步手动提交方式2异步手动提交 方式3自定义手动提交策略 0为啥需要手动提交 kafka自动提交是在kafka拉取到数据之后就直接提交这样很容易丢失数据尤其是在需要事物控制的时候。 很多情况下我们需要从kafka成功拉取数据之后对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 所以这时我们就需要进行手动提交kafka的offset下标。这里顺便说下offset具体是什么。 offset指的是kafka的topic中的每个消费组消费的下标。 简单的来说就是一条消息对应一个offset下标每次消费数据的时候如果提交offset那么下次消费就会从提交的offset加一那里开始消费。 比如一个topic中有100条数据我消费了50条并且提交了那么此时的kafka服务端记录提交的offset就是49(offset从0开始)那么下次消费的时候offset就从50开始消费。 1关闭自动提交(默认为true) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 第一次启动 consumer 从 90 开始消费 第2次启动相同 consumer 还是从90开始消费 2 如何使用手动提交 kafka提供了手动提交offset的api 方法1commitSync 同步提交 方法2commitAsync 异步提交 两者相同点都会将本次 poll 的一批数据最高的偏移量提交 不同点是 commitSync 阻塞当前线程一直到提交成功 并且会自动失败重试 而 commitAsync 没有失败重试机制 可能提交失败 3同步手动提交offset /*** 手动同步提交offset */ public class ManSyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, sichuan1); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest /* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(first, second));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecordsString, String consumerRds consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍历 ConsumerRecords*/for(ConsumerRecordString, String rd : consumerRds) {System.out.println([消费者] [partition] rd.partition() [offset] rd.offset() rd.key() -- rd.value()); }/* 【同步提交】当前线程会阻塞直到 offset提交成功 */ consumer.commitSync();} /* 关闭消费者 */ // consumer.close(); } } 4异步手动提交offset  /*** 异步手动提交offset */ public class ManASyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, sichuan1); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest /* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(first, second));/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecordsString, String consumerRds consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍历 ConsumerRecords*/for(ConsumerRecordString, String rd : consumerRds) {System.out.println([消费者] [partition] rd.partition() [offset] rd.offset() rd.key() -- rd.value()); }/* 【异步提交】 当前线程会阻塞直到 offset提交成功 */ consumer.commitAsync(new OffsetCommitCallback() {Override public void onComplete(MapTopicPartition, OffsetAndMetadata offsets,Exception exception) {if (exception !null) {System.out.println(异步提交失败);} else {System.out.println(异步提交成功); }}}); } /* 关闭消费者 */ // consumer.close(); } } 5自定义手动提交offset策略 5.0为啥需要自定义 因为异步提交有一些问题如下 先消费数据后提交offset 可能导致数据重复消费 先提交offset 后走业务逻辑可能会丢数据 5.1应用场景 把 offset 存储到本地库 和 消息消费逻辑 在同一个数据库事务里面 5.2如何实现需要实现 ConsumerRebalanceListener 来实现。 /*** 自定义手动提交offset策略 */ public class DiyCommitOffsetConsumer {public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092);/*2.2 关闭自动提交(默认为true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, sichuan1); /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 默认值是 lastest /* 创建消费者 */KafkaConsumerString, String consumer new KafkaConsumer(props); /* 订阅主题 */consumer.subscribe(Arrays.asList(first, second), new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(CollectionTopicPartition partitions) { // 在 rebalance方法【前】调用}Overridepublic void onPartitionsAssigned(CollectionTopicPartition partitions) { // 在 rebalance方法【后】调用 /* 分区分配方法 */for (TopicPartition partition : partitions) { /*定位到某个 offset*/consumer.seek(partition, 1); // TODO: 这里需要输入1 }} });/* 循环拉取 */ while(true) {/* 消费消息-获取数据 */ConsumerRecordsString, String consumerRds consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍历 ConsumerRecords*/for(ConsumerRecordString, String rd : consumerRds) {System.out.println([消费者] [partition] rd.partition() [offset] rd.offset() rd.key() -- rd.value()); }/* 【同步提交】当前线程会阻塞直到 offset提交成功 */ consumer.commitSync();} /* 关闭消费者 */ // consumer.close(); } } 补充 消费者rebalance 是什么 消费者 rebalance 什么时候触发 rebalance 如 同一个消费者组下的 某个消费者机器宕机或新增一个消费者机器都会触发 rebalance即重新分配 kafka分区数据与 消费者的对应关系
http://www.sadfv.cn/news/46317/

相关文章:

  • 宣传 网站建设方案模板外贸推广方式有哪些
  • 做百度网站每年的费用多少wordpress支持HTML么
  • 腾讯云怎么备案网站新人0元购物软件
  • 网站建设运营费用包括哪些广州最新新闻事件
  • asp汽车销售公司网站源码 4s店网站源码 汽车网站建设 完整无县电子政务办网站建设工作思路
  • 宁波网站优化如何网站建设有哪些模块
  • 公司网站二维码怎么做的网页设计公司概念
  • 网站制作五个界面公司合法网站域名怎么注册
  • pc端的网站设计方案网络营销方式多样
  • 网站建设开发报价义乌广告设计与制作
  • 泰安网站建设最好wordpress照片库
  • 搭建网站需要什么技能wordpress怎么添加栏目
  • 如何免费做网站赚钱淘宝上网站建设为啥这么便宜
  • 长岭建设局网站dede网站
  • 网站维护与排名WordPress主题niRvana
  • 手机网站推荐几个广西和城乡建设厅网站
  • 自己的网站怎么做跳转深圳网络推广收费标准
  • 播州区住房城乡建设路的网站wordpress文章描述调用修改
  • 朝阳网站建设应聘网站开发的自我介绍
  • 单页网站怎么做关于企业网站建设的必要性
  • 爱建站大全网ui设计师mike个人网站
  • 工程建设公司网站做网站这个工作怎么样
  • 有没有专门做针织衫的网站wordpress显示
  • 开个送快餐网站怎么做办公室装饰
  • wordpress 网站标题设置方法html商业网站模板
  • 电影网站免费建设做钓鱼网站教程视频教程
  • 成都网站seo收费标准怎么做网站的签约编辑
  • 想做找人做网站简述企业网站的基本功能
  • 湖北建设厅网站怎么打不开免费代理浏览网页
  • 网站怎么销售汕头网站建设模板