大理高端网站建设,友点cms,中国核工业集团2024校园招聘,wordpress首页置顶推荐问题1、什么是Flink中的转换算子 在使用 Flink DataStream API 开发流式计算任务时#xff0c;可以将一个或多个 DataStream 转换成新的 DataStream#xff0c;在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑图。 2、常用的转换算子 Flink提供了功能各异的转换算…1、什么是Flink中的转换算子 在使用 Flink DataStream API 开发流式计算任务时可以将一个或多个 DataStream 转换成新的 DataStream在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑图。 2、常用的转换算子 Flink提供了功能各异的转换算子Map,FlatMap,Filter,KeyBy,Reduce,Window,WindowAll... 通过操作各种转换算子来获取新的DataStream及子类的实例来完成计算需求。 Tips: 下面测试用例基于 Flink1.17.0、java1.8 编写 3、基本转换算子map/ filter/ flatMap
3.1 Map
功能说明
DataStream[T] → DataStream[R] 输入一个元素同时输出一个元素可以对元素的数据类型和内容做转换好比SQL中的UDF函数
代码示例
package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Map {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 Map 算子// 方式1使用 Lambda表达式env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).map(value - value _).print();// 方式2使用 MapFunction实现类/** TODO MapFunctionT, O* 功能说明* 对元素做11映射转换* 泛型说明* T : 输入数据类型* O : 输出数据类型* */MapFunctionString, Integer mapFunction new MapFunctionString, Integer() {Overridepublic Integer map(String value) throws Exception {return value.length();}};env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).map(mapFunction).print();// 3.触发程序执行env.execute();}
}执行结果 3.2 FlatMap
功能说明
DataStream[T] → DataStream[R] 输入一个元素同时产生零个、一个或多个元素好比SQL中的UDTF(1对多)函数 代码示例
package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMap {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 FlatMap 算子// 方式使用 flatMapFunction实现类/** TODO flatMapFunctionT, O* 功能说明* 对输入元素做1:多的转换好比SQL中的UDTF函数* 泛型说明* T : 输入数据类型* O : 输出数据类型* */FlatMapFunctionString, String flatMapFunction new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {for (String s : value.split(_)) {out.collect(s);}}};env.fromElements(刘_备, 张_飞, 关_羽, 赵_云, 马_超, 黄_忠).flatMap(flatMapFunction).print();// 3.触发程序执行env.execute();}
}执行结果 3.3 Filter
功能说明
DataStream[T] → DataStream[T] 为每个元素执行一个逻辑判断并保留那些判断为 true 的元素好比SQL中的where
代码示例
package com.baidu.datastream.transform;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Filter {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.使用 Filter 算子// 方式1使用 Lambda表达式env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).filter(value - value.equals(刘备)).print();// 方式2使用 FilterFunction实现类/** TODO FilterFunctionT, O* 功能说明* 对元素过滤处理* 泛型说明* T : 输入数据类型* */FilterFunctionString filterFunction new FilterFunctionString() {Overridepublic boolean filter(String value) throws Exception {return value.equals(张飞);}};env.fromElements(刘备, 张飞, 关羽, 赵云, 马超, 黄忠).filter(filterFunction).print();// 3.触发程序执行env.execute();}
}
执行结果 4、聚合算子
4.1 KeyBy按键分区
功能说明
DataStream[T] → KeyedStream[T,K] 根据指定的字段(key)将数据划分到不相交的分区中。相同key的元素会被分到同一个分区中。
分区规则 分区编号 指定字段(key) 的哈希值 % 分区个数(并行度)
思考 1、哪些 数据类型 不能作为分区的key 数组类型不能作为key 当key的类型为bean类型时bean类必须要重写hashCode方法 代码示例
package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyBy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.使用 KeyBy 算子// 方式1使用 Lambda表达式// TODO key的类型为 StringKeyedStreamString, String stringKeyedStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策).keyBy(value - value.split(_)[0]);stringKeyedStream.print();// TODO key的类型为 bean (需重写hashCode方法)KeyedStreamFlinkUser, FlinkUser userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L), new FlinkUser(2L, x, 110L), new FlinkUser(3L, y, 120L), new FlinkUser(4L, y, 130L), new FlinkUser(5L, z, 140L)).keyBy(user - user);// TODO key的类型为 数组(不支持)
// KeyedStreamString, String[] arrayKeyedStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策)
// .keyBy(value - value.split(_));// 方式2使用 KeySelector实现类/** TODO KeySelectorIN, KEY* 功能说明* 从输入的数据中提取key然后根据 key的hashcode%并行度 进行分区* 注意这里的分区是逻辑分区* 泛型说明* IN 输入数据类型* KEY key的数据类型* 重要提示* 什么类型的数据不能作为key呢* 1.当 POJO 类且没有重写 hashCode() 方法而是依赖依赖于 Object.hashCode() 实现时* 2.任意类型的数组* */KeySelectorFlinkUser, String keySelector new KeySelectorFlinkUser, String() {Overridepublic String getKey(FlinkUser value) throws Exception {return value.name;}};KeyedStreamFlinkUser, String userNameKeyedStream env.fromElements(new FlinkUser(1L, x, 100L), new FlinkUser(2L, x, 110L), new FlinkUser(3L, y, 120L), new FlinkUser(4L, y, 130L), new FlinkUser(5L, z, 140L)).keyBy(keySelector);// max(字段名称) pojo类一定要含有空参构造//userNameKeyedStream.sum(id).print();// 3.触发程序执行env.execute();}
}执行结果 4.2 Reduce
功能说明
KeyedStream[T,K] → DataStream[T] 在相同key的数据流上滚动执行聚合操作。将当前元素与上次一次聚合后得到的值(保存的状态值)组合然后输出新值并将这个值作为状态进行保存。
Reduce函数的弊端 聚合前数据类型 聚合后数据类型不能修改数据类型 不能提供初始值进行聚合操作当只有一个元素时不会触发reduce函数
代码示例
package com.baidu.datastream.transform;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Reduce {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);// 2.使用 Reduce 算子/** TODO ReduceFunctionT* 功能说明* 对相同key中的元素进行聚合操作(依次聚合)* 泛型说明* 输入数据和输出数据的类型* 重要说明* 这种聚合方式不能修改value的数据类型** */ReduceFunctionTuple2String, Integer reduceFunction new ReduceFunctionTuple2String, Integer() {Overridepublic Tuple2String, Integer reduce(Tuple2String, Integer value1, Tuple2String, Integer value2) throws Exception {return new Tuple2(value1.f0, value1.f1 value2.f1);}};// 统计每个国家出现的次数env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {return new Tuple2(value.split(_)[0], 1);}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}}).reduce(reduceFunction).print();// 3.触发程序执行env.execute();}
}运行结果 4.3 sum、min、max、minBy、maxBy
功能说明
KeyedStream[T,K] → DataStream[T] 在相同key的数据流上滚动执行相应聚合操作。
min、minBy的区别 min聚合状态中保存的是第一个元素的非聚合字段 minBy聚合状态中保存的是当前元素的非聚合字段
代码示例
package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SumMinMaxMinByMaxBy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, y, 140L)).keyBy(user - user.name);/** TODO max()、max(num)* 功能说明* 根据指定的字段做聚合操作* 怎样指定聚合字段* 当 value类型为 pojo时通过 max(字段名称) 来指定字段* 当 value类型为 tuple时通过 max(num) 来指定字段* 重点说明* 当 value类型为pojo时必须实现空参构造方法才能提取字段* *///userKeyedStream.max(id).print();//userKeyedStream.min(id).print();//userKeyedStream.sum(id).print();//userKeyedStream.maxBy(id).print();userKeyedStream.minBy(id).print();env.execute();}
}5、物理分区算子
Flink提供了将数据重新分区的方法当任务发生数据倾斜时这个算子会很有用。
5.1 shuffle - 随机分区
功能说明 DataStream[T] → DataStream[T] 将元素随机地均匀分配到下游分区
Tips 因为是完全随机当输入相同时每次执行的结果可能会不同
代码示例
package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Shuffle {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO 问题由于 keyBy 算子导致数据倾斜(key相同导致数据都被同一个并行子任务处理)* 我们可以使用 shuffle 算子将数据均匀的在分配到其他并行子任务中去* 重点提示* shuffle 算子只能操作 DataStream不能操作 KeyedStream* */userKeyedStream.sum(id).shuffle().print();env.execute();}
}运行结果 5.2 rebalance - 轮询分区
功能说明 DataStream[T] → DataStream[T] 使用Round-Robin负载均衡算法将输入的数据平均的分配到下游分区中去。 代码示例
package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Rebalance {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO 问题由于 keyBy 算子导致数据倾斜(key相同导致数据都被同一个并行子任务处理)* 我们可以使用 rebalance 算子将数据均匀的在分配到其他并行子任务中去* 重点提示* rebalance 算子只能操作 DataStream不能操作 KeyedStream* */userKeyedStream.sum(id).rebalance().print();env.execute();}
}运行结果 5.3 rescale - 重缩分区
功能说明 DataStream[T] → DataStream[T] 使用Round-Robin负载均衡算法将以分区为单位将输入的数据平均的分配到下游分区中去。
和rebalance的区别 rebalance将输入数据作为一个整体根据数据输入的顺序随机分发到下游分区(涉及到了网络传输) rescale将以上游分区为单位随机的分配到下游分区中去
使用场景 当source算子为可并发数据源时(如kafka5个分区)设置5个Task来读取分别读取每个分区的数据 此时可以使用rescale来分发到下游实现负载均衡这样可以做到数据只在本地传输而不是网络传输 5.4 global - 全局分区
功能说明 DataStream[T] → DataStream[T] 将元素分发到下游的一个分区中去 5.5 broadcast - 广播分区 功能说明 DataStream[T] → DataStream[T] 将元素广播到下游的每个分区
Tips 数据被广播后会在下游算子的每个分区中都保留一份可以将数据进行重复处理 代码示例
package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Broadcast {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);userKeyedStream.sum(id).broadcast().print();env.execute();}
}运行结果 5.6 自定义分区
功能说明 DataStream[T] → DataStream[T] 使用用户定义的 Partitioner 将元素分发到下游算子的分区中去
代码示例
package com.baidu.datastream.transform;import com.baidu.bean.FlinkUser;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class PartitionCustom {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);KeyedStreamFlinkUser, String userKeyedStream env.fromElements(new FlinkUser(1L, x, 100L),new FlinkUser(2L, x, 110L),new FlinkUser(3L, x, 120L),new FlinkUser(4L, x, 130L),new FlinkUser(5L, x, 140L),new FlinkUser(6L, x, 150L)).keyBy(user - user.name);/** TODO PartitionerK* 功能说明* 自定义分区器根据输入的数据获取分区编号* 泛型说明* K key的数据类型* */PartitionerLong partitioner new PartitionerLong() {Overridepublic int partition(Long key, int numPartitions) {if (key 1L || key 2L) {return 0;} else if (key 3L || key 4L) {return 1;} else {return 2;}}};/** TODO KeySelectorIN, KEY* 功能说明* key提取器根据输入的数据获取key* 泛型说明* IN 输入数据类型* KEY 输出数据类型(key)* */KeySelectorFlinkUser, Long keySelector new KeySelectorFlinkUser, Long() {Overridepublic Long getKey(FlinkUser value) throws Exception {return value.id;}};userKeyedStream.sum(id).partitionCustom(partitioner, keySelector).print();env.execute();}
}运行结果 6、分流 在处理数据的时候经常会将一条流或者一个表根据某些条件拆分成多条流或者多个表
flink中提供了分流的方式1、使用filter算子分流 2、使用侧输出流分流 6.1 使用filter算子分流 - 不推荐
这种分流方式的弊端 需要将原始流复制多份并对每一份做一次判断效率很低 (多次读取多次判断)
代码示例 // 通过 filter 分流public static void ByFilter() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 根据国家将 totalStream 分为三股流DataStreamSourceString totalStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策);SingleOutputStreamOperatorString weiStream totalStream.filter(e - e.contains(魏));SingleOutputStreamOperatorString shuStream totalStream.filter(e - e.contains(蜀));SingleOutputStreamOperatorString wuStream totalStream.filter(e - e.contains(吴));weiStream.print();shuStream.print();wuStream.print();// 3.触发程序执行env.execute();}
6.2 使用侧输出流分流 - 推荐 避免了使用filter算子的弊端指定source读取一次判断一次即可完成分流操作
代码示例 // 通过 侧输入流 分流public static void ByOutputTag() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 根据国家将 totalStream 分为三股流DataStreamSourceString totalStream env.fromElements(蜀_刘备, 蜀_关羽, 魏_曹操, 吴_孙权, 吴_孙坚, 吴_孙策);// 初始化侧输出流OutputTag weiOutputTag new OutputTag(wei, Types.STRING);OutputTag shuOutputTag new OutputTag(shu, Types.STRING);OutputTag wuOutputTag new OutputTag(wu, Types.STRING);// 通过 ProcessFunction向 侧输出流发送数据SingleOutputStreamOperatorString process totalStream.process(new ProcessFunctionString, String() {Overridepublic void processElement(String value, ProcessFunctionString, String.Context ctx, CollectorString out) throws Exception {// 往侧输出流中发送数据if (value.contains(魏)) {ctx.output(weiOutputTag, value);} else if (value.contains(蜀)) {ctx.output(shuOutputTag, value);} else if (value.contains(吴)) {ctx.output(wuOutputTag, value);}}});SideOutputDataStream weiStream process.getSideOutput(weiOutputTag);SideOutputDataStream shuStream process.getSideOutput(shuOutputTag);SideOutputDataStream wuStream process.getSideOutput(wuOutputTag);weiStream.print();shuStream.print();wuStream.print();// 3.触发程序执行env.execute();} 7、合并流