基于 SpringBoot 版的 xxl-job2.0 版本
xxl-job 执行器注册,是执行器发起的,这里就从执行器的配置文件开始阅读:
入口:XxlJobConfig 配置类中的 xxlJobExecutor 实例化中:
// 初始化XxlJobSpringExecutor(Spring项目)
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
项目启动的时候会实例化 xxlJobExecutor,实例化成功之后调用其内部的 start 方法:
public void start() throws Exception {
// 初始化 JobHandler Repository
initJobHandlerRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
super.start();
}
initJobHandlerRepository(applicationContext); 的过程也就是扫描项目中所有的 JobHandler,然后注册到本地缓存中,也就是放在 ConcurrentHashMap
private void initJobHandlerRepository(ApplicationContext applicationContext){
if (applicationContext == null) {
return;
}
// 拿到所有有JobHandler注解的所有Bean
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){
// 拿到我们定义的JobHandler的名称
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler naming conflicts.");
}
registJobHandler(name, handler);
}
}
}
}
registJobHandler(name, handler);就是真正的把遍历的所有 JobHandler 放到本地内存中:
// --------XxlJobExecutor类-------- job handler repository ----------------------
private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
接着走 xxlJobExecutor 中的 start 方法: GlueFactory.refreshInstance(1);
/////…….待阅读
接下来继续走 xxlJobExecutor 中的 start 方法:super.start(); 即 父类 XxlJobExecutor 中的 start 方法:
public void start() throws Exception {
// 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);
// init admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);
}
初始化日志路径比较简单,这里不再赘述;
initAdminBizList(adminAddresses, accessToken); 比较重要了,就是初始化 admin-client ,
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
String addressUrl = address.concat(AdminBiz.MAPPING);
if (addressUrl.startsWith("http://")) {
addressUrl = addressUrl.replace("http://", "");
}
if (addressUrl.startsWith("https://")) {
addressUrl = addressUrl.replace("https://", "");
}
// 这里返回的已经不是真正的AdminBiz,而是AdminBiz的代理类
// 遍历所有调度中心地址,对每一个地址都封装一个代理对象
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
初始化调度中心的客户端,供后续交互使用;AdminBiz 的内部方法代码如下所示:
public interface AdminBiz {
public static final String MAPPING = "/api";
// ---------------------- callback ----------------------
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
// ---------------------- registry ----------------------
public ReturnT<String> registry(RegistryParam registryParam);
// ---------------------- registry remove ---------------
public ReturnT<String> registryRemove(RegistryParam registryParam);
}
我们可以这样理解:这个 AdminBiz 就是调度中心的客户端,我们在执行器项目中首先加入 AdminBiz 这个客户端,然后在执行器项目中就可以通过调用 AdminBiz 客户端里面的方法,从而去触发调用调度中心中的具体实现,也就是说:执行器并没有和调度中心直接交互,而是通过 AdminBiz 这个媒介去和调度中心交互的。
注意 AdminBiz 的初始化过程:
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,
AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
创建 XxlRpcReferenceBean 对象,传入对象所需要的参数,然后调用 getObject() 方法:
// 创建一个代理对象,invoke方法在这里并不会真正执行,而只是执行初始化操作
// 在所代理的方法被调用的时候才会执行这里的 invoke 方法。
public Object getObject() {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String className = method.getDeclaringClass().getName();
// filter method like "Object.toString()"
if (Object.class.getName().equals(className)) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address
String address = routeAddress();
if (address==null || address.trim().length()==0) {
throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
}
// request 封装请求参数
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(method.getName());
xxlRpcRequest.setParameterTypes(method.getParameterTypes());
xxlRpcRequest.setParameters(args);
// send 如果采用异步就走如下
if (CallType.SYNC == callType) {
try {
// future set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
XxlRpcFutureResponseFactory.setInvokerFuture(xxlRpcRequest.getRequestId(), futureResponse);
// do invoke 这里才是真正的调用,即被代理的真正方法
client.asyncSend(address, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
throw new XxlRpcException(e);
} finally{
// future remove
XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
}
}
///这里阅读Bean模式的异步调用,去掉其他模式代码...
}
});
}
接着继续看 XxlJobExecutor 中的 start 方法:JobLogFileCleanThread.getInstance().start(logRetentionDays);
这个线程主要做的操作就是定期删除 log ,
//// 具体源码待阅读….
接着继续看 XxlJobExecutor 中的 start 方法:
TriggerCallbackThread.getInstance().start();
这个线程主要做的操作就是定时监控队列中日志情况
//// 具体源码待阅读….
接着继续看 XxlJobExecutor 中的 start 方法:
initRpcProvider(ip, port, appName, accessToken);
这个就非常重要了,主要就是初始化 XxlRpcProviderFactory 对象,并且调用其 start 方法,
这个 start 方法内部逻辑主要是:启动一个定时线程,默认 30s,定时的往调度中心注册自己
所以说执行器自动注册的定时线程主要就是在这里实现的
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
// init invoker factory
xxlRpcInvokerFactory = new XxlRpcInvokerFactory();
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
// 初始化XxlRpcProviderFactory中的参数
xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
// add services
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
// start 调用start方法,去启动一个执行器自动注册的定时线程
xxlRpcProviderFactory.start(); // 具体代码看如下:
}
执行器启动时自动注册的定时线程:xxlRpcProviderFactory.start() 代码如下:
public void start() throws Exception {
// start server 这里默认启动jetty服务器
server = netType.serverClass.newInstance();
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// 执行器开始把自己注册到调度中心,启动定时注册线程
if (serviceRegistryClass != null) {
// serviceRegistryClass就是前一步传入的ExecutorServiceRegistry.class对象
serviceRegistry = serviceRegistryClass.newInstance();
// 实例化ExecutorServiceRegistry对象,并调用其start方法,传入注册的参数
// 具体逻辑看如下代码
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) {
String ipPort = IpUtil.getIpPort(ip, port);
for (String serviceKey :serviceData.keySet()) {
serviceRegistry.registry(serviceKey, ipPort);
}
}
}
}
});
//// 待阅读....
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// stop registry
if (serviceRegistry != null) {
if (serviceData.size() > 0) {
String ipPort = IpUtil.getIpPort(ip, port);
for (String serviceKey :serviceData.keySet()) {
serviceRegistry.remove(serviceKey, ipPort);
}
}
serviceRegistry.stop();
}
}
});
server.start(this);
}
ExecutorServiceRegistry 的 start 方法:
public void start(Map<String, String> param) {
// start registry
ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
}
接着 ExecutorRegistryThread 线程的 start 方法:
public void start(final String appName, final String address){
// 校验入参...
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
// 封装参数对象
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
// 这里获取 XxlJobExecutor.getAdminBizList() 的值
// 即:执行器启动的时候,初始化的Admin-clinet代理对象列表,
// 所以这里的AdminBiz已经是一个代理对象了
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
// 这里调用registry方法,会执行代理对象AdminBiz中的invoke方法,
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
// registry remove 待分析....
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
});
registryThread.setDaemon(true);
registryThread.start();
}
�那么这里就开始执行 AdminBiz 类中的 invoke 方法,如下(上面贴过代码了,这里在贴一次):
// 在所代理的方法被调用的时候才会执行这里的 invoke 方法。
public Object getObject() {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// className便是AdminBiz了
String className = method.getDeclaringClass().getName();
// filter method like "Object.toString()"
if (Object.class.getName().equals(className)) {
logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
throw new XxlRpcException("xxl-rpc proxy class-method not support");
}
// address 是调度中心的地址:127.0.0.1:8098/api
String address = routeAddress();
if (address==null || address.trim().length()==0) {
throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
}
// request 封装请求参数
/**
* 这里参数包括:className: AdminBiz methodName: register
* parameters: 注册信息,如registerKey,registerValue,等
* 还有一个也重要 requestId: 异步操作中通常需要一个唯一标识来衔接请求和响应
*/
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(method.getName());
xxlRpcRequest.setParameterTypes(method.getParameterTypes());
xxlRpcRequest.setParameters(args);
// send 如果采用异步就走如下
if (CallType.SYNC == callType) {
try {
// future set
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
XxlRpcFutureResponseFactory.setInvokerFuture(xxlRpcRequest.getRequestId(), futureResponse);
// rpc远程调用,地址:127.0.0.1:8098/api,调用调度中心的api方法,并携带如上封装的参数
client.asyncSend(address, xxlRpcRequest);
// future get
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
throw new XxlRpcException(e);
} finally{
// future remove
XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
}
}
///这里阅读Bean模式的异步调用,去掉其他模式代码...
}
});
}
接着那就来到了调度中心的 /api 方法,看看里面是如何执行的:
@RequestMapping(AdminBiz.MAPPING)
@PermessionLimit(limit=false)
public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
XxlJobDynamicScheduler.invokeAdminService(request, response);
}
接着看 invokeAdminService 方法:
public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
// 调用jettyServerHandler的handle方法,即jetty接收到请求后,由handler去处理
jettyServerHandler.handle(null, new Request(null, null), request, response);
}
�jettyServerHandler 的 handle 方法:
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
// request
XxlRpcRequest xxlRpcRequest = parseRequest(request);
// invoke + response
XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
writeResponse(baseRequest, response, xxlRpcResponse);
}
xxlRpcProviderFactory.invokeService(xxlRpcRequest); 是核心方法:
public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
// make response
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
// match service bean
String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
Object serviceBean = serviceData.get(serviceKey);
// valid...
// invoke
try {
Class<?> serviceClass = serviceBean.getClass(); // com.xxl.job.admin.service.impl.AdminBizImpl
String methodName = xxlRpcRequest.getMethodName(); // registry
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
// 走到这里,就是去调用AdminBizImpl的registry方法了,这个方法内部就是去注册信息添加到数据库中
Object result = method.invoke(serviceBean, parameters);
xxlRpcResponse.setResult(result);
} catch (Throwable t) {
logger.error("xxl-rpc provider invokeService error.", t);
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
}
return xxlRpcResponse;
}
�到这里,执行器启动定时注册线程,把自己的信息注册到调度中心的全部代码就走完了。