xxl-job本质上是一个开源的springboot项目,修改源码会比较容易

优缺点

优点:
部署比较简单
有可视化操作界面,调试比较方便
缺点:
日志读写操作频繁,表中的数据很容易堆积的特别多,可以改源码解决
无官方提供的sdk,需要自己封装rest api(官方提供了一个类,方便封装api XxlJobRemotingUtil)

下载使用

可下载源码 https://gitee.com/xuxueli0323/xxl-job

下载并打开后的项目结构
image.png
主要关注的
admin——用于部署的任务调度中心,修改好配置文件直接启动即可
core——xxljob的核心源码
executor-samples——对接xxljob的实例,有springboot版和非框架版
table_xxl_job.sql——xxljob启动时需要的表结构sql

控制台搭建

将xxl-job-admin配置文件修改好之后可以直接启动
值得 注意的一点 是 accessToken,这里要与所有对接的服务保持一致
image.png

默认初始的账号密码是admin/123456,想要改密码可以直接改数据库

进入界面首先要根据项目新建执行器,一个微服务对应一个执行器
image.png

Docker 搭建

  1. docker run \
  2. -e PARAMS=" \
  3. --server.port=8080 \
  4. --server.servlet.context-path=/xxl-job-admin \
  5. --spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai \
  6. --spring.datasource.username=root \
  7. --spring.datasource.password=1234
  8. --xxl.job.accessToken=1234 " \
  9. -p 9050:8080 \
  10. -v /usr/local/xxljob/applogs:/data/applogs \
  11. -name xxl-job-admin \
  12. -d xuxueli/xxl-job-admin:2.3.0

spring boot项目对接

现在创建一个springboot项目对接,具体版本以下载的为准

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.0</version>
</dependency>

读取yml配置文件注册执行器

将服务的执行器注册上去

xxl:
  job:
    # 执行器通讯TOKEN [选填]:非空时启用
    accessToken: 1234
    admin:
      addresses: http://127.0.0.1:8080/xxl-job-admin
    executor:
      # 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
      address: ''
      appname: ${spring.application.name}
      ip: 127.0.0.1
      logpath: ./jobhandler
      logretentiondays: 7
      # 执行器端口
      port: 10610

对应配置文件的实体类,写好配置类才能顺利注册上去,可以直接复制xxljob的官方事例的代码


@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */


}

配置无误 将服务启动就可以在控制台看到注册的执行器
image.png
在控制台的管理界面新建一个任务,注意 jobHandler的值与代码的注解对应,任务参数长度有限,如果要传大数据json就要改表结构
image.png

任务触发时调用方法

@XxlJob的值与任务的JobHandler对应

@Slf4j
@Component
public class DemoJob {

    @XxlJob("demo")
    public void run() {
        //获取参数
        String jobParam = XxlJobHelper.getJobParam();
        log.info("调度成功,参数:{}", jobParam);
    }

}

xxljob api封装

以上的任务都需要手动在页面创建才能触发任务调度,不实用,我们需要在代码中动态创建

用来读取配置文件的实体类

@ConfigurationProperties(prefix = "xxl.job")
@Data
public class XxlJobProperties {

    private Admin admin = new Admin();

    /**
     * 执行器通讯TOKEN [选填]:非空时启用
     */
    private String accessToken;

    /**
     * 调用xxl-job的超时时间,单位秒
     */
    private Integer timeout = 3;

    private Executor executor = new Executor();

    @Data
    public static class Admin {
        /**
         * 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
         */
        private String addresses;
    }

    @Data
    public static class Executor {

        /**
         * 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
         */
        private String address;

        /**
         * 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
         */
        private String ip;

        /**
         * 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
         */
        private String appname;

        /**
         * 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
         */
        private Integer port = 9999;

        /**
         * 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
         */
        private String logpath;

        /**
         * 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
         */
        private Integer logretentiondays = 7;

    }

}

yml的配置可以直接获取

xxl:
  job:
    accessToken: xxxxxxxxxxx
    admin:
      addresses: http://127.0.0.1:8080/xxl-job-admin
    executor:
      address: ''
      appname: ${spring.application.name}
      ip: 127.0.0.1
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 7
      port: 10610

xxljob rest api的参数实体类

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class XxlJobInfo {

    /**
     * 主键ID
     */
    private int id;

    /**
     * appname用来获取jobGroup
     */
    private String appName;

    /**
     * 执行器主键ID
     */
    private int jobGroup;

    /**
     * 任务描述
     */
    private String jobDesc;

    /**
     * 负责人
     */
    private String author;

    /**
     * 报警邮箱
     */
    private String alarmEmail;

    /**
     * 调度类型(CRON)
     */
    private String scheduleType;

    /**
     * 调度配置,值含义取决于调度类型
     */
    private String scheduleConf;

    /**
     * 调度过期策略
     */
    private String misfireStrategy;

    /**
     * 执行器路由策略
     */
    private String executorRouteStrategy;


    /**
     * 执行器,任务参数
     */
    private String executorParam;

    /**
     * 阻塞处理策略
     */
    private String executorBlockStrategy;

    /**
     * 任务执行超时时间,单位秒
     */
    private int executorTimeout;

    /**
     * 失败重试次数
     */
    private int executorFailRetryCount;

    /**
     * GLUE类型    #com.xxl.job.core.glue.GlueTypeEnum,一般为BEAN
     */
    private String glueType;

    /**
     * 执行器,任务Handler名称
     */
    private String executorHandler;

    /**
     * GLUE源代码
     */
    private String glueSource;

    /**
     * GLUE备注
     */
    private String glueRemark;


}

封装xxljob的rest api的工具类

将任务的增删改查 触发 暂停封装

public class XxlJobComponent {

    private String addressUrl;

    private String accessToken;

    private String appName;

    private int timeout;

    private static final int SUCCESS_CODE = 200;

    private static final String URL_SEPARATION = "/";

    public XxlJobComponent(String addressUrl, String accessToken, String appName, int timeout) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;
        if (!this.addressUrl.endsWith(URL_SEPARATION)) {
            this.addressUrl = this.addressUrl + URL_SEPARATION;
        }
        this.appName = appName;
        this.timeout = timeout;
    }

    /**
     * 新增任务
     *
     * @param jobDesc    任务描述
     * @param cron       cron表达式
     * @param param      参数
     * @param jobHandler JobHandler
     * @return jobId
     * @author yudi
     * @date 2021/12/14 15:03
     */
    public boolean addJob(String jobDesc, String cron, String param, String jobHandler) {
        XxlJobInfo jobInfo = buildJobInfo(null, jobDesc, cron, param, jobHandler);
        ReturnT<String> result = XxlJobRemotingUtil.postBody(addressUrl + "jobinfo/api/add", this.accessToken, this.timeout, jobInfo, String.class);
        if (result.getCode() != SUCCESS_CODE) {
            throw new IllegalArgumentException("add_job_error");
        }
        int jobId = Integer.parseInt(result.getContent());
        return jobId != 0;
    }

    /**
     * 更新任务
     *
     * @param jobId      任务id
     * @param jobDesc    任务描述
     * @param cron       cron表达式
     * @param param      参数
     * @param jobHandler JobHandler
     * @author yudi
     * @date 2021/12/14 15:05
     */
    public void updateJob(Integer jobId, String jobDesc, String cron, String param, String jobHandler) {
        XxlJobInfo jobInfo = buildJobInfo(jobId, jobDesc, cron, param, jobHandler);
        ReturnT<String> result = XxlJobRemotingUtil.postBody(addressUrl + "jobinfo/api/update", this.accessToken, this.timeout, jobInfo, String.class);
        if (result.getCode() != SUCCESS_CODE) {
            throw new IllegalArgumentException("update_job_error");
        }
    }

    /**
     * 删除任务
     *
     * @param jobId 资源id
     * @author yudi
     * @date 2021/12/14 15:06
     */
    public void removeJob(Integer jobId) {
        ReturnT<String> result = XxlJobRemotingUtil.postBody(addressUrl + "jobinfo/api/remove", this.accessToken, this.timeout, jobId, String.class);
        if (result.getCode() != SUCCESS_CODE) {
            throw new IllegalArgumentException("remove_job_error");
        }
    }

    /**
     * 开始任务
     *
     * @param jobId 资源id
     * @author yudi
     * @date 2021/12/14 15:06
     */
    public void startJob(Integer jobId) {
        ReturnT<String> result = XxlJobRemotingUtil.postBody(addressUrl + "jobinfo/api/start", this.accessToken, this.timeout, jobId, String.class);
        if (result.getCode() != SUCCESS_CODE) {
            throw new IllegalArgumentException("start_job_error");
        }
    }

    /**
     * 停止任务
     *
     * @param jobId 资源id
     * @author yudi
     * @date 2021/12/14 15:07
     */
    public void stopJob(Integer jobId) {
        ReturnT<String> result = XxlJobRemotingUtil.postBody(addressUrl + "jobinfo/api/stop", this.accessToken, this.timeout, jobId, String.class);
        if (result.getCode() != SUCCESS_CODE) {
            throw new IllegalArgumentException("stop_job_error");
        }
    }

    /**
     * 触发任务
     *
     * @param jobId 资源id
     * @author yudi
     * @date 2021/12/14 15:07
     */
    public void triggerJob(Integer jobId) {
        ReturnT<String> result = XxlJobRemotingUtil.postBody(addressUrl + "jobinfo/api/trigger", this.accessToken, this.timeout, jobId, String.class);
        if (result.getCode() != SUCCESS_CODE) {
            throw new IllegalArgumentException("trigger_job_error");
        }
    }

    private XxlJobInfo buildJobInfo(Integer id, String jobDesc, String cron, String param, String jobHandler) {
        XxlJobInfo jobInfo = new XxlJobInfo();
        if (id != null) {
            jobInfo.setId(id);
        }
        jobInfo.setAppName(appName);
        jobInfo.setJobDesc(jobDesc);
        jobInfo.setAuthor("yudi");
        jobInfo.setScheduleType("CRON");
        jobInfo.setScheduleConf(cron);
        jobInfo.setExecutorRouteStrategy("ROUND");
        jobInfo.setMisfireStrategy("DO_NOTHING");
        jobInfo.setGlueType("BEAN");
        jobInfo.setExecutorHandler(jobHandler);
        jobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
        jobInfo.setExecutorParam(param);
        return jobInfo;
    }
}

springboot自动配置类

@Configuration
@EnableConfigurationProperties(XxlJobProperties.class)
public class XxlJobAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public XxlJobSpringExecutor xxlJobExecutor(XxlJobProperties xxlJobProperties) {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(xxlJobProperties.getAdmin().getAddresses());
        xxlJobSpringExecutor.setAppname(xxlJobProperties.getExecutor().getAppname());
        xxlJobSpringExecutor.setIp(xxlJobProperties.getExecutor().getIp());
        xxlJobSpringExecutor.setPort(xxlJobProperties.getExecutor().getPort());
        xxlJobSpringExecutor.setAccessToken(xxlJobProperties.getAccessToken());
        xxlJobSpringExecutor.setLogPath(xxlJobProperties.getExecutor().getLogpath());
        xxlJobSpringExecutor.setLogRetentionDays(xxlJobProperties.getExecutor().getLogretentiondays());
        return xxlJobSpringExecutor;
    }

    @Bean
    public XxlJobComponent xxlJobComponent(XxlJobProperties xxlJobProperties) {
        int timeout = xxlJobProperties.getTimeout() == null ? 3 :  xxlJobProperties.getTimeout();
        return new XxlJobComponent(xxlJobProperties.getAdmin().getAddresses(), xxlJobProperties.getAccessToken(), xxlJobProperties.getExecutor().getAppname(), timeout);
    }
}

这样就可以直接在springboot项目中使用XxlJobComponent注入的对象调用api了