调查分析
1.AMQ收发消息的Endpoint(Consumer和Producer)是和Destination绑定的(Topic/Queue), 但是MetaQ看上去不是,可以创建Endpoint后,可以随意指向Destination工作。 针对这一点可以使用Spring 的JMSTemplate+AMQ的PooledConnectionFactory来解决API及缓存Producer的问题。
2.MetaQ中的Consumer是可以以Group为单位,实现Consumer 的负载均衡,这一点在AMQ中可以使用Virtual Topic来实现。但使用VirtualTopic时需要做额外的工作是为Consumer端配置一个默认前缀来表示此Consumer为VirtualTopic 类型的Consumer.
3.AMQ针对对象消息传输时,需要在应用程序的环境变量中对被传输的对象类型做预声明, 例-Dorg.apache.activemq.SERIALIZABLE_PACKAGES=*
实现
使用Spring JMSTemplate在AMQ Bean的初始化
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="${jms.broker.username}"/>
<constructor-arg index="1" value="${jms.broker.password}"/>
<constructor-arg index="2" value="${jms.broker.url}"/>
<property name="useAsyncSend" value="true"/>
</bean>
<!-- ConnectionFactory Definition -->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
</bean>
<!-- Default Destination Queue Definition-->
<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="${jms.broker.defaultTopic}"/>
</bean>
<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="defaultDestination"/>
</bean>
<!-- Message Sender Definition -->
<bean id="amqProducer" class="com.ibm.mq.activemqclient.AMQProducer">
<property name="jmsTemplate" ref="jmsTemplate"/>
<property name="defaultTopic" value="${jms.broker.defaultTopic}"/>
</bean>
实现MqProducer
public class AMQProducer implements MqProducer {
private static final Logger logger = LoggerFactory.getLogger(AMQProducer.class);
private String defaultTopic;
private JmsTemplate jmsTemplate;
public void publish(String topic, final MqEventBean eventBean) throws ProducerException {
eventBean.setHeadValue(MqConstants.HEAD_EVENT_TOPIC, topic);
jmsTemplate.convertAndSend(new ActiveMQTopic(topic), eventBean);
}
public void publish(MqEventBean eventBean) throws ProducerException {
String topic;
if(eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC)!=null && !eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC).trim().equals("")) {
topic = eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC);
}
else
{
topic = defaultTopic;
}
publish(topic, eventBean);
}
public String getDefaultTopic() {
return defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
实现MqConsumer接口
public class AMQConsumer implements MqConsumer {
private static final Logger logger = LoggerFactory.getLogger(AMQConsumer.class);
private static final String prefix = "Foo";
private static final String defaultGoup = "default";
@Autowired
private PooledConnectionFactory connectionFactory;
private int concurrentConsumerCount;
@Autowired
private MessageListener messageListener;
public void receive(String topic) throws ConsumerException {
try {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue =session.createQueue(prefix + "."+defaultGoup + "." +topic);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void receive(String topic, String group) throws ConsumerException {
try {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(prefix + "."+group + "." +topic);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static String getPrefix() {
return prefix;
}
public static String getDefaultGoup() {
return defaultGoup;
}
public PooledConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(PooledConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public MessageListener getMessageListener() {
return messageListener;
}
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
}
实现javax.jms.MessageListener接口
public class DateMessageListener implements MessageListener {
static final Logger logger = LoggerFactory.getLogger(DateMessageListener.class);
@Autowired
private MqConsumerService consumerService;
public void onMessage(Message message) {
try {
logger.debug("DateMessageListener::recieveMessages>>>>>");
if(message instanceof ObjectMessage)
{
MqEventBean eb = (MqEventBean) ((ObjectMessage) message).getObject();
eb.setHeadValue(MqConstants.HEAD_EVENT_ID, message.getJMSMessageID());
consumerService.exec(eb);
}
logger.debug("DateMessageListener::recieveMessages<<<<<");
} catch (Exception e) {
e.printStackTrace();
}
}
public MqConsumerService getConsumerService() {
return consumerService;
}
public void setConsumerService(MqConsumerService consumerService) {
this.consumerService = consumerService;
}
}
配置VirtualTopic对应的Consumer前缀
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="Foo.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>