1. ActiveMQ 是什么?


  • ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。
  • ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中仍然扮演着特殊的地位。

2. JMS 规范


概述

  • Java 消息服务(Java Message Service,即 JMS)应用程序接口是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序中间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API。

JMS 的对象模型

对象 描述
ConnectionFactory 连接工厂
Connection 连接
Session 会话
Destination 目的
MessageProducer 生产者
MessageConsumer 消费者
Message 消息
Broker 消息中间件的实例(ActiveMQ)

JMS 的消息类型

  1. Point-to-Point(P2P)/点对点

image.png

  1. Publish/Subscribe(Pub/Sub)/主题(发布订阅)

image.png

JMS 的消息结构

image.png

  1. 消息头 | 对象 | 描述 | | —- | —- | | JMSDestination | 消息的目的地,Topic 或者 Queue | | JMSDeliveryMode | 消息的发送模式 | | JMSTimestamp | 消息传递给 Broker 的时间戳,它不是实际发送的时间 | | JMSExpiration | 消息的有效期,在有效期内,消息消费者才可以消费这个消息 | | JMSPriority | 消息的优先级。0-4 为正常的优先级,5-9 为高优先级 | | JMSMessageID | 一个字符串用来唯一标识一个消息 | | JMSReplyTo | 有时消息生产者希望消费者回复一个消息,JMSReplyTo 为一个 Destination,表示需要回复的目的地 | | JMSCorrelationID | 通常用来关联多个 Message | | JMSType | 表示消息体的结构,和 JMS 提供者有关 | | JMSRedelivered | 如果这个值为 true,表示消息是被重新发送了 |

  2. 消息属性

    • 消息属性可以理解为消息的附加消息头,属性名可以自定义。

image.png

  1. 消息体类型 | 类型 | 描述 | | —- | —- | | BytesMessage | 用来传递字节消息 | | MapMessage | 用来传递键值对消息 | | ObjectMessage | 用来传递序列化对象 | | StreamMessage | 用来传递文件等 | | TextMessage | 用来传递字符串 |

3. ActiveMQ 特性


  1. 支持多种编程语言
  2. 支持多种传输协议
  3. 有多种持久化方式

4. ActiveMQ 如何安装


1. 下载 ActiveMQ

  1. 进入 /tmp 目录

    1. cd /tmp
  2. 使用 wget 下载文档

    1. wget -c https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.10/apache-activemq-5.15.10-bin.tar.gz

2. 安装 ActiveMQ

  1. 提取文档

    1. tar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /var
  2. 修改 ActiveMQ 的目录名

    1. mv /var/apache-activemq-5.15.10/ /var/activemq/

3. 启动 ActiveMQ

  1. 作为进程后台启动 ActiveMQ
    1. ./bin/activemq start

4. ActiveMQ 服务

前面使用命令行运行 ActiveMQ,但最好的方式是将 ActiveMQ 作为服务启动,使用 system 服务将可以确保 ActiveMQ 能在系统启动时自动启动。

  1. 创建 ActiveMQ 服务

    • 使用 vim 创建一个 systemd 服务文件

      1. vim /usr/lib/systemd/system/activemq.service
    • 内容如下

image.png

  • 修改 /var/activemq/bin/env 配置,修改 JAVA_HOME

image.png

  1. ActiveMQ 服务操作

    • 启动 ActiveMQ 服务

      1. systemctl start activemq
    • 查看服务状态

      systemctl status activemq
      
    • 设置开机自启动

      ln -s /usr/lib/systemd/system/activemq.service /etc/systemd/system/multi-user.target.wants/activemq.service
      systemctl enable activemq
      

5. 防火墙添加 ActiveMQ 的端口

ActiveMQ 启动后,外部还无法访问,还需要在防火墙配置中添加 ActiveMQ 的 Web 管理端口和通讯端口。

  1. 添加端口

    # Web 管理端口默认为 8161,通讯端口默认为 61616
    firewall-cmd --zone=public --add-port=8161/tcp --permanent
    firewall-cmd --zone=public --add-port=61616/tcp --permanent
    
  2. 重启防火墙

    systemctl restart firewalld.service
    

5. ActiveMQ 如何使用


1. 使用 ActiveMQ 的 Web 管理平台

  • ActiveMQ 自带有 Web 管理平台,在浏览器访问 http://服务IP:8161/admin 即可进入。
  • 默认开启了身份校验:

    • 账号:admin
    • 密码:admin

      2. Web 管理配置

  • ActiveMQ 的 Web 管理平台是基于 jetty 运行,因此在 /var/activemq/conf 目录可以看到 jetty 的配置文件。

image.png

  • 修改 Web 管理平台的默认端口,在 /var/activemq/conf/jetty.xml 中。

    <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
      <!-- the default port number for the web console -->
      <property name="host" value="0.0.0.0"/>
      <property name="port" value="8161"/>
    </bean>
    
  • 删除登录密码,在 /var/activemq/conf/jetty.xml 中。

    <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
      <property name="name" value="BASIC" />
      <property name="roles" value="user,admin" />
      <!-- set authenticate=false to disable login -->
      <property name="authenticate" value="true" />
    </bean>
    
  • 也可通过修改 /var/activemq/conf/jetty-realm.properties 文件进行配置。

3. 在 Java 中使用 ActiveMQ

  • 使用 Jar 包
    • 在 ActiveMQ 的根目录有 activemq-all-xxx.jar 包,将其直接复制到工程即可。
  • Maven 依赖
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>xxx</version>
    </dependency>
    

4. 在 Spring 中使用 ActiveMQ


Producer

public class Producer {
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory;
        Connection conn = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            // 1. 创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");

            // 2. 创建连接对象
            conn = connectionFactory.createConnection();
            conn.start();

            // todo ...
            // 3. 创建会话
            // 第一个参数,是否支持实务,如果为 true,则会忽略第二个参数,被 jms 服务器设置为 SESSION_TRANSACTED
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 4. 创建点对点发送的目标
            Destination destination = session.createQueue("queue1");
            // 创建发布的目标
//            Destination destination = session.createTopic("topic1");

            // 5. 创建生产者消息
            producer = session.createProducer(destination);
            // 设置生产者模式,有两种可选:
            // DeliveryMode.PERSISTENT 当 activemq 关闭的时候,队列数据将会被保存
            // DeliveryMode.NON_PERSISTENT 当 activemq 关闭的时候,队列数据将会被清空
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // 6. 创建一条消息
            String text = "Hello World!";
            TextMessage message = session.createTextMessage(text);

            // 7. 发送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (producer != null) {
                producer.close();
            }
            if (session != null) {
                session.close();
            }
            if (conn != null) {
                conn.close();
            }
        }
    }
}

Consumer

public class Consumer {
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory;
        Connection conn = null;
        Session session = null;
        MessageConsumer consumer = null;

        try {
            // 1. 创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");

            // 2. 创建连接对象
            conn = connectionFactory.createConnection();
            conn.start();

            // 3. 创建会话
            // 第一个参数:是否支持事务,如果为 true,则会忽略第二个参数,被 jms 服务器设置为 SESSION_TRANSACTED
            // 第一个参数为 false 时,第二个参数的值可为 Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE 其中一个。
            // Session.AUTO_ACKNOWLEDGE 为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
            // Session.CLIENT_ACKNOWLEDGE 为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
            // DUPS_OK_ACKNOWLEDGE 允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 4. 创建点对点接收的目标
            Destination destination = session.createQueue("queue1");
            // 创建订阅的目标
//            Destination destination = session.createTopic("topic1");

            // 5. 创建消费者信息
            consumer = session.createConsumer(destination);

            // 6. 接收消息
            Message message = consumer.receive(1000);
            if (message instanceof TextMessage) {
                System.out.println("收到文本消息:" + ((TextMessage) message).getText());
            } else {
                System.out.println(message);
            }
        } finally {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (conn != null) {
                conn.close();
            }
        }
    }
}

7. SpringBoot + ActiveMQ


依赖包

<dependencies>
    <!--直接使用spring-boot-starter-activemq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <!-- MQTT -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
</dependencies>

application.properties

spring.activemq.broker-url=tcp:/127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin

Producer

@SpringBootApplication
public class Producer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @PostConstruct
    public void init() {
        jmsTemplate.convertAndSend("queue1", "Hello Spring 3");
    }

    public static void main(String[] args) {
        SpringApplication.run(Producer.class);
    }

}

Consumer

@SpringBootApplication
@EnableJms
public class Consumer {

    @JmsListener(destination = "queue1")
    public void receive(String message) {
        System.out.println("收到消息:" + message);
    }

    public static void main(String[] args) {
        SpringApplication.run(Consumer.class);
    }

}

8. SpringBoot + ActiveMQ 配置类,双模式使用


JmsConfiguration

@Configuration
@EnableJms
public class JmsConfiguration {

    /**
     * 连接工厂
     */
    @Bean
    public ConnectionFactory connectionFactory(@Value("${spring.activemq.broker-url}") String brokerUrl,
                                               @Value("${spring.activemq.user}") String userName,
                                               @Value("${spring.activemq.password}") String password) {
        return new ActiveMQConnectionFactory(userName, password, brokerUrl);
    }

    /**
     * 队列模式的监听容器
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactoryQueue(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    /**
     * topic 监听
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        bean.setPubSubDomain(true);
        return bean;
    }

    /**
     * 队列模板
     */
    @Bean
    public JmsTemplate jmsTemplateQueue(ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }

    /**
     * 发布订阅模板
     */
    @Bean
    public JmsTemplate jmsTemplatePublish(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    }

}

Producer

@SpringBootApplication
public class Producer {

    @Autowired
    private JmsTemplate jmsTemplatePublish;

    @Autowired
    private JmsTemplate jmsTemplateQueue;

    @PostConstruct
    public void send() {
        // 队列模式发送
        jmsTemplateQueue.convertAndSend("queue1", "Hello Spring queue 1");

        // 发布订阅模式发送
        jmsTemplatePublish.send("topic1", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                MapMessage message = session.createMapMessage();
                message.setString("msg", "Hello Spring xxx");
                return message;
            }
        });
    }

    public static void main(String[] args) {
        SpringApplication.run(Producer.class);
    }
}

Consumer

@SpringBootApplication
public class Consumer {
    @JmsListener(destination = "queue1", containerFactory = "jmsListenerContainerFactoryQueue")
    public void receiveQueue(Message message) throws JMSException {
        if (message instanceof TextMessage) {
            System.out.println("收到文本消息:" + ((TextMessage) message).getText());
        } else if (message instanceof ActiveMQMapMessage) {
            System.out.println("收到Map消息:" + ((ActiveMQMapMessage) message).getContentMap());
        } else {
            System.out.println(message);
        }
    }

    @JmsListener(destination = "topic1", containerFactory = "jmsListenerContainerFactoryTopic")
    public void receiveTopic(Message message) throws JMSException {
        System.out.println("收到的消息:" + message.toString());
        if (message instanceof MapMessage) {
            System.out.println("收到订阅消息:" + ((MapMessage) message).getMapNames());
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(Consumer.class);
    }
}

9. 其他资料