当前位置: 首页 > news >正文

大理高端网站建设友点cms

大理高端网站建设,友点cms,中国核工业集团2024校园招聘,wordpress首页置顶推荐问题1、什么是Flink中的转换算子 在使用 Flink DataStream API 开发流式计算任务时#xff0c;可以将一个或多个 DataStream 转换成新的 DataStream#xff0c;在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑图。 2、常用的转换算子 Flink提供了功能各异的转换算…1、什么是Flink中的转换算子 在使用 Flink DataStream API 开发流式计算任务时可以将一个或多个 DataStream 转换成新的 DataStream在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑图。 2、常用的转换算子  Flink提供了功能各异的转换算子Map,FlatMap,Filter,KeyBy,Reduce,Window,WindowAll... 通过操作各种转换算子来获取新的DataStream及子类的实例来完成计算需求。 Tips: 下面测试用例基于 Flink1.17.0、java1.8 编写 3、基本转换算子map/ filter/ flatMap 3.1 Map 功能说明 DataStream[T] → DataStream[R] 输入一个元素同时输出一个元素可以对元素的数据类型和内容做转换好比SQL中的UDF函数 代码示例 package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Map {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 Map 算子// 方式1使用 Lambda表达式env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).map(value - value _).print();// 方式2使用 MapFunction实现类/** TODO MapFunctionT, O* 功能说明* 对元素做11映射转换* 泛型说明* T : 输入数据类型* O : 输出数据类型* */MapFunctionString, Integer mapFunction new MapFunctionString, Integer() {Overridepublic Integer map(String value) throws Exception {return value.length();}};env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).map(mapFunction).print();// 3.触发程序执行env.execute();} }执行结果 3.2 FlatMap  功能说明 DataStream[T] → DataStream[R] 输入一个元素同时产生零个、一个或多个元素好比SQL中的UDTF(1对多)函数 代码示例 package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class FlatMap {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 FlatMap 算子// 方式使用 flatMapFunction实现类/** TODO flatMapFunctionT, O* 功能说明* 对输入元素做1:多的转换好比SQL中的UDTF函数* 泛型说明* T : 输入数据类型* O : 输出数据类型* */FlatMapFunctionString, String flatMapFunction new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {for (String s : value.split(_)) {out.collect(s);}}};env.fromElements(刘_备, 张_飞, 关_羽, 赵_云, 马_超, 黄_忠).flatMap(flatMapFunction).print();// 3.触发程序执行env.execute();} }执行结果 3.3 Filter 功能说明 DataStream[T] → DataStream[T] 为每个元素执行一个逻辑判断并保留那些判断为 true 的元素好比SQL中的where 代码示例 package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Filter {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 Filter 算子// 方式1使用 Lambda表达式env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).filter(value - value.equals(刘备)).print();// 方式2使用 FilterFunction实现类/** TODO FilterFunctionT, O* 功能说明* 对元素过滤处理* 泛型说明* T : 输入数据类型* */FilterFunctionString filterFunction new FilterFunctionString() {Overridepublic boolean filter(String value) throws Exception {return value.equals(张飞);}};env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).filter(filterFunction).print();// 3.触发程序执行env.execute();} } 执行结果 4、聚合算子 4.1 KeyBy按键分区 功能说明 DataStream[T] → KeyedStream[T,K] 根据指定的字段(key)将数据划分到不相交的分区中。相同key的元素会被分到同一个分区中。 分区规则           分区编号   指定字段(key) 的哈希值 % 分区个数(并行度)    思考 1、哪些 数据类型 不能作为分区的key 数组类型不能作为key      当key的类型为bean类型时bean类必须要重写hashCode方法 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyBy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.使用 KeyBy 算子// 方式1使用 Lambda表达式// TODO key的类型为 StringKeyedStreamString, String stringKeyedStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策).keyBy(value - value.split(_)[0]);stringKeyedStream.print();// TODO key的类型为 bean (需重写hashCode方法)KeyedStreamFlinkUser, FlinkUser userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L), new FlinkUser(2L, x, 110L), new FlinkUser(3L, y, 120L), new FlinkUser(4L, y, 130L), new FlinkUser(5L, z, 140L)).keyBy(user - user);// TODO key的类型为 数组(不支持) // KeyedStreamString, String[] arrayKeyedStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策) // .keyBy(value - value.split(_));// 方式2使用 KeySelector实现类/** TODO KeySelectorIN, KEY* 功能说明* 从输入的数据中提取key然后根据 key的hashcode%并行度 进行分区* 注意这里的分区是逻辑分区* 泛型说明* IN 输入数据类型* KEY key的数据类型* 重要提示* 什么类型的数据不能作为key呢* 1.当 POJO 类且没有重写 hashCode() 方法而是依赖依赖于 Object.hashCode() 实现时* 2.任意类型的数组* */KeySelectorFlinkUser, String keySelector new KeySelectorFlinkUser, String() {Overridepublic String getKey(FlinkUser value) throws Exception {return value.name;}};KeyedStreamFlinkUser, String userNameKeyedStream env.fromElements(new FlinkUser(1L, x, 100L), new FlinkUser(2L, x, 110L), new FlinkUser(3L, y, 120L), new FlinkUser(4L, y, 130L), new FlinkUser(5L, z, 140L)).keyBy(keySelector);// max(字段名称) pojo类一定要含有空参构造//userNameKeyedStream.sum(id).print();// 3.触发程序执行env.execute();} }执行结果 4.2 Reduce 功能说明 KeyedStream[T,K] → DataStream[T] 在相同key的数据流上滚动执行聚合操作。将当前元素与上次一次聚合后得到的值(保存的状态值)组合然后输出新值并将这个值作为状态进行保存。 Reduce函数的弊端         聚合前数据类型 聚合后数据类型不能修改数据类型         不能提供初始值进行聚合操作当只有一个元素时不会触发reduce函数 代码示例 package com.baidu.datastream.transform;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Reduce {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 2.使用 Reduce 算子/** TODO ReduceFunctionT* 功能说明* 对相同key中的元素进行聚合操作(依次聚合)* 泛型说明* 输入数据和输出数据的类型* 重要说明* 这种聚合方式不能修改value的数据类型** */ReduceFunctionTuple2String, Integer reduceFunction new ReduceFunctionTuple2String, Integer() {Overridepublic Tuple2String, Integer reduce(Tuple2String, Integer value1, Tuple2String, Integer value2) throws Exception {return new Tuple2(value1.f0, value1.f1 value2.f1);}};// 统计每个国家出现的次数env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return new Tuple2(value.split(_)[0], 1);}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}}).reduce(reduceFunction).print();// 3.触发程序执行env.execute();} }运行结果 4.3 sum、min、max、minBy、maxBy 功能说明 KeyedStream[T,K] → DataStream[T] 在相同key的数据流上滚动执行相应聚合操作。 min、minBy的区别              min聚合状态中保存的是第一个元素的非聚合字段          minBy聚合状态中保存的是当前元素的非聚合字段 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SumMinMaxMinByMaxBy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, y, 140L)).keyBy(user - user.name);/** TODO max()、max(num)* 功能说明* 根据指定的字段做聚合操作* 怎样指定聚合字段* 当 value类型为 pojo时通过 max(字段名称) 来指定字段* 当 value类型为 tuple时通过 max(num) 来指定字段* 重点说明* 当 value类型为pojo时必须实现空参构造方法才能提取字段* *///userKeyedStream.max(id).print();//userKeyedStream.min(id).print();//userKeyedStream.sum(id).print();//userKeyedStream.maxBy(id).print();userKeyedStream.minBy(id).print();env.execute();} }5、物理分区算子 Flink提供了将数据重新分区的方法当任务发生数据倾斜时这个算子会很有用。 5.1 shuffle - 随机分区 功能说明 DataStream[T] → DataStream[T]         将元素随机地均匀分配到下游分区 Tips         因为是完全随机当输入相同时每次执行的结果可能会不同 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Shuffle {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO 问题由于 keyBy 算子导致数据倾斜(key相同导致数据都被同一个并行子任务处理)* 我们可以使用 shuffle 算子将数据均匀的在分配到其他并行子任务中去* 重点提示* shuffle 算子只能操作 DataStream不能操作 KeyedStream* */userKeyedStream.sum(id).shuffle().print();env.execute();} }运行结果 5.2 rebalance - 轮询分区 功能说明 DataStream[T] → DataStream[T]         使用Round-Robin负载均衡算法将输入的数据平均的分配到下游分区中去。    代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Rebalance {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO 问题由于 keyBy 算子导致数据倾斜(key相同导致数据都被同一个并行子任务处理)* 我们可以使用 rebalance 算子将数据均匀的在分配到其他并行子任务中去* 重点提示* rebalance 算子只能操作 DataStream不能操作 KeyedStream* */userKeyedStream.sum(id).rebalance().print();env.execute();} }运行结果 5.3 rescale - 重缩分区 功能说明 DataStream[T] → DataStream[T]         使用Round-Robin负载均衡算法将以分区为单位将输入的数据平均的分配到下游分区中去。 和rebalance的区别 rebalance将输入数据作为一个整体根据数据输入的顺序随机分发到下游分区(涉及到了网络传输)           rescale将以上游分区为单位随机的分配到下游分区中去 使用场景     当source算子为可并发数据源时(如kafka5个分区)设置5个Task来读取分别读取每个分区的数据     此时可以使用rescale来分发到下游实现负载均衡这样可以做到数据只在本地传输而不是网络传输 5.4 global - 全局分区 功能说明 DataStream[T] → DataStream[T]         将元素分发到下游的一个分区中去  5.5 broadcast - 广播分区 功能说明 DataStream[T] → DataStream[T]         将元素广播到下游的每个分区  Tips         数据被广播后会在下游算子的每个分区中都保留一份可以将数据进行重复处理 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Broadcast {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);userKeyedStream.sum(id).broadcast().print();env.execute();} }运行结果 5.6  自定义分区 功能说明 DataStream[T] → DataStream[T]         使用用户定义的 Partitioner 将元素分发到下游算子的分区中去 代码示例 package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class PartitionCustom {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO PartitionerK* 功能说明* 自定义分区器根据输入的数据获取分区编号* 泛型说明* K key的数据类型* */PartitionerLong partitioner new PartitionerLong() {Overridepublic int partition(Long key, int numPartitions) {if (key 1L || key 2L) {return 0;} else if (key 3L || key 4L) {return 1;} else {return 2;}}};/** TODO KeySelectorIN, KEY* 功能说明* key提取器根据输入的数据获取key* 泛型说明* IN 输入数据类型* KEY 输出数据类型(key)* */KeySelectorFlinkUser, Long keySelector new KeySelectorFlinkUser, Long() {Overridepublic Long getKey(FlinkUser value) throws Exception {return value.id;}};userKeyedStream.sum(id).partitionCustom(partitioner, keySelector).print();env.execute();} }运行结果 6、分流 在处理数据的时候经常会将一条流或者一个表根据某些条件拆分成多条流或者多个表 flink中提供了分流的方式1、使用filter算子分流   2、使用侧输出流分流 6.1 使用filter算子分流 - 不推荐 这种分流方式的弊端 需要将原始流复制多份并对每一份做一次判断效率很低 (多次读取多次判断) 代码示例 // 通过 filter 分流public static void ByFilter() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 根据国家将 totalStream 分为三股流DataStreamSourceString totalStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策);SingleOutputStreamOperatorString weiStream totalStream.filter(e - e.contains(魏));SingleOutputStreamOperatorString shuStream totalStream.filter(e - e.contains(蜀));SingleOutputStreamOperatorString wuStream totalStream.filter(e - e.contains(吴));weiStream.print();shuStream.print();wuStream.print();// 3.触发程序执行env.execute();} 6.2 使用侧输出流分流 - 推荐 避免了使用filter算子的弊端指定source读取一次判断一次即可完成分流操作 代码示例 // 通过 侧输入流 分流public static void ByOutputTag() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 根据国家将 totalStream 分为三股流DataStreamSourceString totalStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策);// 初始化侧输出流OutputTag weiOutputTag new OutputTag(wei, Types.STRING);OutputTag shuOutputTag new OutputTag(shu, Types.STRING);OutputTag wuOutputTag new OutputTag(wu, Types.STRING);// 通过 ProcessFunction向 侧输出流发送数据SingleOutputStreamOperatorString process totalStream.process(new ProcessFunctionString, String() {Overridepublic void processElement(String value, ProcessFunctionString, String.Context ctx, CollectorString out) throws Exception {// 往侧输出流中发送数据if (value.contains(魏)) {ctx.output(weiOutputTag, value);} else if (value.contains(蜀)) {ctx.output(shuOutputTag, value);} else if (value.contains(吴)) {ctx.output(wuOutputTag, value);}}});SideOutputDataStream weiStream process.getSideOutput(weiOutputTag);SideOutputDataStream shuStream process.getSideOutput(shuOutputTag);SideOutputDataStream wuStream process.getSideOutput(wuOutputTag);weiStream.print();shuStream.print();wuStream.print();// 3.触发程序执行env.execute();} 7、合并流
http://www.sadfv.cn/news/183940/

相关文章:

  • 怎么进行网站建设自媒体平台源码
  • 政网站首页怎么做试广告平台源码
  • acm网站免费做腾讯服务商平台
  • 做pc端网站哪家好东莞建网站找哪里
  • 网站建站视频魏县手机网站建设
  • 如何成为游戏代理合肥正规的seo公司
  • 开平网站开发建站之星网站模版商城
  • 网站建站哪个品牌好自适应网站建设都找全网天下
  • 百度做个公司网站要多少钱电子产品在哪些网站做调研
  • 莆田外贸网站建设有哪些目前最好用的网络管理软件
  • erp软件多少钱什么叫seo网站推广
  • 城子河网站建设苏州h5网站建设
  • 企业建站wordpress网页设计素材和制作教程
  • 快速免费做网站wordpress素锦下载
  • 旅游网站排名前5位的在手机上做网站
  • 怎么优化网站打开速度物联网app开发平台
  • 宁波 商城网站建设wordpress5.0漏洞
  • 深圳市住房和建设局网站住房免费的虚拟电脑app
  • 浙江网站建设设计广西建设网站网址多少
  • 网站促销活动策划山东百度推广总代理
  • 常用网站建设技术有哪些移动crm系统客户端
  • 企业网站 源代码网站开发技术 创新点
  • 网站要怎么样做排名才上得去wordpress5.0改进
  • 网站赚钱吗制作公司网页要多长时间
  • 哪里可以做游戏视频网站广西柳州网站建设
  • 漳州建设银行网站wordpress免费简约主题
  • 企业网站开发文档电商类网站模板
  • 上饶市建设局有什么网站wordpress 页面二维码
  • 广东十大网站建设排名常用于做网站的软件
  • wordpress post属性优化大师的使用方法