当前位置: 首页 > news >正文

济南cms建站网站建设对于企业的意义

济南cms建站,网站建设对于企业的意义,深圳免费建站,做网站哪些好状态一致性 一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次#xff0c;但是结果只有一个。 三个级别#xff1a; 最多一次#xff1a;1次或0次#xff0c;有可能丢数据至少一次#xff1a;1次或n次#xff0c;出错可能会重试 输入端只要可以做到数据重…状态一致性 一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次但是结果只有一个。 三个级别 最多一次1次或0次有可能丢数据至少一次1次或n次出错可能会重试 输入端只要可以做到数据重放即在出错后可以重新发送一样的数据 精确一次数据只会发送1次 幂等写入多次重复操作不影响结果有可能出现某个值由于数据重放导致结果回到原先的值然后逐渐恢复。预写日志 先把结果数据作为日志状态保存起来进行检查点保存时也会将这些结果数据一并做持久化存储在收到检查点完成的通知时将所有结果数据一次性写入外部系统 预写日志缺点这种再次确认的方式如果写入成功返回的ack出现故障还是会出现数据重复。两阶段提交2PC数据写入过程和数据提交分为两个过程如果写入过程没有发生异常就将事务进行提交。 算子节点在收到第一个数据时就开启一个事务然后提交数据在下一个检查点到达前都是预写入如果下一个检查点正常再进行最终提交。对外部系统有一定的要求要能够识别事务ID事务的重复提交应该是无效的。即barrier到来时如果结果一致就提交事务否则进行事务回滚 Flink和Kafka连接时的精确一次保证 开启检查点开启事务隔离级别读已提交注意设置kafka超时时间为10分钟 public class Flink02_KafkaToFlink {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//开启检查点env.enableCheckpointing(1000L);//kafka sourceKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092).setGroupId(flinkb).setTopics(topicA)//优先使用消费者组 记录的Offset进行消费如果offset不存在根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定统一使用通用方法.setProperty(isolation.level, read_committed).build();DataStreamSourceString ds env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafkasource);//处理过程//kafka SinkKafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092).setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(first).setValueSerializationSchema(new SimpleStringSchema()).build())//语义//AT_LEAST_ONCE:至少一次表示数据可能重复需要考虑去重操作//EXACTLY_ONCE:精确一次//kafka transaction timeout is larger than broker//kafka超时时间1H//broker超时时间15分钟// .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据传输的保障.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)//数据传输的保障.setTransactionalIdPrefix(flink RandomUtils.nextInt(0,100000)) // .setProperty(ProducerConfig.RETRIES_CONFIG,10).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,60*1000*10)//10分钟.build();ds.map(JSON::toJSONString).sinkTo(kafkaSink);//写入到kafka 生产者ds.sinkTo(kafkaSink);try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}} }FlinkSQL1.17 FlinkSQL不同版本的接口仍在变化有变动查看官网。 在官网这个位置可以查看Flink对于以来的一些官方介绍。 Table依赖剖析 三个依赖 1. flink-table-api-java-uber-1.17.2.jar (所有的Java API) 2. flink-table-runtime-1.17.2.jar (包含Table运行时) 3. flink-table-planner-loader-1.17.2.jar (查询计划器即SQL解析器) 静态导包在import后添加static,并在类后面加上*导入全部。主要是为了方便使用下面的 $ 方法否则 $ 方法前面都要添加Expressions的类名前缀 table.where($(vc).isGreaterOrEqual(100)).select($(id),$(vc),$(ts)).execute().print();程序架构 准备环境 流表环境基于流创建表环境表环境从操作层面与流独立底层处理还是流 创建表 基于流将流转换为表连接器表 转换处理 基于Table对象使用API进行处理基于SQL的方式直接写SQL处理 输出 基于Table对象或连接器表输出结果表转换为流基于流的方式输出 流处理中的表 处理的数据对象 关系字段元组的有界集合流处理字段元组的无限序列 对数据的访问 关系可以得到完整的流处理数据是动态的 因此处理过程中的表是动态表必须要持续查询。 流表转换 持续查询 追加查询窗口查询的结果通过追加的方式添加到表的末尾使用toDataStream更新查询窗口查询的结果会对原有的结果进行修改, 使用toChangeLogStream如果不清楚是什么类型直接使用toChangeLogSteam()将表转换为流 public class Flink04_TableToStreamQQ {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);SingleOutputStreamOperatorEvent ds env.socketTextStream(hadoop102, 8888).map(line - {String[] fields line.split(,);return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));});Table table tableEnv.fromDataStream(ds);tableEnv.createTemporaryView(t1, table);//SQLString appendSQL select user, url, ts from t1 where user zhangsan;//需要在查询过程中更新上一次的值String updateSQL select user, count(*) cnt from t1 group by user;Table resultTable tableEnv.sqlQuery(updateSQL);//表转换为流//doesnt support consuming update changes which is produced by node GroupAggregate(groupBy[user], select[user, COUNT(*) AS cnt]) // DataStreamRow rowDs tableEnv.toDataStream(resultTable);//有更新操作时使用toChangelogStream(),它即支持追加也支持更新查询DataStreamRow rowDs tableEnv.toChangelogStream(resultTable);rowDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}} }将动态表转换为流 仅追加流如果表的结果都是追加查询Retract撤回流 包含两类消息添加消息和撤回消息下游需要根据这两类消息进行处理 更新插入流 两种消息更新插入消息带key和删除消息 连接器 DataGen和Print连接器 public class Flink01_DataGenPrint {public static void main(String[] args) {//TableEnvironment tableEnv TableEnvironment.create(EnvironmentSettings.newInstance().build());//1. 准备表环境, 基于流环境创建表环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);//DataGenString createTable create table t1 ( id STRING , vc INT , ts BIGINT ) WITH ( connector datagen , rows-per-second 1 , fields.id.kind random , fields.id.length 6 , fields.vc.kind random , fields.vc.min 100 , fields.vc.max 1000 , fields.ts.kind sequence , fields.ts.start 1000000 , fields.ts.end 100000000 ) ;tableEnv.executeSql(createTable);//Table resultTable tableEnv.sqlQuery(select * from t1 where vc 200);//.execute().print();//printString sinkTable create table t2( id string, vc int, ts bigint ) with ( connector print, print-identifier print );tableEnv.executeSql(sinkTable);tableEnv.executeSql(insert into t2 select id, vc, ts from t1 where vc 200);} }文件连接器 public class Flink02_FileConnector {public static void main(String[] args) {TableEnvironment tableEnvironment TableEnvironment.create(EnvironmentSettings.newInstance().build());//FileSourceString sourceTable create table t1 ( id STRING , vc INT , ts BIGINT, // file.name string not null METADATA, 文件名字由于系统原因无法识别盘符后面的冒号 file.size bigint not null METADATA ) WITH ( connector filesystem , path input/ws.txt , format csv ) ;tableEnvironment.executeSql(sourceTable);//tableEnvironment.sqlQuery( select * from t1 ).execute().print();//转换处理...//File sinkString sinkTable create table t2 ( id STRING , vc INT , ts BIGINT, // file.name string not null METADATA, 文件名字由于系统原因无法识别盘符后面的冒号 file_size bigint ) WITH ( connector filesystem , path output , format json ) ;tableEnvironment.executeSql(sinkTable);tableEnvironment.executeSql(insert into t2 select id, vc, ts, file.size from t1);} }kafka连接器 public class Flink03_KafkaConnector {public static void main(String[] args) {TableEnvironment tableEnvironment TableEnvironment.create(EnvironmentSettings.newInstance().build());//kafka sourceString sourceTable create table t1 ( id STRING , vc INT , ts BIGINT, topic string not null METADATA, partition int not null METADATA, offset bigint not null METADATA ) WITH ( connector kafka , properties.bootstrap.servers hadoop102:9092,hadoop103:9092 , topic topicA, properties.group.id flinksql, value.format csv, scan.startup.mode group-offsets, properties.auto.offset.reset latest ) ;//创建表tableEnvironment.executeSql(sourceTable);//打印查询结果//tableEnvironment.sqlQuery( select * from t1 ).execute().print();//转换处理...//kafka SinkString sinkTable create table t2 ( id STRING , vc INT , ts BIGINT, topic string ) WITH ( connector kafka , properties.bootstrap.servers hadoop102:9092,hadoop103:9092 , topic topicB, sink.delivery-guarantee at-least-once, // properties.transaction.timeout.ms , // sink.transactional-id-prefix xf, // properties.group.id flinksql, value.format json // scan.startup.mode group-offsets, // properties.auto.offset.reset latest ) ;tableEnvironment.executeSql(sinkTable);tableEnvironment.executeSql(insert into t2 select id, vc, ts, topic from t1);} }Jdbc连接器
http://www.sadfv.cn/news/218317/

相关文章:

  • 景洪服装网站建设做网站选用什么域名比较好
  • 网站备案管理北京专业建设网站公司哪家好
  • led行业网站源码网站怎么做悬浮图片
  • 免费的在线学习网站怎么分析网站设计
  • 佛山高端网站制作公司哪家好建服务网站需要多少钱
  • 企业网站优化的弊端各国网站域名
  • 网站开发交易网站海口手机端建站模板
  • 个人网站建设及实现网络架构方法
  • 网站做关键词排名广西水利电力建设集团网站
  • 荷兰网站开发价格有人做网站推广吗
  • 进行网站开发前 需要干什么成都营销型网站设计
  • 网站建设的市场情况嘉兴市建设监理协会网站
  • html5手机网站框架单页淘宝客网站模板
  • 东营做网站优化公司怎样简单做网站
  • 台州网站建设策划网站栏目及内容
  • 黑龙江省建设工程交易中心网站速度啊网站
  • 设计排版优秀网站新手如何通过网络挣钱
  • 做普通网站需要多少钱我是做网站的 怎么才能提高业绩
  • 单页网站搭建吃什么补肾阳虚
  • 公司自有网站工信备案寻找网站开发
  • 做网站前的准备网站建设合同的要素及签订注意事项
  • 专业制作网站的公司建行手机银行app下载
  • 汕头高端模板建站咨询网站开发
  • 个人网站的设计与制作论文手表网站建站
  • 哪个网站找做软件网页二级页面设计
  • 做外贸需要关注的网站有什么好处建设银行贵金属网站
  • wordpress评论特效实用的企业网站优化技巧
  • 电子科技产品网站建设wordpress的tag函数使用教程
  • 制作酒店网站如何运用网站做推广
  • 我想学习做网站国内出名网站建设设计公司