汉口网站制作设计,淘宝 网站建设教程视频教程,湖南易图做推广送网站,深圳十大高科技企业今天#xff0c;我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章#xff0c;这篇文章很简单#xff0c;就是利用Kafka的API创建一个生产者和消费者#xff0c;生产者不断向Kafka写入消息#xff0c;消费者则不断消费Kafka的消息。下面是具体的实例代码。一、创…今天我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章这篇文章很简单就是利用Kafka的API创建一个生产者和消费者生产者不断向Kafka写入消息消费者则不断消费Kafka的消息。下面是具体的实例代码。一、创建配置类Config这个类很简单只是存放了两个常量一个是话题TOPIC一个是线程数THREADSpackage com.lya.kafka;/*** 配置项* author liuyazhuang**/public class Config {/*** 话题*/public static final String TOPIC wordcount;/*** 线程数*/public static final Integer THREADS 1;}二、编程生产者类ProducerDemo这个类的主要作用就是向Kafka写入相应的消息并且将消息写入wordcount话题。package com.lya.kafka;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/*** 生产者实例* author liuyazhuang**/public class ProducerDemo {public static void main(String[] args) throws Exception {Properties props new Properties();props.put(zk.connect, 192.168.209.121:2181);props.put(metadata.broker.list,192.168.209.121:9092);props.put(serializer.class, kafka.serializer.StringEncoder);props.put(zk.connectiontimeout.ms, 15000);ProducerConfig config new ProducerConfig(props);Producer producer new Producer(config);// 发送业务消息// 读取文件 读取内存数据库 读socket端口for (int i 1; i 100; i) {Thread.sleep(500);producer.send(new KeyedMessage(Config.TOPIC,this number i));}}}三、编写消息者类ConsumerDemo这个类的主要作用就是消费Kafka中wordcount话题的消息。package com.lya.kafka;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.message.MessageAndMetadata;/*** 消费者实例* author liuyazhuang**/public class ConsumerDemo {public static void main(String[] args) {Properties props new Properties();props.put(zookeeper.connect, 192.168.209.121:2181);props.put(group.id, 1111);props.put(auto.offset.reset, smallest);props.put(zk.connectiontimeout.ms, 15000);ConsumerConfig config new ConsumerConfig(props);ConsumerConnector consumer Consumer.createJavaConsumerConnector(config);Map topicCountMap new HashMap();topicCountMap.put(Config.TOPIC, Config.THREADS);Map consumerMap consumer.createMessageStreams(topicCountMap);List streams consumerMap.get(Config.TOPIC);for(final KafkaStream kafkaStream : streams){new Thread(new Runnable() {Overridepublic void run() {for(MessageAndMetadata mm : kafkaStream){String msg new String(mm.message());System.out.println(msg);}}}).start();}}}四、运行实例首先运行消费者类ConsumerDemo运行结果如下没有打印任何信息。此时我们运行生产者类ProducerDemo我们再次打开消费者的控制台查看如下打印出了生产者生产的消息。至此Kafka简单客户端编程实例结束。以上就是本文的全部内容希望对大家的学习有所帮助也希望大家多多支持脚本之家。