有关网站开发的外文文献,海南建设银行官网招聘网站,wordpress 缩短网址,怎样做模板网站【README】
1.本文包含了 批处理与流处理的代码示例#xff1b;
批处理#xff1a;把数据 攒在一起#xff08;或攒一段时间或攒一定内存大小#xff09;#xff0c;然后再处理#xff0c;这叫批处理#xff1b;流处理#xff1a;数据每来一个就处理一个#xff1b;…【README】
1.本文包含了 批处理与流处理的代码示例
批处理把数据 攒在一起或攒一段时间或攒一定内存大小然后再处理这叫批处理流处理数据每来一个就处理一个
2.特点
数据处理方式特点批处理1.高延时流处理1.低延时
3.引入flink的maven依赖
dependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.14.4/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.14.4/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.14.4/version/dependency/dependencies 【1】flink批处理离线数据数据有限
【1.1】代码
1数据源我们保存在本地文本文件中命名为 hello.txt
hello world
hello flink
how are you
thank you
hello zhangsan
hello lisi
2批处理代码
/*** Description 批处理word count程序离线数据* author xiao tang* version 1.0.0* createTime 2022年04月09日*/
public class WordCount {public static void main(String[] args) throws Exception {// 创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 从文件中读取数据String inputPath D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\hello.txt;DataSourceString dataSource env.readTextFile(inputPath);// 对数据集处理按照空格分词展开转为 (word,1) 二元组统计DataSetTuple2String, Integer resultSet dataSource.flatMap(new MyFlatMapper()).groupBy(0) // 按照第1个位置的word分组.sum(1); // 将第2个位置上的数据求和resultSet.print();}public static class MyFlatMapper implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String value, CollectorTuple2String, Integer collector) throws Exception {// 按照空格分词String[] words value.split( );// 遍历所有word包装成word 输出Arrays.stream(words).forEach(x-{collector.collect(new Tuple2(x, 1));});}}
}
批处理打印结果 (you,2) (flink,1) (world,1) (hello,4) (lisi,1) (zhangsan,1) (are,1) (thank,1) (how,1) 批处理的结果是最终结果 【2】flink流处理离线数据数据有限
/*** Description 流数据无限数据* author xiao tang* version 1.0.0* createTime 2022年04月09日*/
public class StreamWordCount {public static void main(String[] args) throws Exception {// 流处理执行环境StreamExecutionEnvironment streamEnv StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(2); // 设置并行度// 从文件中读取数据String inputPath D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\hello.txt;DataStreamString dataStream streamEnv.readTextFile(inputPath);// 定义流操作DataStreamTuple2String, Integer resultStream dataStream.flatMap(new WordCount.MyFlatMapper()).keyBy(0).sum(1);// 打印结果resultStream.print();// 执行任务流终止操作streamEnv.execute();}
}
打印结果 2 (world,1) 1 (thank,1) 2 (flink,1) 1 (hello,1) 2 (how,1) 2 (you,1) 1 (hello,2) 2 (you,2) 1 (hello,3) 2 (zhangsan,1) 1 (hello,4) 2 (lisi,1) 1 (are,1) 流处理的结果是一个动态变化的有状态的结果
有状态的意思说白了就是后面的处理结果依赖前面的处理结果如对hello计数为3它是在前面hello计数为2的基础上做的处理 【3】flink流处理在线数据数据无限
我们引入了 netcatnc底层使用socket模拟向某端口写入数据
然后 flink监控该端口的数据并做处理
【3.1】 flink处理类
处理类监听了 nc所在机器的的端口即 192.168.163.201:7777
/*** Description socket文本流词计数* author xiao tang* version 1.0.0* createTime 2022年04月09日*/
public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {// 流处理执行环境StreamExecutionEnvironment streamEnv StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(2); // 设置并行度// 从 flinkjava parametertool 获取参数或有
// ParameterTool parameterTool ParameterTool.fromArgs(args);
// String host parameterTool.get(host);
// int port parameterTool.getInt(port);// 从socket文本流读取数据DataStreamString inputDataStream streamEnv.socketTextStream(192.168.163.201, 7777);// 定义流操作DataStreamTuple2String, Integer resultStream inputDataStream.flatMap(new WordCount.MyFlatMapper()).keyBy(0).sum(1);// 打印结果resultStream.print();// 执行任务流终止操作streamEnv.execute();}
}
演示效果