1. 消息中间件概念
1. ActiveMQ技术维度 各个框架功能一样 只是落地细节区别
2. 什么场景需要消息中间件
如果没有引入MQ,生产者和消费者互相调用,在大型分布式应用中,系统间的RPC交互繁杂,即每增加一个消费者,生产者都要修改(系统之间的接口耦合比较严重);等待同步消息性存在问题(RPC接口基本上是同步调用,类似“木桶理论”);面对消息多容易冲垮
1. 系统之间直接调用实际工程落地和存在的问题:
2. 面对大流量并发时,容易被冲垮。
3. 等待同步存在性能问题
3. 为什么要使用 MQ ?
没有MQ之前 系统的耦合度高 系统抵御不了高并发。
解决了耦合调用、异步模型、抵御洪峰流量,保护了主业务,消峰。
1. MQ过程
2. 特点
异步处理模式
这样的一种通讯方式,就是所谓的异步,对于系统A来说只要把消息发给MQ就行,然后系统B就会异步处理,系统A不会关心B是否处理完,至于怎么处理都是B的事 和A没有关系 这就是解耦
系统解耦:
发送者不必了解对方,只需要确认消息。
发送者和接受者不必同时在线。
2. MQ下载与安装
ActiveMQ 的官网 : http://activemq.apache.org
下载地址;https://activemq.apache.org/activemq-5015009-release
1. 安装
- 压缩包下载复制到/opy/software文件夹下
- 解压到/opt/modlue 文件夹
- 普通启动
./activemq start/stop/restart
- activemq 默认的进程端口是 61616
判断activemq启动的两种命令:
判断是否有进程 ps -ef | grep active | grep -v grep
判断端口是否被占用 netstat -anp | grep 61616
- 带日志的启动
2.控制台访问
后台已经启动 但是前台看不到啊??
如tomcat 启动有猫
activemq 有图形化界面吗?
- 后台已经启动 前台怎么访问
- 前台怎么连接
端口是8161 默认密码 adminwin是客服端,linux是我们的服务器 需要ping通过
访问的地址:http://192.168.234.100:8161/
3. Java实现active(queue)通讯
1.导入jar包
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- activemq 和 spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
2. JMS总体架构
destination:目的地 有队列Q和主题T 主题是群发 队列单发
3. 连接与消费
1.默认的连接地址格式
2. 消息生产
- 获取连接
- 创建会话
- 通过会话创建队列
- 通过对列创建消息生产者
- 通过会话创建消息
通过消息生产者发送消息
public static final String ACTIVEMQ_URL = "tcp://192/168.234.100:61616"; public static final String QUE_NAME = "que1"; public static void main(String[] args) throws JMSException { // 创建连接工厂 按照给定URL 采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 通过连接工厂,获得connection Connection connection = activeMQConnectionFactory.createConnection(); // 启动访问 connection.start(); // 创建会话 两个参数 第一个叫事务 第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地 (是队列 还是 主题) Queue queue = session.createQueue(QUE_NAME); // 创建消息的生产者 MessageProducer producer = session.createProducer(queue); // 通过使用消息生产者 生产消息返送到队列里面 for (int i = 1; i <= 3; i++) { // 创建消息 TextMessage textMessage = session.createTextMessage("msg-" + i);//一个字符串消息 // 通过消息生产者发送 producer.send(textMessage); } // 关闭资源 session.close(); connection.close(); producer.close(); System.out.println("消息发送完成"); }
3. 消息消费
public static final String ACTIVEMQ_URL = "tcp://192.168.234.100:61616"; public static final String QUE_NAME = "que1"; public static void main(String[] args) throws JMSException { // 创建连接工厂 按照给定URL 采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 通过连接工厂,获得connection Connection connection = activeMQConnectionFactory.createConnection(); // 启动访问 connection.start(); // 创建会话 两个参数 第一个叫事务 第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目的地 (是队列 还是 主题) Queue queue = session.createQueue(QUE_NAME); // 创建消费者 MessageConsumer consumer = session.createConsumer(queue); while (true){ // timeout 选项 多少秒没有消息自动关闭 TextMessage receive = (TextMessage) consumer.receive(); if (receive != null){ System.out.println("消费者接收到消息" + receive.getText()); }else { break; } } consumer.close(); session.close(); connection.close(); }
4. 消费者的细节
不带时间的receive()他会一直等待 阻塞
-
5. 消费者—通过监听的方式
设置消息监听器
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message != null && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read();
6. 多个消费者同时监听 消费情况
先生产 先启动消费者1再启动消费者2
消费者1 YES 都被先启动的消费者消费光了
消费者2 NO
先消费 后生产6条消息 自带负载均衡的思想
4. Java实现(topic)通讯 - 群发
1. 代码
``` public static final String TOP_NAME = “top-atguigu”; // 创建目的地 (是队列 还是 主题) Topic topic = session.createTopic(TOP_NAME);
// 创建消费者 MessageConsumer consumer = session.createConsumer(topic); ===============================其他原样照搬================
<a name="mnOFP"></a>
## 2. 测试
1. 先启动订阅再生产 不然就是废消息 启动2个消费者

2. 发送时 两个消费者获取全部消息

<a name="etJI2"></a>
## 3. Queue 和 Topic 对比

<a name="TmPtA"></a>
# 5.JMS 的实现 以及 产品功能
<a name="FkJnR"></a>
## 1. 什么是javaEE

<a name="ZeDuV"></a>
## 2. JMS Java消息服务
什么是Java消息服务?<br />Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
<a name="aJnXS"></a>
## 3. 几个MQ落地产品比较:

<a name="Ak3xQ"></a>
## 4.JMS组成结构和特点
<a name="LxcdR"></a>
### 1. 四大结构
<br />
<a name="IQolZ"></a>
### 2. JMS Message
<br />
1. 消息头
JMSDestination:消息目的地 主要是指Queue 和 Topic<br />JMSDeliveryMode:消息持久化模式 <br /><br /> 1是不持续 2是持续<br />JMSExpiration:消息过期时间 <br /><br />JMSPriority:消息的优先级 0-9<br />JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性。
============send犯法=========== void send(Destination destination,Message message,int deliveryMode,int priority,long timeToLive);
2. 消息体 5种格式
TextMessage:普通字符消息 包含一个String<br />MapMessage:一个Map类型 key为String 值为Java的基本数据类型<br />BytesMessage:二进制数组消息 包含一个bate<br />StreamMessage:Java数据流消息 用标准流操作来循序填充和读取<br />ObiectMessage:对象消息 包含一个可序列化的Java对象<br />发送的消息体类型和接受的类型必须一致<br /><br />
3. 消息属性
发送消息的时候可以带属性 进一步标识 实现 去重 识别 重点标注<br /><br />例子:<br /><br /><br />
<a name="jvmra"></a>
## 5. JMS可靠性
// 创建会话 两个参数 第一个叫事务 第二个叫签收<br />Session session = connection.createSession(false, Session._AUTO_ACKNOWLEDGE_);
<a name="wPR9I"></a>
### 1. 持久化

1. 非持久化 Queue
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 这里将服务器关机 再启动 消息就没有了
2. 持久化(默认)
producer.setDeliveryMode(DeliveryMode.PERSISTENT); 这里将服务器关机 再启动 消息还存在 记录不在了
1. 持久 Topic
Topic持久主要是体现在订阅者 订阅完离线 等再次上线时 会把推送的消息接收
=============消费者======================= public static void main(String[] args) throws JMSException, IOException { System.out.println(“我是2号”); // 创建连接工厂 按照给定URL 采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 通过连接工厂,获得connection Connection connection = activeMQConnectionFactory.createConnection(); connection.setClientID(“z3”); // 创建会话 两个参数 第一个叫事务 第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地 (是队列 还是 主题)
Topic topic = session.createTopic(TOP_NAME);
//订阅
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark");
// 启动
connection.start();
Message message = topicSubscriber.receive();
while (message != null){
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的持久化topic" + textMessage.getText());
message = topicSubscriber.receive(1000L);
}
session.close();
connection.close();
}
演示:<br />消费者先启动一次 进行订阅 再下线 此时显示离线状态<br /><br />生产者发送消息 消费者再上线是否能接收到消息?<br />能 把离线期间的消息收到 并且变成在线状态<br />
<a name="m0dwT"></a>
### 2. 事务
1. 生产者事务
//事务设置 只有true和flase Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 如果设置为true send以后还需要cpmmit 才能到队列中 //提交 才能显示到队列中 session.commit(); session.rollback(); 回退
2. 消费者事务 !!!!!!!!!!!
//如果事务设置成flase 那么正常被消费 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 但是!!! 如果事务被开启那么必须要有提交 软件执行了消息 但是activemq监控台没有收到 下次执行会重复执行消息 一直可以消费 session.commit(); 提交后 正常
<a name="zVn21"></a>
### 3. 签收
Session session = connection.createSession(flase, Session._AUTO_ACKNOWLEDGE_);<br />Session._AUTO_ACKNOWLEDGE :自动签收 !!_<br />Session._CLIENT_ACKNOWLEDGE : 手动签收!!_<br />Session._DUPS_OK_ACKNOWLEDGE: 允许部分重复的签收_<br />Session._SESSION_TRANSACTED : 事务级_<br />_非事务的签收:_<br />_自动签收:默认有消息就接收(默认)_<br />_手动签收:会消费 会重复消费 和事务没提交一样_<br />_需要手动反馈 客户端调用acknowledge方法手动签收_<br /><br />_允许重复签收:多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。_<br />_有事务的签收:_<br />Session session = connection.createSession(true, Session._AUTO_ACKNOWLEDGE_);<br />_手动签收:(事务要更大一些)_<br />_开启了事务 。没有使用了acknowledge方法 而是commit 成功消费到了消息_<br />_开启了事务,使用了acknowledge方法 没有commit 重复消费_<br />_如果事务回滚 消息则被再次传送_
<a name="aC12f"></a>
### 4. 点对点和发布订阅小总结
<a name="qhb0z"></a>
#### 1.点对点
点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。<br />如果在Session关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收<br />队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势
<a name="hWcSc"></a>
#### 2. 发布订阅
(1)JMS的发布订阅总结<br />JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic。<br />主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。<br />主题使得消息订阅者和消息发布者保持互相独立不需要解除即可保证消息的传送
(2)非持久订阅<br />非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。<br />如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。<br /> 一句话:先订阅注册才能接受到发布,只给订阅者发布消息。
(3)持久订阅<br />客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息<br />当持久订阅状态下,不能恢复或重新派送一个未签收的消息。<br />持久订阅才能恢复或重新派送一个未签收的消息。
(4)非持久和持久化订阅如何选择<br />当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。
<a name="tdCmE"></a>
# 6. ActiveMQ的Broker
<a name="CUcOQ"></a>
## 1. 是什么?
相当于一个ActiveMQ服务器实例。说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。
<a name="Von7o"></a>
## 2. 多配置文件启动
1. 复制多台配置文件

2. 选择配置文件启动
./activemq start xbean:file:/opt/module/apache-activemq-5.15.9/conf/activemq2.xml <br />
<a name="SHvkt"></a>
## 3. 嵌入式Broker 相当于内嵌的Active服务器
这样就在本机启动了一个服务器
1. jsion的绑定
2. 编写启动类
<br />
<a name="gzMy8"></a>
# ========中级=========
<a name="dEp1x"></a>
# 1. Spring整合ActiveMQ
<a name="whyGq"></a>
## 1. 建maven工程 添加Spring支持JMS的包
<a name="YvBI0"></a>
## 2. 配置文件
<?xml version=”1.0” encoding=”UTF-8”?>
<a name="VYWXj"></a>
## 3. 提供者
@Service public class SpringMQ_producer { @Autowired private JmsTemplate jmsTemplate; public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext(“applicationContext.xml”); SpringMQ_producer producer = (SpringMQ_producer) ctx.getBean(“springMQ_producer”); producer.jmsTemplate.send((session) -> { TextMessage textMessage = session.createTextMessage(“spring 和 activemq 的整合”); return textMessage; }); System.out.println(“ send task over “); } }
<a name="pF3mF"></a>
## 4. 主题写法
1. 修改这一行 改成主题即可

<a name="YXVGc"></a>
## 5. 配置监听类
1. 配置文件

2. 创建一个myMessageListener实现MessageListener
<a name="rm2vz"></a>
# 2. SpringBoot整合ActiveMQ
<a name="Kylsi"></a>
## 1. 公共配置
1. 创建maven工程

2. pom.xml
选择session activemq5
<a name="fKOnD"></a>
## 2. 队列 生产者
1. yaml 配置文件
server: port: 7777
spring: activemq: broker-url: tcp://192.168.234.100:61616 #自己的服务器地址 user: admin password: admin jms: pub-sub-domain: false # 是否发布订阅 false = Queue true = Topic
定义队列名称
myqueue: boot-active-queue
2. 配置类
@Component public class ConfigBean { @Value(“${myqueue}”) private String myQueue;
@Bean
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
}
3. 提供者
@Component public class Queue_produce { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue;
public void produce(){
jmsMessagingTemplate.convertAndSend(queue,"queue队列 "+ UUID.randomUUID().toString().substring(0,6));
}
}
4. 定时投递
//定时投放
@Scheduled(fixedDelay = 3000)
public void produceSch(){
jmsMessagingTemplate.convertAndSend(queue,"queue队列 "+ UUID.randomUUID().toString().substring(0,6));
}
====================== @SpringBootApplication 主启动类开启 @EnableScheduling public class BootMqProduceApplication {
<a name="ZrYiX"></a>
## 3. 队列 消费者
1. yaml 修改端口
server: port: 8888 ===============
spring: activemq: broker-url: tcp://192.168.234.100:61616 #自己的服务器地址 user: admin password: admin jms: pub-sub-domain: false # 是否发布订阅 false = Queue true = Topic
定义队列名称
myqueue: boot-active-queue
2. 消费者 代码
@Component public class Queue_Consumer {
@JmsListener(destination = "${myqueue}")
public void receve(TextMessage textMessage) throws JMSException {
System.out.println("*****消费者收到消息" + textMessage.getText());
}
}
<a name="q8Qbe"></a>
# 3.SpringBoot整合Topic
1. pom
server: port: 7788 spring: activemq: broker-url: tcp://192.168.234.100 user: admin password: admin jms: pub-sub-domain: true =============
定义队列名称
myqueue: boot-active-queue
2. 配置类
@Component public class ConfigBean {
@Value("${mytopic}")
private String topicName;
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
}
3. 实现方法
@Component public class Topic_Product { @Autowired private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Topic topic;
@Scheduled(fixedDelay = 3000)
public void produce(){
jmsMessagingTemplate.convertAndSend(topic,"你好topic");
System.out.println("发送完成");
}
}
4. 消费者 该端口
4. 消费者代码
@Component public class Consumer { // @JmsListener(destination = “${mytopic}”) public void revice(TextMessage textMessage) throws JMSException { System.out.println(“收到的消息”+textMessage.getText()); } } ```