湖北设计公司,深圳网站seo推广,网站建设的栏目规划,学校门户网站什么意思背景
算子的联合列表状态是平时使用的比较少的一种状态#xff0c;本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态
算子联合列表状态
首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主…背景
算子的联合列表状态是平时使用的比较少的一种状态本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态
算子联合列表状态
首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主要由这两个方法处理 1初始化方法
public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates stateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState new TreeMap(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2KafkaTopicPartition, Long kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info(Consumer subtask {} restored state: {}.,getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info(Consumer subtask {} has no restore state.,getRuntimeContext().getIndexOfThisSubtask());}}2.开始通知检查点开始的方法
public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug(snapshotState() called on closed source);} else {unionOffsetStates.clear();final AbstractFetcher?, ? fetcher this.kafkaFetcher;if (fetcher null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.EntryKafkaTopicPartition, Long subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMapKafkaTopicPartition, Long currentOffsets fetcher.snapshotCurrentState();if (offsetCommitMode OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.EntryKafkaTopicPartition, Long kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}