当前位置: 首页 > news >正文

动态ip做网站网站建设实现的目标

动态ip做网站,网站建设实现的目标,小程序是什么东西,app界面设计模板免费下载默认情况下#xff0c;一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。 一、自定义PartitionAssignor. package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.internals.Abstrac…默认情况下一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。 一、自定义PartitionAssignor. package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; import org.apache.kafka.common.TopicPartition;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;public class BroadcastAssignor extends AbstractPartitionAssignor {Overridepublic String name() {return broadcast;}private MapString, ListString consumersPerTopic(MapString, Subscription consumerMetadata) {MapString, ListString res new HashMap();for (Map.EntryString, Subscription subscriptionEntry : consumerMetadata.entrySet()) {String consumerId subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;}Overridepublic MapString, ListTopicPartition assign(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions) {MapString, ListString consumersPerTopic consumersPerTopic(subscriptions);MapString, ListTopicPartition assignment new HashMap();subscriptions.keySet().forEach(memberId -assignment.put(memberId, new ArrayList()));consumersPerTopic.entrySet().forEach(topicEntry-{String topic topicEntry.getKey();ListString members topicEntry.getValue();Integer numPartitionsForTopic partitionsPerTopic.get(topic);if (numPartitionsForTopic null || members.isEmpty())return;ListTopicPartition partitions AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);if (!partitions.isEmpty()) {members.forEach(memberId -assignment.get(memberId).addAll(partitions));}});return assignment;} }二、定义两个消费者给其配置上述PartitionAssignor. package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.time.temporal.TemporalUnit; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit;public class KafkaTest19 {private static Properties getProperties(){Properties propertiesnew Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,xx.xx.xx.xx:9092);properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,testGroup2023);properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,BroadcastAssignor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumerString,String myConsumernew KafkaConsumerString, String(getProperties());String topicstudy2023;myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecordsString,String consumerRecordsmyConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println(record offset is: record.offset());}}} } package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.time.temporal.TemporalUnit; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit;public class KafkaTest20 {private static Properties getProperties(){Properties propertiesnew Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,xx.xx.xx.xx:9092);properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,testGroup2023);properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,BroadcastAssignor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumerString,String myConsumernew KafkaConsumerString, String(getProperties());String topicstudy2023;myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecordsString,String consumerRecordsmyConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println(record offset is: record.offset());}}} } 在kafka创建只有一个分区的topic study2023 创建一个生产者往study2023这个 topic发送消息 package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;public class KafkaTest01 {public static void main(String[] args) {Properties properties new Properties();properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,xx.xx.xx.xx:9092);KafkaProducerString,String kafkaProducernew KafkaProducerString, String(properties);ProducerRecordString,String producerRecordnew ProducerRecord(study2023,0,fff,hello sister,now is: new Date());FutureRecordMetadata future kafkaProducer.send(producerRecord);long offset 0;try {offset future.get().offset();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(offset);kafkaProducer.close();} } 分别运行生产者和消费者可以看到相同消费者组里两个消费者可以消费study2023这个topic的同一个分区的数据
http://www.yutouwan.com/news/217856/

相关文章:

  • 品牌网站建设网站网站建设公司介绍ppt
  • 成都网站建设公司电话邵阳做网站
  • 互动科技网站建设网站建设的介绍
  • 网站关键词优化方案分为几个步骤店面设计绘画
  • 国内最有趣的25个网站丽江建网站
  • 做网站需要什么手续网站项目需求表
  • 网站建设绩效考核方案上海工商一网通办
  • 网站建设评价江苏网站建设流程
  • 赤峰网站设计专业网站设计制作价格
  • 河南平台网站建设做免费网站安全吗
  • 推广型网站开发网址wordpress主题需要ftp
  • 上饶网站开发wordpress 文章翻页
  • 华为公司网站建设案例分析门户网站建设情况调研报告
  • 龙华网站设计世界500强企业有哪些
  • 做网站要用到什么软件专做女裤有哪些网站
  • 克拉玛依网站建设公司项目管理软件 project教程
  • 五路居网站建设上海城隍庙小吃推荐
  • 做sohu最好的推广网站石家庄网站制作找谁
  • 网站建设与网页设计教程大连网站推广优化
  • 上海网站建设建站房地产销售话术
  • 广州车陂网站建设公司泌阳专业网站建设
  • 旅游网站的功能温州市建设工程信息网
  • 织梦贷款网站源码网站搭建视频
  • 网站域名登记证明文件建设部网站最新消息
  • dede网站搬家教程什么网站专门做自由行的
  • 网站备案要关站吗多个域名指向同一个网站 备案
  • 内江做网站哪里便宜网站建设与管理 情况总结
  • 江苏南京建设厅网站音乐制作软件
  • 网站系统源代码郑州市做网站
  • 网站建设财务计划与预测软件开发学院