当前位置: 首页 > news >正文

网站论坛建设全国企业信息公示官网

网站论坛建设,全国企业信息公示官网,网络运营需要什么技术,wordpress 开发手册在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于…在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于测试➜ ~ nc -lk 8888 a b c k k k状态算子代码/** * Description TODO 自定义状态MapFunc **/ // 状态算子必须要实现对应的算子接口和CheckpointFunction接口 class StateMapFunc implements MapFunctionString, String, CheckpointedFunction{private ListStateString strListState;/*** Param o* return String* Description TODO map方法的正常处理逻辑**/Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals(k) RandomUtils.nextInt(0, 5) 3) {throw new Exception(Task 异常);}// 将数据添加到状态存储器中strListState.add(s);IterableString strings strListState.get();StringBuilder builder new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** Param functionSnapshotContext* return void* Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println(快照生成, checkpointId: functionSnapshotContext.getCheckpointId());}/*** Param functionInitializationContext* return void* Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptorString stateDescriptor new ListStateDescriptor(demo, String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState operatorStateStore.getListState(stateDescriptor);} }要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.业务代码 public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage(file:///Users/xxx/data/testData/checkpoint);// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSourceString socketSource env.socketTextStream(localhost, 8888);// 将自定义的StateOperator传入SingleOutputStreamOperatorString map socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute(Operator State);} }具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.
http://www.yutouwan.com/news/195632/

相关文章:

  • 深圳龙华公司哈尔滨网站建设优化公司
  • 可以做外贸的网站有哪些小型个人网站制作
  • 网站搜索排名优化怎么做网上买东西
  • 建一个网站的费用鞍山网上推广怎么弄?
  • 网站开发研究现状wordpress是是什么技术
  • 涡阳做网站郑州影视公司招聘
  • 网站建设怎样中英文后端开发和前端开发的区别
  • 网站设计制作哪种快一建论坛建工教育网
  • 企业网站排名技巧瑜伽wordpress模板
  • 泸州市住房和城乡建设局网站网站建设设计岗位职责
  • 商务网站系统中支付功能怎么做深圳网络营销网站建设
  • 实力网站优化公司首选广东网
  • 怎么做阿里巴巴外贸网站婚庆公司招聘
  • 山东网站建设运营商业网站建设与运营
  • 现在建设校园网站用什么软件wordpress 列表 展开收缩
  • 冠县网站建设惠州品牌网站建设公司哪里有
  • 西宁网站建设君博解决软文世界官网
  • 网站建设人员分布discuz转wordpress
  • wordpress怎么做404页面跳转北京seo关键词排名优化
  • 做网站需要哪些软件深圳网站建设公司jm3q
  • 江苏国龙翔建设网站.删除wordpress用户组
  • 动漫制作专业有哪些职业岗位青岛推广优化
  • jquery 素材的网站网站如何申请微信支付
  • 如何创建微信小程序商店海南百度推广seo
  • 网站的域名是什么公司logo设计图片欣赏
  • 找人做设计的网站国内营销策划咨询公司
  • 网站的备案编号wordpress设置网址导航
  • 广西网站建设seo优化亚马逊查关键词排名工具
  • 域名通过了才可以做网站吗seo网络推广是干嘛的
  • 营销网站设计与规划方案乐安网站建设