1、持久化(宕机重启后依然存在)

    1. // 设置消息持久化
    2. producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    2、可靠消息机制
    手动签收
    使用手动签收模式:
    消息中间件将消息推送给消费者,消费者接收到消息之后,必须是手动发送命令给消息中间件已经消费成功;

    1. public class Consumer {
    2. // mq通讯地址
    3. private static String url = "tcp://127.0.0.1:61616";
    4. // 队列名称
    5. private static String queueName = "my_queue";
    6. public static void main(String[] args) throws JMSException {
    7. System.out.println("我是消费者003");
    8. // 1.创建连接工厂 吗,密码采用默认密码admin 和admin
    9. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
    10. // 2.创建连接
    11. Connection connection = factory.createConnection();
    12. // 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收
    13. connection.start();// 启动连接
    14. final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    15. // 4.创建目标(队列)
    16. Queue queue = session.createQueue(queueName);
    17. // 5.创建消费者
    18. MessageConsumer consumer = session.createConsumer(queue);
    19. // 6.启动监听 监听消息
    20. consumer.setMessageListener(new MessageListener() {
    21. public void onMessage(Message message) {
    22. try {
    23. TextMessage textMessage = (TextMessage) message;
    24. System.out.println("消费者消息生产者内容:" + textMessage.getText());
    25. // // 手动签收消息,告诉给消息中间件,已经消费成功.
    26. // textMessage.acknowledge();
    27. } catch (Exception e) {
    28. // TODO: handle exception
    29. }
    30. }
    31. });
    32. // 不要关闭连接
    33. }
    34. }

    事务方式

    1. public class ProducerTest {
    2. // mq通讯地址
    3. private static String url = "tcp://127.0.0.1:61616";
    4. // 队列名称
    5. private static String queueName = "my_queue";
    6. // 如果生产者以事务形式提交消息,消费者以事务形式接受消息
    7. // 第一次运行消费者 1.可以接受消息成功,但是没有标记为已消费
    8. // 第一次运行消费者 如果有生产者有新的消息继续发送,消费者接受每个消息都commit,标记为已经消费
    9. public static void main(String[] args) throws JMSException {
    10. // 1.创建连接工厂 吗,密码采用默认密码admin 和admin
    11. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
    12. // 2.创建连接
    13. Connection connection = factory.createConnection();
    14. // 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收 true 表示以事务形式发送消息
    15. connection.start();// 启动连接
    16. Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    17. // 4.创建目标(队列)
    18. Queue queue = session.createQueue(queueName);
    19. // 5.创建生产者
    20. MessageProducer producer = session.createProducer(queue);
    21. // 设置消息是否 需要持久化 默认 DeliveryMode.NON_PERSISTENT
    22. producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    23. for (int i = 1; i <= 10; i++) {
    24. // 6.创建 消息
    25. TextMessage textMessage = session.createTextMessage("消息内容i:" + i);
    26. // 7.发送消息
    27. producer.send(textMessage);
    28. // 提交事务
    29. session.commit();
    30. }
    31. // 8.关闭连接
    32. connection.close();
    33. System.out.println("消息发送完毕!");
    34. }
    35. }
    public class ProducerTest {
        // mq通讯地址
        private static String url = "tcp://127.0.0.1:61616";
        // 队列名称
        private static String queueName = "my_queue";
    
        // 如果生产者以事务形式提交消息,消费者以事务形式接受消息
        // 第一次运行消费者 1.可以接受消息成功,但是没有标记为已消费
        // 第一次运行消费者 如果有生产者有新的消息继续发送,消费者接受每个消息都commit,标记为已经消费
        public static void main(String[] args) throws JMSException {
            // 1.创建连接工厂 吗,密码采用默认密码admin 和admin
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
            // 2.创建连接
            Connection connection = factory.createConnection();
            // 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收 true 表示以事务形式发送消息
            connection.start();// 启动连接
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 4.创建目标(队列)
            Queue queue = session.createQueue(queueName);
            // 5.创建生产者
            MessageProducer producer = session.createProducer(queue);
            // 设置消息是否 需要持久化 默认 DeliveryMode.NON_PERSISTENT
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
            for (int i = 1; i <= 10; i++) {
                // 6.创建 消息
                TextMessage textMessage = session.createTextMessage("消息内容i:" + i);
                // 7.发送消息
                producer.send(textMessage);
                // 提交事务
                session.commit();
            }
            // 8.关闭连接
            connection.close();
            System.out.println("消息发送完毕!");
        }
    
    }
    

    3、SpringBoot整合ActiveMQ
    生产者:

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.1.RELEASE</version>
        </parent>
        <!-- 管理依赖 -->
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>Finchley.M7</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
        <dependencies>
            <!-- SpringBoot整合Web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- SpringBoot Activemq -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    
        </dependencies>
        <!-- 注意: 这里必须要添加, 否者各种依赖有问题 -->
        <repositories>
            <repository>
                <id>spring-milestones</id>
                <name>Spring Milestones</name>
                <url>https://repo.spring.io/libs-milestone</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
    
    spring:
      activemq:
      ###MQ连接通讯地址
        broker-url: tcp://127.0.0.1:61616
      ###账号
        user: admin
      ###密码  
        password: admin
    
    
    ###自定义队列    
    my_queue: springboot2.0-queue
    server:
      port: 8082
    
    @Component
    public class ConfigQueue {
        @Value("${my_queue}")
        private String myQueue;
    
        // 1.首先需要将队列注入springboot容器中
        @Bean
        public Queue queue() {
            return new ActiveMQQueue(myQueue);
        }
    
    
    }
    
    @Component
    public class P2pProducer {
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
        @Autowired
        private Queue queue;
    
        // 每隔5秒种时间向队列中发送消息
        @Scheduled(fixedDelay = 5000)
        public void send() {
            String userName = System.currentTimeMillis() + "";
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("userName", userName);
            jsonObject.put("email", "yushengjun6442018@163.com");
            String msg = jsonObject.toJSONString();
            jmsMessagingTemplate.convertAndSend(queue, msg);
            System.out.println("采用点对点通讯模式,msg:" + msg);
        }
    }
    
    @SpringBootApplication
    @EnableScheduling
    public class AppP2PProducer {
    
        public static void main(String[] args) {
            SpringApplication.run(AppP2PProducer.class, args);
        }
    
    }
    

    消费者:
    依赖与配置文件都和生产者相同

    @Component
    public class P2PConsumer {
        @Autowired
        private JavaMailSender javaMailSender;
    
        // 幂等性
        @JmsListener(destination = "${my_queue}")
        public void receive(String msg) throws Exception {
            if (StringUtils.isEmpty(msg)) {
                return;
            }
            // 1.解析json
            JSONObject parseObject = JSONObject.parseObject(msg);
            String userName = parseObject.getString("userName");
            String email = parseObject.getString("email");
            sendSimpleMail(email, userName);
            System.out.println("采用点对点模式,消费者成功获取到生产者的消息,msg:" + msg);
    
        }
    
        public void sendSimpleMail(String eamil, String userName) throws Exception {
            SimpleMailMessage message = new SimpleMailMessage();
            // 邮件来自 自己发自己
            message.setFrom(eamil);
            // 发送给谁
            message.setTo(eamil);
            // 邮件标题
            message.setSubject("蚂蚁课堂|每特教育 新学员提醒");
            // 邮件内容
            message.setText("祝贺您,成为了我们" + userName + ",学员!");
            // 发送邮件
            javaMailSender.send(message);
            System.out.println("邮件发送完成," + JSONObject.toJSONString(message));
        }
    
    }
    
    @SpringBootApplication
    public class AppP2PConsumer {
    
        public static void main(String[] args) {
            SpringApplication.run(AppP2PConsumer.class, args);
    
        }
    
    }
    
    @SpringBootApplication
    @EnableScheduling
    public class AppP2PProducer {
    
        public static void main(String[] args) {
            SpringApplication.run(AppP2PProducer.class, args);
        }
    
    }
    
    @SpringBootApplication
    @EnableScheduling
    public class AppP2PProducer {
    
        public static void main(String[] args) {
            SpringApplication.run(AppP2PProducer.class, args);
        }
    
    }
    

    发布与订阅(这里我只写不同的地方)
    消费者配置文件要配置开启发布与订阅

    spring:
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
    #### 开启发布订阅   
      jms: 
        pub-sub-domain: true
    topic: spring-topic
    server:
      port: 8082
    
    @Component
        public class ConfigQueue {
            @Value("${my_queue}")
            private String myQueue;
    
            // 1.首先需要将队列注入springboot容器中
            @Bean
            public Topic queue() {
                return new ActiveMQTopic(myQueue);
            }
    
    
        }
    
    @Component
    public class P2pProducer {
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
        @Autowired
        private Topic topic;
    
        // 每隔5秒种时间向队列中发送消息
        @Scheduled(fixedDelay = 5000)
        public void send() {
            String userName = System.currentTimeMillis() + "";
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("userName", userName);
            jsonObject.put("email", "yushengjun6442018@163.com");
            String msg = jsonObject.toJSONString();
            jmsMessagingTemplate.convertAndSend(topic, msg);
            System.out.println("采用点对点通讯模式,msg:" + msg);
        }
    }
    

    ActiveMQ02.docxsk.zip