当前位置: 首页 > news >正文

邯郸哪有做网站的公司重庆公司网站建设价格

邯郸哪有做网站的公司,重庆公司网站建设价格,wordpress参考文献,深圳定制网站建设目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、lat…目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、latest() 5.3、timestamp() 6、Kafka分区扩容了该怎么办 —— 动态分区检查 7、在加载KafkaSource时提取事件时间添加水位线 7.1、使用内置的单调递增的水位线生成器 kafka timestamp 为事件时间 7.2、使用内置的单调递增的水位线生成器 kafka 消息中的 ID字段 为事件时间 1、添加pom依赖 我们可以使用Flink官方提供连接Kafka的工具flink-connector-kafka 该工具实现了一个消费者FlinkKafkaConsumer可以用它来读取kafka的数据 如果想使用这个通用的Kafka连接工具需要引入jar依赖 !-- 引入 kafka连接器依赖-- dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.0/version /dependency 2、API使用说明 官网链接Apache Kafka 连接器 语法说明:  // 1.初始化 KafkaSource 实例 KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点) .setTopics(input-topic) // 必填指定要消费的topic.setGroupId(my-group) // 必填指定消费者的groupid(不存在时会自动创建).setValueOnlyDeserializer(new SimpleStringSchema()) // 必填指定反序列化器(用来解析kafka消息数据转换为flink数据类型).setStartingOffsets(OffsetsInitializer.earliest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build(); // 2.通过 fromSource KafkaSource 获取 DataStreamSource env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source); 3、这是一个完整的入门案例 开发语言java1.8 flink版本flink1.17.0 public class ReadKafka {public static void main(String[] args) throws Exception {newAPI();}public static void newAPI() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(worker01:9092) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点).setTopics(20230810) // 必填指定要消费的topic.setGroupId(FlinkConsumer) // 必填指定消费者的groupid(不存在时会自动创建).setValueOnlyDeserializer(new SimpleStringSchema()) // 必填指定反序列化器(用来解析kafka消息数据).setStartingOffsets(OffsetsInitializer.earliest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build();env.fromSource(source,WatermarkStrategy.noWatermarks(),Kafka Source).print();// 3.触发程序执行env.execute();} }4、Kafka消息应该如何解析 代码中需要提供一个反序列化器Deserializer来对 Kafka 的消息进行解析 反序列化器的功能 将Kafka ConsumerRecords转换为Flink处理的数据类型(Java/Scala对象) 反序列化器通过  setDeserializer(KafkaRecordDeserializationSchema.of(反序列化器类型)) 指定 下面介绍两种常用Kafka消息解析器 KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)) 1、返回完整的Kafka消息将JSON字符串反序列化为ObjectNode对象 2、可以选择是否返回Kafak消息的Metadata信息true-返回false-不返回 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 1、只返回Kafka消息中的value部分  4.1、只获取Kafka消息的value部分 4.2、获取完整Kafka消息(key、value、Metadata) kafak消息格式 key   {nation:蜀国} value  {ID:整数} public static void ParseMessageJSONKeyValue() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点).setTopics(9527) // 必填指定要消费的topic.setGroupId(FlinkConsumer) // 必填指定消费者的groupid(不存在时会自动创建)// 必填指定反序列化器(将kafak消息解析为ObjectNodejson对象).setDeserializer(KafkaRecordDeserializationSchema.of(// includeMetadata (true:返回Kafak元数据信息 false:不返回)new JSONKeyValueDeserializationSchema(true))).setStartingOffsets(OffsetsInitializer.latest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build();env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source).print();// 3.触发程序执行env.execute();}运行结果     常见报错  Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic 9527, partition 0, leaderEpoch 0, offset 1064, CreateTime 1691668775938, serialized key size 4, serialized value size 9, headers RecordHeaders(headers [], isReadOnly false), key [B5e9eaab8, value [B67390400).at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)... 14 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token xxxx: was expecting (JSON String, Number, Array, Object or token null, true or false)at [Source: (byte[])xxxx; line: 1, column: 5] 报错原因 出现这个报错一般是使用flink读取fafka时使用JSONKeyValueDeserializationSchema 来解析消息时kafka消息中的key 或者 value 内容不符合json格式而造成的解析错误 例如下面这个格式就会造成解析错误  key1000value你好 那应该怎么解决呢 1、如果有权限修改Kafka消息格式可以将Kafka消息keyvalue内容修改为Json格式 2、如果没有权限修改Kafka消息格式(比如线上环境修改比较困难)可以重新实现 JSONKeyValueDeserializationSchema类根据所需格式来解析Kafka消息(可以参考源码) 4.3、自定义Kafka消息解析器 生产中对Kafka消息及解析的格式总是各种各样的当flink预定义的解析器满足不了业务需求时可以通过自定义kafka消息解析器来完成业务的支持 例如当使用 MyJSONKeyValueDeserializationSchema 获取Kafka元数据时只返回了 offset、topic、partition 三个字段信息现在需要kafka生产者写入数据时的timestamp就可以通过自定义kafka消息解析器来完成 代码示例 // TODO 自定义Kafka消息解析器在 metadata 中增加 timestamp字段 public class MyJSONKeyValueDeserializationSchema implements KafkaDeserializationSchemaObjectNode{private static final long serialVersionUID 1509391548173891955L;private final boolean includeMetadata;private ObjectMapper mapper;public MyJSONKeyValueDeserializationSchema(boolean includeMetadata) {this.includeMetadata includeMetadata;}Overridepublic void open(DeserializationSchema.InitializationContext context) throws Exception {mapper JacksonMapperFactory.createObjectMapper();}Overridepublic ObjectNode deserialize(ConsumerRecordbyte[], byte[] record) throws Exception {ObjectNode node mapper.createObjectNode();if (record.key() ! null) {node.set(key, mapper.readValue(record.key(), JsonNode.class));}if (record.value() ! null) {node.set(value, mapper.readValue(record.value(), JsonNode.class));}if (includeMetadata) {node.putObject(metadata).put(offset, record.offset()).put(topic, record.topic()).put(partition, record.partition())// 添加 timestamp 字段.put(timestamp,record.timestamp());}return node;}Overridepublic boolean isEndOfStream(ObjectNode nextElement) {return false;}Overridepublic TypeInformationObjectNode getProducedType() {return getForClass(ObjectNode.class);}}运行结果 5、起始消费位点应该如何设置 起始消费位点说明 起始消费位点是指 启动flink任务时应该从哪个位置开始读取Kafka的消息    下面介绍下常用的三个设置     OffsetsInitializer.earliest()  从最早位点开始消 这里的最早指的是Kafka消息保存的时长(默认为7天生成环境各公司略有不同) 该这设置为默认设置当不指定OffsetsInitializer.xxx时默认为earliest()  OffsetsInitializer.latest()    从最末尾位点开始消费 这里的最末尾指的是flink任务启动时间点之后生产的消息 OffsetsInitializer.timestamp(时间戳) 从时间戳大于等于指定时间戳毫秒的数据开始消费 下面用案例说明下三种设置的效果kafak生成10条数据如下 5.1、earliest() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最早位置开始消费该设置为默认设置.setStartingOffsets(OffsetsInitializer.earliest()).build(); 运行结果 5.2、latest() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build(); 运行结果 5.3、timestamp() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从指定时间戳后开始消费.setStartingOffsets(OffsetsInitializer.timestamp(1691722791273L)).build(); 运行结果 6、Kafka分区扩容了该怎么办 —— 动态分区检查 在flink1.13的时候如果Kafka分区扩容了只有通过重启flink任务才能消费到新增分区的数据小编就曾遇到过上游业务部门的kafka分区扩容了并没有通知下游使用方导致实时指标异常甚至丢失了数据。 在flink1.17的时候可以通过开启动态分区检查来实现不用重启flink任务就能消费到新增分区的数据 开启分区检查(默认不开启) KafkaSource.builder().setProperty(partition.discovery.interval.ms, 10000); // 每 10 秒检查一次新分区 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest())// 开启动态分区检查默认不开启.setProperty(partition.discovery.interval.ms, 10000) // 每 10 秒检查一次新分区.build(); 7、在加载KafkaSource时提取事件时间添加水位线 可以在 fromSource(source,WatermarkStrategy,sourceName) 时提取事件时间和制定水位线生成策略 注意当不指定事件时间提取器时Kafka Source 使用 Kafka 消息中的时间戳作为事件时间 7.1、使用内置的单调递增的水位线生成器 kafka timestamp 为事件时间 代码示例 // 在读取Kafka消息时提取事件时间插入水位线public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(source,// 使用内置的单调递增的水位线生成器默认使用 kafka的timestamp作为事件时间WatermarkStrategy.forMonotonousTimestamps(),Kafka Source)// 通过 ProcessFunction 查看提取的事件时间和水位线信息.process(new ProcessFunctionObjectNode, String() {Overridepublic void processElement(ObjectNode kafkaJson, ProcessFunctionObjectNode, String.Context ctx, CollectorString out) throws Exception {// 当前处理时间long currentProcessingTime ctx.timerService().currentProcessingTime();// 当前水位线long currentWatermark ctx.timerService().currentWatermark();StringBuffer record new StringBuffer();record.append(\n);record.append(kafkaJson \n);record.append(currentProcessingTime currentProcessingTime \n);record.append(currentWatermark currentWatermark \n);record.append(kafka-ID Long.parseLong(kafkaJson.get(value).get(ID).toString()) \n);record.append(kafka-timestamp Long.parseLong(kafkaJson.get(metadata).get(timestamp).toString()) \n);out.collect(record.toString());}}).print();// 3.触发程序执行env.execute();}运行结果 7.2、使用内置的单调递增的水位线生成器 kafka 消息中的 ID字段 为事件时间 代码示例 // 在读取Kafka消息时提取事件时间插入水位线public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(source,// 使用内置的单调递增的水位线生成器使用 kafka消息中的ID字段作为事件时间WatermarkStrategy.ObjectNodeforMonotonousTimestamps()// 提取 Kafka消息中的 ID字段作为 事件时间.withTimestampAssigner((json, timestamp) - Long.parseLong(json.get(value).get(ID).toString())),Kafka Source)// 通过 ProcessFunction 查看提取的事件时间和水位线信息.process(new ProcessFunctionObjectNode, String() {Overridepublic void processElement(ObjectNode kafkaJson, ProcessFunctionObjectNode, String.Context ctx, CollectorString out) throws Exception {// 当前处理时间long currentProcessingTime ctx.timerService().currentProcessingTime();// 当前水位线long currentWatermark ctx.timerService().currentWatermark();StringBuffer record new StringBuffer();record.append(\n);record.append(kafkaJson \n);record.append(currentProcessingTime currentProcessingTime \n);record.append(currentWatermark currentWatermark \n);record.append(kafka-ID Long.parseLong(kafkaJson.get(value).get(ID).toString()) \n);record.append(kafka-timestamp Long.parseLong(kafkaJson.get(metadata).get(timestamp).toString()) \n);out.collect(record.toString());}}).print();// 3.触发程序执行env.execute();}运行结果
http://www.yutouwan.com/news/172962/

相关文章:

  • 合作网站登录制作化妆品购物网站排名
  • 8免费建站网站李继红跪舔坊网站建设
  • 商城网站入驻系统wordpress qq微信登陆
  • 巴中城市建设投资有限公司网站珠海营销网站建设
  • 如何建立一个好的网站网站打开慢 可以只换空间不换域名吗
  • 济南网站APPwordpress 编写手机主题
  • 做哪类网站比较赚钱wordpress 调用二级分类
  • 佛山网站建设开发团队十大微商推广平台
  • 郑州模板网站制作工程造价信息月刊
  • 网站页脚需要放什么用那个程序做网站收录好
  • 中国建设部官方网站鲁班奖公司网站程序
  • 营销网站域名设计网上智慧团建网站
  • 天猫网站做链接怎么做深圳网站建设哪家好
  • 如何做淘宝联盟网站的推广seo网页的基础知识
  • 网站开发工程师题南城网站建设公司策划
  • 自己做的网站上出现乱码怎么修改建设网站怎样赚钱
  • 聊城手机网站建设多少钱一般网站宽度
  • 免费自动建站国内网站免费服务器
  • 做网站能用微软住房和建设部执业资格注册中心网站
  • 小型网站建设公司wordpress 仿小米主题下载
  • 网站建设人员需求分析有哪些专门做减肥内容的网站
  • 天津网站建设 Wordpress毕设做网站些什么比较简单
  • 台州知名的网站建设outlook企业邮箱怎么申请
  • 星月教你做网站回顾文档windows 2003建设网站
  • 公司门户网站怎么做协会网站建设方案
  • 网站推广哪个平台好手机版文章网站源码
  • 无锡市住房和城乡建设局网站县级网站建设
  • 三网合一网站怎么做一个新手怎么做电商运营
  • 南通网站建设招聘微信网站链接网站建设
  • 合肥建设集团信息网站徐州睢宁建设网站