当前位置: 首页 > news >正文

腾讯云建设网站教程线上推广工作内容

腾讯云建设网站教程,线上推广工作内容,装修品牌排行榜前十名,比一网站建设分布式调度Elastic-job 1. 概述 1.1什么是任务调度 我们可以思考⼀下下⾯业务场景的解决⽅案: 某电商平台需要每天上午10点#xff0c;下午3点#xff0c;晚上8点发放⼀批优惠券某银⾏系统需要在信⽤卡到期还款⽇的前三天进⾏短信提醒某财务系统需要在每天凌晨0:10分结算前…分布式调度Elastic-job 1. 概述 1.1什么是任务调度 我们可以思考⼀下下⾯业务场景的解决⽅案: 某电商平台需要每天上午10点下午3点晚上8点发放⼀批优惠券某银⾏系统需要在信⽤卡到期还款⽇的前三天进⾏短信提醒某财务系统需要在每天凌晨0:10分结算前⼀天的财务数据统计汇总 以上场景就是任务调度所需要解决的问题 任务调度是为了自动完成特定任务在约定的特定时刻去执行任务的过程 我们经常使用Spring中提供的定时任务注解Scheduled 在业务类中⽅法中贴上这个注解 Scheduled(cron 0/20 * * * * ? ) public void doWork(){ //doSomething }然后在启动类上贴上 EnableScheduling 注解 1.2 为什么需要分布式调度 感觉Spring给我们提供的这个注解可以完成任务调度的功能好像已经完美解决问题了为什么还需要 分布式呢? 主要有如下这⼏点原因: 1.单机处理极限原本1分钟内需要处理1万个订单但是现在需要1分钟内处理10万个订单原来⼀个 统计需要1⼩时现在业务⽅需要10分钟就统计出来。你也许会说你也可以多线程、单机多进程处 理。的确多线程并⾏处理可以提⾼单位时间的处理效率但是单机能⼒毕竟有限主要是CPU、内存 和磁盘始终会有单机处理不过来的情况。 2.高可用单机版的定式任务调度只能在⼀台机器上运⾏如果程序或者系统出现异常就会导致功能不 可⽤。虽然可以在单机程序实现的⾜够稳定但始终有机会遇到⾮程序引起的故障⽽这个对于⼀个系 统的核⼼功能来说是不可接受的。 3.防止重复执行: 在单机模式下定时任务是没什么问题的。但当我们部署了多台服务同时⼜每台服务 ⼜有定时任务时若不进⾏合理的控制在同⼀时间只有⼀个定时任务启动执⾏这时定时执⾏的结 果就可能存在混乱和错误了 这个时候就需要分布式的任务调度来实现了。 1.3 Elastic-Job介绍 Elastic-Job是⼀个分布式调度的解决⽅案由当当⽹开源它由两个相互独⽴的⼦项⽬Elastic-job-Lite和 Elastic-Job-Cloud组成使⽤Elastic-Job可以快速实现分布式任务调度。 Elastic-Job的地址 https://shardingsphere.apache.org/elasticjob/ 功能列表: 分布式调度协调 在分布式环境中任务能够按照指定的调度策略执⾏并且能够避免同⼀任务多实例重复执⾏。 丰富的调度策略 基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务。 弹性拓容缩容 当集群中增加⼀个实例它应当能够被选举被执⾏任务当集群减少⼀个实例时他所执⾏的任务 能被转移到别的示例中执⾏。 失效转移 某示例在任务执⾏失败后会被转移到其他实例执⾏。 错过执行任务重触发 若因某种原因导致作业错过执⾏⾃动记录错误执⾏的作业并在下次次作业完成后⾃动触发。 ⽀持并行调度 ⽀持任务分⽚任务分⽚是指将⼀个任务分成多个⼩任务在多个实例同时执⾏。 作业分片一致性 当任务被分⽚后保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。 支持作业生命周期操作 可以动态对任务进⾏开启及停⽌操作。 丰富的作业类型 ⽀持Simple、DataFlow、Script三种作业类型 系统架构图 2.Elastic-Job快速入门 2.1 环境搭建 2.1.1 版本02.要求 JDK 要求1.7以上版本 Maven 要求3.0.4及以上版本 Zookeeper 要求采取3.4.6以上版本 2.1.2 Zookeeper安装运行 1. 解压zookeeper-3.4.11.tar.gz, 进入conf目录, 复制zoo_sample.cfg文件, 命名为:zoo.cfg 2. 进入bin目录, 运行zkServer.cmd就可以了. 3. 解压ZooInspector.zip, 运行jar文件zookeeper客户端可视化工具 2.1.3 创建Maven项目 添加如下依赖 dependencygroupIdcom.dangdang/groupIdartifactIdelastic-job-lite-core/artifactIdversion2.1.5/version /dependency2.2 代码实现 2.2.1 任务类 package com.xiaoge;import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob;import java.util.Date;public class MyElasticJob implements SimpleJob {public void execute(ShardingContext shardingContext) {System.out.println(定时任务开始 new Date());} }2.2.2 配置类 package com.xiaoge;import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class JobDemo {public static void main(String[] args) {// JobScheduler(注册中心对象 任务配置对象)new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}// 注册中心private static CoordinatorRegistryCenter createRegistryCenter() {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration new ZookeeperConfiguration(localhost:2181, elastic-job-demo);// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter(zookeeper地址, 项目名)CoordinatorRegistryCenter regCenter new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}// 定时任务配置private static LiteJobConfiguration createJobConfiguration() {// 定义作业核⼼配置 newBuilder(任务名称, cron表达式, 分片数量)JobCoreConfiguration simpleCoreConfig JobCoreConfiguration.newBuilder(myElasticJob, 0/10 * * * * ?, 1).build();// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()---获取这个类的权限定类名SimpleJobConfiguration simpleJobConfig new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;} }2.2.3 测试 运行单个程序查看是否按照cron表达式的内容进⾏任务的调度 运行多个程序查看是否只会有⼀个实例进⾏任务调度 运行多个程序后把正在进行任务调度的进程关掉查看其它进程是否能继续进⾏任务调度 3.SpringBoot集成Elastic-Job 3.1 添加Maven依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.xiaoge/groupIdartifactIdelastic-job-boot/artifactIdversion1.0-SNAPSHOT/versionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.1.3.RELEASE/version/parentpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdcom.dangdang/groupIdartifactIdelastic-job-lite-spring/artifactIdversion2.1.5/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency/dependencies/project3.2 相关配置 因为配置中心的地址并不是固定的所以我们应该把这个地址信息配置在配置文件中所以在配置⽂件 application.yml中添加配置如下: elasticjob:url: localhost:2181group-name: elastic-job-bootzk注册中心配置类: Bean public CoordinatorRegistryCenter registryCenter(Value(${elasticjob.url}) String zookeeperUrl, Value(${elasticjob.group-name}) String groupName) {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration new ZookeeperConfiguration(zookeeperUrl, groupName);// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter(zookeeper地址, 项目名)CoordinatorRegistryCenter regCenter new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter; }任务调度配置类: /*** todo 注意这个方法不能交给 spring 管理, 你要让它是个公共的方法,* 传递不同的jobName(任务名称), cron(cron表达式), shardingTotalCount(分片数量) 生成不同的LiteJobConfiguration, 因为环境不同任务配置不同.* 也有可能别的任务需要这个方法创建* return*/ public LiteJobConfiguration createJobConfiguration(Class? clazz, String cron, Integer shardingTotalCount, String shardingParam) {// 定义作业核⼼配置 newBuilder(任务名称, cron表达式, 分片数量)JobCoreConfiguration.Builder jobBuilder JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingTotalCount);if (!StringUtils.isEmpty(shardingParam)) {// 分片参数jobBuilder jobBuilder.shardingItemParameters(shardingParam);}// SimpleJob配置// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()---获取这个类的权限定类名SimpleJobConfiguration simpleJobConfiguration new SimpleJobConfiguration(jobBuilder.build(), clazz.getCanonicalName());// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();return simpleJobRootConfig; }4.案例需求 需求:数据库中有⼀些列的数据需要对这些数据进行备份操作备份完之后修改数据的状态标记已 经备份了. 4.1 初始化数据 在数据库中导⼊ elastic-job-demo.sql 数据 4.2 集成DruidMyBatis 4.2.1 添加依赖 dependencygroupIdcom.alibaba/groupIdartifactIddruid/artifactIdversion1.1.10/version /dependency dependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter/artifactIdversion1.2.0/version /dependency !--mysql驱动-- dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactId /dependency4.2.2 添加配置 spring:datasource:url: jdbc:mysql://localhost:3306/elastic-job-demo?serverTimezoneGMT%2B8driverClassName: com.mysql.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceusername: rootpassword: root4.2.3 添加实体类 package com.xiaoge.domain;import lombok.Data;Data public class FileCustom {//唯⼀标识private Long id;//⽂件名private String name;//⽂件类型private String type;//⽂件内容private String content;//是否已备份private Boolean backedUp false;public FileCustom() {}public FileCustom(Long id, String name, String type, String content) {this.id id;this.name name;this.type type;this.content content;} }4.2.4 添加Mapper处理类 package com.xiaoge.mapper;import com.xiaoge.domain.FileCustom; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update;import java.util.List;Mapper public interface FileCustomMapper {Select(select * from t_file_custom where backedUp 0)ListFileCustom selectAll();Update(update t_file_custom set backedUp #{state} where id #{id})int changeState(Param(id) Long id, Param(state) int state); }4.3 业务功能实现 4.3.1 添加任务类 package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.xiaoge.domain.FileCustom; import com.xiaoge.mapper.FileCustomMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.util.List; import java.util.concurrent.TimeUnit;Slf4j Component public class FileCustomElasticJob implements SimpleJob {Autowiredprivate FileCustomMapper fileCustomMapper;Overridepublic void execute(ShardingContext shardingContext) {doWork();}private void doWork(){ListFileCustom fileList fileCustomMapper.selectAll();System.out.println(需要备份⽂件个数:fileList.size());for(FileCustom fileCustom:fileList){backUpFile(fileCustom);}}private void backUpFile(FileCustom fileCustom){try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(执⾏⽂件备份fileCustom);fileCustomMapper.changeState(fileCustom.getId(),1);} }4.3.2 添加任务调度配置 在配置类中新增这个Bean /*** todo 注意一个ElasticJob里面不管有多少实例, 只会有一个被调度, 那就是zookeeper选出来的leader* param myElasticJob* param regCenter* return*/ Bean(initMethod init) public SpringJobScheduler initSpringScheduler(ElasticJob myElasticJob, CoordinatorRegistryCenter regCenter) {LiteJobConfiguration simpleJobRootConfig createJobConfiguration(myElasticJob.getClass(), 0/10 * * * * ?, 1);return new SpringJobScheduler(myElasticJob, regCenter, simpleJobRootConfig); }4.4 测试问题 为了高可用我们会对这个项⽬做集群的操作,可以保证其中⼀台挂了,另外⼀台可以继续⼯作.但是在集 群的情况下,调度任务只在⼀台机器上运行如果单个任务调度⽐较耗时耗资源的情况下对这台机器 的消耗还是比较大的 但是这个时候其他机器却是空闲着的.如何合理的利用集群的其他机器且如何让任务执行得更快些呢 这时候Elastic-Job提供了任务调度分片的功能. 5.分片概念 作业分片是指任务的分布式执行需要将⼀个任务拆分为多个独立的任务项然后由分布式的应用实 例分别执行某⼀个或者几个分布项。 例如Elastic-Job快速入门中文件备份的案例现有两台服务器每台服务器分别跑⼀个应用实例。 为了快速执行作业那么可以讲任务分成4片每个应⽤实例都执行两片。作业遍历数据逻辑应为实例 1查找text和image类型⽂件执⾏备份实例2查找radio和vedio类型⽂件执⾏备份。如果由于服务器拓 容应⽤实例数量增加为4则作业遍历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的 ⽂件。 例如Elastic-Job快速入门中文件备份的案例现有两台服务器每台服务器分别跑⼀个应⽤实例。 为了快速执行作业那么可以讲任务分成4片每个应⽤实例都执行两片。作业遍历数据逻辑应为实例 1查找text和image类型文件执行备份实例2查找radio和vedio类型文件执行备份。如果由于服务器拓 容应⽤实例数量增加为4则作业遍历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的 文件。 分片项与业务处理解耦 Elastic-Job并不直接提供数据处理的功能,框架只会将分⽚项分配⾄各个运⾏中的作业服务器开发者 需要自行处理分⽚项与真实数据的对应关系 最大限度利用资源 将分片项设置大于服务器的数据最好是⼤于服务器倍数的数量作业将会合理利⽤分布式资源动态 的分配分片项. 例如: 3台服务器分成10片则分片项结果为服务器A0,1,2;服务器B3,4,5;服务器C6,7,8,9.如果 服务器C奔溃则分片项分配结果为服务器A0,1,2,3,4;服务器B5,6,7,8,9.在不丢失分⽚项的情况下最大限度利⽤现有的资源提高吞吐量. 6.案例改造成任务分片 6.1 配置类修改 在任务配置类中增加分片个数以及分片参数. Bean(initMethod init) public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJobfileCustomElasticJob){SpringJobScheduler springJobScheduler new SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(FileCustomElasticJob.class,0 0/1 * * *?,4,0text,1image,2radio,3vedio));return springJobScheduler; }6.2 新增作业分片逻辑 package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.xiaoge.domain.FileCustom; import com.xiaoge.mapper.FileCustomMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.util.List; import java.util.concurrent.TimeUnit;Slf4j Component public class FileCustomElasticJob implements SimpleJob {Autowiredprivate FileCustomMapper fileCustomMapper;Overridepublic void execute(ShardingContext shardingContext) {long threadId Thread.currentThread().getId();log.info(线程ID: {}, 任务的名称: {}, 任务参数: {}, 分片个数: {}, 分片索引号: {}, 分片参数: {},threadId,shardingContext.getJobName(),shardingContext.getJobParameter(),shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(),shardingContext.getShardingParameter());doWork(shardingContext.getShardingParameter());}private void doWork(String shardingParameter) {ListFileCustom fileList fileCustomMapper.selectFileCustomByType(shardingParameter);log.info(需要备份⽂件个数{}: {}, shardingParameter, fileList.size());for (FileCustom fileCustom : fileList) {backUpFile(fileCustom);}}private void backUpFile(FileCustom fileCustom) {try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(执⾏⽂件备份 fileCustom);fileCustomMapper.changeState(fileCustom.getId(), 1);} } 6.3 Mapper类修改 package com.xiaoge.mapper;import com.xiaoge.domain.FileCustom; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update;import java.util.List;Mapper public interface FileCustomMapper {Select(select * from t_file_custom where backedUp 0)ListFileCustom selectAll();Select(select * from t_file_custom where backedUp 0 and type #{type})ListFileCustom selectFileCustomByType(Param(type) String type);Update(update t_file_custom set backedUp #{state} where id #{id})int changeState(Param(id) Long id, Param(state) int state); }6.4 测试 只有⼀台机器的情况下任务分片是如何执行的 有多台机器的情况下任务分片是如何执行的 7.Dataflow类型调度任务 Dataflow类型的定时任务需要实现Dataflowjob接⼝该接⼝提供2个⽅法供覆盖分别⽤于抓取 fetchData和处理processData数据我们继续对例⼦进⾏改造。 Dataflow类型的定时任务需要实现Dataflowjob接⼝该接⼝提供2个⽅法供覆盖分别⽤于抓取 fetchData和处理processData数据我们继续对例子进行改造。 7.1 任务类 package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.dataflow.DataflowJob; import com.xiaoge.domain.FileCustom; import com.xiaoge.mapper.FileCustomMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component;import java.util.List; import java.util.concurrent.TimeUnit;/*** TODO 处理大数据量的时候用那个DataFlow这种方式** author a hrefmailto:1330137071qq.comZhang Xiao/a* since*/ Component public class FileDataFlowJob implements DataflowJobFileCustom {Autowiredprivate FileCustomMapper fileCustomMapper;// 抓取数据Overridepublic ListFileCustom fetchData(ShardingContext shardingContext) {System.out.println(开始抓取数据...........);return fileCustomMapper.selectLimit(shardingContext.getShardingParameter(), 2);}// 处理数据Overridepublic void processData(ShardingContext shardingContext, ListFileCustom fileCustomList) {fileCustomList.forEach(fileCustom - {backUpFile(fileCustom);});}private void backUpFile(FileCustom fileCustom) {System.out.println(备份的方法名: fileCustom.getName() 备份的类型: fileCustom.getType());try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(执⾏⽂件备份 fileCustom);fileCustomMapper.changeState(fileCustom.getId(), 1);} } 7.2 配置类 package com.xiaoge.config;import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.JobTypeConfiguration; import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.event.JobEventConfiguration; import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import com.xiaoge.service.FileCustomElasticJob; import com.xiaoge.service.FileDataFlowJob; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;/*** TODO** author a hrefmailto:1330137071qq.comZhang Xiao/a* since*/ Configuration public class ElasticJobConfig {/*** todo 注意一个ElasticJob里面不管有多少实例, 只会有一个被调度, 那就是zookeeper选出来的leader* param myElasticJob* param regCenter* return*/ // Bean(initMethod init) // public SpringJobScheduler testScheduler(ElasticJob myElasticJob, CoordinatorRegistryCenter regCenter) { // LiteJobConfiguration simpleJobRootConfig createJobConfiguration(myElasticJob.getClass(), 0/10 * * * * ?, 1); // return new SpringJobScheduler(myElasticJob, regCenter, simpleJobRootConfig); // }// Bean(initMethod init) // public SpringJobScheduler fileScheduler(FileCustomElasticJob fileCustomElasticJob, CoordinatorRegistryCenter regCenter){ // SpringJobScheduler springJobScheduler new SpringJobScheduler(fileCustomElasticJob,regCenter,createJobConfiguration(fileCustomElasticJob.getClass(),0 0/1 * * * ?,4, 0text,1image,2radio,3vedio, false)); // return springJobScheduler; // }Bean(initMethod init)public SpringJobScheduler fileDataFlowScheduler(FileDataFlowJob fileDataFlowJob, CoordinatorRegistryCenter regCenter){SpringJobScheduler springJobScheduler new SpringJobScheduler(fileDataFlowJob,regCenter,createJobConfiguration(fileDataFlowJob.getClass(),0 0/1 * * * ?,4, 0text,1image,2radio,3vedio, true));return springJobScheduler;}// Bean(initMethod init) // public SpringJobScheduler test1Scheduler(ElasticJob myElasticJob1, CoordinDataRevisionatorRegistryCenter regCenter) { // LiteJobConfiguration simpleJobRootConfig createJobConfiguration(myElasticJob1.getClass(), 0/3 * * * * ?, 1); // return new SpringJobScheduler(myElasticJob1, regCenter, simpleJobRootConfig); // }Beanpublic CoordinatorRegistryCenter registryCenter(Value(${elasticjob.url}) String zookeeperUrl, Value(${elasticjob.group-name}) String groupName) {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration new ZookeeperConfiguration(zookeeperUrl, groupName);// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter(zookeeper地址, 项目名)CoordinatorRegistryCenter regCenter new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}/*** todo 注意这个方法不能交给 spring 管理, 你要让它是个公共的方法,* 传递不同的jobName(任务名称), cron(cron表达式), shardingTotalCount(分片数量) 生成不同的LiteJobConfiguration, 因为环境不同任务配置不同.* 也有可能别的任务需要这个方法创建* return*/public LiteJobConfiguration createJobConfiguration(Class? clazz, String cron, Integer shardingTotalCount, String shardingParam, boolean isDataFlow) {// 定义作业核⼼配置 newBuilder(任务名称, cron表达式, 分片数量)JobCoreConfiguration.Builder jobBuilder JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingTotalCount);if (!StringUtils.isEmpty(shardingParam)) {// 分片参数jobBuilder jobBuilder.shardingItemParameters(shardingParam);}JobTypeConfiguration jobConfiguration;if (isDataFlow) {// DataflowJob配置jobConfiguration new DataflowJobConfiguration(jobBuilder.build(), clazz.getCanonicalName(), true);} else {// SimpleJob配置// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()---获取这个类的权限定类名jobConfiguration new SimpleJobConfiguration(jobBuilder.build(), clazz.getCanonicalName());}// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig LiteJobConfiguration.newBuilder(jobConfiguration).overwrite(true).build();return simpleJobRootConfig;}} 7.3 测试 8.运维管理 8.1 事件追踪 Elastic-Job-Lite在配置中提供了JobEventConfiguration,⽀持数据库⽅式配置会在数据库中⾃动创建 JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若⼲索引来近路作业的相关信息。 8.1.1 修改Elastic-Job配置类 在ElasticJobConfig配置类中注⼊DataSource Configuration public class ElasticJobConfig {Autowiredprivate DataSource dataSource;...... }在任务配置中增加事件追踪配置 Bean(initMethod init)public SpringJobScheduler fileDataFlowScheduler(FileDataFlowJob fileDataFlowJob, CoordinatorRegistryCenter regCenter){// 日志监控, 它会自动在数据库生成两张表job_execution_log/job_status_trace_log// 配置会在任务执行的时间将任务执行的情况存储到数据源中JobEventConfiguration jobEventConfiguration new JobEventRdbConfiguration(dataSource);SpringJobScheduler springJobScheduler new SpringJobScheduler(fileDataFlowJob,regCenter,createJobConfiguration(fileDataFlowJob.getClass(),0 0/1 * * * ?,4, 0text,1image,2radio,3vedio, true), jobEventConfiguration);return springJobScheduler; }8.1.2 日志信息表 启动后会发现在elastic-job-demo数据库中新增以下两张表 job_execution_log 记录每次作业的执行历史分为两个步骤 1.作业开始执⾏时间想数据库插⼊数据. 2.作业完成执⾏时向数据库更新数据更新is_success,complete_time和failure_cause(如果任务执行失败) job_status_trace_log 记录作业状态变更痕迹表可通过每次作业运行的task_id查询作业状态变化的⽣命轨迹和运行轨迹. 8.2 运维控制台 elastic-job中提供了⼀个elastic-job-lite-console控制台 设计理念 1.本 控制台和Elastic-Job并⽆直接关系是通过读取Elastic-Job的注册中心数据展示作业状态或更新注 册中心数据修改全局配置。 2.控制台只能控制任务本身是否运行但不能控制作业进程的启停因为控制台和作业本身服务器是完 全分布式的控制台并不能控制作业服务器。 主要功能: 1.查看作业以及服务器状态 2.快捷的修改以及删除作业配置 3.启用和禁用作业 4.跨注册中心查看作业 5.查看作业运行轨迹和运行状态 不支持项 1.添加作业因为作业都是在首次运行时自动添加使用控制台添加作业并无必要.直接在作业服务器启 动包含Elasitc-Job的作业进程即可。 8.2.1 搭建步骤 解压缩 elastic-job-lite-console-2.1.5.tar 进⼊bin⽬录并执⾏: bin\start.bat打开浏览器访问 http://localhost:8899 ⽤户名: root 密码: root,进⼊之后界⾯如下: 提供两种⽤户管理员和访客管理员拥有全部操作权限访客仅拥有查看权限。默认管理员账号和密码是root/root,访客⽤户名和密码是guest/guest,通过conf\auth.properties可以修改管理员以及访客⽤ 户名及密码 8.2.2 配置及使用 配置注册中心地址 先启动zookeeper然后再注册中心配置界面点添加 点击提交后然后点连接zookeeper必须处于启动状态 连接成功后在作业纬度下可以显示该命名空间作业名称分⽚数量及该作业的cron表达式等信息 在服务器纬度可以查看到服务器ip,当前运⾏的是实例数作业总数等信息。 添加数据库连接之后可以查看任务的执行结果 然后在作业历史中就可以看到任务执行历史了。 demo下载地址: https://download.csdn.net/download/zsx1314lovezyf/88282573 ​ ​
http://www.sadfv.cn/news/86982/

相关文章:

  • 自助服务系统网站贵州建网站报价
  • 淮南先锋网谷歌seo运营
  • 注册网站域名平台做网站的复式照片
  • 提供微网站建设网站建设做微营销
  • 什么网站做的最好wordpress 调用最新文章
  • 广告行业包括网站建设吗东道设计公司待遇如何
  • 网站设计配色怎么做中国建设银行贷款官网站
  • 新农村建设投诉在哪个网站国外优秀创意的个人网页设计欣赏
  • 网站备案部门网站建设不好
  • 专业的网站建设设计价格资讯门户 wordpress
  • 书店网站策划书优质做网站公司
  • 做网站需要什么电脑配置网站平台都有哪些
  • 如何再网站上做免费广告词wordpress 文章内
  • 一家专门做灯的网站wordpress用户密码表
  • sdcms网站源码上海网站建设公司招人
  • app网站开发流程图大气集团网站模板
  • 自己建立公司网站网站建设三合一
  • 免费私人网站建设平台网站显示建设中
  • 广告营销网络优化工程师主要负责什么工作
  • 营销网站建设模板html编辑器推荐
  • 图书类网站开发的背景福州有名的公司网站设计
  • 公司做网站的流程作图的步骤商城网站的基本功能
  • 公司网站集资网站开发人员犯法么网站建设需要多少内存
  • 网站的开发与建设wordpress getfooter
  • 网站建设报价单 文库青岛网站制作辰星辰
  • 天津制作网站宁波seo关键词优化教程
  • 兼职网站排行h5网站设计欣赏
  • dw可以做h5网站网站建设销售客户疑问
  • 做特卖网站广告策划案优秀案例
  • 服务器网站建设教程视频教程wordpress 文章 标题