1、持久化(宕机重启后依然存在)
// 设置消息持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
2、可靠消息机制
手动签收
使用手动签收模式:
消息中间件将消息推送给消费者,消费者接收到消息之后,必须是手动发送命令给消息中间件已经消费成功;
public class Consumer {
// mq通讯地址
private static String url = "tcp://127.0.0.1:61616";
// 队列名称
private static String queueName = "my_queue";
public static void main(String[] args) throws JMSException {
System.out.println("我是消费者003");
// 1.创建连接工厂 吗,密码采用默认密码admin 和admin
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 2.创建连接
Connection connection = factory.createConnection();
// 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收
connection.start();// 启动连接
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 4.创建目标(队列)
Queue queue = session.createQueue(queueName);
// 5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
// 6.启动监听 监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费者消息生产者内容:" + textMessage.getText());
// // 手动签收消息,告诉给消息中间件,已经消费成功.
// textMessage.acknowledge();
} catch (Exception e) {
// TODO: handle exception
}
}
});
// 不要关闭连接
}
}
事务方式
public class ProducerTest {
// mq通讯地址
private static String url = "tcp://127.0.0.1:61616";
// 队列名称
private static String queueName = "my_queue";
// 如果生产者以事务形式提交消息,消费者以事务形式接受消息
// 第一次运行消费者 1.可以接受消息成功,但是没有标记为已消费
// 第一次运行消费者 如果有生产者有新的消息继续发送,消费者接受每个消息都commit,标记为已经消费
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂 吗,密码采用默认密码admin 和admin
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 2.创建连接
Connection connection = factory.createConnection();
// 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收 true 表示以事务形式发送消息
connection.start();// 启动连接
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 4.创建目标(队列)
Queue queue = session.createQueue(queueName);
// 5.创建生产者
MessageProducer producer = session.createProducer(queue);
// 设置消息是否 需要持久化 默认 DeliveryMode.NON_PERSISTENT
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i <= 10; i++) {
// 6.创建 消息
TextMessage textMessage = session.createTextMessage("消息内容i:" + i);
// 7.发送消息
producer.send(textMessage);
// 提交事务
session.commit();
}
// 8.关闭连接
connection.close();
System.out.println("消息发送完毕!");
}
}
public class ProducerTest {
// mq通讯地址
private static String url = "tcp://127.0.0.1:61616";
// 队列名称
private static String queueName = "my_queue";
// 如果生产者以事务形式提交消息,消费者以事务形式接受消息
// 第一次运行消费者 1.可以接受消息成功,但是没有标记为已消费
// 第一次运行消费者 如果有生产者有新的消息继续发送,消费者接受每个消息都commit,标记为已经消费
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂 吗,密码采用默认密码admin 和admin
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 2.创建连接
Connection connection = factory.createConnection();
// 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收 true 表示以事务形式发送消息
connection.start();// 启动连接
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 4.创建目标(队列)
Queue queue = session.createQueue(queueName);
// 5.创建生产者
MessageProducer producer = session.createProducer(queue);
// 设置消息是否 需要持久化 默认 DeliveryMode.NON_PERSISTENT
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i <= 10; i++) {
// 6.创建 消息
TextMessage textMessage = session.createTextMessage("消息内容i:" + i);
// 7.发送消息
producer.send(textMessage);
// 提交事务
session.commit();
}
// 8.关闭连接
connection.close();
System.out.println("消息发送完毕!");
}
}
3、SpringBoot整合ActiveMQ
生产者:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<!-- 管理依赖 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.M7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- SpringBoot整合Web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SpringBoot Activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
<!-- 注意: 这里必须要添加, 否者各种依赖有问题 -->
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
spring:
activemq:
###MQ连接通讯地址
broker-url: tcp://127.0.0.1:61616
###账号
user: admin
###密码
password: admin
###自定义队列
my_queue: springboot2.0-queue
server:
port: 8082
@Component
public class ConfigQueue {
@Value("${my_queue}")
private String myQueue;
// 1.首先需要将队列注入springboot容器中
@Bean
public Queue queue() {
return new ActiveMQQueue(myQueue);
}
}
@Component
public class P2pProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 每隔5秒种时间向队列中发送消息
@Scheduled(fixedDelay = 5000)
public void send() {
String userName = System.currentTimeMillis() + "";
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", userName);
jsonObject.put("email", "yushengjun6442018@163.com");
String msg = jsonObject.toJSONString();
jmsMessagingTemplate.convertAndSend(queue, msg);
System.out.println("采用点对点通讯模式,msg:" + msg);
}
}
@SpringBootApplication
@EnableScheduling
public class AppP2PProducer {
public static void main(String[] args) {
SpringApplication.run(AppP2PProducer.class, args);
}
}
消费者:
依赖与配置文件都和生产者相同
@Component
public class P2PConsumer {
@Autowired
private JavaMailSender javaMailSender;
// 幂等性
@JmsListener(destination = "${my_queue}")
public void receive(String msg) throws Exception {
if (StringUtils.isEmpty(msg)) {
return;
}
// 1.解析json
JSONObject parseObject = JSONObject.parseObject(msg);
String userName = parseObject.getString("userName");
String email = parseObject.getString("email");
sendSimpleMail(email, userName);
System.out.println("采用点对点模式,消费者成功获取到生产者的消息,msg:" + msg);
}
public void sendSimpleMail(String eamil, String userName) throws Exception {
SimpleMailMessage message = new SimpleMailMessage();
// 邮件来自 自己发自己
message.setFrom(eamil);
// 发送给谁
message.setTo(eamil);
// 邮件标题
message.setSubject("蚂蚁课堂|每特教育 新学员提醒");
// 邮件内容
message.setText("祝贺您,成为了我们" + userName + ",学员!");
// 发送邮件
javaMailSender.send(message);
System.out.println("邮件发送完成," + JSONObject.toJSONString(message));
}
}
@SpringBootApplication
public class AppP2PConsumer {
public static void main(String[] args) {
SpringApplication.run(AppP2PConsumer.class, args);
}
}
@SpringBootApplication
@EnableScheduling
public class AppP2PProducer {
public static void main(String[] args) {
SpringApplication.run(AppP2PProducer.class, args);
}
}
@SpringBootApplication
@EnableScheduling
public class AppP2PProducer {
public static void main(String[] args) {
SpringApplication.run(AppP2PProducer.class, args);
}
}
发布与订阅(这里我只写不同的地方)
消费者配置文件要配置开启发布与订阅
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
#### 开启发布订阅
jms:
pub-sub-domain: true
topic: spring-topic
server:
port: 8082
@Component
public class ConfigQueue {
@Value("${my_queue}")
private String myQueue;
// 1.首先需要将队列注入springboot容器中
@Bean
public Topic queue() {
return new ActiveMQTopic(myQueue);
}
}
@Component
public class P2pProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
// 每隔5秒种时间向队列中发送消息
@Scheduled(fixedDelay = 5000)
public void send() {
String userName = System.currentTimeMillis() + "";
JSONObject jsonObject = new JSONObject();
jsonObject.put("userName", userName);
jsonObject.put("email", "yushengjun6442018@163.com");
String msg = jsonObject.toJSONString();
jmsMessagingTemplate.convertAndSend(topic, msg);
System.out.println("采用点对点通讯模式,msg:" + msg);
}
}