当前位置: 首页 > news >正文

网站建设平台天梯建站网站建投网站沈阳工程信息

网站建设平台天梯建站网站建投网站,沈阳工程信息,店铺推广引流的方法,网络公司 营销型网站DataStream API 主要流程#xff1a; 获取执行环境读取数据源转换操作输出数据Execute触发执行 获取执行环境 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)创建本地环境StreamExecutionEnvironment.createLocalEnvironment()创建远程环境creat…DataStream API 主要流程 获取执行环境读取数据源转换操作输出数据Execute触发执行 获取执行环境 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)创建本地环境StreamExecutionEnvironment.createLocalEnvironment()创建远程环境createRemoteEnvironment(“hadoop102”, 37784, “jar/1.jar”) 参数1主机号参数2端口号参数3作业jar包的路径 获取数据源 简单数据源 从集合中读取数据env.fromCollection(集合)从元素列表中获取数据env.fromElements()从文件中读取数据env.readTextFIle(路径), 已废弃从端口读取数据env.socketTextStream() 文件数据源kafka数据源DataGen数据源自定义数据源 文件数据源 使用文件数据源前需要先添加相关依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/versionscopeprovided/scope /dependencypublic class Flink02_FileSource {public static void main(String[] args) throw Exception {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//file sourceFileSource.FileSourceBuilderString fileSourceBuilder FileSource.StringforRecordStreamFormat(new TextLineInputFormat(utf-8), new Path(input/word.txt));FileSourceString fileSource fileSourceBuilder.build();//source 算子DataStreamSourceString ds env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), fileSource);ds.print();env.execute();} }DataGen数据源 主要用于生成模拟数据也需要导入相关依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/versionscopecompile/scope/dependencypublic class Flink04_DataGenSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return UUID.randomUUID() - value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);DataStreamSourceString dataGenDs env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), dataGenDs);dataGenDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}} }Kafka消费者 消费方式拉取 消费者对象KafkaConsumenr 消费原则 一个主题的一个分区只能被一个消费者组中的一个消费者消费 一个消费者组中的一个消费者可以消费一个主题中的多个分区 消费者相关的参数 key.deserializer 反序列化value.deserializerbootstrap.servers 集群的位置group.id 消费者组id 为何分组方便同一组的消费者进行断点续传auto.commit.interval.ms 自动提交间隔 默认5senable.auto.commit 开启自动提交offset偏移量auto.offset.reset 当offset不存在时offset重置默认是最末尾的位置 ①新的消费者组之前没有消费过没有记录的offset②当前要消费的offset在kafka中已经不存在可能是因为时间久了对应的数据清理掉了 重置策略 earliest: 头能消费到分区中现有的数据latest: 尾只能消费到分区中新来的数据 isolation.level事务隔离级别 读未提交读已提交 消费数据存在的问题 漏消费导致数据丢失重复消费导致数据重复 shell 创建生产者对象kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first public class Flink03_KafkaSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);KafkaSourceString stringKafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092).setGroupId(flink).setTopics(first)//优先使用消费者组记录的Offset进行消费如果offset不存在根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定统一使用通用方法 // .setProperty(isolation.level, read_committed).build();DataStreamSourceString kafkaDS env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), kafkaDS);kafkaDS.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}} }
http://www.sadfv.cn/news/221462/

相关文章:

  • 网站瀑布流怎么做重庆公司章程如何查询下载
  • 微软网站开发技术重庆市建筑协会信息网
  • 更合网站设计制作阿里大数据平台
  • 海宁网站开发我国的课程一般通过
  • 深圳市手机网站建设怎么样管理系统的组成
  • 网站毕业设计一般做几个页面合肥聚名网络科技有限公司
  • 品牌授权网站怎么设置网站的关键字
  • 济南制作网站企业写给初学网站开发们的一封信
  • 哪些网站微信支付平台通过php获取手机网站访客的手机号码
  • 数据分析网站网站服务器权限
  • 手机网站 微信网站企业做网站分哪几种
  • 做积分商城网站seo教程网站优化
  • 自己建网站怎么做影视资源wordpress 搜索 分词
  • 网站开发个人基本情况1000字开发一个小程序游戏要多少钱
  • 网站外链怎么购买网站建设计划方案模板下载
  • 二手物品交易网站开发环境网站制作前言公司
  • 做网站后的总结宿迁司法拍卖房产网
  • 虚拟主机和网站空间网页游戏开服表怎么删
  • 新乡做网站的seo职位信息
  • 手机网站开发视频个人网页制作设计模板
  • 亚马逊品牌网站怎么做平台商城网站建设
  • 抚顺网站设计郑州发布
  • 智慧团建网站注册登录入口手机上怎么做网页
  • 1简述网站建设流程图部分网站为什么网页打不开的原因及解决方法
  • 两个域名同时指向一个网站如何建立自己的公司网站
  • 用自己的电脑建设网站安阳信息网官网
  • 主题网站设计与制作中国软件外包网
  • 沈阳企业网站wordpress如何添加安装导航
  • 网站代理登录域名wordpress 密码加密方式
  • 常平网站仿做中国建设银行湖北省分行网站