使用MeatQ客户端在工程内引入maven依赖
<dependency>
<groupId>com.tests.framework</groupId>
<artifactId>tests-common-metaqclient</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
Producer消息生产者的Spring相关配置如下
<bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">
<property name="zkConnect" value="${appName.mq.zkConnect}" />
<property name="zkSessionTimeoutMs" value="${appName.mq.zkSessionTimeoutMs}" />
<property name="zkConnectionTimeoutMs" value="${appName.mq.zkConnectionTimeoutMs}" />
<property name="zkSyncTimeMs" value="${appName.mq.zkSyncTimeMs}" />
</bean>
<bean id="metaqProducer" class="com.ibm.mq.metaqclient.MetaqProducer" init-method="init">
<property name="defaultTopic" value="*" />
</bean>
<!-- message body converter using java serialization. -->
<bean id="messageBodyConverter"
class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>
<!-- template to send messages.
共享一个MessageProducer来发送多个topic的消息shareProducer=true -->
<bean id="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">
<property name="shareProducer" value="true" />
<property name="defaultTopic" value="*" />
</bean>
Producer消息生产者的sessionFactory的Zookeeper配置
appName.mq.zkConnect=139.196.32.146:2181
appName.mq.zkSessionTimeoutMs=30000
appName.mq.zkConnectionTimeoutMs=30000
appName.mq.zkSyncTimeMs=5000
Producer消息生产者的代码实现
- 在代码内使用Spring注入MeatQ消息生产者
@Autowired
MqProducer metaqProducer;
- 使用MeatQ消息生产者发布消息
MqEventBean eventBean = new MqEventBean();
metaqProducer.publish(topic, eventBean);
发送消息统一使用MqEventBean,根据需要填充消息内容
Consumer消息消费者的Spring相关配置如下
<bean id="sessionFactory"
class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">
<property name="zkConnect" value="${appName.mq.zkConnect}" />
<property name="zkSessionTimeoutMs" value="${appName.mq.zkSessionTimeoutMs}" />
<property name="zkConnectionTimeoutMs" value="${appName.mq.zkConnectionTimeoutMs}" />
<property name="zkSyncTimeMs" value="${appName.mq.zkSyncTimeMs}" />
</bean>
<bean id="metaqConsumer" class="com.ibm.mq.metaqclient.MeatqConsumer" init-method="init">
<property name="defaultGroup" value="app-group" />
</bean>
<bean id="messageListener" class="com.ibm.mq.metaqclient.DateMessageListener">
</bean>
<!-- 各业务具体的metaqConsumerService实现 -->
<bean id="consumerService" class="com.ibm.demo.mq.service.MyAppMqConsumerService"></bean>
<!-- message body converter using java serialization. -->
<bean id="messageBodyConverter"
class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>
<!-- template to send messages.
共享一个MessageProducer来发送多个topic的消息shareProducer=true -->
<bean id="metaqTemplate"
class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">
<property name="messageSessionFactory" ref="sessionFactory" />
<property name="messageBodyConverter" ref="messageBodyConverter" />
<property name="shareProducer" value="true" />
<property name="defaultTopic" value="*" />
</bean>
Consumer消息消费者的zookeeper的配置与Producer消息生产者相同。
Consumer消息消费者采用监听方式,获取消息,自动调用各业务的消息处理服务,该服务要继承MqConsumerService接口类,并在Consumer的配置文件内定义该服务,配置如下。
消息处理接口类MqConsumerService
public interface MqConsumerService {
/**
* 执行取得消息的相应处理
*
* @param eventBean
*/
void exec(MqEventBean eventBean);
}
Consumer消息消费者的代码实现
- 使用Spring注入MeatQ消息消费者
@Autowired
MqConsumer metaqConsumer;
- 使用MeatQ消息消费者订阅消息
metaqConsumer.receive(topic,group);
- 在自定义实现MqConsumerService接口类内,实现消息接受处理。
public class MyAppMqConsumerService implements MqConsumerService {
public void exec(MqEventBean eventBean) {
if (null != eventBean) {
String topic = eventBean.getHeadValue(MqEventBean.HEAD_EVENT_TOPIC);
if ("demo-test-01".equals(topic)) {
System.out.println("process.........." + topic);
} else if ("demo-test-02".equals(topic)) {
System.out.println("process.........." + topic);
} else {
}
}
}
}
使用统一的MqEventBean消息对象生产与消费消息,MqEventBean内包含head与body,实际使用中可根据业务需求填充数据。根据MeatQ的官方文档,目前在MeataQ客户端会判断消息负载PayLoad的大小,当消息内容大于1024*1024(1M),将抛出异常,如果有消息对象大于1M的情况,请分多次发出。
public class MqEventBean implements Serializable {
/**
* head消息头部信息
*/
private Hashtable<String, String> head = new Hashtable<String, String>();
/**
* body消息内容
*/
private Hashtable<String, Object> body = new Hashtable<String, Object>();
(下略)
}
API:
public interface MqConsumer {
/**
*
@param topic
@return
@throws MetaqConsumerException
*/
void receive(String topic) throws MetaqConsumerException;
/**
*
@param topic
@param group
@throws MetaqConsumerException
*/
void receive(String topic, String group) throws MetaqConsumerException;
}
/**
*
* 功能描述: MQ生产者接口
*
* @author guohuac
*
*/
public interface MqProducer {
/**
*
* @param topic
* @param eventBean
* @throws MetaqProducerException
*/
void publish(String topic, MqEventBean eventBean) throws MetaqProducerException;
/**
* 使用该方法,topic必须在MqEventBean的head部指定
* eventBean.setHeadValue(MqConstants.HEAD_EVENT_TOPIC, topic);
*
* @param eventBean
* @throws MetaqProducerException
*/
void publish(MqEventBean eventBean) throws MetaqProducerException;
}