使用MeatQ客户端在工程内引入maven依赖

    1. <dependency>
    2. <groupId>com.tests.framework</groupId>
    3. <artifactId>tests-common-metaqclient</artifactId>
    4. <version>1.0.0-SNAPSHOT</version>
    5. </dependency>

    Producer消息生产者的Spring相关配置如下

    1. <bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">
    2. <property name="zkConnect" value="${appName.mq.zkConnect}" />
    3. <property name="zkSessionTimeoutMs" value="${appName.mq.zkSessionTimeoutMs}" />
    4. <property name="zkConnectionTimeoutMs" value="${appName.mq.zkConnectionTimeoutMs}" />
    5. <property name="zkSyncTimeMs" value="${appName.mq.zkSyncTimeMs}" />
    6. </bean>
    7. <bean id="metaqProducer" class="com.ibm.mq.metaqclient.MetaqProducer" init-method="init">
    8. <property name="defaultTopic" value="*" />
    9. </bean>
    10. <!-- message body converter using java serialization. -->
    11. <bean id="messageBodyConverter"
    12. class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>
    13. <!-- template to send messages.
    14. 共享一个MessageProducer来发送多个topic的消息shareProducer=true -->
    15. <bean id="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">
    16. <property name="shareProducer" value="true" />
    17. <property name="defaultTopic" value="*" />
    18. </bean>

    Producer消息生产者的sessionFactory的Zookeeper配置

    1. appName.mq.zkConnect=139.196.32.146:2181
    2. appName.mq.zkSessionTimeoutMs=30000
    3. appName.mq.zkConnectionTimeoutMs=30000
    4. appName.mq.zkSyncTimeMs=5000

    Producer消息生产者的代码实现

    1. 在代码内使用Spring注入MeatQ消息生产者

    @Autowired
    MqProducer metaqProducer;

    1. 使用MeatQ消息生产者发布消息

    MqEventBean eventBean = new MqEventBean();
    metaqProducer.publish(topic, eventBean);
    发送消息统一使用MqEventBean,根据需要填充消息内容

    Consumer消息消费者的Spring相关配置如下

    1. <bean id="sessionFactory"
    2. class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">
    3. <property name="zkConnect" value="${appName.mq.zkConnect}" />
    4. <property name="zkSessionTimeoutMs" value="${appName.mq.zkSessionTimeoutMs}" />
    5. <property name="zkConnectionTimeoutMs" value="${appName.mq.zkConnectionTimeoutMs}" />
    6. <property name="zkSyncTimeMs" value="${appName.mq.zkSyncTimeMs}" />
    7. </bean>
    8. <bean id="metaqConsumer" class="com.ibm.mq.metaqclient.MeatqConsumer" init-method="init">
    9. <property name="defaultGroup" value="app-group" />
    10. </bean>
    11. <bean id="messageListener" class="com.ibm.mq.metaqclient.DateMessageListener">
    12. </bean>
    13. <!-- 各业务具体的metaqConsumerService实现 -->
    14. <bean id="consumerService" class="com.ibm.demo.mq.service.MyAppMqConsumerService"></bean>
    15. <!-- message body converter using java serialization. -->
    16. <bean id="messageBodyConverter"
    17. class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>
    18. <!-- template to send messages.
    19. 共享一个MessageProducer来发送多个topic的消息shareProducer=true -->
    20. <bean id="metaqTemplate"
    21. class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">
    22. <property name="messageSessionFactory" ref="sessionFactory" />
    23. <property name="messageBodyConverter" ref="messageBodyConverter" />
    24. <property name="shareProducer" value="true" />
    25. <property name="defaultTopic" value="*" />
    26. </bean>

    Consumer消息消费者的zookeeper的配置与Producer消息生产者相同。

    Consumer消息消费者采用监听方式,获取消息,自动调用各业务的消息处理服务,该服务要继承MqConsumerService接口类,并在Consumer的配置文件内定义该服务,配置如下。

    消息处理接口类MqConsumerService

    1. public interface MqConsumerService {
    2. /**
    3. * 执行取得消息的相应处理
    4. *
    5. * @param eventBean
    6. */
    7. void exec(MqEventBean eventBean);
    8. }

    Consumer消息消费者的代码实现

    1. 使用Spring注入MeatQ消息消费者

    @Autowired
    MqConsumer metaqConsumer;

    1. 使用MeatQ消息消费者订阅消息

    metaqConsumer.receive(topic,group);

    1. 在自定义实现MqConsumerService接口类内,实现消息接受处理。
    1. public class MyAppMqConsumerService implements MqConsumerService {
    2. public void exec(MqEventBean eventBean) {
    3. if (null != eventBean) {
    4. String topic = eventBean.getHeadValue(MqEventBean.HEAD_EVENT_TOPIC);
    5. if ("demo-test-01".equals(topic)) {
    6. System.out.println("process.........." + topic);
    7. } else if ("demo-test-02".equals(topic)) {
    8. System.out.println("process.........." + topic);
    9. } else {
    10. }
    11. }
    12. }
    13. }

    使用统一的MqEventBean消息对象生产与消费消息,MqEventBean内包含head与body,实际使用中可根据业务需求填充数据。根据MeatQ的官方文档,目前在MeataQ客户端会判断消息负载PayLoad的大小,当消息内容大于1024*1024(1M),将抛出异常,如果有消息对象大于1M的情况,请分多次发出。

    1. public class MqEventBean implements Serializable {
    2. /**
    3. * head消息头部信息
    4. */
    5. private Hashtable<String, String> head = new Hashtable<String, String>();
    6. /**
    7. * body消息内容
    8. */
    9. private Hashtable<String, Object> body = new Hashtable<String, Object>();
    10. (下略)
    11. }
    1. API:
    2. public interface MqConsumer {
    3. /**
    4. *
    5. @param topic
    6. @return
    7. @throws MetaqConsumerException
    8. */
    9. void receive(String topic) throws MetaqConsumerException;
    10. /**
    11. *
    12. @param topic
    13. @param group
    14. @throws MetaqConsumerException
    15. */
    16. void receive(String topic, String group) throws MetaqConsumerException;
    17. }
    18. /**
    19. *
    20. * 功能描述: MQ生产者接口
    21. *
    22. * @author guohuac
    23. *
    24. */
    25. public interface MqProducer {
    26. /**
    27. *
    28. * @param topic
    29. * @param eventBean
    30. * @throws MetaqProducerException
    31. */
    32. void publish(String topic, MqEventBean eventBean) throws MetaqProducerException;
    33. /**
    34. * 使用该方法,topic必须在MqEventBean的head部指定
    35. * eventBean.setHeadValue(MqConstants.HEAD_EVENT_TOPIC, topic);
    36. *
    37. * @param eventBean
    38. * @throws MetaqProducerException
    39. */
    40. void publish(MqEventBean eventBean) throws MetaqProducerException;
    41. }