网站论坛建设,全国企业信息公示官网,网络运营需要什么技术,wordpress 开发手册在Flink中状态主要分为三种:
Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态)
这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解
数据源 这里选用Socket作为Source输入,便于…在Flink中状态主要分为三种:
Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态)
这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解
数据源 这里选用Socket作为Source输入,便于测试➜ ~ nc -lk 8888
a
b
c
k
k
k状态算子代码/**
* Description TODO 自定义状态MapFunc
**/
// 状态算子必须要实现对应的算子接口和CheckpointFunction接口
class StateMapFunc implements MapFunctionString, String, CheckpointedFunction{private ListStateString strListState;/*** Param o* return String* Description TODO map方法的正常处理逻辑**/Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals(k) RandomUtils.nextInt(0, 5) 3) {throw new Exception(Task 异常);}// 将数据添加到状态存储器中strListState.add(s);IterableString strings strListState.get();StringBuilder builder new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** Param functionSnapshotContext* return void* Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println(快照生成, checkpointId: functionSnapshotContext.getCheckpointId());}/*** Param functionInitializationContext* return void* Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptorString stateDescriptor new ListStateDescriptor(demo, String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState operatorStateStore.getListState(stateDescriptor);}
}要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.业务代码 public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage(file:///Users/xxx/data/testData/checkpoint);// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSourceString socketSource env.socketTextStream(localhost, 8888);// 将自定义的StateOperator传入SingleOutputStreamOperatorString map socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute(Operator State);}
}具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.