xxl-job源码设计 - 图1
目录
xxl-job源码设计 - 图2

  • 通信底层介绍
  • 通信整体流程
  • 惊艳的设计

**

xxl-job源码设计 - 图3
通信底层介绍
xxl-job源码设计 - 图4

xxl-job 使用 netty http 的方式进行通信,虽然也支持 Mina,jetty,netty tcp 等方式,但是代码里面固定写死的是 netty http。

**

xxl-job源码设计 - 图5
通信整体流程
xxl-job源码设计 - 图6

我以调度器通知执行器执行任务为例,绘制的活动图:

xxl-job源码设计 - 图7
活动图

**

xxl-job源码设计 - 图8
惊艳的设计
xxl-job源码设计 - 图9

看完了整个处理流程代码,设计上可以说独具匠心,将 netty,多线程的知识运用得行云流水。

我现在就将这些设计上出彩的点总结如下:

| 使用动态代理模式,隐藏通信细节

xxl-job 定义了两个接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封装了向心跳,暂停,触发执行等操作,AdminBiz 封装了回调,注册,取消注册操作,接口的实现类中,并没有通信相关的处理。

XxlRpcReferenceBean 类的 getObject() 方法会生成一个代理类,这个代理类会进行远程通信。

| 全异步处理

执行器收到消息进行反序列化,并没有同步执行任务代码,而是将任务信息存储在 LinkedBlockingQueue 中,异步线程从这个队列中获取任务信息,然后执行。

而任务的处理结果,也不是说处理完之后,同步返回的,也是放到回调线程的阻塞队列中,异步的将处理结果返回回去。

这样处理的好处就是减少了 netty 工作线程的处理时间,提升了吞吐量。

| 对异步处理的包装

对异步处理进行了包装,代码看起来是同步调用的。

我们看下调度器,XxlJobTrigger 类触发任务执行的代码:

  1. public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
  2. ReturnT<String> runResult = null;
  3. try {
  4. ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
  5. //这里面做了很多异步处理,最终同步得到处理结果
  6. runResult = executorBiz.run(triggerParam);
  7. } catch (Exception e) {
  8. logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
  9. runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
  10. }
  11. StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
  12. runResultSB.append("<br>address:").append(address);
  13. runResultSB.append("<br>code:").append(runResult.getCode());
  14. runResultSB.append("<br>msg:").append(runResult.getMsg());
  15. runResult.setMsg(runResultSB.toString());
  16. return runResult;
  17. }

ExecutorBiz.run 方法我们说过了,是走的动态代理,和执行器进行通信,执行器执行结果也是异步处理完,才返回的,而这里看到的 run 方法是同步等待处理结果返回。

我们看下xxl-job是如何同步获取处理结果的:调度器向执行器发出消息后,该线程阻塞。等到执行器处理完毕后,将处理结果返回,唤醒被阻塞的线程,调用处拿到返回值。

动态代理代码如下:

  1. //代理类中的触发调用
  2. if (CallType.SYNC == callType) {
  3. // future-response set
  4. XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
  5. try {
  6. // do invoke
  7. client.asyncSend(finalAddress, xxlRpcRequest);
  8. // future get
  9. XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
  10. if (xxlRpcResponse.getErrorMsg() != null) {
  11. throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
  12. }
  13. return xxlRpcResponse.getResult();
  14. } catch (Exception e) {
  15. logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
  16. throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
  17. } finally{
  18. // future-response remove
  19. futureResponse.removeInvokerFuture();
  20. }
  21. }

XxlRpcFutureResponse 类中实现了线程的等待,和线程唤醒的处理:

  1. //返回结果,唤醒线程
  2. public void setResponse(XxlRpcResponse response) {
  3. this.response = response;
  4. synchronized (lock) {
  5. done = true;
  6. lock.notifyAll();
  7. }
  8. }
  9. @Override
  10. public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  11. if (!done) {
  12. synchronized (lock) {
  13. try {
  14. if (timeout < 0) {
  15. //线程阻塞
  16. lock.wait();
  17. } else {
  18. long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
  19. lock.wait(timeoutMillis);
  20. }
  21. } catch (InterruptedException e) {
  22. throw e;
  23. }
  24. }
  25. }
  26. if (!done) {
  27. throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
  28. }
  29. return response;
  30. }

有的同学可能会问了,调度器接收到返回结果,怎么确定唤醒哪个线程呢?

每一次远程调用,都会生成 uuid 的请求 id,这个 id 是在整个调用过程中一直传递的,就像一把钥匙,在你回家的的时候,拿着它就带开门。

这里拿着请求 id 这把钥匙,就能找到对应的 XxlRpcFutureResponse,然后调用 setResponse 方法,设置返回值,唤醒线程。

  1. public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
  2. // 通过requestId找到XxlRpcFutureResponse,
  3. final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
  4. if (futureResponse == null) {
  5. return;
  6. }
  7. if (futureResponse.getInvokeCallback()!=null) {
  8. // callback type
  9. try {
  10. executeResponseCallback(new Runnable() {
  11. @Override
  12. public void run() {
  13. if (xxlRpcResponse.getErrorMsg() != null) {
  14. futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
  15. } else {
  16. futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
  17. }
  18. }
  19. });
  20. }catch (Exception e) {
  21. logger.error(e.getMessage(), e);
  22. }
  23. } else {
  24. // 里面调用lock的notify方法
  25. futureResponse.setResponse(xxlRpcResponse);
  26. }
  27. // do remove
  28. futureResponsePool.remove(requestId);
  29. }