基于 SpringBoot 版的 xxl-job2.0 版本
xxl-job 执行器注册,是执行器发起的,这里就从执行器的配置文件开始阅读:
入口:XxlJobConfig 配置类中的 xxlJobExecutor 实例化中:
// 初始化XxlJobSpringExecutor(Spring项目)@Bean(initMethod = "start", destroyMethod = "destroy")public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppName(appName);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
项目启动的时候会实例化 xxlJobExecutor,实例化成功之后调用其内部的 start 方法:
public void start() throws Exception {// 初始化 JobHandler RepositoryinitJobHandlerRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super startsuper.start();}
initJobHandlerRepository(applicationContext); 的过程也就是扫描项目中所有的 JobHandler,然后注册到本地缓存中,也就是放在 ConcurrentHashMap
private void initJobHandlerRepository(ApplicationContext applicationContext){if (applicationContext == null) {return;}// 拿到所有有JobHandler注解的所有BeanMap<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);if (serviceBeanMap!=null && serviceBeanMap.size()>0) {for (Object serviceBean : serviceBeanMap.values()) {if (serviceBean instanceof IJobHandler){// 拿到我们定义的JobHandler的名称String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();IJobHandler handler = (IJobHandler) serviceBean;if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler naming conflicts.");}registJobHandler(name, handler);}}}}
registJobHandler(name, handler);就是真正的把遍历的所有 JobHandler 放到本地内存中:
// --------XxlJobExecutor类-------- job handler repository ----------------------private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}
接着走 xxlJobExecutor 中的 start 方法: GlueFactory.refreshInstance(1);
/////…….待阅读
接下来继续走 xxlJobExecutor 中的 start 方法:super.start(); 即 父类 XxlJobExecutor 中的 start 方法:
public void start() throws Exception {// 初始化日志路径XxlJobFileAppender.initLogPath(logPath);// init admin-clientinitAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();initRpcProvider(ip, port, appName, accessToken);}
初始化日志路径比较简单,这里不再赘述;
initAdminBizList(adminAddresses, accessToken); 比较重要了,就是初始化 admin-client ,
// ---------------------- admin-client (rpc invoker) ----------------------private static List<AdminBiz> adminBizList;private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {String addressUrl = address.concat(AdminBiz.MAPPING);if (addressUrl.startsWith("http://")) {addressUrl = addressUrl.replace("http://", "");}if (addressUrl.startsWith("https://")) {addressUrl = addressUrl.replace("https://", "");}// 这里返回的已经不是真正的AdminBiz,而是AdminBiz的代理类// 遍历所有调度中心地址,对每一个地址都封装一个代理对象AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}}public static List<AdminBiz> getAdminBizList(){return adminBizList;}
初始化调度中心的客户端,供后续交互使用;AdminBiz 的内部方法代码如下所示:
public interface AdminBiz {public static final String MAPPING = "/api";// ---------------------- callback ----------------------public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);// ---------------------- registry ----------------------public ReturnT<String> registry(RegistryParam registryParam);// ---------------------- registry remove ---------------public ReturnT<String> registryRemove(RegistryParam registryParam);}
我们可以这样理解:这个 AdminBiz 就是调度中心的客户端,我们在执行器项目中首先加入 AdminBiz 这个客户端,然后在执行器项目中就可以通过调用 AdminBiz 客户端里面的方法,从而去触发调用调度中心中的具体实现,也就是说:执行器并没有和调度中心直接交互,而是通过 AdminBiz 这个媒介去和调度中心交互的。
注意 AdminBiz 的初始化过程:
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
创建 XxlRpcReferenceBean 对象,传入对象所需要的参数,然后调用 getObject() 方法:
// 创建一个代理对象,invoke方法在这里并不会真正执行,而只是执行初始化操作// 在所代理的方法被调用的时候才会执行这里的 invoke 方法。public Object getObject() {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { iface },new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String className = method.getDeclaringClass().getName();// filter method like "Object.toString()"if (Object.class.getName().equals(className)) {logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());throw new XxlRpcException("xxl-rpc proxy class-method not support");}// addressString address = routeAddress();if (address==null || address.trim().length()==0) {throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");}// request 封装请求参数XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();xxlRpcRequest.setRequestId(UUID.randomUUID().toString());xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());xxlRpcRequest.setAccessToken(accessToken);xxlRpcRequest.setClassName(className);xxlRpcRequest.setMethodName(method.getName());xxlRpcRequest.setParameterTypes(method.getParameterTypes());xxlRpcRequest.setParameters(args);// send 如果采用异步就走如下if (CallType.SYNC == callType) {try {// future setXxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);XxlRpcFutureResponseFactory.setInvokerFuture(xxlRpcRequest.getRequestId(), futureResponse);// do invoke 这里才是真正的调用,即被代理的真正方法client.asyncSend(address, xxlRpcRequest);// future getXxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);if (xxlRpcResponse.getErrorMsg() != null) {throw new XxlRpcException(xxlRpcResponse.getErrorMsg());}return xxlRpcResponse.getResult();} catch (Exception e) {throw new XxlRpcException(e);} finally{// future removeXxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());}}///这里阅读Bean模式的异步调用,去掉其他模式代码...}});}
接着继续看 XxlJobExecutor 中的 start 方法:JobLogFileCleanThread.getInstance().start(logRetentionDays);
这个线程主要做的操作就是定期删除 log ,
//// 具体源码待阅读….
接着继续看 XxlJobExecutor 中的 start 方法:
TriggerCallbackThread.getInstance().start();
这个线程主要做的操作就是定时监控队列中日志情况
//// 具体源码待阅读….
接着继续看 XxlJobExecutor 中的 start 方法:
initRpcProvider(ip, port, appName, accessToken);
这个就非常重要了,主要就是初始化 XxlRpcProviderFactory 对象,并且调用其 start 方法,
这个 start 方法内部逻辑主要是:启动一个定时线程,默认 30s,定时的往调度中心注册自己
所以说执行器自动注册的定时线程主要就是在这里实现的
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {// init invoker factoryxxlRpcInvokerFactory = new XxlRpcInvokerFactory();// init, provider factoryString address = IpUtil.getIpPort(ip, port);Map<String, String> serviceRegistryParam = new HashMap<String, String>();serviceRegistryParam.put("appName", appName);serviceRegistryParam.put("address", address);xxlRpcProviderFactory = new XxlRpcProviderFactory();// 初始化XxlRpcProviderFactory中的参数xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);// add servicesxxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());// start 调用start方法,去启动一个执行器自动注册的定时线程xxlRpcProviderFactory.start(); // 具体代码看如下:}
执行器启动时自动注册的定时线程:xxlRpcProviderFactory.start() 代码如下:
public void start() throws Exception {// start server 这里默认启动jetty服务器server = netType.serverClass.newInstance();server.setStartedCallback(new BaseCallback() { // serviceRegistry started@Overridepublic void run() throws Exception {// 执行器开始把自己注册到调度中心,启动定时注册线程if (serviceRegistryClass != null) {// serviceRegistryClass就是前一步传入的ExecutorServiceRegistry.class对象serviceRegistry = serviceRegistryClass.newInstance();// 实例化ExecutorServiceRegistry对象,并调用其start方法,传入注册的参数// 具体逻辑看如下代码serviceRegistry.start(serviceRegistryParam);if (serviceData.size() > 0) {String ipPort = IpUtil.getIpPort(ip, port);for (String serviceKey :serviceData.keySet()) {serviceRegistry.registry(serviceKey, ipPort);}}}}});//// 待阅读....server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped@Overridepublic void run() {// stop registryif (serviceRegistry != null) {if (serviceData.size() > 0) {String ipPort = IpUtil.getIpPort(ip, port);for (String serviceKey :serviceData.keySet()) {serviceRegistry.remove(serviceKey, ipPort);}}serviceRegistry.stop();}}});server.start(this);}
ExecutorServiceRegistry 的 start 方法:
public void start(Map<String, String> param) {// start registryExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));}
接着 ExecutorRegistryThread 线程的 start 方法:
public void start(final String appName, final String address){// 校验入参...registryThread = new Thread(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {// 封装参数对象RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);// 这里获取 XxlJobExecutor.getAdminBizList() 的值// 即:执行器启动的时候,初始化的Admin-clinet代理对象列表,// 所以这里的AdminBiz已经是一个代理对象了for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {// 这里调用registry方法,会执行代理对象AdminBiz中的invoke方法,ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {logger.error(e.getMessage(), e);}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}// registry remove 待分析....try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}} catch (Exception e) {logger.error(e.getMessage(), e);}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");}});registryThread.setDaemon(true);registryThread.start();}
�那么这里就开始执行 AdminBiz 类中的 invoke 方法,如下(上面贴过代码了,这里在贴一次):
// 在所代理的方法被调用的时候才会执行这里的 invoke 方法。public Object getObject() {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { iface },new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// className便是AdminBiz了String className = method.getDeclaringClass().getName();// filter method like "Object.toString()"if (Object.class.getName().equals(className)) {logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());throw new XxlRpcException("xxl-rpc proxy class-method not support");}// address 是调度中心的地址:127.0.0.1:8098/apiString address = routeAddress();if (address==null || address.trim().length()==0) {throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");}// request 封装请求参数/*** 这里参数包括:className: AdminBiz methodName: register* parameters: 注册信息,如registerKey,registerValue,等* 还有一个也重要 requestId: 异步操作中通常需要一个唯一标识来衔接请求和响应*/XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();xxlRpcRequest.setRequestId(UUID.randomUUID().toString());xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());xxlRpcRequest.setAccessToken(accessToken);xxlRpcRequest.setClassName(className);xxlRpcRequest.setMethodName(method.getName());xxlRpcRequest.setParameterTypes(method.getParameterTypes());xxlRpcRequest.setParameters(args);// send 如果采用异步就走如下if (CallType.SYNC == callType) {try {// future setXxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);XxlRpcFutureResponseFactory.setInvokerFuture(xxlRpcRequest.getRequestId(), futureResponse);// rpc远程调用,地址:127.0.0.1:8098/api,调用调度中心的api方法,并携带如上封装的参数client.asyncSend(address, xxlRpcRequest);// future getXxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);if (xxlRpcResponse.getErrorMsg() != null) {throw new XxlRpcException(xxlRpcResponse.getErrorMsg());}return xxlRpcResponse.getResult();} catch (Exception e) {throw new XxlRpcException(e);} finally{// future removeXxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());}}///这里阅读Bean模式的异步调用,去掉其他模式代码...}});}
接着那就来到了调度中心的 /api 方法,看看里面是如何执行的:
@RequestMapping(AdminBiz.MAPPING)@PermessionLimit(limit=false)public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {XxlJobDynamicScheduler.invokeAdminService(request, response);}
接着看 invokeAdminService 方法:
public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {// 调用jettyServerHandler的handle方法,即jetty接收到请求后,由handler去处理jettyServerHandler.handle(null, new Request(null, null), request, response);}
�jettyServerHandler 的 handle 方法:
@Overridepublic void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {// requestXxlRpcRequest xxlRpcRequest = parseRequest(request);// invoke + responseXxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);writeResponse(baseRequest, response, xxlRpcResponse);}
xxlRpcProviderFactory.invokeService(xxlRpcRequest); 是核心方法:
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {// make responseXxlRpcResponse xxlRpcResponse = new XxlRpcResponse();xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());// match service beanString serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());Object serviceBean = serviceData.get(serviceKey);// valid...// invoketry {Class<?> serviceClass = serviceBean.getClass(); // com.xxl.job.admin.service.impl.AdminBizImplString methodName = xxlRpcRequest.getMethodName(); // registryClass<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();Object[] parameters = xxlRpcRequest.getParameters();Method method = serviceClass.getMethod(methodName, parameterTypes);method.setAccessible(true);// 走到这里,就是去调用AdminBizImpl的registry方法了,这个方法内部就是去注册信息添加到数据库中Object result = method.invoke(serviceBean, parameters);xxlRpcResponse.setResult(result);} catch (Throwable t) {logger.error("xxl-rpc provider invokeService error.", t);xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));}return xxlRpcResponse;}
�到这里,执行器启动定时注册线程,把自己的信息注册到调度中心的全部代码就走完了。
