使用MeatQ客户端在工程内引入maven依赖
<dependency><groupId>com.tests.framework</groupId><artifactId>tests-common-metaqclient</artifactId><version>1.0.0-SNAPSHOT</version></dependency>
Producer消息生产者的Spring相关配置如下
<bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean"><property name="zkConnect" value="${appName.mq.zkConnect}" /><property name="zkSessionTimeoutMs" value="${appName.mq.zkSessionTimeoutMs}" /><property name="zkConnectionTimeoutMs" value="${appName.mq.zkConnectionTimeoutMs}" /><property name="zkSyncTimeMs" value="${appName.mq.zkSyncTimeMs}" /></bean><bean id="metaqProducer" class="com.ibm.mq.metaqclient.MetaqProducer" init-method="init"><property name="defaultTopic" value="*" /></bean><!-- message body converter using java serialization. --><bean id="messageBodyConverter"class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/><!-- template to send messages.共享一个MessageProducer来发送多个topic的消息shareProducer=true --><bean id="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate"><property name="shareProducer" value="true" /><property name="defaultTopic" value="*" /></bean>
Producer消息生产者的sessionFactory的Zookeeper配置
appName.mq.zkConnect=139.196.32.146:2181appName.mq.zkSessionTimeoutMs=30000appName.mq.zkConnectionTimeoutMs=30000appName.mq.zkSyncTimeMs=5000
Producer消息生产者的代码实现
- 在代码内使用Spring注入MeatQ消息生产者
@Autowired
MqProducer metaqProducer;
- 使用MeatQ消息生产者发布消息
MqEventBean eventBean = new MqEventBean();
metaqProducer.publish(topic, eventBean);
发送消息统一使用MqEventBean,根据需要填充消息内容
Consumer消息消费者的Spring相关配置如下
<bean id="sessionFactory"class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean"><property name="zkConnect" value="${appName.mq.zkConnect}" /><property name="zkSessionTimeoutMs" value="${appName.mq.zkSessionTimeoutMs}" /><property name="zkConnectionTimeoutMs" value="${appName.mq.zkConnectionTimeoutMs}" /><property name="zkSyncTimeMs" value="${appName.mq.zkSyncTimeMs}" /></bean><bean id="metaqConsumer" class="com.ibm.mq.metaqclient.MeatqConsumer" init-method="init"><property name="defaultGroup" value="app-group" /></bean><bean id="messageListener" class="com.ibm.mq.metaqclient.DateMessageListener"></bean><!-- 各业务具体的metaqConsumerService实现 --><bean id="consumerService" class="com.ibm.demo.mq.service.MyAppMqConsumerService"></bean><!-- message body converter using java serialization. --><bean id="messageBodyConverter"class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/><!-- template to send messages.共享一个MessageProducer来发送多个topic的消息shareProducer=true --><bean id="metaqTemplate"class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate"><property name="messageSessionFactory" ref="sessionFactory" /><property name="messageBodyConverter" ref="messageBodyConverter" /><property name="shareProducer" value="true" /><property name="defaultTopic" value="*" /></bean>
Consumer消息消费者的zookeeper的配置与Producer消息生产者相同。
Consumer消息消费者采用监听方式,获取消息,自动调用各业务的消息处理服务,该服务要继承MqConsumerService接口类,并在Consumer的配置文件内定义该服务,配置如下。
消息处理接口类MqConsumerService
public interface MqConsumerService {/*** 执行取得消息的相应处理** @param eventBean*/void exec(MqEventBean eventBean);}
Consumer消息消费者的代码实现
- 使用Spring注入MeatQ消息消费者
@Autowired
MqConsumer metaqConsumer;
- 使用MeatQ消息消费者订阅消息
metaqConsumer.receive(topic,group);
- 在自定义实现MqConsumerService接口类内,实现消息接受处理。
public class MyAppMqConsumerService implements MqConsumerService {public void exec(MqEventBean eventBean) {if (null != eventBean) {String topic = eventBean.getHeadValue(MqEventBean.HEAD_EVENT_TOPIC);if ("demo-test-01".equals(topic)) {System.out.println("process.........." + topic);} else if ("demo-test-02".equals(topic)) {System.out.println("process.........." + topic);} else {}}}}
使用统一的MqEventBean消息对象生产与消费消息,MqEventBean内包含head与body,实际使用中可根据业务需求填充数据。根据MeatQ的官方文档,目前在MeataQ客户端会判断消息负载PayLoad的大小,当消息内容大于1024*1024(1M),将抛出异常,如果有消息对象大于1M的情况,请分多次发出。
public class MqEventBean implements Serializable {/*** head消息头部信息*/private Hashtable<String, String> head = new Hashtable<String, String>();/*** body消息内容*/private Hashtable<String, Object> body = new Hashtable<String, Object>();(下略)}
API:public interface MqConsumer {/***@param topic@return@throws MetaqConsumerException*/void receive(String topic) throws MetaqConsumerException;/***@param topic@param group@throws MetaqConsumerException*/void receive(String topic, String group) throws MetaqConsumerException;}/**** 功能描述: MQ生产者接口** @author guohuac**/public interface MqProducer {/**** @param topic* @param eventBean* @throws MetaqProducerException*/void publish(String topic, MqEventBean eventBean) throws MetaqProducerException;/*** 使用该方法,topic必须在MqEventBean的head部指定* eventBean.setHeadValue(MqConstants.HEAD_EVENT_TOPIC, topic);** @param eventBean* @throws MetaqProducerException*/void publish(MqEventBean eventBean) throws MetaqProducerException;}
