本文首发于泊浮目的简书专栏:https://www.jianshu.com/nb/21050520

前言

无论是事件和消息驱动,都是解耦的有力手段之一。ZStack作为一个大型软件项目,也使用了这些方案对整个架构进行了解耦。

EventFacade

EventFacade是一个很有意思的组件,因为它几乎是自举的。这就意味着有兴趣的朋友可以copy and paste,然后稍作修改就可以在自己的项目里工作起来了。

如何使用它

在ZStack的repo中,同样提供了相应的case

  1. package org.zstack.test.core.cloudbus;
  2. /**
  3. * Created with IntelliJ IDEA.
  4. * User: frank
  5. * Time: 12:38 AM
  6. * To change this template use File | Settings | File Templates.
  7. */
  8. public class TestCanonicalEvent {
  9. CLogger logger = Utils.getLogger(TestCanonicalEvent.class);
  10. ComponentLoader loader;
  11. EventFacade evtf;
  12. boolean success;
  13. @Before
  14. public void setUp() throws Exception {
  15. BeanConstructor con = new BeanConstructor();
  16. loader = con.build();
  17. evtf = loader.getComponent(EventFacade.class);
  18. ((EventFacadeImpl) evtf).start();
  19. }
  20. @Test
  21. public void test() throws InterruptedException {
  22. String path = "/test/event";
  23. evtf.on(path, new EventRunnable() {
  24. @Override
  25. public void run() {
  26. success = true;
  27. }
  28. });
  29. evtf.fire(path, null);
  30. TimeUnit.SECONDS.sleep(1);
  31. Assert.assertTrue(success);
  32. }
  33. }

使用方法非常简单,先注册一个路径用于接收事件,然后沿着该路径发送一个事件,该事件注册的函数则会被调用。

接口定义

  1. package org.zstack.core.cloudbus;
  2. import java.util.Map;
  3. /**
  4. * Created with IntelliJ IDEA.
  5. * User: frank
  6. * Time: 11:29 PM
  7. * To change this template use File | Settings | File Templates.
  8. */
  9. public interface EventFacade {
  10. void on(String path, AutoOffEventCallback cb);
  11. void on(String path, EventCallback cb);
  12. void on(String path, EventRunnable runnable);
  13. void off(AbstractEventFacadeCallback cb);
  14. void onLocal(String path, AutoOffEventCallback cb);
  15. void onLocal(String path, EventCallback cb);
  16. void onLocal(String path, EventRunnable runnable);
  17. void fire(String path, Object data);
  18. boolean isFromThisManagementNode(Map tokens);
  19. String META_DATA_MANAGEMENT_NODE_ID = "metadata::managementNodeId";
  20. String META_DATA_PATH = "metadata::path";
  21. String WEBHOOK_TYPE = "CanonicalEvent";
  22. }

源码解读

on

  1. @Override
  2. public void on(String path, AutoOffEventCallback cb) {
  3. global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
  4. }
  5. @Override
  6. public void on(String path, final EventCallback cb) {
  7. global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
  8. }
  9. @Override
  10. public void on(String path, EventRunnable cb) {
  11. global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
  12. }

on方法仅仅是将一个属于EventRunnable的uuid作为key,并将Callback作为value放入global这个map中。为什么要这么做呢?因为在Map的key是不可重复的,存path肯定是不妥的。

EventFacadeImpl的方法签名以及成员变量:

  1. public class EventFacadeImpl implements EventFacade, CloudBusEventListener, Component, GlobalApiMessageInterceptor {
  2. @Autowired
  3. private CloudBus bus;
  4. private final Map<String, CallbackWrapper> global = Collections.synchronizedMap(new HashMap<>());
  5. private final Map<String, CallbackWrapper> local = new ConcurrentHashMap<>();
  6. private EventSubscriberReceipt unsubscriber;

fire

相对的fire方法:

  1. @Override
  2. public void fire(String path, Object data) {
  3. assert path != null;
  4. CanonicalEvent evt = new CanonicalEvent();
  5. evt.setPath(path);
  6. evt.setManagementNodeId(Platform.getManagementServerId());
  7. if (data != null) {
  8. /*
  9. if (!TypeUtils.isPrimitiveOrWrapper(data.getClass()) && !data.getClass().isAnnotationPresent(NeedJsonSchema.class)) {
  10. throw new CloudRuntimeException(String.format("data[%s] passed to canonical event is not annotated by @NeedJsonSchema", data.getClass().getName()));
  11. }
  12. */
  13. evt.setContent(data);
  14. }
  15. //从local这个map中找到对应的event并调用
  16. fireLocal(evt);
  17. //将事件发送给对应的webhook
  18. callWebhooks(evt);
  19. //通过cloudBus发送事件,关于cloudBus的源码之后会讲到
  20. bus.publish(evt);
  21. }

onLocal和on的区别

在上面的分析中并没有看到global的event是如何被触发的,如果想完全了解其中的过程,还得从CloudBus说起,我们稍后就会提到它。但是已经可以猜到为何要区分on和onLocal了。一个是通过消息总线触发,一个是在当前JVM进程内触发——这意味着一个支持ManagerNode集群事件,一个只支持单个MN事件。这也是来自于ZStack的业务场景——有些事情需要MN一起做,有些事情一个MN做了其他MN就不用做了。介于篇幅,有兴趣的读者可以自行翻看代码,这里不再详举。

WebHook

WebHook是ZStack向前端主动通信的手段之一。在注册了相应EventPath后,该path被调用后则会向相应的URL发送content。从case——CanonicalEventWebhookCaseWebhookCase可以看到它的正确使用姿势。

CanonicalEventWebhookCase

  1. class CanonicalEventWebhookCase extends SubCase {
  2. EnvSpec envSpec
  3. @Override
  4. void clean() {
  5. envSpec.delete()
  6. }
  7. @Override
  8. void setup() {
  9. INCLUDE_CORE_SERVICES = false
  10. spring {
  11. include("webhook.xml")
  12. }
  13. }
  14. String WEBHOOK_PATH = "/canonical-event-webhook"
  15. void testErrorToCreateWebhookifOpaqueFieldMissing() {
  16. expect(AssertionError.class) {
  17. createWebhook {
  18. name = "webhook1"
  19. url = "http://127.0.0.1:8989$WEBHOOK_PATH"
  20. type = EventFacade.WEBHOOK_TYPE
  21. }
  22. }
  23. }
  24. void testCanonicalEventWithVariableInPath() {
  25. String path = "/test/{uuid}/event"
  26. int count = 0
  27. WebhookInventory hook1 = createWebhook {
  28. name = "webhook1"
  29. url = "http://127.0.0.1:8989$WEBHOOK_PATH"
  30. type = EventFacade.WEBHOOK_TYPE
  31. opaque = path
  32. }
  33. // this webhook will not be called because path unmatching
  34. WebhookInventory hook2 = createWebhook {
  35. name = "webhook1"
  36. url = "http://127.0.0.1:8989$WEBHOOK_PATH"
  37. type = EventFacade.WEBHOOK_TYPE
  38. opaque = "/this-path-does-not-match"
  39. }
  40. CanonicalEvent evt
  41. envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e ->
  42. evt = json(e.getBody(), CanonicalEvent.class)
  43. count ++
  44. return [:]
  45. }
  46. String content = "hello world"
  47. String eventPath = "/test/${Platform.uuid}/event"
  48. bean(EventFacade.class).fire(eventPath, content)
  49. retryInSecs {
  50. assert count == 1
  51. assert evt != null
  52. assert evt.path == eventPath
  53. assert evt.content == content
  54. assert evt.managementNodeId == Platform.getManagementServerId()
  55. }
  56. }
  57. void testCanonicalEventUseWebhook() {
  58. String path = "/test/event"
  59. WebhookInventory hook1 = createWebhook {
  60. name = "webhook1"
  61. url = "http://127.0.0.1:8989$WEBHOOK_PATH"
  62. type = EventFacade.WEBHOOK_TYPE
  63. opaque = path
  64. }
  65. WebhookInventory hook2 = createWebhook {
  66. name = "webhook2"
  67. url = "http://127.0.0.1:8989$WEBHOOK_PATH"
  68. type = EventFacade.WEBHOOK_TYPE
  69. opaque = path
  70. }
  71. def testFireTwoEvents = {
  72. List<CanonicalEvent> evts = []
  73. envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e ->
  74. CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
  75. evts.add(evt)
  76. return [:]
  77. }
  78. String content = "hello world"
  79. bean(EventFacade.class).fire(path, content)
  80. retryInSecs {
  81. assert evts.size() == 2
  82. CanonicalEvent evt1 = evts[0]
  83. CanonicalEvent evt2 = evts[1]
  84. assert evt1.path == path
  85. assert evt1.content == content
  86. assert evt1.managementNodeId == Platform.getManagementServerId()
  87. assert evt2.path == path
  88. assert evt2.content == content
  89. assert evt2.managementNodeId == Platform.getManagementServerId()
  90. }
  91. }
  92. def testOneEventsGetAfterDeleteOneHook = {
  93. deleteWebhook { uuid = hook1.uuid }
  94. List<CanonicalEvent> evts = []
  95. envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e ->
  96. CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
  97. evts.add(evt)
  98. return [:]
  99. }
  100. String content = "hello world"
  101. bean(EventFacade.class).fire(path, content)
  102. retryInSecs {
  103. assert evts.size() == 1
  104. }
  105. }
  106. def testNoEventGetAfterDeleteAllHooks = {
  107. deleteWebhook { uuid = hook2.uuid }
  108. List<CanonicalEvent> evts = []
  109. envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e ->
  110. CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
  111. evts.add(evt)
  112. return [:]
  113. }
  114. String content = "hello world"
  115. bean(EventFacade.class).fire(path, content)
  116. retryInSecs {
  117. assert evts.size() == 0
  118. }
  119. }
  120. testFireTwoEvents()
  121. testOneEventsGetAfterDeleteOneHook()
  122. testNoEventGetAfterDeleteAllHooks()
  123. }
  124. @Override
  125. void environment() {
  126. envSpec = env {
  127. // nothing
  128. }
  129. }
  130. @Override
  131. void test() {
  132. envSpec.create {
  133. testCanonicalEventUseWebhook()
  134. testCanonicalEventWithVariableInPath()
  135. testErrorToCreateWebhookifOpaqueFieldMissing()
  136. }
  137. }
  138. }

WebhookCase

class WebhookCase extends SubCase {
    EnvSpec envSpec

    @Override
    void clean() {
        envSpec.delete()
    }

    @Override
    void setup() {
        INCLUDE_CORE_SERVICES = false
        spring {
            include("webhook.xml")
        }
    }

    @Override
    void environment() {
        envSpec = env {
            // nothing
        }
    }

    void testWebhooksCRUD() {
        WebhookInventory hook = null

        def testCreateWebhook = {
            def params = null

            hook = createWebhook {
                name = "webhook"
                type = "custom-type"
                url = "http://127.0.0.1:8080/webhook"
                description = "desc"
                opaque = "test data"

                params = delegate
            }

            assert dbIsExists(hook.uuid, WebhookVO.class)
            assert hook.name == params.name
            assert hook.type == params.type
            assert hook.url == params.url
            assert hook.description == params.description
            assert hook.opaque == params.opaque
        }

        def testQueryWebhook = {
            List<WebhookInventory> invs = queryWebhook {
                conditions = ["name=${hook.name}"]
            }

            assert invs.size() == 1
            assert invs[0].uuid == hook.uuid
        }

        def testDeleteWebhook = {
            deleteWebhook {
                uuid = hook.uuid
            }

            assert !dbIsExists(hook.uuid, WebhookVO.class)
        }

        testCreateWebhook()
        testQueryWebhook()
        testDeleteWebhook()
    }

    void testInvalidUrl() {
        expect(AssertionError.class) {
            createWebhook {
                name = "webhook"
                type = "custom-type"
                url = "this is not a url"
                description = "desc"
                opaque = "test data"
            }
        }
    }

    @Override
    void test() {
        envSpec.create {
            testWebhooksCRUD()
            testInvalidUrl()
        }
    }
}

CloudBus

CloudBus可以说是ZStack中最重要的组件了,ZStack各个模块的通信全部是由Message来完成的,而CloudBus就是它们的通信媒介,接下来我们来看它的源码。

本节适合对AMQP有一定了解同学,如果不了解可以先看我的博客MQ学习小记

如何使用它

先看一个相关的Case

package org.zstack.test.core.cloudbus;

import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.concurrent.TimeUnit;

public class TestCloudBusCall {
    CLogger logger = Utils.getLogger(TestCloudBusCall.class);
    ComponentLoader loader;
    CloudBusIN bus;
    Service serv;

    public static class HelloWorldMsg extends NeedReplyMessage {
        private String greet;

        public String getGreet() {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet;
        }

    }

    public static class HelloWorldReply extends MessageReply {
        private String greet;

        public String getGreet() {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet;
        }
    }

    class FakeService extends AbstractService {
        @Override
        public boolean start() {
            bus.registerService(this);
            bus.activeService(this);
            return true;
        }

        @Override
        public boolean stop() {
            bus.deActiveService(this);
            bus.unregisterService(this);
            return true;
        }

        @Override
        public void handleMessage(Message msg) {
            if (msg.getClass() == HelloWorldMsg.class) {
                HelloWorldMsg hmsg = (HelloWorldMsg) msg;
                HelloWorldReply r = new HelloWorldReply();
                r.setGreet(hmsg.getGreet());
                bus.reply(msg, r);
            }
        }

        @Override
        public String getId() {
            return this.getClass().getCanonicalName();
        }

    }

    @Before
    public void setUp() throws Exception {
        BeanConstructor con = new BeanConstructor();
        loader = con.build();
        bus = loader.getComponent(CloudBusIN.class);
        serv = new FakeService();
        serv.start();
    }

    @Test
    public void test() throws InterruptedException, ClassNotFoundException {
        HelloWorldMsg msg = new HelloWorldMsg();
        msg.setGreet("Hello");
        msg.setServiceId(FakeService.class.getCanonicalName());
        msg.setTimeout(TimeUnit.SECONDS.toMillis(10));
        HelloWorldReply r = (HelloWorldReply) bus.call(msg);
        serv.stop();
        Assert.assertEquals("Hello", r.getGreet());
    }
}
我们注册了一个Service,并覆写HandleMessage方法,在Case中,我们成功收到了消息并通过了断言。


再看一个:
~~~java
package org.zstack.test.core.cloudbus;

import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class TestCloudBusSendCallback {
    CLogger logger = Utils.getLogger(TestCloudBusSendCallback.class);
    ComponentLoader loader;
    CloudBusIN bus;
    CountDownLatch latch = new CountDownLatch(1);
    boolean isSuccess = false;
    Service serv;

    public static class HelloWorldMsg extends NeedReplyMessage {
        private String greet;

        public String getGreet() {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet;
        }

    }

    public static class HelloWorldReply extends MessageReply {
        private String greet;

        public String getGreet() {
            return greet;
        }

        public void setGreet(String greet) {
            this.greet = greet;
        }
    }

    class FakeService extends AbstractService {
        @Override
        public boolean start() {
            bus.registerService(this);
            bus.activeService(this);
            return true;
        }

        @Override
        public boolean stop() {
            bus.deActiveService(this);
            bus.unregisterService(this);
            return true;
        }

        @Override
        public void handleMessage(Message msg) {
            if (msg.getClass() == HelloWorldMsg.class) {
                HelloWorldMsg hmsg = (HelloWorldMsg) msg;
                HelloWorldReply r = new HelloWorldReply();
                r.setGreet(hmsg.getGreet());
                bus.reply(msg, r);
            }
        }

        @Override
        public String getId() {
            return this.getClass().getCanonicalName();
        }

    }

    @Before
    public void setUp() throws Exception {
        BeanConstructor con = new BeanConstructor();
        loader = con.build();
        bus = loader.getComponent(CloudBusIN.class);
        serv = new FakeService();
        serv.start();
    }

    @Test
    public void test() throws InterruptedException, ClassNotFoundException {
        HelloWorldMsg msg = new HelloWorldMsg();
        msg.setGreet("Hello");
        msg.setServiceId(FakeService.class.getCanonicalName());
        msg.setTimeout(TimeUnit.SECONDS.toMillis(10));
        bus.send(msg, new CloudBusCallBack(null) {
            @Override
            public void run(MessageReply reply) {
                if (reply instanceof HelloWorldReply) {
                    HelloWorldReply hr = (HelloWorldReply) reply;
                    if ("Hello".equals(hr.getGreet())) {
                        isSuccess = true;
                    }
                }
                latch.countDown();
            }
        });
        latch.await(15, TimeUnit.SECONDS);
        serv.stop();
        Assert.assertEquals(true, isSuccess);
    }
}

同样也是注册了一个Service,然后使用了CallBack,如果运行一下发现断言是可以通过的——意味着CallBack有被调用。

综上,使用CloudBus很简单——只需要注册你的Service,使用CloudBus指定Service发送,Service就能收到,如果你需要注册你的CallBack,也能很简单完成。

## 接口定义
这么好用的东西,内部实现恐怕不会太简单。我们先从[接口](https://github.com/zstackio/zstack/blob/d6f511e6c15a2fab3e57a93637ada63cc4b3ee6c/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java)开始看:
~~~java
package org.zstack.core.cloudbus;

import org.zstack.header.Component;
import org.zstack.header.Service;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.exception.CloudConfigureFailException;
import org.zstack.header.message.*;

import java.util.List;

public interface CloudBus extends Component {
    void send(Message msg);

    <T extends Message> void send(List<T> msgs);

    void send(NeedReplyMessage msg, CloudBusCallBack callback);

    @Deprecated
    void send(List<? extends NeedReplyMessage> msgs, CloudBusListCallBack callBack);

    @Deprecated
    void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusListCallBack callBack);

    @Deprecated
    void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusSteppingCallback callback);

    void route(List<Message> msgs);

    void route(Message msg);

    void reply(Message request, MessageReply reply);

    void publish(List<Event> events);

    void publish(Event event);

    MessageReply call(NeedReplyMessage msg);

    <T extends NeedReplyMessage> List<MessageReply> call(List<T> msg);

    void registerService(Service serv) throws CloudConfigureFailException;

    void unregisterService(Service serv);

    EventSubscriberReceipt subscribeEvent(CloudBusEventListener listener, Event...events);

    void dealWithUnknownMessage(Message msg);

    void replyErrorByMessageType(Message msg, Exception e);

    void replyErrorByMessageType(Message msg, String err);

    void replyErrorByMessageType(Message msg, ErrorCode err);

    void logExceptionWithMessageDump(Message msg, Throwable e);

    String makeLocalServiceId(String serviceId);

    void makeLocalServiceId(Message msg, String serviceId);

    String makeServiceIdByManagementNodeId(String serviceId, String managementNodeId);

    void makeServiceIdByManagementNodeId(Message msg, String serviceId, String managementNodeId);

    String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid);

    void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid);

    void installBeforeDeliveryMessageInterceptor(BeforeDeliveryMessageInterceptor interceptor, Class<? extends Message>...classes);

    void installBeforeSendMessageInterceptor(BeforeSendMessageInterceptor interceptor, Class<? extends Message>...classes);

    void installBeforePublishEventInterceptor(BeforePublishEventInterceptor interceptor, Class<? extends Event>...classes);
}

接口的命名语义较为清晰,在这里不多做解释。开始我们的源码阅读之旅。

源码解读

CloudBus在ZStack Starting的时候做了什么?

init

init是在bean处于加载器,Spring提供的一个钩子。在xml中我们可以看到声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/aop
         http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
         http://www.springframework.org/schema/tx 
          http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
          http://zstack.org/schema/zstack 
         http://zstack.org/schema/zstack/plugin.xsd"
    default-init-method="init" default-destroy-method="destroy">

    <bean id="TimeoutManager" class="org.zstack.core.timeout.ApiTimeoutManagerImpl" />

    <bean id="CloudBus" class = "org.zstack.core.cloudbus.CloudBusImpl2" depends-on="ThreadFacade,ThreadAspectj">
        <zstack:plugin>
            <zstack:extension interface="org.zstack.header.managementnode.ManagementNodeChangeListener" order="9999"/>
        </zstack:plugin>
    </bean>

    <bean id="EventFacade" class = "org.zstack.core.cloudbus.EventFacadeImpl">
        <zstack:plugin>
        <zstack:extension interface="org.zstack.header.Component" />
        <zstack:extension interface="org.zstack.header.apimediator.GlobalApiMessageInterceptor" />
    </zstack:plugin>
    </bean>

    <bean id="ResourceDestinationMaker" class="org.zstack.core.cloudbus.ResourceDestinationMakerImpl" />

    <bean id="MessageIntegrityChecker" class="org.zstack.core.cloudbus.MessageIntegrityChecker">
        <zstack:plugin>
            <zstack:extension interface="org.zstack.header.Component" />
        </zstack:plugin>
    </bean>
</beans>

init方法:

    void init() {
        trackerClose = CloudBusGlobalProperty.CLOSE_TRACKER;
        serverIps = CloudBusGlobalProperty.SERVER_IPS;
        tracker = new MessageTracker();

        ConnectionFactory connFactory = new ConnectionFactory();
        List<Address> addresses = CollectionUtils.transformToList(serverIps, new Function<Address, String>() {
            @Override
            public Address call(String arg) {
                return Address.parseAddress(arg);
            }
        });
        connFactory.setAutomaticRecoveryEnabled(true);
        connFactory.setRequestedHeartbeat(CloudBusGlobalProperty.RABBITMQ_HEART_BEAT_TIMEOUT);
        connFactory.setNetworkRecoveryInterval((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_NETWORK_RECOVER_INTERVAL));
        connFactory.setConnectionTimeout((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_CONNECTION_TIMEOUT));

        logger.info(String.format("use RabbitMQ server IPs: %s", serverIps));

        try {
            if (CloudBusGlobalProperty.RABBITMQ_USERNAME != null) {
                connFactory.setUsername(CloudBusGlobalProperty.RABBITMQ_USERNAME);
                logger.info(String.format("use RabbitMQ username: %s", CloudBusGlobalProperty.RABBITMQ_USERNAME));
            }
            if (CloudBusGlobalProperty.RABBITMQ_PASSWORD != null) {
                connFactory.setPassword(CloudBusGlobalProperty.RABBITMQ_PASSWORD);
                logger.info("use RabbitMQ password: ******");
            }
            if (CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST != null) {
                connFactory.setVirtualHost(CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST);
                logger.info(String.format("use RabbitMQ virtual host: %s", CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST));
            }

            conn = connFactory.newConnection(addresses.toArray(new Address[]{}));
            logger.debug(String.format("rabbitmq connection is established on %s", conn.getAddress()));

            ((Recoverable)conn).addRecoveryListener(new RecoveryListener() {
                @Override
                public void handleRecovery(Recoverable recoverable) {
                    logger.info(String.format("rabbitmq connection is recovering on %s", conn.getAddress().toString()));
                }
            });

            channelPool = new ChannelPool(CloudBusGlobalProperty.CHANNEL_POOL_SIZE, conn);
            createExchanges();
            outboundQueue = new BusQueue(makeMessageQueueName(SERVICE_ID), BusExchange.P2P);
            Channel chan = channelPool.acquire();
            chan.queueDeclare(outboundQueue.getName(), false, false, true, queueArguments());
            chan.basicConsume(outboundQueue.getName(), true, consumer);
            chan.queueBind(outboundQueue.getName(), outboundQueue.getBusExchange().toString(), outboundQueue.getBindingKey());
            channelPool.returnChannel(chan);
            maid.construct();
            noRouteEndPoint.construct();
            tracker.construct();
            tracker.trackService(SERVICE_ID);
        } catch (Exception e) {
            throw new CloudRuntimeException(e);
        }
    }

简单来说,该函数尝试获取配置文件中与RabbitMQ中相关的配置,并初始化Connection,并以此为基础创建了channel poll。然后将一个channel和一个messageQueue绑定在了一起。同时构造了EventMaid和noRouteEndPoint和tracker,后二者都是Message的消费者,看名字就可以看出来,一个用于订阅/发布模型(绑定此交换器的队列都会收到消息),一个用于track。

start

start则是ZStack定义的一个钩子,当ManagerNode起来的时候,start会被调用到。

   @Override
    public boolean start() {
        populateExtension();
        prepareStatistics();

        for (Service serv : services) {
            assert serv.getId() != null : String.format("service id can not be null[%s]", serv.getClass().getName());
            registerService(serv);
        }

        jmxf.registerBean("CloudBus", this);

        return true;
    }

一个个看:

    private void populateExtension() {
        services = pluginRgty.getExtensionList(Service.class);
        for (ReplyMessagePreSendingExtensionPoint extp : pluginRgty.getExtensionList(ReplyMessagePreSendingExtensionPoint.class)) {
            List<Class> clazzs = extp.getReplyMessageClassForPreSendingExtensionPoint();
            if (clazzs == null || clazzs.isEmpty()) {
                continue;
            }

            for (Class clz : clazzs) {
                if (!(APIEvent.class.isAssignableFrom(clz)) && !(MessageReply.class.isAssignableFrom(clz))) {
                    throw new CloudRuntimeException(String.format("ReplyMessagePreSendingExtensionPoint can only marshal APIEvent or MessageReply. %s claimed by %s is neither APIEvent nor MessageReply",
                            clz.getName(), extp.getClass().getName()));
                }

                List<ReplyMessagePreSendingExtensionPoint> exts = replyMessageMarshaller.get(clz);
                if (exts == null) {
                    exts = new ArrayList<ReplyMessagePreSendingExtensionPoint>();
                    replyMessageMarshaller.put(clz, exts);
                }
                exts.add(extp);
            }
        }
    }

首先收集了所有继承于Service的类,然后加载会改变msg reply的extensionPoint。

 private void prepareStatistics() {
        List<Class> needReplyMsgs = BeanUtils.scanClassByType("org.zstack", NeedReplyMessage.class);
        needReplyMsgs = CollectionUtils.transformToList(needReplyMsgs, new Function<Class, Class>() {
            @Override
            public Class call(Class arg) {
                return !APIMessage.class.isAssignableFrom(arg) || APISyncCallMessage.class.isAssignableFrom(arg) ? arg : null;
            }
        });

        for (Class clz : needReplyMsgs) {
            MessageStatistic stat = new MessageStatistic();
            stat.setMessageClassName(clz.getName());
            statistics.put(stat.getMessageClassName(), stat);
        }
    }

为需要回复的msg设置统计信息。

之后就是把所有的Service收集起来,方便Msg的分发。

常用方法

CloudBus.makeLocalServiceId

    @Override
    public String makeLocalServiceId(String serviceId) {
        return serviceId + "." + Platform.getManagementServerId();
    }

    @Override
    public void makeLocalServiceId(Message msg, String serviceId) {
        msg.setServiceId(makeLocalServiceId(serviceId));
    }

ZStack的伸缩性秘密武器:无状态服务中所说一般,每个管理节点都会注册一堆服务队列。因此我们要按照其格式组装,这样消息才能被服务所接收。

CloudBus.makeTargetServiceIdByResourceUuid

    @Override
    public String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid) {
        DebugUtils.Assert(serviceId!=null, "serviceId cannot be null");
        DebugUtils.Assert(resourceUuid!=null, "resourceUuid cannot be null");
        //得到资源所在的MN UUID
        String mgmtUuid = destMaker.makeDestination(resourceUuid);
        return serviceId + "." + mgmtUuid;
    }

    @Override
    public void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid) {
        String targetService = makeTargetServiceIdByResourceUuid(serviceId, resourceUuid);
        msg.setServiceId(targetService);
    }

在ZStack中,ManagerNode很有可能是集群部署的,每个MN管控不同的资源。那么就需要一致性哈希环来确定资源所在哪个MN。

CloudBus.send

    @Override
    public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
        //给msg一个超时时间
        evaluateMessageTimeout(msg);
        //new继承于Envelope的匿名内部类
        Envelope e = new Envelope() {
            //用来判断这个msg是否已经发出去了
            AtomicBoolean called = new AtomicBoolean(false);

            final Envelope self = this;
            //计算超时,往线程池提交一个任务
            TimeoutTaskReceipt timeoutTaskReceipt = thdf.submitTimeoutTask(new Runnable() {
                @Override
                public void run() {
                    self.timeout();
                }
            }, TimeUnit.MILLISECONDS, msg.getTimeout());

            @Override
            //msg 发送成功时候调用这个方法
            public void ack(MessageReply reply) {
                //计算该msg耗时
                count(msg);
                //根据msg的唯一UUID移除在这个map中的记录
                envelopes.remove(msg.getId());
                //如果更新失败,说明这个消息已经被发送过了。返回
                if (!called.compareAndSet(false, true)) {
                    return;
                }
                //取消一个计算超时的任务
                timeoutTaskReceipt.cancel();
                //调用注册的callback
                callback.run(reply);
            }

            //消息超时时调用的逻辑
            @Override
            public void timeout() {
                // 根据msg的唯一UUID移除在这个map中的记录
                envelopes.remove(msg.getId());
                 // 如何已经被调用过则返回
                if (!called.compareAndSet(false, true)) {
                    return;
                }
                //内部构造一个超时reply返回给callback
                callback.run(createTimeoutReply(msg));
            }
            //用于getWaitingReplyMessageStatistic
            @Override
            List<Message> getRequests() {
                List<Message> requests = new ArrayList<Message>();
                requests.add(msg);
                return requests;
            }
        };

        //往envelopes这个map里放入msg的唯一UUID和刚刚构造的envelope
        envelopes.put(msg.getId(), e);
        //发送消息
        send(msg, false);
    }

私有方法:send

    private void send(Message msg, Boolean noNeedReply) {
        //msg的serviceID不允许为空,不然不能
        if (msg.getServiceId() == null) {
            throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName()));
        }
        //为msg构建基本属性
        basicProperty(msg);
        //设置msg header属性
        msg.putHeaderEntry(CORRELATION_ID, msg.getId());
        //消息的回复队列设置
        msg.putHeaderEntry(REPLY_TO, outboundQueue.getBindingKey());
        if (msg instanceof APIMessage) {
            // API always need reply
            msg.putHeaderEntry(NO_NEED_REPLY_MSG, Boolean.FALSE.toString());
        } else if (msg instanceof NeedReplyMessage) {
            // for NeedReplyMessage sent without requiring receiver to reply,
            // mark it, then it will not be tracked and replied
            msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString());
        }

        buildRequestMessageMetaData(msg);
        wire.send(msg);
    }

该函数是一段公用逻辑。所有的消息都是从这里进去然后由rabbitMQ发出去的。所以在这里需要多说几句。

    protected void basicProperty(Message msg) {
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        msg.setAMQPProperties(builder.deliveryMode(1).expiration(String.valueOf(TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.MESSAGE_TTL))).build());
    }

这个函数设置了msg基础属性——持久化策略(否)和超时。

那么再看buildRequestMessageMetaData方法

    private void buildRequestMessageMetaData(Message msg) {
        if (msg instanceof APIMessage || (msg instanceof NeedReplyMessage && !Boolean.valueOf((String)msg.getHeaderEntry(NO_NEED_REPLY_MSG)))) {
            RequestMessageMetaData metaData;
            if (msg instanceof LockResourceMessage) {
                LockResourceMessage lmsg = (LockResourceMessage) msg;
                LockMessageMetaData lmetaData = new LockMessageMetaData();
                lmetaData.unlockKey = lmsg.getUnlockKey();
                lmetaData.reason = lmsg.getReason();
                lmetaData.senderManagementUuid = Platform.getManagementServerId();
                metaData = lmetaData;
            } else {
                metaData = new RequestMessageMetaData();
            }

            metaData.needApiEvent = msg instanceof APIMessage && !(msg instanceof APISyncCallMessage);
            metaData.msgId = msg.getId();
            metaData.replyTo = msg.getHeaderEntry(REPLY_TO);
            metaData.timeout = msg instanceof NeedReplyMessage ? ((NeedReplyMessage) msg).getTimeout() : null;
            metaData.serviceId = msg.getServiceId();
            metaData.messageName = msg.getClass().getName();
            metaData.className = metaData.getClass().getName();
            msg.getAMQPHeaders().put(MESSAGE_META_DATA, JSONObjectUtil.toJsonString(metaData));
        }
    }

方法buildRequestMessageMetaData将消息所需的metaData从msg里取了出来并放入了msg的真正Header中。

然后是wire.send:

        public void send(Message msg) {
            // for unit test finding invocation chain
            MessageCommandRecorder.record(msg.getClass());

            List<BeforeSendMessageInterceptor> interceptors = beforeSendMessageInterceptors.get(msg.getClass());
            if (interceptors != null) {
                for (BeforeSendMessageInterceptor interceptor : interceptors) {
                    interceptor.intercept(msg);

                    /*
                    if (logger.isTraceEnabled()) {
                        logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass()));
                    }
                    */
                }
            }

            for (BeforeSendMessageInterceptor interceptor : beforeSendMessageInterceptorsForAll) {
                interceptor.intercept(msg);

                /*
                if (logger.isTraceEnabled()) {
                    logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass()));
                }
                */
            }

            send(msg, true);
        }

逻辑一目了然:

  1. 记录它的调用链
  2. 调用它专属的发送前拦截器进行拦截
  3. 调用所有msg的发送前拦截器进行拦截

send(msg, true);:

       public void send(final Message msg, boolean makeQueueName) {
            /*
            StopWatch watch = new StopWatch();
            watch.start();
            */
            String serviceId = msg.getServiceId();
            if (makeQueueName) { 
                //获取真正的队列名
                serviceId = makeMessageQueueName(serviceId);
            }
            // build json schema
            buildSchema(msg);
            //当前的thread Context中获取必要信息。每个api调用所携带的uuid就是这样传递下去的
            evalThreadContextToMessage(msg);

            if (logger.isTraceEnabled() && logMessage(msg)) {
                logger.trace(String.format("[msg send]: %s", wire.dumpMessage(msg)));
            }

            //从channel poll 中取出一个channel 
            Channel chan = channelPool.acquire();
            try {
                //接下来单独解释
                new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();
                /*
                watch.stop();
                logger.debug(String.mediaType("sending %s cost %sms", msg.getClass().getName(), watch.getTime()));
                */
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            } finally {
                //返回给channel poll
                channelPool.returnChannel(chan);
            }
        }

单独分析new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();

        private class RecoverableSend {
            Channel chan;
            byte[] data;
            String serviceId;
            Message msg;
            BusExchange exchange;

            RecoverableSend(Channel chan, Message msg, String serviceId, BusExchange exchange) throws IOException {
                data = compressMessageIfNeeded(msg);
                this.chan = chan;
                this.serviceId = serviceId;
                this.msg = msg;
                this.exchange = exchange;
            }

            void send() throws IOException {
                try {
                    chan.basicPublish(exchange.toString(), serviceId,
                            true, msg.getAMQPProperties(), data);
                } catch (ShutdownSignalException e) {
                    if (!(conn instanceof AutorecoveringConnection) || serverIps.size() <= 1 || !Platform.IS_RUNNING) {
                        // the connection is not recoverable
                        throw e;
                    }

                    logger.warn(String.format("failed to send a message because %s; as the connection is recoverable," +
                            "we are doing recoverable send right now", e.getMessage()));

                    if (!recoverSend()) {
                        throw e;
                    }
                }
            }

            private byte[] compressMessageIfNeeded(Message msg) throws IOException {
                if (!CloudBusGlobalProperty.COMPRESS_NON_API_MESSAGE || msg instanceof APIEvent || msg instanceof APIMessage) {
                    return gson.toJson(msg, Message.class).getBytes();
                }

                msg.getAMQPHeaders().put(AMQP_PROPERTY_HEADER__COMPRESSED, "true");
                return Compresser.deflate(gson.toJson(msg, Message.class).getBytes());
            }

            private boolean recoverSend() throws IOException {
                int interval = conn.getHeartbeat() / 2;
                interval = interval > 0 ? interval : 1;
                int count = 0;

                // as the connection is lost, there is no need to wait heart beat missing 8 times
                // so we use reflection to fast the process
                RecoveryAwareAMQConnection delegate = FieldUtils.getFieldValue("delegate", conn);
                DebugUtils.Assert(delegate != null, "cannot get RecoveryAwareAMQConnection");
                Field _missedHeartbeats = FieldUtils.getField("_missedHeartbeats", RecoveryAwareAMQConnection.class);
                DebugUtils.Assert(_missedHeartbeats!=null, "cannot find _missedHeartbeats");
                _missedHeartbeats.setAccessible(true);
                try {
                    _missedHeartbeats.set(delegate, 100);
                } catch (IllegalAccessException e) {
                    throw new CloudRuntimeException(e);
                }

                while (count < CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES) {
                    try {
                        TimeUnit.SECONDS.sleep(interval);
                    } catch (InterruptedException e1) {
                        logger.warn(e1.getMessage());
                    }

                    try {
                        chan.basicPublish(exchange.toString(), serviceId,
                                true, msg.getAMQPProperties(), data);
                        return true;
                    } catch (ShutdownSignalException e) {
                        logger.warn(String.format("recoverable send fails %s times, will continue to retry %s times; %s",
                                count, CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES-count, e.getMessage()));
                        count ++;
                    }
                }

                return false;
            }
        }

最核心的代码即是:

                    chan.basicPublish(exchange.toString(), serviceId,
                            true, msg.getAMQPProperties(), data);

根据交换器、绑定器的key和msg的基本属性还有已经序列化的msg在RabbitMQ中发送消息。

我们可以看一下该方法签名:

    /**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the 'mandatory' flag is to be set
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。

还有一个附有immediate的方法:

    /**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the 'mandatory' flag is to be set
     * @param immediate true if the 'immediate' flag is to be
     * set. Note that the RabbitMQ server does not support this flag.
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;

当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

CloudBus.reply

    @Override
    public void reply(Message request, MessageReply reply) {
        if (Boolean.valueOf((String) request.getHeaderEntry(NO_NEED_REPLY_MSG))) {
            if (logger.isTraceEnabled()) {
                logger.trace(String.format("%s in message%s is set, drop reply%s", NO_NEED_REPLY_MSG,
                        wire.dumpMessage(request), wire.dumpMessage(reply)));
            }

            return;
        }

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        reply.setAMQPProperties(builder.deliveryMode(1).build());
        reply.getHeaders().put(IS_MESSAGE_REPLY, Boolean.TRUE.toString());
        reply.putHeaderEntry(CORRELATION_ID, request.getId());
        reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));

        buildResponseMessageMetaData(reply);
        if (request instanceof NeedReplyMessage) {
            callReplyPreSendingExtensions(reply, (NeedReplyMessage) request);
        }
        wire.send(reply, false);
    }

其他属性之前都有提到。reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));则是将reply统一经过outboundQueue这个队列,同时根据correlationId返回给原发送者。

callReplyPreSendingExtensions则会根据需求改变reply结果。之后就是wire.send,之前已经分析过了。

CloudBus.publish

    @Override
    public void publish(Event event) {
        if (event instanceof APIEvent) {
            APIEvent aevt = (APIEvent) event;
            DebugUtils.Assert(aevt.getApiId() != null, String.format("apiId of %s cannot be null", aevt.getClass().getName()));
        }
        //和前面的msgProperty一样
        eventProperty(event);
        //构建metaData
        buildResponseMessageMetaData(event);
        //前面分析过了
        callReplyPreSendingExtensions(event, null);
        //调用beforeEventPublishInterceptors。为了抛出异常的时候方便track,声明了这样的一个变量。
        BeforePublishEventInterceptor c = null;
        try {
            List<BeforePublishEventInterceptor> is = beforeEventPublishInterceptors.get(event.getClass());
            if (is != null) {
                for (BeforePublishEventInterceptor i : is) {
                    c = i;
                    i.beforePublishEvent(event);
                }
            }

            for (BeforePublishEventInterceptor i : beforeEventPublishInterceptorsForAll)  {
                c = i;
                i.beforePublishEvent(event);
            }
        } catch (StopRoutingException e) {
            if (logger.isTraceEnabled()) {
                logger.trace(String.format("BeforePublishEventInterceptor[%s] stop publishing event: %s",
                        c == null ? "null" : c.getClass().getName(), JSONObjectUtil.toJsonString(event)));
            }

            return;
        }

        wire.publish(event);
    }

接下来看wire.publish方法

        public void publish(Event evt) {
            /*
            StopWatch watch = new StopWatch();
            watch.start();
            */

            buildSchema(evt);

            evalThreadContextToMessage(evt);

            if (logger.isTraceEnabled() && logMessage(evt)) {
                logger.trace(String.format("[event publish]: %s", wire.dumpMessage(evt)));
            }

            Channel chan = channelPool.acquire();
            try {
                new RecoverableSend(chan, evt, evt.getType().toString(), BusExchange.BROADCAST).send();
                /*
                watch.stop();
                logger.debug(String.mediaType("sending %s cost %sms", evt.getClass().getName(), watch.getTime()));
                */
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            } finally {
                channelPool.returnChannel(chan);
            }
        }

大部分方法和send无异。但是在Event的类中定义了两种Type:

package org.zstack.header.message;

import org.zstack.header.rest.APINoSee;

public abstract class Event extends Message {
    /**
     * @ignore
     */
    @APINoSee
    private String avoidKey;

    public String getAvoidKey() {
        return avoidKey;
    }

    public void setAvoidKey(String avoidKey) {
        this.avoidKey = avoidKey;
    }

    public abstract Type getType();

    public abstract String getSubCategory();

    public static final String BINDING_KEY_PERFIX = "key.event.";

    public static enum Category {
        LOCAL,
        API,
    }

    public static class Type {
        private final String _name;

        public Type(Category ctg, String subCtg) {
            _name = BINDING_KEY_PERFIX + ctg.toString() + "." + subCtg;
        }

        @Override
        public String toString() {
            return _name;
        }

        @Override
        public int hashCode() {
            return _name.hashCode();
        }

        @Override
        public boolean equals(Object t) {
            if (!(t instanceof Type)) {
                return false;
            }

            Type type = (Type) t;
            return _name.equals(type.toString());
        }
    }
}

即Local和API。从名字上很好看出来,一个用来回复APIMsg的,一个用来发布本地消息。不过要了解这里面的细节,就得看EventMaid了。

EventMaid
    private class EventMaid extends AbstractConsumer {
        Map<String, List<EventListenerWrapper>> listeners = new ConcurrentHashMap<String, List<EventListenerWrapper>>();
        Channel eventChan;
        String queueName = makeEventQueueName(String.format("eventMaid.%s", Platform.getUuid()));

        public void construct() {
            try {
                eventChan = conn.createChannel();
                eventChan.queueDeclare(queueName, false, false, true, queueArguments());
                eventChan.basicConsume(queueName, true, this);
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }

        public void destruct() {
            try {
                eventChan.close();
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }


        public void listen(Event evt, EventListenerWrapper l) {
            String type = evt.getType().toString();
            try {
                synchronized (listeners) {
                    List<EventListenerWrapper> lst = listeners.get(type);
                    if (lst == null) {
                        lst = new CopyOnWriteArrayList<EventListenerWrapper>();
                        listeners.put(type, lst);
                        eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type);
                        logger.debug(String.format("[listening event]: %s", type));
                    }

                    if (!lst.contains(l)) {
                        lst.add(l);
                    }
                }
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }

        public void unlisten(Event evt, EventListenerWrapper l) {
            String type = evt.getType().toString();
            try {
                synchronized (listeners) {
                    List<EventListenerWrapper> lst = listeners.get(type);
                    if (lst == null) {
                        return;
                    }

                    lst.remove(l);
                    if (lst.isEmpty()) {
                        listeners.remove(type);
                        eventChan.queueUnbind(queueName, BusExchange.BROADCAST.toString(), type);
                        logger.debug(String.format("[unlistening event]: %s", type));
                    }
                }
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }

        @SyncThread(level = 10)
        @MessageSafe
        private void dispatch(Event evt, EventListenerWrapper l) {
            setThreadLoggingContext(evt);

            l.callEventListener(evt);
        }


        private void handle(Event evt) {
            String type = evt.getType().toString();
            List<EventListenerWrapper> lst = listeners.get(type);
            if (lst == null) {
                return;
            }

            if (logger.isTraceEnabled()) {
                logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt)));
            }

            for (EventListenerWrapper l : lst) {
                dispatch(evt, l);
            }
        }

        @Override
        public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
            Event evt = null;
            try {
                evt = (Event) wire.toMessage(bytes, basicProperties);
                handle(evt);
            } catch (final Throwable t) {
                final Event fevt = evt;
                throwableSafe(new Runnable() {
                    @Override
                    public void run() {
                        if (fevt != null) {
                            logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t);
                        } else {
                            logger.warn(String.format("unhandled throwable"), t);
                        }
                    }
                });
            }
        }
    }

这段代码得先从handleDelivery开始看:

        @Override
        public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
            Event evt = null;
            try {
                evt = (Event) wire.toMessage(bytes, basicProperties);
                handle(evt);
            } catch (final Throwable t) {
                final Event fevt = evt;
                throwableSafe(new Runnable() {
                    @Override
                    public void run() {
                        if (fevt != null) {
                            logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t);
                        } else {
                            logger.warn(String.format("unhandled throwable"), t);
                        }
                    }
                });
            }
        }

可以看到,这里是重载了Consumer接口的handleDelivery,我们看一下它的方法注释:

   /**
     * Called when a <code><b>basic.deliver</b></code> is received for this consumer.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param envelope packaging data for the message
     * @param properties content header data for the message
     * @param body the message body (opaque, client-specific byte array)
     * @throws IOException if the consumer encounters an I/O error while processing the message
     * @see Envelope
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;

这样保证EventMaid的对象能够接收到Msg。在try代码块中,从byte转换出了Event,然后走向了handle逻辑。

        private void handle(Event evt) {
            //前面提过,有两种Type,API和Local
            String type = evt.getType().toString();
            //所以只会取出两种List
            List<EventListenerWrapper> lst = listeners.get(type);
            if (lst == null) {
                return;
            }

            if (logger.isTraceEnabled()) {
                logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt)));
            }

            for (EventListenerWrapper l : lst) {
                //跳到下一个逻辑
                dispatch(evt, l);
            }
        }
        @SyncThread(level = 10)
        @MessageSafe
        private void dispatch(Event evt, EventListenerWrapper l) {
            setThreadLoggingContext(evt);
            //跳至下一段逻辑
            l.callEventListener(evt);
        }
    @Override
    public EventSubscriberReceipt subscribeEvent(final CloudBusEventListener listener, final Event... events) {
        final EventListenerWrapper wrapper = new EventListenerWrapper() {
            @Override
            public void callEventListener(Event e) {
                //走到各自的handle逻辑,如果返回true则unlisten
                if (listener.handleEvent(e)) {
                    maid.unlisten(e, this);
                }
            }
        };
        // 一个event对应一个ListenWrapper
        for (Event e : events) {
            maid.listen(e, wrapper);
        }

        return new EventSubscriberReceipt() {
            @Override
            public void unsubscribe(Event e) {
                maid.unlisten(e, wrapper);
            }

            @Override
            public void unsubscribeAll() {
                for (Event e : events) {
                    maid.unlisten(e, wrapper);
                }
            }
        };
    }

再看listen:

        public void listen(Event evt, EventListenerWrapper l) {
            String type = evt.getType().toString();
            try {
                synchronized (listeners) {
                    List<EventListenerWrapper> lst = listeners.get(type);
                    if (lst == null) {
                        lst = new CopyOnWriteArrayList<EventListenerWrapper>();
                        listeners.put(type, lst);
                        eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type);
                        logger.debug(String.format("[listening event]: %s", type));
                    }

                    if (!lst.contains(l)) {
                        lst.add(l);
                    }
                }
            } catch (IOException e) {
                throw new CloudRuntimeException(e);
            }
        }

首先加锁了listeners这个put,并根据type取出相应的list。同时将这个list转换为CopyOnWriteArrayList,这样这个list的引用就不会泄露出去了。然后绑定一个channel作为通道。另外,如果EventListenerWrapper List中不存在提交的EventListenerWrapper,则添加进去。

相信讲了这么多,有一部分读者可能已经绕晕了。这边写一个关于EventMaid的逻辑调用小结:

  • 在ZStack的每个Component启动时,会向CloudBus订阅event。
  • 当CloudBus收到需要publish的event,会向所有实现CloudBusEventListener接口的对象发送事件,由他们自己选择是否处理这些事件。

CloudBus和EventFascade就是这样协同工作的。

小结

在本文,我们一起浏览的ZStack中提供消息驱动特性组件的源码——显然,这两个组件的API非常好用,简洁明了。但在具体逻辑中有几个可以改进的点:

  • handleEvent返回boolean的判断为ture则取消listen,语义上不是很好理解
  • listen方法中的listeners可以用并发容器——ConcurrentHashMap代替,以增加吞吐量。
  • listeners的v完全可以用Set来代替。CopyOnWriteArrayList也可以用CopyOnWriteArraySet来代替。我们在listen方法中可以看到,如果lst不包含l,则add。这说明lst是不应该重复的。