当前位置: 首页 > news >正文

长春朝阳学校网站建设临沂网站制作企业

长春朝阳学校网站建设,临沂网站制作企业,深圳万创网怎么样,外贸公司手机网站文章目录 1. Kafka 生产者2. kafaka 命令行操作3. kafka 生产者发送消息流程4. Kafka 生产者的创建5. Kafka 生产者发送消息1. 发送即忘记2. 同步发送3. 异步发送 6. Kafka 消息对象 ProducerRecord 1. Kafka 生产者 不管是把Kafka作为消息队列、消息总线还是数据存储平台总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序它们负责将消息发送到 Kafka 集群中的一个或多个主题topic。生产者可以将消息发送到指定的主题也可以根据分区策略将消息发送到多个分区中。生产者可以以异步或同步方式发送消息并且可以配置消息的可靠性和持久性等属性。在 Kafka 中生产者是消息的源头它们将消息发送到 Kafka 集群中供消费者消费。 2. kafaka 命令行操作 ① 启动 Zookeeper 集群 [rootmaster01 bin]# pwd /root/ch/soft/zk/zk-01/bin [rootmaster01 bin]# ./zkServer.sh start[rootmaster01 bin]# pwd /root/ch/soft/zk/zk-02/bin [rootmaster01 bin]# ./zkServer.sh start[rootmaster01 bin]# pwd /root/ch/soft/zk/zk-03/bin [rootmaster01 bin]# ./zkServer.sh start② 启动 kafka 集群 [rootmaster01 kafka01]# pwd /root/ch/soft/kafka/kafka01 [rootmaster01 kafka01]# bin/kafka-server-start.sh config/server.properties[rootmaster01 kafka02]# pwd /root/ch/soft/kafka/kafka02 [rootmaster01 kafka02]# bin/kafka-server-start.sh config/server.properties[rootmaster01 kafka03]# pwd /root/ch/soft/kafka/kafka03 [rootmaster01 kafka03]# bin/kafka-server-start.sh config/server.properties③ 创建主题 test [rootmaster01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2 --topic test Created topic test. [rootmaster01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test Topic:test PartitionCount:3 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: test Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: test Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0④ 生产者发送消息到主题test [rootmaster01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic test hello 你好kafka!⑤ 消费者消费主题test的消息 [rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning hello 你好kafka!3. kafka 生产者发送消息流程 先从创建一个ProducerRecord对象开始其中需要包含目标主题和要发送的内容。另外还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时生产者需要先把键和值对象序列化成字节数组这样才能在网络上传输。 接下来如果没有显式地指定分区那么数据将被传给分区器。分区器通常会基于ProducerRecord对象的键选择一个分区。选好分区以后生产者就知道该往哪个主题和分区发送这条消息了。紧接着该消息会被添加到一个消息批次里这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标broker。 broker在收到这些消息时会返回一个响应。如果消息写入成功就返回一个RecordMetaData对象其中包含了主题和分区信息以及消息在分区中的偏移量。如果消息写入失败则会返回一个错误。生产者在收到错误之后会尝试重新发送消息重试几次之后如果还是失败则会放弃重试并返回错误信息。 4. Kafka 生产者的创建 要向Kafka写入消息首先需要创建一个生产者对象并设置一些属性。Kafka生产者有3个必须设置的属性。 ① bootstrap.servers broker的地址。可以由多个host:port组成生产者用它们来建立初始的Kafka集群连接。它不需要包含所有的broker地址因为生产者在建立初始连接之后可以从给定的broker那里找到其他broker的信息。不过还是建议至少提供两个broker地址因为一旦其中一个停机则生产者仍然可以连接到集群。 ② key.serializer 一个类名用来序列化消息的键。broker 希望接收到的消息的键和值都是字节数组。生产者可以把任意Java对象作为键和值发送给broker但它需要知道如何把这些Java对象转换成字节数组。key.serializer 必须被设置为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer等如果你只使用常见的几种Java对象类型就没有必要实现自己的序列化器。 需要注意的是必须设置key.serializer这个属性尽管你可能只需要将值发送给Kafka。如果只需要发送值则可以将Void作为键的类型然后将这个属性设置为VoidSerializer。 ③ value.serializer 一个类名用来序列化消息的值。与设置key.serializer属性一样需要将value.serializer设置成可以序列化消息值对象的类。 public class CustomProducer01 {public static void main(String[] args) {// kafka生产者属性配置Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,10.65.132.2:9093);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 创建kafka生产者 KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);} } 5. Kafka 生产者发送消息 实例化好生产者对象后接下来就可以开始发送消息了。KafkaProducer 的 send() 方法用于向 Kafka 集群发送消息。该方法的语法如下 public interface ProducerK, V extends Closeable {FutureRecordMetadata send(ProducerRecordK, V record);FutureRecordMetadata send(ProducerRecordK, V record, Callback callback); }其中ProducerRecordK, V 表示要发送的消息记录K 和 V 分别表示键和值的类型。send() 方法返回一个 Future 对象表示异步发送消息的结果。 发送消息主要有以下3种方式 ① 发送并忘记 把消息发送给服务器但并不关心它是否成功送达。大多数情况下消息可以成功送达因为Kafka是高可用的而且生产者有自动尝试重发的机制。但是如果发生了不可重试的错误或超时那么消息将会丢失应用程序将不会收到任何信息或异常。 ② 同步发送 一般来说生产者是异步的——我们调用send()方法发送消息它会返回一个Future对象。可以调用get()方法等待Future完成这样就可以在发送下一条消息之前知道当前消息是否发送成功。 ③ 异步发送 调用send()方法并指定一个回调函数当服务器返回响应时这个函数会被触发。 1. 发送即忘记 发送即忘记生产者发送消息后不会等待服务器的响应直接发送下一条消息。它只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下这种发送方式没有什么问题不过在某些时候比如发生不可重试异常时会造成消息的丢失。这种发送方式的性能最高可靠性也最差。 public class CustomProducer01 {private static final String brokerList 10.65.132.2:9093;private static final String topic test;public static Properties initConfig(){Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties initConfig();// kafka生产者发送消息默认是异步发送方式KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);ProducerRecordString, String producerRecord new ProducerRecord(topic, 你好kafka!);try{// 发送消息kafkaProducer.send(producerRecord);}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();} }cmd命令行窗口开启 kafka 消息者观察消费者是否接收到消息 [rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning 你好kafka!2. 同步发送 同步发送消息很简单当Kafka返回错误或重试次数达到上限时生产者可以捕获到异常。这里需要考虑性能问题。根据Kafka集群繁忙程度的不同broker可能需要2毫秒或更长的时间来响应请求。如果采用同步发送方式那么发送线程在这段时间内就只能等待什么也不做甚至都不发送其他消息这将导致糟糕的性能。因此同步发送方式通常不会被用在生产环境中但会经常被用在示例代码中。 send() 方法本身就是异步的send() 方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send() 方法之后可以调用 get() 方法来阻塞等待Kafka的响应直到消息发送成功或者发生异常。如果发生异常那么就需要捕获异常并交由外层逻辑处理。 Future 接口源码 public interface FutureV {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }Future接口是Java中用于表示异步计算结果的接口。它定义了一些方法用于查询异步计算是否完成、获取计算结果等操作。 cancel方法用于取消异步计算isCancelled方法用于判断异步计算是否已经被取消isDone方法用于判断异步计算是否已经完成。get方法用于获取异步计算的结果如果计算还没有完成则该方法会阻塞直到计算完成。如果计算被取消则该方法会抛出CancellationException异常。如果计算抛出异常则该方法会抛出ExecutionException异常。get(long timeout, TimeUnit unit)方法与get方法类似但是它会在指定的时间内等待计算完成如果超时则会抛出TimeoutException异常。 Future 表示一个任务的生命周期并提供了相应的方法来判断任务是否已经完成或取消以及获取任务的结果和取消任务等。既然KafkaProducer.send() 方法的返回值是一个Future类型的对象那么完全可以用Java语言层面的技巧来丰富应用的实现比如使用Future中的 getlong timeoutTimeUnit unit方法实现可超时的阻塞。 public class CustomProducer01 {private static final String brokerList 10.65.132.2:9093;private static final String topic test;public static Properties initConfig(){Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties initConfig();// kafka生产者发送消息默认是异步发送方式KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);ProducerRecordString, String producerRecord new ProducerRecord(topic, 你好kafka同步发送!);try{// 发送消息FutureRecordMetadata future kafkaProducer.send(producerRecord);// 获取异步计算的结果如果计算还没有完成则该方法会阻塞直到计算完成RecordMetadata recordMetadata future.get();System.out.println(metadata.topic() recordMetadata.topic());}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();} }[rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning 你好kafka! 你好kafka同步发送!调用Future.get()方法等待Kafka响应。如果消息没有发送成功那么这个方法将抛出一个异常。如果没有发生错误那么我们将得到一个RecordMetadata对象并能从中获取消息的偏移量和其他元数据。 KafkaProducer一般会出现两种错误。一种是可重试错误这种错误可以通过重发消息来解决。例如对于连接错误只要再次建立连接就可以解决。对于“not leader for partition”非分区首领错误只要重新为分区选举首领就可以解决此时元数据也会被刷新。可以通过配置启用KafkaProducer的自动重试机制。如果在多次重试后仍无法解决问题则应用程序会收到重试异常。另一种错误则无法通过重试解决比如“Message size too large”消息太大。对于这种错误KafkaProducer不会进行任何重试而会立即抛出异常。 3. 异步发送 假设一条消息在应用程序和Kafka集群之间往返需要10毫秒。如果在发送完每条消息后都需要等待响应那么发送100条消息将需要1秒。如果只发送消息但不需要等待响应那么发送100条消息所需要的时间就会少很多。大多数时候并不需要等待响应——尽管Kafka会把消息的目标主题、分区信息和偏移量返回给客户端但对客户端应用程序来说可能不是必需的。不过当消息发送失败需要抛出异常、记录错误日志或者把消息写入“错误消息”文件以便日后分析诊断时就需要用到这些信息了。为了能够在异步发送消息时处理异常情况生产者提供了回调机制。 生产者发送消息后不会等待服务器的响应而是通过回调函数来处理服务器的响应。回调函数会在 producer 收到 ack 时调用该方法有两个参数分别是元数据信息RecordMetadata和异常信息Exception如果 Exception 为 null说明消息发送成功如果 Exception 不为 null说明消息发送失败。 注意消息发送失败会自动重试不需要我们在回调函数中手动重试。 public class CustomProducer01 {private static final String brokerList 10.65.132.2:9093;private static final String topic test;public static Properties initConfig(){Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties initConfig();// kafka生产者发送消息默认是异步发送方式KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);ProducerRecordString, String producerRecord new ProducerRecord(topic, 你好kafka异步发送带返回值!);try{// 发送消息kafkaProducer.send(producerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {// 说明消息发送成功if(enull){System.out.println(metadata.topic() recordMetadata.topic());System.out.println(metadata.partition() recordMetadata.partition());}}});}catch (Exception e){e.printStackTrace();}// 关闭资源kafkaProducer.close();} }[rootmaster01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning 你好kafka! 你好kafka同步发送! 你好kafka异步发送带回调函数!Kafka生产者异步发送消息时可以通过指定回调函数来处理发送结果。当消息发送完成后回调函数会被调用以通知应用程序消息发送的结果。具体来说当生产者成功发送消息时回调函数会被传递一个RecordMetadata对象该对象包含了发送消息的相关信息如消息所在的分区、消息在分区中的偏移量等。如果发送消息失败则回调函数会被传递一个非空的Exception对象以指示发送失败的原因。 注意回调的执行将在生产者主线程中进行如果有两条消息被发送给同一个分区则这可以保证它们的回调是按照发送的顺序执行的。这就要求回调的执行要快避免生产者出现延迟或影响其他消息的发送。不建议在回调中执行阻塞操作阻塞操作应该被放在其他线程中执行。 6. Kafka 消息对象 ProducerRecord ① ProducerRecord 成员变量 public class ProducerRecordK, V {// 消息要发送到的主题private final String topic;// 消息要发送到的分区号如果为null则由Kafka自动选择分区private final Integer partition;// 消息的键private final K key;// 消息的值private final V value;// 消息的时间戳如果为null则使用当前时间戳private final Long timestamp;// 消息的头部信息private final Headers headers;// ..... }topic和partition字段分别代表消息要发往的主题和分区号。key是用来指定消息的键它不仅是消息的附加信息还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类而这个key可以让消息再进行二次归类同一个key的消息会被划分到同一个分区中。value是指消息体一般不为空如果为空则表示特定的消息。timestamp是指消息的时间戳它有CreateTime和LogAppendTime两种类型前者表示消息创建的时间后者表示消息追加到日志文件的时间。 ② ProducerRecord 构造函数 public class ProducerRecordK, V {private final String topic;private final Integer partition;private final Headers headers;private final K key;private final V value;private final Long timestamp;public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, IterableHeader headers) {if (topic null)throw new IllegalArgumentException(Topic cannot be null.);if (timestamp ! null timestamp 0)throw new IllegalArgumentException(String.format(Invalid timestamp: %d. Timestamp should always be non-negative or null., timestamp));if (partition ! null partition 0)throw new IllegalArgumentException(String.format(Invalid partition: %d. Partition number should always be non-negative or null., partition));this.topic topic;this.partition partition;this.key key;this.value value;this.timestamp timestamp;this.headers new RecordHeaders(headers);}public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {this(topic, partition, timestamp, key, value, null);}public ProducerRecord(String topic, Integer partition, K key, V value, IterableHeader headers) {this(topic, partition, null, key, value, headers);}public ProducerRecord(String topic, Integer partition, K key, V value) {this(topic, partition, null, key, value, null);}public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null);}public ProducerRecord(String topic, V value) {this(topic, null, null, null, value, null);} }
http://www.yutouwan.com/news/93454/

相关文章:

  • 建设一个行业性的网站价格平面设计是什么意思
  • 网站排名易下拉教程手游推广渠道
  • 建立用模板建立网站那些网站专门做游戏辅助的
  • 做壁纸网站厨师培训
  • PHP网站建设计划书烟台网站建设加企汇互联专业
  • 连云港网站建设报价html5video网站
  • 网站换域名怎么做个体工商户营业执照
  • 做超链接网站的代码嘉兴网站建设技术托管
  • 网站内容保护建设介绍网站
  • 建网站 南京优秀购物网站
  • 网站找哪些单位做实名认证怎么给网站做反链
  • 临沂网站建设兼职上海关键词排名推广
  • 网站建设公司哪家好?该如何选择网站建设 不违背
  • 网站域名被重定向wordpress不显示图片
  • 门户网站的意义服务器个人买能干什么
  • 衡阳网站建设要点推广做网站怎么挣钱最快
  • 西安高端网站设计公司中国建设银行开放式网站
  • aspnet网站开发源码百度手机助手应用商店下载
  • node.js做网站开发百度指数的主要功能有
  • 做化学题的网站视频网站开发 博客园
  • 深圳做网站排名哪家专业深圳前十网站建设公司
  • 网站和app的开发成本有域名了也备案了怎么做网站
  • ASP.NET2.0网站开发全程解析钟落潭有没有做网站的
  • 论文收录网站网站设计公司大概多少钱
  • 营销式网站建设个人seo怎么赚钱
  • 吉林省网站建设公司做万词霸屏后网站关键词没有排名
  • 网站开发语言php高端网站定制公司
  • 校园网站建设初探论文舆情分析网站免费
  • 电脑网站打不开了但是有网动漫网站设计理念
  • 免费的十大免费货源网站如何在国外网站做推广