当前位置: 首页 > news >正文

网站建设 作用一个app安卓下载

网站建设 作用,一个app安卓下载,查权重,北京软件外包公司传送门 分布式定时任务系列1#xff1a;XXL-job安装 分布式定时任务系列2#xff1a;XXL-job使用 分布式定时任务系列3#xff1a;任务执行引擎设计 分布式定时任务系列4#xff1a;任务执行引擎设计续 Java并发编程实战1#xff1a;java中的阻塞队列 引子 这篇文章的…传送门 分布式定时任务系列1XXL-job安装 分布式定时任务系列2XXL-job使用 分布式定时任务系列3任务执行引擎设计 分布式定时任务系列4任务执行引擎设计续 Java并发编程实战1java中的阻塞队列 引子 这篇文章的主要目不是讨论XXL-job的使用而是要通过它的任务线程实现机制来分析java中阻塞队列的应用 而这一切要从上周某天公司一个普通的下午说起。 当时一个同事要添加任务就随口问了一句“阻塞处理策略选啥”我心里一惊以前没注意过这个地方每次都是默认的也就是单机串行。 就是下图这个 这里面有3个选项 众人先是侃侃讨论了半天后面再去翻了源码来查看发现了下面的blockingQueue 阻塞队列 什么是阻塞队列 关于java阻塞队列可以看看java中的阻塞队列。里面例子用到的是有界队列ArrayBlockingQueueXXL-job里面用的是无界队列LinkedBlockingQueue。不论它们的相似及区别点最重要的都是拥有阻塞特性。 要分析XXL-job是怎么使用阻塞队列的就从XXL-job的调度触发机制入手 什么是XXL-job任务 前面在分布式定时任务系列2XXL-job使用里面介绍过XXL-job的使用注册一个任务也很简单只要在业务代码方法中打上XxlJob注解即可下面是官方提供的demo /*** 1、简单任务示例Bean模式*/XxlJob(demoJobHandler)public void demoJobHandler() throws Exception {XxlJobHelper.log(XXL-JOB, Hello World.);for (int i 0; i 5; i) {XxlJobHelper.log(beat at: i);TimeUnit.SECONDS.sleep(2);}// default success} 开发步骤 1、任务开发在Spring Bean实例中开发Job方法 2、注解配置为Job方法添加注解 XxlJob(value自定义jobhandler名称, init JobHandler初始化方法, destroy JobHandler销毁方法)注解value值对应的是调度中心新建任务的JobHandler属性的值。 3、执行日志需要通过 XxlJobHelper.log 打印执行日志 4、任务结果默认任务结果为 成功 状态不需要主动设置如有诉求比如设置任务结果为失败可以通过 XxlJobHelper.handleFail/handleSuccess 自主设置任务结果  任务注册 对于XXL-job来说是怎么注册并管理这些任务的呢 在程序启动的时候xxl-job-core包中的XxlJobSpringExecutor类由于实现了接口SmartInitializingSingleton在Bean实例化完成时会调用afterSingletonsInstantiated()方法 Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}} 在initJobHandlerMethodRepository(applicationContext)方法中现在看一下这个方法的代码该方法会提取所有SpringBean的实例中方法上添加了XxlJob注解的方法并进行注册 private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext null) {return;}// init job handler from methodString[] beanDefinitionNames applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean applicationContext.getBean(beanDefinitionName);MapMethod, XxlJob annotatedMethods null; // referred to org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookupXxlJob() {Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error(xxl-job method-jobhandler resolve error for bean[ beanDefinitionName ]., ex);}if (annotatedMethodsnull || annotatedMethods.isEmpty()) {continue;}for (Map.EntryMethod, XxlJob methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod methodXxlJobEntry.getKey();XxlJob xxlJob methodXxlJobEntry.getValue();// registregistJobHandler(xxlJob, bean, executeMethod);}}// ---------------------- job handler repository ----------------------private static ConcurrentMapString, IJobHandler jobHandlerRepository new ConcurrentHashMapString, IJobHandler();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info( xxl-job register jobhandler success, name:{}, jobHandler:{}, name, jobHandler);return jobHandlerRepository.put(name, jobHandler);} 注册的方法就是registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod))会将XxlJob注解标记的方法提取出对应的方法名通过反射得到Method信息并实例为对应的MethodJobHandler最后通过ConcurrentMap来管理其中key是注任务名称value为JobHandler实例。 看到这里不知你有没有一个疑问这里的JobHandler里面并没有阻塞队列BlockingQueue它们是怎么关联起来的呢所以这里就要讨论一下XXL-job的任务执行! XXL-job的任务执行 在后台管理手动触发一下测试任务并把代码打上debug断点 首先会发送后台请求 一路跟踪下去会进入到XxlJobExecutor.runExecutor(TriggerParam triggerParam, String address)方法最终是到ExecutorBizImpl.run(TriggerParam triggerParam)方法 public ReturnTString run(TriggerParam triggerParam) {// load oldjobHandler jobThreadJobThread jobThread XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandler jobHandler jobThread!null?jobThread.getHandler():null;String removeOldReason null;// validjobHandler jobThreadGlueTypeEnum glueTypeEnum GlueTypeEnum.match(triggerParam.getGlueType());if (GlueTypeEnum.BEAN glueTypeEnum) {// new jobhandlerIJobHandler newJobHandler XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThreadif (jobThread!null jobHandler ! newJobHandler) {// change handler, need kill old threadremoveOldReason change jobhandler or glue type, and terminate the old job thread.;jobThread null;jobHandler null;}// valid handlerif (jobHandler null) {jobHandler newJobHandler;if (jobHandler null) {return new ReturnTString(ReturnT.FAIL_CODE, job handler [ triggerParam.getExecutorHandler() ] not found.);}}} else if (GlueTypeEnum.GLUE_GROOVY glueTypeEnum) {// valid old jobThreadif (jobThread ! null !(jobThread.getHandler() instanceof GlueJobHandler ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason change job source or glue type, and terminate the old job thread.;jobThread null;jobHandler null;}// valid handlerif (jobHandler null) {try {IJobHandler originJobHandler GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnTString(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!null glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread ! null !(jobThread.getHandler() instanceof ScriptJobHandler ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason change job source or glue type, and terminate the old job thread.;jobThread null;jobHandler null;}// valid handlerif (jobHandler null) {jobHandler new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnTString(ReturnT.FAIL_CODE, glueType[ triggerParam.getGlueType() ] is not valid.);}// executor block strategyif (jobThread ! null) {ExecutorBlockStrategyEnum blockStrategy ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnTString(ReturnT.FAIL_CODE, block strategy effectExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason block strategy effect ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread null;}} else {// just queue trigger}}// replace thread (new or exists invalid)if (jobThread null) {jobThread XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queueReturnTString pushResult jobThread.pushTriggerQueue(triggerParam);return pushResult;} 任务执行过程中如果发现XXL-job对应的执行线程不存在会创建一个新new Thread实例并绑定前面的JobHandler实例这样对于每一个任务都有一个单独的线程也就将执行线程JobThread跟任务关联起来了 public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread new JobThread(jobId, handler);newJobThread.start();logger.info( xxl-job regist JobThread success, jobId:{}, handler:{}, new Object[]{jobId, handler});JobThread oldJobThread jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, maps put method return the old value!!!if (oldJobThread ! null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}public JobThread(int jobId, IJobHandler handler) {this.jobId jobId;this.handler handler;// 初始化阻塞队列this.triggerQueue new LinkedBlockingQueueTriggerParam();this.triggerLogIdSet Collections.synchronizedSet(new HashSetLong());// assign job thread namethis.setName(xxl-job, JobThread-jobId-System.currentTimeMillis());} 而执行任务的最后一步就是将当前任务入队到刚才的执行线程的BlockingQueue中 // push data to queueReturnTString pushResult jobThread.pushTriggerQueue(triggerParam); 而这也就跟文章最前面的代码关联起来了 /*** new trigger to queue** param triggerParam* return*/public ReturnTString pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info( repeate trigger job, logId:{}, triggerParam.getLogId());return new ReturnTString(ReturnT.FAIL_CODE, repeate trigger job, logId: triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;} BlockingQueue的应用 消息入队 至此就可以看出任务触发的大致逻辑了用户在admin手工触发任务之后最终会放入到任务对应的队列之中通过add方法将当前执行参数入队FIFO不阻塞队列如果满了就抛出异常这也是一个比较大的隐患如果触了过于频繁可能会导致OOM而这也就是为什么要设计不同阻塞策略的原因。 消息消费 而这些触发的任务怎么消费的呢其实也是在上面的提到的执行线程JobThread 当实例化一个线程的时候com.xxl.job.core.executor.XxlJobExecutor类会立即启动调用线程的start()方法 启动之后会调用线程的run()方法com.xxl.job.core.thread.JobThread类 在执行线程中会写一个while的无限循环来不停的从阻塞队列中取出任务 循环结束的条件就是上面的阻塞队列如果是覆盖之前调度将会结束循环com.xxl.job.core.biz.impl.ExecutorBizImpl类
http://www.sadfv.cn/news/253373/

相关文章:

  • 中国交通建设集团有限公司是央企无锡优化
  • 山东网站建设价格实惠怎么用wordpress打开网站吗
  • 免费做文字图网站天津网站建设开发维护
  • 嘉兴网站建设制作查询公司的网站备案信息
  • 网站建设播放vr视频国内建站源码
  • 最牛的网站建设恶意网站的防治
  • 绍兴易网网站开发html5建一个网站
  • 商城网站前置审批国外做ppt网站
  • 营销网站案例什么意思asp网站301
  • 网上商城网站系统网站建设教程这篇苏州久远网络
  • 优普南通网站建设wamp个人网站开发来源
  • 网站构建的过程邢台地区网站建设口碑好
  • 广东平台网站建设找哪家能进网站的浏览器
  • dede音乐网站wordpress顶和踩功能
  • 做红k线网站做网站最常用的软件是什么
  • 长春仿站定制模板建站做网站优化推广的好处
  • 微山做网站wordpress 帐号共用
  • 网站被降权后怎么办清远专业网站制作公司
  • 开发公司章程长沙网站seo推广公司哪家好
  • 网站默认数据库地址梵客家装口碑怎么样
  • 网站建设费用摊销多少年秦皇岛市建设路小学网站
  • 做网站加入视频无法播放做跨境网站注意事项
  • 酒泉网站建设费用企业网站关键词优化排名应该怎么做
  • 国家建设部举报网站页面排版布局
  • 成都企业建站系统推广普通话手抄报模板可打印
  • 网站 改版 建议国外做网站用的程序
  • 做网站过程中的自身不足原创小说手机网站制作需要多少钱
  • 自己做的网站出现iis7租用大型服务器多少钱
  • 重庆网站建设快忻宁波建设监理管理协会网站
  • 天津做网站联系方式wordpress怎么上传产品