• 异步化消息
  • 使用JMS, RabbitMQ和Kafka发送消息
  • 从代理拉取消息
  • 监听消息

    8.1 使用JMS发送消息

    8.1.1 搭建JMS环境

  1. 添加依赖

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-artemis</artifactId>
    4. </dependency>
  2. 配置文件

    1. spring:
    2. artemis:
    3. host: localhost
    4. port: 61617
    5. user: admin
    6. password: admin

    提示:如果是在本地开发可以不用配置这些属性

8.1.2 使用JmsTemplate发送消息

  • 3个send方法都需要MessageCreator来生成Message对象
  • 3个convertAndSend方法会接受Object对象,并且在幕后自动进行Object转换为Message
  • 3个convertAndSend方法会自动将Object转换为Message,同时还能接受一个MessagePostProcessor对象,在发送之前对Message进行自定义

    1. public class JmsOrderMessagingService implements OrderMessagingService{
    2. private JmsTemplate jmsTemplate;
    3. @Autowired
    4. public JmsOrderMessagingService(JmsTemplate jmsTemplate) {
    5. this.jmsTemplate = jmsTemplate;
    6. }
    7. @Override
    8. public void sendOrder(Order order) {
    9. jmsTemplate.send(new MessageCreator() {
    10. @Override
    11. public Message createMessage(Session session) throws JMSException {
    12. return session.createObjectMessage(order);
    13. }
    14. });
    15. }
    16. }

    使用lambda表达式改写sendOrder方法

    1. @Override
    2. public void sendOrder(Order order) {
    3. jmsTemplate.send(session -> session.createObjectMessage(order));
    4. }

    配置默认的目的地

    1. spring:
    2. jms:
    3. template:
    4. default-destination: tacocloud.order.queue

通过bean的方式配置目的地

  1. @Bean
  2. public Destination orderQueue() {
  3. return new ActiveMQQueue("tacocloud.order.queue");
  4. }

将bean注入

  1. private Destination orderQueue;
  2. @Autowired
  3. public JmsOrderMessagingService(JmsTemplate jmsTemplate, Destination orderQueue) {
  4. this.jmsTemplate = jmsTemplate;
  5. this.orderQueue = orderQueue;
  6. }

还可以用send方法第一个参数,指明目的地

  1. @Override
  2. public void sendOrder(Order order) {
  3. jmsTemplate.send("tacocloud.order.queue", session -> session.createObjectMessage(order));
  4. }

发送消息之前进行转换

利用convertAndSend方法,可以在发送消息之前将java类转化为Message

  1. @Override
  2. public void sendOrder(Order order) {
  3. jmsTemplate.convertAndSend("tacocloud.order.queue", order);
  4. }

配置消息转换器

默认是SimpleMessageConverter转换器,但是可以用下面的方法来替代默认的转换器
注意:是import org.springframework.jms.support.converter.MappingJackson2MessageConverter;包下面的

  1. @Bean
  2. public MappingJackson2MessageConverter messageConverter() {
  3. MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
  4. messageConverter.setTypeIdPropertyName("_typeId");
  5. return messageConverter;
  6. }

默认情况下,它将会包含要转换的类型的全限定类名。但是,这样的话会不太灵活,要求接收端也包含相同的类型,并且具有相同的全限定类名。为了实现更⼤的灵活性,我们可以通过调⽤消息转换器的setTypeIdMappings()⽅法将⼀个合成类型名映射到实际类型上。

  1. @Bean
  2. public MappingJackson2MessageConverter messageConverter() {
  3. MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
  4. messageConverter.setTypeIdPropertyName("_typeId");
  5. HashMap<String, Class<?>> typeIdMappings = new HashMap<>();
  6. typeIdMappings.put("order", Order.class);
  7. messageConverter.setTypeIdMappings(typeIdMappings);
  8. return messageConverter;
  9. }

对消息进行后期处理

可以调用Message对象设置些属性,例如可以设置一下订单来源属性

  1. @Override
  2. public void sendOrder(Order order) {
  3. jmsTemplate.send("tacocloud.order.queue", session -> {
  4. Message message = session.createObjectMessage(order);
  5. message.setStringProperty("X_ORDER_SOURCE", "WEB");
  6. return message;
  7. });
  8. }

convertAndSend方法是在底层创建Message的,但是可以用第三个参数MessagePostProcessor来实现上面的效果

  1. @Override
  2. public void sendOrder(Order order) {
  3. jmsTemplate.convertAndSend("tacocloud.order.queue", order, new MessagePostProcessor() {
  4. @Override
  5. public Message postProcessMessage(Message message) throws JMSException {
  6. message.setStringProperty("X_ORDER_SOURCE", "WEB");
  7. return message;
  8. }
  9. });
  10. }

用lambda表达式简化

  1. @Override
  2. public void sendOrder(Order order) {
  3. jmsTemplate.convertAndSend("tacocloud.order.queue", order, message -> {
  4. message.setStringProperty("X_ORDER_SOURCE", "WEB");
  5. return message;
  6. });
  7. }

8.1.3 接收JMS消息

使⽤JmsTemplate来接收消息

  1. @Component
  2. public class JmsOrderReceiver implements OrderReceiver{
  3. private JmsTemplate jmsTemplate;
  4. private MessageConverter messageConverter;
  5. @Autowired
  6. public JmsOrderReceiver(JmsTemplate jmsTemplate, MessageConverter messageConverter) {
  7. this.jmsTemplate = jmsTemplate;
  8. this.messageConverter = messageConverter;
  9. }
  10. @Override
  11. public Order receiveOrder() throws JMSException {
  12. Message message = jmsTemplate.receive("tacocloud.order.queue");
  13. return (Order) messageConverter.fromMessage(message);
  14. }
  15. }

或者可以用receiveAndConvert

  1. @Override
  2. public Order receiveOrder() throws JMSException {
  3. return (Order) jmsTemplate.receiveAndConvert("tacocloud.order.queue");
  4. }

声明消息监听器

  1. @Component
  2. public class OrderListener {
  3. private KitchenUI ui;
  4. @Autowired
  5. public OrderListener(KitchenUI ui) {
  6. this.ui = ui;
  7. }
  8. @JmsListener(destination = "tacocloud.order.queue")
  9. public void receiveOrder(Order order) {
  10. ui.displayOrder(order);
  11. }
  12. }