调查分析

1.AMQ收发消息的Endpoint(Consumer和Producer)是和Destination绑定的(Topic/Queue), 但是MetaQ看上去不是,可以创建Endpoint后,可以随意指向Destination工作。 针对这一点可以使用Spring 的JMSTemplate+AMQ的PooledConnectionFactory来解决API及缓存Producer的问题。

2.MetaQ中的Consumer是可以以Group为单位,实现Consumer 的负载均衡,这一点在AMQ中可以使用Virtual Topic来实现。但使用VirtualTopic时需要做额外的工作是为Consumer端配置一个默认前缀来表示此Consumer为VirtualTopic 类型的Consumer.

3.AMQ针对对象消息传输时,需要在应用程序的环境变量中对被传输的对象类型做预声明, 例-Dorg.apache.activemq.SERIALIZABLE_PACKAGES=*

实现

使用Spring JMSTemplate在AMQ Bean的初始化

  1. <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  2. <constructor-arg index="0" value="${jms.broker.username}"/>
  3. <constructor-arg index="1" value="${jms.broker.password}"/>
  4. <constructor-arg index="2" value="${jms.broker.url}"/>
  5. <property name="useAsyncSend" value="true"/>
  6. </bean>
  7. <!-- ConnectionFactory Definition -->
  8. <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
  9. <constructor-arg ref="amqConnectionFactory"/>
  10. </bean>
  11. <!-- Default Destination Queue Definition-->
  12. <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQTopic">
  13. <constructor-arg index="0" value="${jms.broker.defaultTopic}"/>
  14. </bean>
  15. <!-- JmsTemplate Definition -->
  16. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  17. <property name="connectionFactory" ref="connectionFactory"/>
  18. <property name="defaultDestination" ref="defaultDestination"/>
  19. </bean>
  20. <!-- Message Sender Definition -->
  21. <bean id="amqProducer" class="com.ibm.mq.activemqclient.AMQProducer">
  22. <property name="jmsTemplate" ref="jmsTemplate"/>
  23. <property name="defaultTopic" value="${jms.broker.defaultTopic}"/>
  24. </bean>

实现MqProducer

  1. public class AMQProducer implements MqProducer {
  2. private static final Logger logger = LoggerFactory.getLogger(AMQProducer.class);
  3. private String defaultTopic;
  4. private JmsTemplate jmsTemplate;
  5. public void publish(String topic, final MqEventBean eventBean) throws ProducerException {
  6. eventBean.setHeadValue(MqConstants.HEAD_EVENT_TOPIC, topic);
  7. jmsTemplate.convertAndSend(new ActiveMQTopic(topic), eventBean);
  8. }
  9. public void publish(MqEventBean eventBean) throws ProducerException {
  10. String topic;
  11. if(eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC)!=null && !eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC).trim().equals("")) {
  12. topic = eventBean.getHeadValue(MqConstants.HEAD_EVENT_TOPIC);
  13. }
  14. else
  15. {
  16. topic = defaultTopic;
  17. }
  18. publish(topic, eventBean);
  19. }
  20. public String getDefaultTopic() {
  21. return defaultTopic;
  22. }
  23. public void setDefaultTopic(String defaultTopic) {
  24. this.defaultTopic = defaultTopic;
  25. }
  26. public JmsTemplate getJmsTemplate() {
  27. return jmsTemplate;
  28. }
  29. public void setJmsTemplate(JmsTemplate jmsTemplate) {
  30. this.jmsTemplate = jmsTemplate;
  31. }
  32. }

实现MqConsumer接口

  1. public class AMQConsumer implements MqConsumer {
  2. private static final Logger logger = LoggerFactory.getLogger(AMQConsumer.class);
  3. private static final String prefix = "Foo";
  4. private static final String defaultGoup = "default";
  5. @Autowired
  6. private PooledConnectionFactory connectionFactory;
  7. private int concurrentConsumerCount;
  8. @Autowired
  9. private MessageListener messageListener;
  10. public void receive(String topic) throws ConsumerException {
  11. try {
  12. Connection connection = connectionFactory.createConnection();
  13. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  14. Queue queue =session.createQueue(prefix + "."+defaultGoup + "." +topic);
  15. MessageConsumer consumer = session.createConsumer(queue);
  16. consumer.setMessageListener(messageListener);
  17. connection.start();
  18. } catch (JMSException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. public void receive(String topic, String group) throws ConsumerException {
  23. try {
  24. Connection connection = connectionFactory.createConnection();
  25. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  26. Queue queue = session.createQueue(prefix + "."+group + "." +topic);
  27. MessageConsumer consumer = session.createConsumer(queue);
  28. consumer.setMessageListener(messageListener);
  29. connection.start();
  30. } catch (JMSException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. public static String getPrefix() {
  35. return prefix;
  36. }
  37. public static String getDefaultGoup() {
  38. return defaultGoup;
  39. }
  40. public PooledConnectionFactory getConnectionFactory() {
  41. return connectionFactory;
  42. }
  43. public void setConnectionFactory(PooledConnectionFactory connectionFactory) {
  44. this.connectionFactory = connectionFactory;
  45. }
  46. public MessageListener getMessageListener() {
  47. return messageListener;
  48. }
  49. public void setMessageListener(MessageListener messageListener) {
  50. this.messageListener = messageListener;
  51. }
  52. }

实现javax.jms.MessageListener接口

  1. public class DateMessageListener implements MessageListener {
  2. static final Logger logger = LoggerFactory.getLogger(DateMessageListener.class);
  3. @Autowired
  4. private MqConsumerService consumerService;
  5. public void onMessage(Message message) {
  6. try {
  7. logger.debug("DateMessageListener::recieveMessages>>>>>");
  8. if(message instanceof ObjectMessage)
  9. {
  10. MqEventBean eb = (MqEventBean) ((ObjectMessage) message).getObject();
  11. eb.setHeadValue(MqConstants.HEAD_EVENT_ID, message.getJMSMessageID());
  12. consumerService.exec(eb);
  13. }
  14. logger.debug("DateMessageListener::recieveMessages<<<<<");
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. public MqConsumerService getConsumerService() {
  20. return consumerService;
  21. }
  22. public void setConsumerService(MqConsumerService consumerService) {
  23. this.consumerService = consumerService;
  24. }
  25. }

配置VirtualTopic对应的Consumer前缀

  1. <destinationInterceptors>
  2. <virtualDestinationInterceptor>
  3. <virtualDestinations>
  4. <virtualTopic name=">" prefix="Foo.*." selectorAware="false"/>
  5. </virtualDestinations>
  6. </virtualDestinationInterceptor>
  7. </destinationInterceptors>