随州网站建设,私域流量运营管理,西部数码网站管理助手 伪静态,百度搜索引擎营销如何实现创建生产者实例和构建消息之后#xff0c;就可以开始发送消息了。
发送消息主要有三种模式#xff1a;发后即忘、同步、异步。
发后即忘#xff1a;
就是直接调用 生产者的 send方法发送。
发后即完#xff0c;只管往 kafka中发送消息#xff0c;而不关心消息是否正确…创建生产者实例和构建消息之后就可以开始发送消息了。
发送消息主要有三种模式发后即忘、同步、异步。
发后即忘
就是直接调用 生产者的 send方法发送。
发后即完只管往 kafka中发送消息而不关心消息是否正确到达。
这种发送方式的性能最高可靠性也最差。
producer.send(record);具体代码如下
public class KafkaDemoProducer {public static final String BROKER_LIST localhost:9092;public static final String TOPIC myTopic1;public static void main(String[] args) {//属性配置Properties properties getProperties(BROKER_LIST);//生产者初始化KafkaProducerString, String producer new KafkaProducer(properties);ProducerRecordString, String record new ProducerRecord(TOPIC, hello kafka);//发送消息try {producer.send(record);System.out.println(producer.send(record).);} catch (Exception e) {System.out.println(send error. e);}producer.close();}private static Properties getProperties(String brokerList) {Properties properties new Properties();properties.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer,org.apache.kafka.common.serialization.StringSerializer);properties.put(bootstrap.servers, brokerList);return properties;}}同步发送
try {producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {log.error(send record get error, e);
}同步发送的方式可靠性最高要么消息发送成功要么发生异常。如果发生异常会catch并处理异常。
同步发送的性能会差一些需要阻塞等待一条消息发送完才能发送下一条。
异步发送
异步发送就是在 send 方法里指定一下 Callback 的回调函数。
消息发送成功后会收到成功的回调。参数 metadata 为发送成功的消息相关的信息
如果发送失败也会收到回调包含失败的异常信息 exception。
producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception ! null) {log.error(send onCompletion error. , exception);} else {log.info(metadata.topic() - metadata.partition() : metadata.offset());}}
});参考资料
《深入理解Kafka 核心设计与实践原理》