基于 SpringBoot 版的 xxl-job2.0 版本

    xxl-job 执行器注册,是执行器发起的,这里就从执行器的配置文件开始阅读:

    入口:XxlJobConfig 配置类中的 xxlJobExecutor 实例化中:

    1. // 初始化XxlJobSpringExecutor(Spring项目)
    2. @Bean(initMethod = "start", destroyMethod = "destroy")
    3. public XxlJobSpringExecutor xxlJobExecutor() {
    4. logger.info(">>>>>>>>>>> xxl-job config init.");
    5. XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    6. xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    7. xxlJobSpringExecutor.setAppName(appName);
    8. xxlJobSpringExecutor.setIp(ip);
    9. xxlJobSpringExecutor.setPort(port);
    10. xxlJobSpringExecutor.setAccessToken(accessToken);
    11. xxlJobSpringExecutor.setLogPath(logPath);
    12. xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    13. return xxlJobSpringExecutor;
    14. }

    项目启动的时候会实例化 xxlJobExecutor,实例化成功之后调用其内部的 start 方法:

    1. public void start() throws Exception {
    2. // 初始化 JobHandler Repository
    3. initJobHandlerRepository(applicationContext);
    4. // refresh GlueFactory
    5. GlueFactory.refreshInstance(1);
    6. // super start
    7. super.start();
    8. }

    initJobHandlerRepository(applicationContext); 的过程也就是扫描项目中所有的 JobHandler,然后注册到本地缓存中,也就是放在 ConcurrentHashMap 本地内存中;

    1. private void initJobHandlerRepository(ApplicationContext applicationContext){
    2. if (applicationContext == null) {
    3. return;
    4. }
    5. // 拿到所有有JobHandler注解的所有Bean
    6. Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
    7. if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
    8. for (Object serviceBean : serviceBeanMap.values()) {
    9. if (serviceBean instanceof IJobHandler){
    10. // 拿到我们定义的JobHandler的名称
    11. String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
    12. IJobHandler handler = (IJobHandler) serviceBean;
    13. if (loadJobHandler(name) != null) {
    14. throw new RuntimeException("xxl-job jobhandler naming conflicts.");
    15. }
    16. registJobHandler(name, handler);
    17. }
    18. }
    19. }
    20. }

    registJobHandler(name, handler);就是真正的把遍历的所有 JobHandler 放到本地内存中:

    1. // --------XxlJobExecutor类-------- job handler repository ----------------------
    2. private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
    3. public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    4. logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    5. return jobHandlerRepository.put(name, jobHandler);
    6. }
    7. public static IJobHandler loadJobHandler(String name){
    8. return jobHandlerRepository.get(name);
    9. }

    接着走 xxlJobExecutor 中的 start 方法: GlueFactory.refreshInstance(1);

    /////…….待阅读

    接下来继续走 xxlJobExecutor 中的 start 方法:super.start(); 即 父类 XxlJobExecutor 中的 start 方法:

    1. public void start() throws Exception {
    2. // 初始化日志路径
    3. XxlJobFileAppender.initLogPath(logPath);
    4. // init admin-client
    5. initAdminBizList(adminAddresses, accessToken);
    6. // init JobLogFileCleanThread
    7. JobLogFileCleanThread.getInstance().start(logRetentionDays);
    8. // init TriggerCallbackThread
    9. TriggerCallbackThread.getInstance().start();
    10. // init executor-server
    11. port = port>0?port: NetUtil.findAvailablePort(9999);
    12. ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    13. initRpcProvider(ip, port, appName, accessToken);
    14. }

    初始化日志路径比较简单,这里不再赘述;

    initAdminBizList(adminAddresses, accessToken); 比较重要了,就是初始化 admin-client ,

    1. // ---------------------- admin-client (rpc invoker) ----------------------
    2. private static List<AdminBiz> adminBizList;
    3. private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    4. if (adminAddresses!=null && adminAddresses.trim().length()>0) {
    5. for (String address: adminAddresses.trim().split(",")) {
    6. if (address!=null && address.trim().length()>0) {
    7. String addressUrl = address.concat(AdminBiz.MAPPING);
    8. if (addressUrl.startsWith("http://")) {
    9. addressUrl = addressUrl.replace("http://", "");
    10. }
    11. if (addressUrl.startsWith("https://")) {
    12. addressUrl = addressUrl.replace("https://", "");
    13. }
    14. // 这里返回的已经不是真正的AdminBiz,而是AdminBiz的代理类
    15. // 遍历所有调度中心地址,对每一个地址都封装一个代理对象
    16. AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
    17. AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
    18. if (adminBizList == null) {
    19. adminBizList = new ArrayList<AdminBiz>();
    20. }
    21. adminBizList.add(adminBiz);
    22. }
    23. }
    24. }
    25. }
    26. public static List<AdminBiz> getAdminBizList(){
    27. return adminBizList;
    28. }

    初始化调度中心的客户端,供后续交互使用;AdminBiz 的内部方法代码如下所示:

    1. public interface AdminBiz {
    2. public static final String MAPPING = "/api";
    3. // ---------------------- callback ----------------------
    4. public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
    5. // ---------------------- registry ----------------------
    6. public ReturnT<String> registry(RegistryParam registryParam);
    7. // ---------------------- registry remove ---------------
    8. public ReturnT<String> registryRemove(RegistryParam registryParam);
    9. }

    我们可以这样理解:这个 AdminBiz 就是调度中心的客户端,我们在执行器项目中首先加入 AdminBiz 这个客户端,然后在执行器项目中就可以通过调用 AdminBiz 客户端里面的方法,从而去触发调用调度中心中的具体实现,也就是说:执行器并没有和调度中心直接交互,而是通过 AdminBiz 这个媒介去和调度中心交互的。

    注意 AdminBiz 的初始化过程:
    AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
    AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
    创建 XxlRpcReferenceBean 对象,传入对象所需要的参数,然后调用 getObject() 方法:

    1. // 创建一个代理对象,invoke方法在这里并不会真正执行,而只是执行初始化操作
    2. // 在所代理的方法被调用的时候才会执行这里的 invoke 方法。
    3. public Object getObject() {
    4. return Proxy.newProxyInstance(Thread.currentThread()
    5. .getContextClassLoader(), new Class[] { iface },
    6. new InvocationHandler() {
    7. @Override
    8. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    9. String className = method.getDeclaringClass().getName();
    10. // filter method like "Object.toString()"
    11. if (Object.class.getName().equals(className)) {
    12. logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
    13. throw new XxlRpcException("xxl-rpc proxy class-method not support");
    14. }
    15. // address
    16. String address = routeAddress();
    17. if (address==null || address.trim().length()==0) {
    18. throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
    19. }
    20. // request 封装请求参数
    21. XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
    22. xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
    23. xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
    24. xxlRpcRequest.setAccessToken(accessToken);
    25. xxlRpcRequest.setClassName(className);
    26. xxlRpcRequest.setMethodName(method.getName());
    27. xxlRpcRequest.setParameterTypes(method.getParameterTypes());
    28. xxlRpcRequest.setParameters(args);
    29. // send 如果采用异步就走如下
    30. if (CallType.SYNC == callType) {
    31. try {
    32. // future set
    33. XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
    34. XxlRpcFutureResponseFactory.setInvokerFuture(xxlRpcRequest.getRequestId(), futureResponse);
    35. // do invoke 这里才是真正的调用,即被代理的真正方法
    36. client.asyncSend(address, xxlRpcRequest);
    37. // future get
    38. XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
    39. if (xxlRpcResponse.getErrorMsg() != null) {
    40. throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
    41. }
    42. return xxlRpcResponse.getResult();
    43. } catch (Exception e) {
    44. throw new XxlRpcException(e);
    45. } finally{
    46. // future remove
    47. XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
    48. }
    49. }
    50. ///这里阅读Bean模式的异步调用,去掉其他模式代码...
    51. }
    52. });
    53. }

    接着继续看 XxlJobExecutor 中的 start 方法:JobLogFileCleanThread.getInstance().start(logRetentionDays);
    这个线程主要做的操作就是定期删除 log ,
    //// 具体源码待阅读….

    接着继续看 XxlJobExecutor 中的 start 方法:
    TriggerCallbackThread.getInstance().start();
    这个线程主要做的操作就是定时监控队列中日志情况
    //// 具体源码待阅读….

    接着继续看 XxlJobExecutor 中的 start 方法:
    initRpcProvider(ip, port, appName, accessToken);
    这个就非常重要了,主要就是初始化 XxlRpcProviderFactory 对象,并且调用其 start 方法,
    这个 start 方法内部逻辑主要是:启动一个定时线程,默认 30s,定时的往调度中心注册自己
    所以说执行器自动注册的定时线程主要就是在这里实现的

    1. private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
    2. // init invoker factory
    3. xxlRpcInvokerFactory = new XxlRpcInvokerFactory();
    4. // init, provider factory
    5. String address = IpUtil.getIpPort(ip, port);
    6. Map<String, String> serviceRegistryParam = new HashMap<String, String>();
    7. serviceRegistryParam.put("appName", appName);
    8. serviceRegistryParam.put("address", address);
    9. xxlRpcProviderFactory = new XxlRpcProviderFactory();
    10. // 初始化XxlRpcProviderFactory中的参数
    11. xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
    12. // add services
    13. xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
    14. // start 调用start方法,去启动一个执行器自动注册的定时线程
    15. xxlRpcProviderFactory.start(); // 具体代码看如下:
    16. }

    执行器启动时自动注册的定时线程:xxlRpcProviderFactory.start() 代码如下:

    1. public void start() throws Exception {
    2. // start server 这里默认启动jetty服务器
    3. server = netType.serverClass.newInstance();
    4. server.setStartedCallback(new BaseCallback() { // serviceRegistry started
    5. @Override
    6. public void run() throws Exception {
    7. // 执行器开始把自己注册到调度中心,启动定时注册线程
    8. if (serviceRegistryClass != null) {
    9. // serviceRegistryClass就是前一步传入的ExecutorServiceRegistry.class对象
    10. serviceRegistry = serviceRegistryClass.newInstance();
    11. // 实例化ExecutorServiceRegistry对象,并调用其start方法,传入注册的参数
    12. // 具体逻辑看如下代码
    13. serviceRegistry.start(serviceRegistryParam);
    14. if (serviceData.size() > 0) {
    15. String ipPort = IpUtil.getIpPort(ip, port);
    16. for (String serviceKey :serviceData.keySet()) {
    17. serviceRegistry.registry(serviceKey, ipPort);
    18. }
    19. }
    20. }
    21. }
    22. });
    23. //// 待阅读....
    24. server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
    25. @Override
    26. public void run() {
    27. // stop registry
    28. if (serviceRegistry != null) {
    29. if (serviceData.size() > 0) {
    30. String ipPort = IpUtil.getIpPort(ip, port);
    31. for (String serviceKey :serviceData.keySet()) {
    32. serviceRegistry.remove(serviceKey, ipPort);
    33. }
    34. }
    35. serviceRegistry.stop();
    36. }
    37. }
    38. });
    39. server.start(this);
    40. }

    ExecutorServiceRegistry 的 start 方法:

    1. public void start(Map<String, String> param) {
    2. // start registry
    3. ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
    4. }

    接着 ExecutorRegistryThread 线程的 start 方法:

    1. public void start(final String appName, final String address){
    2. // 校验入参...
    3. registryThread = new Thread(new Runnable() {
    4. @Override
    5. public void run() {
    6. // registry
    7. while (!toStop) {
    8. try {
    9. // 封装参数对象
    10. RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
    11. // 这里获取 XxlJobExecutor.getAdminBizList() 的值
    12. // 即:执行器启动的时候,初始化的Admin-clinet代理对象列表,
    13. // 所以这里的AdminBiz已经是一个代理对象了
    14. for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
    15. try {
    16. // 这里调用registry方法,会执行代理对象AdminBiz中的invoke方法,
    17. ReturnT<String> registryResult = adminBiz.registry(registryParam);
    18. if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
    19. registryResult = ReturnT.SUCCESS;
    20. logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
    21. break;
    22. } else {
    23. logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
    24. }
    25. } catch (Exception e) {
    26. logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
    27. }
    28. }
    29. } catch (Exception e) {
    30. logger.error(e.getMessage(), e);
    31. }
    32. try {
    33. TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
    34. } catch (InterruptedException e) {
    35. logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
    36. }
    37. }
    38. // registry remove 待分析....
    39. try {
    40. RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
    41. for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
    42. try {
    43. ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
    44. if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
    45. registryResult = ReturnT.SUCCESS;
    46. logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
    47. break;
    48. } else {
    49. logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
    50. }
    51. } catch (Exception e) {
    52. logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
    53. }
    54. }
    55. } catch (Exception e) {
    56. logger.error(e.getMessage(), e);
    57. }
    58. logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
    59. }
    60. });
    61. registryThread.setDaemon(true);
    62. registryThread.start();
    63. }

    �那么这里就开始执行 AdminBiz 类中的 invoke 方法,如下(上面贴过代码了,这里在贴一次):

    1. // 在所代理的方法被调用的时候才会执行这里的 invoke 方法。
    2. public Object getObject() {
    3. return Proxy.newProxyInstance(Thread.currentThread()
    4. .getContextClassLoader(), new Class[] { iface },
    5. new InvocationHandler() {
    6. @Override
    7. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    8. // className便是AdminBiz了
    9. String className = method.getDeclaringClass().getName();
    10. // filter method like "Object.toString()"
    11. if (Object.class.getName().equals(className)) {
    12. logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
    13. throw new XxlRpcException("xxl-rpc proxy class-method not support");
    14. }
    15. // address 是调度中心的地址:127.0.0.1:8098/api
    16. String address = routeAddress();
    17. if (address==null || address.trim().length()==0) {
    18. throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
    19. }
    20. // request 封装请求参数
    21. /**
    22. * 这里参数包括:className: AdminBiz methodName: register
    23. * parameters: 注册信息,如registerKey,registerValue,等
    24. * 还有一个也重要 requestId: 异步操作中通常需要一个唯一标识来衔接请求和响应
    25. */
    26. XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
    27. xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
    28. xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
    29. xxlRpcRequest.setAccessToken(accessToken);
    30. xxlRpcRequest.setClassName(className);
    31. xxlRpcRequest.setMethodName(method.getName());
    32. xxlRpcRequest.setParameterTypes(method.getParameterTypes());
    33. xxlRpcRequest.setParameters(args);
    34. // send 如果采用异步就走如下
    35. if (CallType.SYNC == callType) {
    36. try {
    37. // future set
    38. XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
    39. XxlRpcFutureResponseFactory.setInvokerFuture(xxlRpcRequest.getRequestId(), futureResponse);
    40. // rpc远程调用,地址:127.0.0.1:8098/api,调用调度中心的api方法,并携带如上封装的参数
    41. client.asyncSend(address, xxlRpcRequest);
    42. // future get
    43. XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
    44. if (xxlRpcResponse.getErrorMsg() != null) {
    45. throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
    46. }
    47. return xxlRpcResponse.getResult();
    48. } catch (Exception e) {
    49. throw new XxlRpcException(e);
    50. } finally{
    51. // future remove
    52. XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
    53. }
    54. }
    55. ///这里阅读Bean模式的异步调用,去掉其他模式代码...
    56. }
    57. });
    58. }

    接着那就来到了调度中心的 /api 方法,看看里面是如何执行的:

    1. @RequestMapping(AdminBiz.MAPPING)
    2. @PermessionLimit(limit=false)
    3. public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
    4. XxlJobDynamicScheduler.invokeAdminService(request, response);
    5. }

    接着看 invokeAdminService 方法:

    1. public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
    2. // 调用jettyServerHandler的handle方法,即jetty接收到请求后,由handler去处理
    3. jettyServerHandler.handle(null, new Request(null, null), request, response);
    4. }

    �jettyServerHandler 的 handle 方法:

    1. @Override
    2. public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
    3. // request
    4. XxlRpcRequest xxlRpcRequest = parseRequest(request);
    5. // invoke + response
    6. XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
    7. writeResponse(baseRequest, response, xxlRpcResponse);
    8. }

    xxlRpcProviderFactory.invokeService(xxlRpcRequest); 是核心方法:

    1. public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
    2. // make response
    3. XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
    4. xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
    5. // match service bean
    6. String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
    7. Object serviceBean = serviceData.get(serviceKey);
    8. // valid...
    9. // invoke
    10. try {
    11. Class<?> serviceClass = serviceBean.getClass(); // com.xxl.job.admin.service.impl.AdminBizImpl
    12. String methodName = xxlRpcRequest.getMethodName(); // registry
    13. Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
    14. Object[] parameters = xxlRpcRequest.getParameters();
    15. Method method = serviceClass.getMethod(methodName, parameterTypes);
    16. method.setAccessible(true);
    17. // 走到这里,就是去调用AdminBizImpl的registry方法了,这个方法内部就是去注册信息添加到数据库中
    18. Object result = method.invoke(serviceBean, parameters);
    19. xxlRpcResponse.setResult(result);
    20. } catch (Throwable t) {
    21. logger.error("xxl-rpc provider invokeService error.", t);
    22. xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
    23. }
    24. return xxlRpcResponse;
    25. }

    �到这里,执行器启动定时注册线程,把自己的信息注册到调度中心的全部代码就走完了。

    参考:
    xxl-job 源码解读
    xxl-job 官网