自己网站的关键词怎么改,淘宝电商怎么做,凡科快图app,快速做网站线上运行的CEP中肯定经常遇到规则变更的情况#xff0c;如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景#xff0c;如果规则窗口过长#xff08;一两个星期#xff09;#xff0c;状态过大#xff0c;就会导致重启… 线上运行的CEP中肯定经常遇到规则变更的情况如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景如果规则窗口过长一两个星期状态过大就会导致重启时间延长期间就会造成一些想要处理的异常行为不能及时发现。 1.实现分析
外部加载通常规则引擎会有专门的规则管理模块提供用户去创建自己的规则对于Flink任务来说需要到外部去加载规则动态更新需要提供定时去检测规则是否变更历史状态清理在模式匹配中是一系列NFAState 的不断变更如果规则发生变更需要清理历史状态API需要对外提供易用的API
2.代码实现 首先实现一个用户API。
package cep.functions;import java.io.Serializable;import org.apache.flink.api.common.functions.Function;import cep.pattern.Pattern;/*** author StephenYou* Created on 2023-07-23* Description: 动态Pattern接口用户调用API不区分key*/
public interface DynamicPatternFunctionT extends Function, Serializable {/**** 初始化* throws Exception*/public void init() throws Exception;/*** 注入新的pattern* return*/public PatternT,T inject() throws Exception;/*** 一个扫描周期:ms* return*/public long getPeriod() throws Exception;/*** 规则是否发生变更* return*/public boolean isChanged() throws Exception;
}希望上述API的调用方式如下。
//正常调用CEP.pattern(dataStream,pattern);//动态PatternCEP.injectionPattern(dataStream, new UserDynamicPatternFunction()) 所以需要修改CEP-Lib源码 b.增加injectionPattern函数。
public class CEP {/**** Dynamic injection pattern function * param input* param dynamicPatternFunction* return* param T*/public static T PatternStreamT injectionPattern throws Exception (DataStreamT input,DynamicPatternFunctionT dynamicPatternFunction){return new PatternStream(input, dynamicPatternFunction); }
} 增加PatternStream构造函数因为需要动态更新所以有必要传进去整个函数。
public class PatternStreamT {PatternStream(final DataStreamT inputStream, DynamicPatternFunctionT dynamicPatternFunction) throws Exception {this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));}
} 修改PatternStreamBuilder.build, 增加调用函数的过程。 final CepOperatorIN, K, OUT operator null;if (patternFunction null ) {operator new CepOperator(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);} else {operator new CepOperator(inputSerializer,isProcessingTime,patternFunction,comparator,null,processFunction,lateDataOutputTag);} 增加对应的CepOperator构造函数。 public CepOperator(final TypeSerializerIN inputSerializer,final boolean isProcessingTime,final DynamicPatternFunction patternFunction,Nullable final EventComparatorIN comparator,Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,final PatternProcessFunctionIN, OUT function,Nullable final OutputTagIN lateDataOutputTag) {super(function);this.inputSerializer Preconditions.checkNotNull(inputSerializer);this.patternFunction patternFunction;this.isProcessingTime isProcessingTime;this.comparator comparator;this.lateDataOutputTag lateDataOutputTag;if (afterMatchSkipStrategy null) {this.afterMatchSkipStrategy AfterMatchSkipStrategy.noSkip();} else {this.afterMatchSkipStrategy afterMatchSkipStrategy;}this.nfaFactory null;} 加载Pattern构造NFA Overridepublic void open() throws Exception {super.open();timerService getInternalTimerService(watermark-callbacks, VoidNamespaceSerializer.INSTANCE, this);//初始化if (patternFunction ! null) {patternFunction.init();Pattern pattern patternFunction.inject();afterMatchSkipStrategy pattern.getAfterMatchSkipStrategy();boolean timeoutHandling getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory NFACompiler.compileFactory(pattern, timeoutHandling);long period patternFunction.getPeriod();// 注册定时器检测规则是否变更if (period 0) {getProcessingTimeService().registerTimer(timerService.currentProcessingTime() period, this::onProcessingTime);}}nfa nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context new ContextFunctionImpl();collector new TimestampedCollector(output);cepTimerService new TimerServiceImpl();// metricsthis.numLateRecordsDropped metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);} 状态清理一共分为两块: 匹配状态数据清理、定时器清理 进行状态清理: Overridepublic void processElement(StreamRecordIN element) throws Exception {if (patternFunction ! null) {// 规则版本更新if (needRefresh.value() refreshVersion.get()) {//清除状态computationStates.clear();elementQueueState.clear();partialMatches.releaseCacheStatisticsTimer();//清除定时器IterableLong registerTime registerTimeState.get();if (registerTime ! null) {IteratorLong iterator registerTime.iterator();while (iterator.hasNext()) {Long l iterator.next();//删除定时器timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);//状态清理iterator.remove();}}//更新当前的版本needRefresh.update(refreshVersion.get());}}
} 上面是在处理每条数据时清除状态和版本。接下来要进行状态和版本的初始化。 Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//初始化状态if (patternFunction ! null) {/*** 两个标识位状态*/refreshFlagState context.getOperatorStateStore().getUnionListState(new ListStateDescriptorInteger(refreshFlagState, Integer.class));if (context.isRestored()) {if (refreshFlagState.get().iterator().hasNext()) {refreshVersion new AtomicInteger(refreshFlagState.get().iterator().next());}} else {refreshVersion new AtomicInteger(0);}needRefresh context.getKeyedStateStore().getState(new ValueStateDescriptorInteger(needRefreshState, Integer.class, 0));}
}
3.测试验证 设置每10s变更一次Pattern。 PatternStream patternStream CEP.injectionPattern(source, new TestDynamicPatternFunction());patternStream.select(new PatternSelectFunctionTuple3String, Long, String, Map() {Overridepublic Map select(Map map) throws Exception {map.put(processingTime, System.currentTimeMillis());return map;}}).print();env.execute(SyCep);}public static class TestDynamicPatternFunction implements DynamicPatternFunctionTuple3String, Long, String {public TestDynamicPatternFunction() {this.flag true;}boolean flag;int time 0;Overridepublic void init() throws Exception {flag true;}Overridepublic PatternTuple3String, Long, String, Tuple3String, Long, String inject()throws Exception {// 2种patternif (flag) {Pattern pattern Pattern.Tuple3String, Long, Stringbegin(start).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(success);}}).times(1).followedBy(middle).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(fail);}}).times(1).next(end);return pattern;} else {Pattern pattern Pattern.Tuple3String, Long, Stringbegin(start2).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(success2);}}).times(2).next(middle2).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(fail2);}}).times(2).next(end2);return pattern;}}Overridepublic long getPeriod() throws Exception {return 10000;}Overridepublic boolean isChanged() throws Exception {flag !flag ;time getPeriod();System.out.println(change pattern : time);return true;}}
打印结果符合预期 4.源码地址
感觉有用的话帮忙点个小星星。^_^ GitHub - StephenYou520/SyCep: CEP 动态Pattern