网站建设平台天梯建站网站建投网站,沈阳工程信息,店铺推广引流的方法,网络公司 营销型网站DataStream API
主要流程#xff1a; 获取执行环境读取数据源转换操作输出数据Execute触发执行 获取执行环境 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)创建本地环境StreamExecutionEnvironment.createLocalEnvironment()创建远程环境creat…DataStream API
主要流程 获取执行环境读取数据源转换操作输出数据Execute触发执行 获取执行环境 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)创建本地环境StreamExecutionEnvironment.createLocalEnvironment()创建远程环境createRemoteEnvironment(“hadoop102”, 37784, “jar/1.jar”) 参数1主机号参数2端口号参数3作业jar包的路径 获取数据源 简单数据源 从集合中读取数据env.fromCollection(集合)从元素列表中获取数据env.fromElements()从文件中读取数据env.readTextFIle(路径), 已废弃从端口读取数据env.socketTextStream() 文件数据源kafka数据源DataGen数据源自定义数据源
文件数据源
使用文件数据源前需要先添加相关依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/versionscopeprovided/scope
/dependencypublic class Flink02_FileSource {public static void main(String[] args) throw Exception {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//file sourceFileSource.FileSourceBuilderString fileSourceBuilder FileSource.StringforRecordStreamFormat(new TextLineInputFormat(utf-8), new Path(input/word.txt));FileSourceString fileSource fileSourceBuilder.build();//source 算子DataStreamSourceString ds env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), fileSource);ds.print();env.execute();}
}DataGen数据源
主要用于生成模拟数据也需要导入相关依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/versionscopecompile/scope/dependencypublic class Flink04_DataGenSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return UUID.randomUUID() - value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);DataStreamSourceString dataGenDs env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), dataGenDs);dataGenDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}Kafka消费者 消费方式拉取 消费者对象KafkaConsumenr 消费原则 一个主题的一个分区只能被一个消费者组中的一个消费者消费 一个消费者组中的一个消费者可以消费一个主题中的多个分区 消费者相关的参数 key.deserializer 反序列化value.deserializerbootstrap.servers 集群的位置group.id 消费者组id 为何分组方便同一组的消费者进行断点续传auto.commit.interval.ms 自动提交间隔 默认5senable.auto.commit 开启自动提交offset偏移量auto.offset.reset 当offset不存在时offset重置默认是最末尾的位置 ①新的消费者组之前没有消费过没有记录的offset②当前要消费的offset在kafka中已经不存在可能是因为时间久了对应的数据清理掉了 重置策略 earliest: 头能消费到分区中现有的数据latest: 尾只能消费到分区中新来的数据 isolation.level事务隔离级别 读未提交读已提交 消费数据存在的问题 漏消费导致数据丢失重复消费导致数据重复 shell 创建生产者对象kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
public class Flink03_KafkaSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);KafkaSourceString stringKafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092).setGroupId(flink).setTopics(first)//优先使用消费者组记录的Offset进行消费如果offset不存在根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定统一使用通用方法
// .setProperty(isolation.level, read_committed).build();DataStreamSourceString kafkaDS env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), kafkaDS);kafkaDS.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}