当前位置: 首页 > news >正文

自己网站的关键词怎么改淘宝电商怎么做

自己网站的关键词怎么改,淘宝电商怎么做,凡科快图app,快速做网站线上运行的CEP中肯定经常遇到规则变更的情况#xff0c;如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景#xff0c;如果规则窗口过长#xff08;一两个星期#xff09;#xff0c;状态过大#xff0c;就会导致重启…        线上运行的CEP中肯定经常遇到规则变更的情况如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景如果规则窗口过长一两个星期状态过大就会导致重启时间延长期间就会造成一些想要处理的异常行为不能及时发现。 1.实现分析 外部加载通常规则引擎会有专门的规则管理模块提供用户去创建自己的规则对于Flink任务来说需要到外部去加载规则动态更新需要提供定时去检测规则是否变更历史状态清理在模式匹配中是一系列NFAState 的不断变更如果规则发生变更需要清理历史状态API需要对外提供易用的API 2.代码实现 首先实现一个用户API。 package cep.functions;import java.io.Serializable;import org.apache.flink.api.common.functions.Function;import cep.pattern.Pattern;/*** author StephenYou* Created on 2023-07-23* Description: 动态Pattern接口用户调用API不区分key*/ public interface DynamicPatternFunctionT extends Function, Serializable {/**** 初始化* throws Exception*/public void init() throws Exception;/*** 注入新的pattern* return*/public PatternT,T inject() throws Exception;/*** 一个扫描周期:ms* return*/public long getPeriod() throws Exception;/*** 规则是否发生变更* return*/public boolean isChanged() throws Exception; }希望上述API的调用方式如下。 //正常调用CEP.pattern(dataStream,pattern);//动态PatternCEP.injectionPattern(dataStream, new UserDynamicPatternFunction()) 所以需要修改CEP-Lib源码 b.增加injectionPattern函数。 public class CEP {/**** Dynamic injection pattern function * param input* param dynamicPatternFunction* return* param T*/public static T PatternStreamT injectionPattern throws Exception (DataStreamT input,DynamicPatternFunctionT dynamicPatternFunction){return new PatternStream(input, dynamicPatternFunction); } } 增加PatternStream构造函数因为需要动态更新所以有必要传进去整个函数。 public class PatternStreamT {PatternStream(final DataStreamT inputStream, DynamicPatternFunctionT dynamicPatternFunction) throws Exception {this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));} } 修改PatternStreamBuilder.build, 增加调用函数的过程。 final CepOperatorIN, K, OUT operator null;if (patternFunction null ) {operator new CepOperator(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);} else {operator new CepOperator(inputSerializer,isProcessingTime,patternFunction,comparator,null,processFunction,lateDataOutputTag);} 增加对应的CepOperator构造函数。 public CepOperator(final TypeSerializerIN inputSerializer,final boolean isProcessingTime,final DynamicPatternFunction patternFunction,Nullable final EventComparatorIN comparator,Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,final PatternProcessFunctionIN, OUT function,Nullable final OutputTagIN lateDataOutputTag) {super(function);this.inputSerializer Preconditions.checkNotNull(inputSerializer);this.patternFunction patternFunction;this.isProcessingTime isProcessingTime;this.comparator comparator;this.lateDataOutputTag lateDataOutputTag;if (afterMatchSkipStrategy null) {this.afterMatchSkipStrategy AfterMatchSkipStrategy.noSkip();} else {this.afterMatchSkipStrategy afterMatchSkipStrategy;}this.nfaFactory null;} 加载Pattern构造NFA Overridepublic void open() throws Exception {super.open();timerService getInternalTimerService(watermark-callbacks, VoidNamespaceSerializer.INSTANCE, this);//初始化if (patternFunction ! null) {patternFunction.init();Pattern pattern patternFunction.inject();afterMatchSkipStrategy pattern.getAfterMatchSkipStrategy();boolean timeoutHandling getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory NFACompiler.compileFactory(pattern, timeoutHandling);long period patternFunction.getPeriod();// 注册定时器检测规则是否变更if (period 0) {getProcessingTimeService().registerTimer(timerService.currentProcessingTime() period, this::onProcessingTime);}}nfa nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context new ContextFunctionImpl();collector new TimestampedCollector(output);cepTimerService new TimerServiceImpl();// metricsthis.numLateRecordsDropped metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);} 状态清理一共分为两块: 匹配状态数据清理、定时器清理 进行状态清理: Overridepublic void processElement(StreamRecordIN element) throws Exception {if (patternFunction ! null) {// 规则版本更新if (needRefresh.value() refreshVersion.get()) {//清除状态computationStates.clear();elementQueueState.clear();partialMatches.releaseCacheStatisticsTimer();//清除定时器IterableLong registerTime registerTimeState.get();if (registerTime ! null) {IteratorLong iterator registerTime.iterator();while (iterator.hasNext()) {Long l iterator.next();//删除定时器timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);//状态清理iterator.remove();}}//更新当前的版本needRefresh.update(refreshVersion.get());}} } 上面是在处理每条数据时清除状态和版本。接下来要进行状态和版本的初始化。 Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//初始化状态if (patternFunction ! null) {/*** 两个标识位状态*/refreshFlagState context.getOperatorStateStore().getUnionListState(new ListStateDescriptorInteger(refreshFlagState, Integer.class));if (context.isRestored()) {if (refreshFlagState.get().iterator().hasNext()) {refreshVersion new AtomicInteger(refreshFlagState.get().iterator().next());}} else {refreshVersion new AtomicInteger(0);}needRefresh context.getKeyedStateStore().getState(new ValueStateDescriptorInteger(needRefreshState, Integer.class, 0));} } 3.测试验证 设置每10s变更一次Pattern。 PatternStream patternStream CEP.injectionPattern(source, new TestDynamicPatternFunction());patternStream.select(new PatternSelectFunctionTuple3String, Long, String, Map() {Overridepublic Map select(Map map) throws Exception {map.put(processingTime, System.currentTimeMillis());return map;}}).print();env.execute(SyCep);}public static class TestDynamicPatternFunction implements DynamicPatternFunctionTuple3String, Long, String {public TestDynamicPatternFunction() {this.flag true;}boolean flag;int time 0;Overridepublic void init() throws Exception {flag true;}Overridepublic PatternTuple3String, Long, String, Tuple3String, Long, String inject()throws Exception {// 2种patternif (flag) {Pattern pattern Pattern.Tuple3String, Long, Stringbegin(start).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(success);}}).times(1).followedBy(middle).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(fail);}}).times(1).next(end);return pattern;} else {Pattern pattern Pattern.Tuple3String, Long, Stringbegin(start2).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(success2);}}).times(2).next(middle2).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(fail2);}}).times(2).next(end2);return pattern;}}Overridepublic long getPeriod() throws Exception {return 10000;}Overridepublic boolean isChanged() throws Exception {flag !flag ;time getPeriod();System.out.println(change pattern : time);return true;}} 打印结果符合预期 4.源码地址 感觉有用的话帮忙点个小星星。^_^ GitHub - StephenYou520/SyCep: CEP 动态Pattern
http://www.sadfv.cn/news/241568/

相关文章:

  • 河北网站建设案例采集文章留在网站
  • 计算机应用技术网站建设图片做旧网站
  • 网站建设幻灯片背景图片素材网站规划与开发设计
  • 常德找工作网站广东网站设计公司电话
  • 域名网站有哪些dw制作班级网站
  • 西安网站制作公司哪建站公司网站社区
  • 宝安做网站公司科技感强的网站
  • 新手学做网站代码网站建设公司谁管
  • 浙江省邮电工程建设有限公司网站管理咨询公司简介模板
  • 网站栏目内容和功能阿里云网站建设如何
  • 小白网页制作软件搜索引擎优化通常要注意的问题有
  • 网站建设j基本步骤选择合肥网站建设
  • 专业网站是什么意思wordpress 替换jquery
  • 网站建设设计企业投注类网站怎么做自动软件
  • 洛阳网站设计哪家便宜线上卖货平台有哪些
  • 建设厅网站进不去wordpress手机站主题
  • 推荐武进网站建设wordpress标签页样式
  • 免费注册企业网站建材在哪里做网站好
  • 智能建站代理自己建立旅游的网站建设
  • 沧州做网站推广公司网络营销案例及视频
  • 网站设计师的工作内容网页设计师通常是设计两套ui吗
  • 赤壁网站开发WordPress 先登录
  • 如何让自己网站排名提高wordpress文章直接转html代码
  • 柳州最强的网站建设上海建设学院网站
  • 招聘网站开发文档做网站的公司排名
  • 网站怎么推广比较好深圳市住房和建设局投诉电话
  • 电子商务网站建设市场分析阳江网络推广公司
  • 设计本网站图片大全云计算培训
  • 网站列表页框架布局原则关键词查询工具免费
  • h5做的公司网站手机微网站系统