调查分析
1.AMQ收发消息的Endpoint(Consumer和Producer)是和Destination绑定的(Topic/Queue), 但是MetaQ看上去不是,可以创建Endpoint后,可以随意指向Destination工作。 针对这一点可以使用Spring 的JMSTemplate+AMQ的PooledConnectionFactory来解决API及缓存Producer的问题。
2.MetaQ中的Consumer是可以以Group为单位,实现Consumer 的负载均衡,这一点在AMQ中可以使用Virtual Topic来实现。但使用VirtualTopic时需要做额外的工作是为Consumer端配置一个默认前缀来表示此Consumer为VirtualTopic 类型的Consumer.
3.AMQ针对对象消息传输时,需要在应用程序的环境变量中对被传输的对象类型做预声明, 例-Dorg.apache.activemq.SERIALIZABLE_PACKAGES=*
实现
使用Spring JMSTemplate在AMQ Bean的初始化
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><constructor-arg index="0" value="${jms.broker.username}"/><constructor-arg index="1" value="${jms.broker.password}"/><constructor-arg index="2" value="${jms.broker.url}"/><property name="useAsyncSend" value="true"/></bean><!-- ConnectionFactory Definition --><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"><constructor-arg ref="amqConnectionFactory"/></bean><!-- Default Destination Queue Definition--><bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="${jms.broker.defaultTopic}"/></bean><!-- JmsTemplate Definition --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"/><property name="defaultDestination" ref="defaultDestination"/></bean><!-- Message Sender Definition --><bean id="amqProducer" class="com.ibm.mq.activemqclient.AMQProducer"><property name="jmsTemplate" ref="jmsTemplate"/><property name="defaultTopic" value="${jms.broker.defaultTopic}"/></bean>
实现MqProducer
public class AMQProducer implements MqProducer {private static final Logger logger = LoggerFactory.getLogger(AMQProducer.class);private String defaultTopic;private JmsTemplate jmsTemplate;public void publish(String topic, final MqEventBean eventBean) throws ProducerException {eventBean.setHeadValue(MqConstants.HEAD_EVENT_TOPIC, topic);jmsTemplate.convertAndSend(new ActiveMQTopic(topic), eventBean);}public void publish(MqEventBean eventBean) throws ProducerException {String topic;if(eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC)!=null && !eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC).trim().equals("")) {topic = eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC);}else{topic = defaultTopic;}publish(topic, eventBean);}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public JmsTemplate getJmsTemplate() {return jmsTemplate;}public void setJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}}
实现MqConsumer接口
public class AMQConsumer implements MqConsumer {private static final Logger logger = LoggerFactory.getLogger(AMQConsumer.class);private static final String prefix = "Foo";private static final String defaultGoup = "default";@Autowiredprivate PooledConnectionFactory connectionFactory;private int concurrentConsumerCount;@Autowiredprivate MessageListener messageListener;public void receive(String topic) throws ConsumerException {try {Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue =session.createQueue(prefix + "."+defaultGoup + "." +topic);MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);connection.start();} catch (JMSException e) {e.printStackTrace();}}public void receive(String topic, String group) throws ConsumerException {try {Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(prefix + "."+group + "." +topic);MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);connection.start();} catch (JMSException e) {e.printStackTrace();}}public static String getPrefix() {return prefix;}public static String getDefaultGoup() {return defaultGoup;}public PooledConnectionFactory getConnectionFactory() {return connectionFactory;}public void setConnectionFactory(PooledConnectionFactory connectionFactory) {this.connectionFactory = connectionFactory;}public MessageListener getMessageListener() {return messageListener;}public void setMessageListener(MessageListener messageListener) {this.messageListener = messageListener;}}
实现javax.jms.MessageListener接口
public class DateMessageListener implements MessageListener {static final Logger logger = LoggerFactory.getLogger(DateMessageListener.class);@Autowiredprivate MqConsumerService consumerService;public void onMessage(Message message) {try {logger.debug("DateMessageListener::recieveMessages>>>>>");if(message instanceof ObjectMessage){MqEventBean eb = (MqEventBean) ((ObjectMessage) message).getObject();eb.setHeadValue(MqConstants.HEAD_EVENT_ID, message.getJMSMessageID());consumerService.exec(eb);}logger.debug("DateMessageListener::recieveMessages<<<<<");} catch (Exception e) {e.printStackTrace();}}public MqConsumerService getConsumerService() {return consumerService;}public void setConsumerService(MqConsumerService consumerService) {this.consumerService = consumerService;}}
配置VirtualTopic对应的Consumer前缀
<destinationInterceptors><virtualDestinationInterceptor><virtualDestinations><virtualTopic name=">" prefix="Foo.*." selectorAware="false"/></virtualDestinations></virtualDestinationInterceptor></destinationInterceptors>
